feat: implement P&ID extraction and tag mapping, update MCP server and web UI

This commit is contained in:
windpacer
2026-05-02 14:56:04 +09:00
parent fb11359b4c
commit 30182bf020
110 changed files with 3553252 additions and 226 deletions

View File

@@ -60,6 +60,121 @@ def _llm():
from openai import OpenAI
return OpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
# ── PaddleOCR 싱글톤 (PDF fallback용) ──────────────────────────────────────────
@lru_cache(maxsize=1)
def _ocr():
"""PaddleOCR 인스턴스 (한/영, GPU). 첫 호출 시 ~50MB 모델 다운로드."""
from paddleocr import PaddleOCR
import os
use_gpu = os.environ.get("PADDLE_USE_GPU", "true").lower() == "true"
try:
ocr = PaddleOCR(
use_angle_cls=True,
lang="korean",
use_gpu=use_gpu,
show_log=False,
)
return ocr
except Exception as e:
# GPU 실패 시 CPU 폴백
if use_gpu:
os.environ["PADDLE_USE_GPU"] = "false"
return _ocr()
raise e
# ── DXF/PDF 텍스트 추출 헬퍼 ───────────────────────────────────────────────────
def _extract_text_from_dxf(filepath: str) -> str:
"""ezdxf로 DXF 파일에서 텍스트 추출 (MTEXT 포맷 코드 제거)."""
import ezdxf
from ezdxf.tools.text import plain_mtext
doc = ezdxf.readfile(filepath)
msp = doc.modelspace()
texts = []
for entity in msp:
if entity.dxftype() == "TEXT":
texts.append(entity.dxf.text)
elif entity.dxftype() == "MTEXT":
try:
plain = plain_mtext(entity.dxf.text)
if plain.strip():
texts.append(plain)
except Exception:
pass
return "\n".join(texts)
def _extract_text_from_pdf(filepath: str) -> str:
"""PyMuPDF로 PDF 파일에서 텍스트 추출."""
import fitz # pymupdf
doc = fitz.open(filepath)
texts = []
for page in doc:
texts.append(page.get_text())
return "\n".join(texts)
def _extract_text_from_pdf_ocr(filepath: str) -> str:
"""PaddleOCR로 PDF에서 이미지 추출 후 OCR (고정밀도)."""
import fitz # pymupdf
from PIL import Image
import numpy as np
doc = fitz.open(filepath)
all_texts = []
for page_idx, page in enumerate(doc):
# 페이지를 이미지로 변환
mat = fitz.Matrix(300 / 72) # 300 DPI
pix = page.get_pixmap(matrix=mat)
img_data = pix.tobytes("png")
img = Image.open(__import__("io").BytesIO(img_data))
# OCR 실행
result = _ocr().ocr(np.array(img), cls=True)
if result[0]:
for line in result[0]:
all_texts.append(line[1][0])
return "\n".join(all_texts)
def _convert_dwg_to_dxf_dxflib(filepath: str) -> str:
"""libreoffice로 DWG를 DXF로 변환."""
import subprocess
import os
dxf_path = filepath.replace(".dwg", ".dxf")
try:
# LibreOffice로 변환
result = subprocess.run(
[
"libreoffice",
"--headless",
"--convert-to", "dxf:AutoCAD DXF",
"--outdir", os.path.dirname(filepath) or ".",
filepath
],
check=True,
timeout=120,
capture_output=True,
text=True
)
if os.path.exists(dxf_path):
return dxf_path
else:
raise FileNotFoundError("DXF 변환 파일이 생성되지 않았습니다.")
except subprocess.CalledProcessError as e:
raise Exception(f"LibreOffice 변환 실패: {e.stderr}")
# ── Qdrant 검색 헬퍼 ──────────────────────────────────────────────────────────
def _search(collection: str, query: str, top_k: int, threshold: float = 0.25) -> str:
@@ -442,6 +557,423 @@ def query_with_nl(question: str) -> str:
return json.dumps(result, ensure_ascii=False, default=str)
# ── P&ID 추출 도구 ──────────────────────────────────────────────────────────────
@mcp.tool()
def extract_pid_tags(text: str, source_type: str) -> str:
"""P&ID 도면(DXF/PDF)에서 태그 정보를 추출합니다.
Args:
text: DXF/PDF에서 추출한 텍스트
source_type: 'dxf' 또는 'pdf'
Returns:
JSON: { success, count, tags: [{tagNo, equipmentName, instrumentType, lineNumber, pidDrawingNo, confidence}] }
"""
system = (
"You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
"Extract all instrument and equipment tags from the provided text.\n"
"Return ONLY a valid JSON array. Each element must have exactly these fields:\n"
'{"tagNo":"FCV-101","equipmentName":null,"instrumentType":"FCV","lineNumber":null,"pidDrawingNo":null,"confidence":0.95}\n'
"Rules:\n"
"- tagNo: any token matching [LETTERS]-[DIGITS] or [LETTERS]-[DIGITS]-[SUFFIX]\n"
" Examples: FCV-101, P-10101, T-10100, VG-6203-15A-F1A-n, BT-6200, DP-10101\n"
"- instrumentType: leading letters of tagNo (e.g. FCV, P, T, VG, BT, DP, PSV)\n"
"- equipmentName: descriptive name if present in text near the tag, else null\n"
"- lineNumber: null unless a line number is explicitly associated\n"
"- pidDrawingNo: null unless a drawing number is explicitly associated\n"
"- confidence: 0.95 for clear tags, lower for ambiguous ones\n"
"- Output ONLY the JSON array, no markdown, no explanation.\n"
"- If no tags found, return: []\n"
)
import logging
import re
import json as json_module
try:
truncated_text = text[:100000] if len(text) > 100000 else text
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: {source_type}\n\nText:\n{truncated_text}"},
],
max_tokens=32768,
temperature=0.1,
extra_body={"chat_template_kwargs": {"enable_thinking": False}},
)
raw = (resp.choices[0].message.content or "").strip()
finish_reason = resp.choices[0].finish_reason
# 마크다운 코드 블록 제거
if raw.startswith("```"):
lines = raw.splitlines()
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
# finish_reason=length 로 잘린 경우: 마지막 완전한 객체까지 살린 뒤 배열 닫기
if finish_reason == "length":
last_close = raw.rfind("}")
if last_close != -1:
raw = raw[:last_close + 1] + "]"
# 유효한 JSON 배열 추출 (가장 긴 균형 잡힌 [...] 선택)
def _extract_array(s: str) -> str:
depth = 0; start = -1; best = ""
for i, c in enumerate(s):
if c == '[':
if depth == 0: start = i
depth += 1
elif c == ']':
depth -= 1
if depth == 0 and start >= 0:
cand = s[start:i+1]
if len(cand) > len(best): best = cand
return best if best else "[]"
raw = _extract_array(raw)
# JSON 파싱 — 실패 시 개별 객체 추출로 폴백
try:
data = json_module.loads(raw)
except json_module.JSONDecodeError:
objects = re.findall(r'\{[^{}]*\}', raw, re.DOTALL)
data = []
for obj in objects:
try:
data.append(json_module.loads(obj))
except json_module.JSONDecodeError:
pass
if not data:
return json_module.dumps({"success": False, "count": 0, "tags": []}, ensure_ascii=False)
logging.info(f"[extract_pid_tags] source={source_type} count={len(data) if isinstance(data, list) else 0}")
return json_module.dumps({
"success": True,
"count": len(data),
"tags": data
}, ensure_ascii=False, indent=2)
except Exception as e:
logging.error(f"P&ID 태그 추출 실패: {e}")
logging.error(f"Raw response: {raw[:1000]}")
return json.dumps({"success": False, "error": f"P&ID 태그 추출 실패: {e}"}, ensure_ascii=False)
@mcp.tool()
def match_pid_tags(pid_tags: list[str], experion_tags: list[str]) -> str:
"""P&ID 태그를 Experion 태그에 매핑합니다.
Args:
pid_tags: P&ID에서 추출한 태그 목록 (예: ["FT-101", "PT-201"])
experion_tags: Experion 시스템 태그 목록 (예: ["ficq-6113.pv", "pt-201.pv"])
Returns:
JSON: { success, count, mappings: [{pidTag, experionTag, confidence}] }
"""
system = (
"You are a P&ID to Experion tag matching expert.\n"
"Match P&ID tags to Experion tags based on similarity.\n"
"Return ONLY a JSON array of objects with the following structure:\n"
'[{"pidTag":"FT-101","experionTag":"ft-101.pv","confidence":0.92},...]\n'
"IMPORTANT rules:\n"
"- pidTag: The original P&ID tag from input\n"
"- experionTag: The matched Experion tag (lowercase, with .pv/.sp/.mv suffix)\n"
"- confidence: 0.0 to 1.0 based on match quality\n"
"- If no good match found, set confidence < 0.5 and leave experionTag null\n"
"- Do NOT include any explanation, only the JSON array.\n"
"- If no matches found, return an empty array: []\n"
"- temperature=0.1 for deterministic output.\n"
)
import re
import json as json_module
try:
pid_str = "\n".join(pid_tags)
experion_str = "\n".join(experion_tags)
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"P&ID Tags:\n{pid_str}\n\nExperion Tags:\n{experion_str}"},
],
max_tokens=16384,
temperature=0.1,
extra_body={"chat_template_kwargs": {"enable_thinking": False}},
)
raw = (resp.choices[0].message.content or "").strip()
finish_reason = resp.choices[0].finish_reason
if raw.startswith("```"):
lines = raw.splitlines()
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
if finish_reason == "length":
last_close = raw.rfind("}")
if last_close != -1:
raw = raw[:last_close + 1] + "]"
match = re.search(r'\[.*\]', raw, re.DOTALL)
raw = match.group(0) if match else "[]"
data = json_module.loads(raw)
return json_module.dumps({"success": True, "count": len(data), "mappings": data},
ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"P&ID 태그 매핑 실패: {e}"}, ensure_ascii=False)
# ── P&ID 파싱 도구 (DXF/PDF/DWG) ───────────────────────────────────────────────
@mcp.tool()
def parse_pid_dxf(filepath: str) -> str:
"""ezdxf 기반 DXF 파일 파싱. 텍스트 추출 후 LLM으로 태그 자동 추출.
Args:
filepath: DXF 파일 경로
Returns:
JSON: { success, text, count, tags: [{tagNo, equipmentName, ...}] }
"""
try:
text = _extract_text_from_dxf(filepath)
if not text.strip():
return json.dumps({
"success": True,
"text": "",
"count": 0,
"tags": []
}, ensure_ascii=False, indent=2)
# LLM으로 태그 추출
system = (
"You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
"Extract instrument and equipment tags from the provided text.\n"
"Return ONLY a JSON array of objects with the following structure:\n"
'[{"tagNo":"FIT-10115","equipmentName":"Flow Transmitter","instrumentType":"FT" OR "FIT OR "TIA","lineNumber":"L-101","pidDrawingNo":"P&ID-001","confidence":0.95},...]\n'
"IMPORTANT rules:\n"
"- tagNo: Standard tag format with these patterns:\n"
" * Instrument: [Function][Loop]-[Number] (e.g., FT-101, PT-201, LI-301, FICQ-6113)\n"
" * Equipment: [Type]-[Number] (e.g., P-10101, T-10100, C-9111, E-10119)\n"
" * Complex: [Type]-[Number]-[Size]-[Class]-[Material]-[Option] (e.g., VG-6203-15A-F1A-n, CD-10513-40A-S1A-H50)\n"
" * Real examples from DXF: BT-6200, SARF-#6-PID-002, P-6101, DP-10101, CHS-6630-100A-F-C50\n"
"- instrumentType: First 2-4 letters of tagNo (FIT, PT, LI, FICQ, TCV, FCV, PCV, PG, TG, etc.)\n"
"- equipmentName: Descriptive name if available, otherwise null\n"
"- lineNumber: Line number if available, otherwise null\n"
"- pidDrawingNo: Drawing number if available, otherwise null\n"
"- confidence: 0.0 to 1.0 based on how clearly the tag was identified\n"
"- Do NOT include any explanation, only the JSON array.\n"
"- If no tags found, return an empty array: []\n"
"- temperature=0.1 for deterministic output.\n"
)
truncated_text = text[:12000] if len(text) > 12000 else text
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: dxf\n\nText:\n{truncated_text}"},
],
max_tokens=4096,
temperature=0.1,
)
raw = (resp.choices[0].message.content or "").strip()
# 마크다운 코드 블록 제거
if raw.startswith("```"):
lines = raw.splitlines()
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
# JSON 배열 추출
import re
match = re.search(r'\[.*\]', raw, re.DOTALL)
if match:
raw = match.group(0)
# JSON 파싱 시도
try:
data = json.loads(raw)
except json.JSONDecodeError:
# JSON 배열 추출 시도 (더 엄격한 패턴)
match = re.search(r'\[\s*\{.*?\}\s*\]', raw, re.DOTALL)
if match:
raw_clean = match.group(0)
try:
data = json.loads(raw_clean)
except json.JSONDecodeError:
# 마지막으로, JSON 배열을 개별 객체로 분리하여 파싱 시도
objects = re.findall(r'\{[^{}]*\}', raw, re.DOTALL)
data = []
for obj in objects:
try:
data.append(json.loads(obj))
except json.JSONDecodeError:
pass
if not isinstance(data, list):
data = []
return json.dumps({
"success": True,
"text": text[:10000], # 제한
"count": len(text),
"tags": data
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"DXF 파싱 실패: {e}"}, ensure_ascii=False)
@mcp.tool()
def parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
"""PyMuPDF 기반 PDF 파일 파싱. 텍스트 추출 후 LLM으로 태그 자동 추출.
Args:
filepath: PDF 파일 경로
use_ocr: OCR 사용 여부 (기본 True, 고정밀도)
Returns:
JSON: { success, text, count, tags: [{tagNo, equipmentName, ...}] }
"""
try:
if use_ocr:
text = _extract_text_from_pdf_ocr(filepath)
else:
text = _extract_text_from_pdf(filepath)
if not text.strip():
return json.dumps({
"success": True,
"text": "",
"count": 0,
"tags": []
}, ensure_ascii=False, indent=2)
# LLM으로 태그 추출
system = (
"You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
"Extract instrument and equipment tags from the provided text.\n"
"Return ONLY a JSON array of objects with the following structure:\n"
'[{"tagNo":"FIT-10115","equipmentName":"Flow Transmitter","instrumentType":"FT" OR "FIT OR "TIA","lineNumber":"L-101","pidDrawingNo":"P&ID-001","confidence":0.95},...]\n'
"IMPORTANT rules:\n"
"- tagNo: Standard tag format with these patterns:\n"
" * Instrument: [Function][Loop]-[Number] (e.g., FT-101, PT-201, LI-301, FICQ-6113)\n"
" * Equipment: [Type]-[Number] (e.g., P-10101, T-10100, C-9111, E-10119)\n"
" * Complex: [Type]-[Number]-[Size]-[Class]-[Material]-[Option] (e.g., VG-6203-15A-F1A-n, CD-10513-40A-S1A-H50)\n"
" * Real examples from DXF: BT-6200, SARF-#6-PID-002, P-6101, DP-10101, CHS-6630-100A-F-C50\n"
"- instrumentType: First 2-4 letters of tagNo (FIT, PT, LI, FICQ, TCV, FCV, PCV, PG, TG, etc.)\n"
"- equipmentName: Descriptive name if available, otherwise null\n"
"- lineNumber: Line number if available, otherwise null\n"
"- pidDrawingNo: Drawing number if available, otherwise null\n"
"- confidence: 0.0 to 1.0 based on how clearly the tag was identified\n"
"- Do NOT include any explanation, only the JSON array.\n"
"- If no tags found, return an empty array: []\n"
"- temperature=0.1 for deterministic output.\n"
)
truncated_text = text[:12000] if len(text) > 12000 else text
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: pdf\n\nText:\n{truncated_text}"},
],
max_tokens=4096,
temperature=0.1,
)
raw = (resp.choices[0].message.content or "").strip()
# 마크다운 코드 블록 제거
if raw.startswith("```"):
lines = raw.splitlines()
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
# JSON 배열 추출
import re
match = re.search(r'\[.*\]', raw, re.DOTALL)
if match:
raw = match.group(0)
# JSON 파싱 시도
try:
data = json.loads(raw)
except json.JSONDecodeError:
# JSON 배열 추출 시도 (더 엄격한 패턴)
match = re.search(r'\[\s*\{.*?\}\s*\]', raw, re.DOTALL)
if match:
raw_clean = match.group(0)
try:
data = json.loads(raw_clean)
except json.JSONDecodeError:
# 마지막으로, JSON 배열을 개별 객체로 분리하여 파싱 시도
objects = re.findall(r'\{[^{}]*\}', raw, re.DOTALL)
data = []
for obj in objects:
try:
data.append(json.loads(obj))
except json.JSONDecodeError:
pass
if not isinstance(data, list):
data = []
return json.dumps({
"success": True,
"text": text[:10000],
"count": len(text),
"tags": data
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"PDF 파싱 실패: {e}"}, ensure_ascii=False)
@mcp.tool()
def parse_pid_drawing(filepath: str) -> str:
"""확장자 자동 감지하여 P&ID 도면 파싱.
Args:
filepath: DXF/DWG/PDF 파일 경로
Returns:
JSON: { success, text, count, tags, format }
"""
import os
ext = os.path.splitext(filepath)[1].lower()
if ext == ".dxf":
return parse_pid_dxf(filepath)
elif ext == ".dwg":
# DWG 파일은 사전에 DXF로 변환하여 업로드해야 합니다.
# Linux에서 DWG를 DXF로 변환하는 도구는 제한되어 있습니다.
return json.dumps({
"success": False,
"error": "DWG 파일은 현재 직접 파싱할 수 없습니다.\n" +
"사전에 DXF로 변환하여 업로드해 주세요.\n" +
"\n변환 방법:\n" +
"1. Windows에서 AutoCAD 또는 ODA File Converter 사용\n" +
"2. 온라인 DWG → DXF 변환기 사용\n" +
"3. LibreOffice Draw (Windows/macOS 전용) 사용"
}, ensure_ascii=False)
elif ext == ".pdf":
return parse_pid_pdf(filepath)
else:
return json.dumps({
"success": False,
"error": f"Unsupported format: {ext}. Supported: .dxf, .dwg, .pdf"
}, ensure_ascii=False)
# ── 엔트리포인트 ──────────────────────────────────────────────────────────────
def main():