Files
ExperionCrawler/futurePlan/End-to-End P&ID Graph Pipeline/PID_Parser_작업지시서_v3.md

46 KiB

작업지시서: P&ID 도면 파서 MCP 도구 (DWG/DXF/PDF 통합)

0. 변경 이력

버전 날짜 변경 내용
v1 2026-05-01 초안: PyMuPDF 텍스트 추출 기반 (PDF만)
v2 2026-05-01 OCR 기반 재설계 (현장 PDF는 SHX→path 변환되어 텍스트 추출 불가)
v3 2026-05-01 DWG/DXF/PDF 통합 — DXF가 있으면 91.6% 자동 매칭, PDF는 OCR fallback

1. 목적 (Why)

현장 P&ID 입력 포맷은 다양하다:

  • DXF: ezdxf로 직독, 좌표/레이어/블록 정보 100% 정확
  • DWG: ODA File Converter로 DXF 변환 후 처리
  • PDF (벡터): PyMuPDF로 텍스트 추출 (SHX 폰트 미사용 시)
  • PDF (SHX path 변환): OCR 필수 (현장에서 가장 흔함)

각 포맷별로 추출 정확도와 처리 비용이 크게 다르므로 포맷 우선순위 자동 라우팅통합 출력 스키마가 핵심이다.

검증 결과 (실제 P&ID DXF, p-9100.dxf, 11MB, 3,925개 텍스트):

  • INSTRUMENT 레이어 CIRCLE(ISA balloon) 215개 검출
  • 좌표 매칭만으로 197개 (91.6%) 태그 자동 정규화 성공 (LLM 호출 없이)
  • 처리 시간: <1초 (OCR 대비 100배 빠름)
  • DCS vs 현장계기 구분 가능 (사각형 검출 추가 시)

2. 작업 범위 (Scope)

2.1. 포함

  • server.py에 신규 도구 3종 추가:
    • parse_pid_dxf — ezdxf 기반 (메인, 가장 정확)
    • parse_pid_dwg — ODA Converter로 DXF 변환 후 위 함수 호출
    • parse_pid_pdf — PaddleOCR 기반 (PDF만 있을 때 fallback)
  • 통합 디스패처 parse_pid_drawing — 파일 확장자로 자동 라우팅
  • 기존 extract_pid_tags 확장: source_type="clusters" 모드 (DXF 사전정규화 태그 처리 포함)
  • 의존성 추가: ezdxf, paddleocr, paddlepaddle-gpu, pymupdf, scikit-learn, numpy, Pillow
  • ODA File Converter 설치 가이드 (시스템 도구)
  • 단위 테스트: DXF 1개, PDF 1개

2.2. 제외 (별도 phase)

  • LEGEND 시트 자동 파싱 → 약어 사전 빌드 (phase 2)
  • C# 클라이언트 측 호출 코드 (별도 작업)
  • pid_tag_mapping PostgreSQL 테이블 (별도 작업)
  • 비동기 처리/큐잉 (phase 2)
  • 도면 변경 diff 추적 (phase 3)

3. 컨텍스트 (Context)

3.1. 대상 파일

  • 편집 대상: server.py (현재 ExperionCrawler MCP 서버)
  • 위치: 기존 # ── P&ID 추출 도구 ── 섹션 바로 위에 신규 도구 추가

3.2. 통합 출력 스키마

세 파서 모두 다음 형식으로 통일:

{
  "success": true,
  "source": "dxf",                      // "dxf" | "dwg" | "pdf"
  "source_path": "/path/to/file.dxf",
  "total_pages": 1,                     // DXF/DWG는 항상 1, PDF는 가변
  "processed_pages": [1],
  "pages": [{
    "page": 1,
    "image_size": [width, height],      // PDF는 픽셀, DXF는 도면 단위
    "drawing_no": "P-9100",             // 추출 가능 시
    "clusters": [{
      "id": "p1c0",
      "texts": ["FIC", "10101"],
      "bbox": [x0, y0, x1, y1],
      "ocr_confidence": 1.0,            // DXF는 항상 1.0, PDF는 OCR 신뢰도
      "has_korean": false,
      "layer": "INSTRUMENT",            // DXF/DWG만, PDF는 null
      "balloon_type": "FIELD",          // "DCS" | "FIELD" | null
      "balloon_radius": 0.4,            // DXF/DWG만
      "pre_normalized_tag": "FIC-10101" // DXF는 좌표매칭으로 미리 합쳐줌, PDF는 null
    }]
  }]
}

pre_normalized_tag가 핵심입니다. DXF에서는 LLM 호출 없이도 91.6%의 태그가 이 필드로 미리 정규화됩니다. LLM은 나머지 8.4%(매칭 실패 cluster)와 한글 설명 매핑만 처리하면 됩니다.

3.3. 데이터 흐름

Input: file.dxf | file.dwg | file.pdf
       ↓
parse_pid_drawing(path)  ← 확장자로 자동 라우팅
       ↓
{dxf|dwg|pdf}_parser
       ↓ 통합 cluster JSON
extract_pid_tags(clusters_json, "clusters")
       ↓ 정규화된 태그 리스트
match_pid_tags(pid_tags, experion_tags)  ← 기존
       ↓
PostgreSQL pid_tag_mapping (별도 작업)

4. 상세 작업 항목 (Tasks)

4.1. 시스템 도구 설치 (DWG 처리용)

ODA File Converter 설치 (Linux):

# https://www.opendesign.com/guestfiles/oda_file_converter 에서 다운로드
# (등록 필요, 무료)
wget <ODA_DOWNLOAD_URL> -O oda_converter.deb
sudo dpkg -i oda_converter.deb

# 검증
which ODAFileConverter
ODAFileConverter --help

설치 경로 확인 후 server.py 상수에 등록:

ODA_CONVERTER_PATH = "/usr/bin/ODAFileConverter"  # 환경에 맞게 조정

⚠️ ODA File Converter는 무료지만 상용 라이선스가 아닌 무료 사용 약관 적용. 내부 운영은 무방하나 상용 SaaS로 재배포할 경우 ODA Teigha SDK 검토 필요.

4.2. Python 의존성 설치

# DXF 처리
pip install ezdxf>=1.3.0

# OCR (PDF fallback용, GPU 환경)
pip install paddlepaddle-gpu==2.6.* -i https://www.paddlepaddle.org.cn/packages/stable/cu118/
pip install paddleocr>=2.7.0

# 공통
pip install pymupdf>=1.24.0 scikit-learn>=1.3.0 numpy>=1.24.0 Pillow>=10.0.0

requirements.txt:

ezdxf>=1.3.0
paddlepaddle-gpu>=2.6.0
paddleocr>=2.7.0
pymupdf>=1.24.0
scikit-learn>=1.3.0
numpy>=1.24.0
Pillow>=10.0.0

4.3. 공통 헬퍼 (server.py 상단에 추가)

기존 _llm() 근처에 추가:

# ── PaddleOCR 싱글톤 (PDF fallback용) ──────────────────────────────────────────

@lru_cache(maxsize=1)
def _ocr():
    """PaddleOCR 인스턴스 (한/영, GPU). 첫 호출 시 ~50MB 모델 다운로드."""
    from paddleocr import PaddleOCR
    import os

    use_gpu = os.environ.get("PADDLE_USE_GPU", "true").lower() == "true"
    try:
        return PaddleOCR(
            use_angle_cls=True, lang="korean", use_gpu=use_gpu,
            show_log=False, det_db_box_thresh=0.3, det_db_unclip_ratio=1.6,
        )
    except Exception as e:
        logging.warning(f"PaddleOCR GPU 초기화 실패, CPU fallback: {e}")
        return PaddleOCR(use_angle_cls=True, lang="korean", use_gpu=False, show_log=False)


# ── ODA File Converter 경로 ────────────────────────────────────────────────────

ODA_CONVERTER_PATH = "/usr/bin/ODAFileConverter"  # 환경별 조정


def _parse_page_range(page_range: str, total: int) -> list[int]:
    """페이지 범위 문자열 → 0-based 인덱스 리스트.
    "" → 전체, "1" → [0], "1-3" → [0,1,2], "1,3" → [0,2], "2-" → [1..total-1]
    """
    if not page_range or not page_range.strip():
        return list(range(total))
    indices = set()
    for part in page_range.split(","):
        part = part.strip()
        if not part:
            continue
        if "-" in part:
            a, b = part.split("-", 1)
            start = int(a) - 1 if a.strip() else 0
            end = int(b) - 1 if b.strip() else total - 1
            for i in range(max(0, start), min(total - 1, end) + 1):
                indices.add(i)
        else:
            i = int(part) - 1
            if 0 <= i < total:
                indices.add(i)
    return sorted(indices)

4.4. parse_pid_dxf 도구 (메인)

# ── P&ID 추출 도구 ── 섹션 바로 위에 추가:

# ── P&ID DXF 파서 (ezdxf 기반, 가장 정확) ─────────────────────────────────────

@mcp.tool()
def parse_pid_dxf(
    dxf_path: str,
    instrument_layer: str = "INSTRUMENT",
    balloon_text_tolerance: float = 1.5,
) -> str:
    """DXF 도면을 직접 파싱해서 cluster JSON을 반환합니다.

    ezdxf로 TEXT/MTEXT/CIRCLE/INSERT 엔티티를 직독.
    INSTRUMENT 레이어의 CIRCLE을 ISA balloon으로 보고 내부 텍스트와 좌표 매칭하여
    'FIC' + '10101' → 'FIC-10101'을 LLM 호출 없이 사전 정규화한다.

    Args:
        dxf_path:                DXF 파일 절대 경로
        instrument_layer:        ISA balloon이 위치한 레이어명. 회사마다 다름.
                                 (예: 'INSTRUMENT', 'INST', 'INSTR', 'I-1')
        balloon_text_tolerance:  balloon 반지름 대비 텍스트 검색 범위 배수.
                                 1.5 = 반지름의 1.5배 거리까지 내부로 간주.

    Returns:
        통합 cluster JSON. pre_normalized_tag 필드에 자동 정규화 결과 포함.
    """
    try:
        import ezdxf
        import re as _re
        import os

        if not os.path.exists(dxf_path):
            return json.dumps(
                {"success": False, "error": f"파일을 찾을 수 없습니다: {dxf_path}"},
                ensure_ascii=False,
            )

        doc = ezdxf.readfile(dxf_path)
        msp = doc.modelspace()

        # 1. 모든 텍스트 추출
        all_texts = []
        for e in msp:
            etype = e.dxftype()
            if etype == "TEXT":
                all_texts.append({
                    "text": e.dxf.text.strip(),
                    "layer": e.dxf.layer,
                    "x": e.dxf.insert.x,
                    "y": e.dxf.insert.y,
                    "rotation": e.dxf.rotation,
                    "height": e.dxf.height,
                })
            elif etype == "MTEXT":
                # MTEXT는 폰트 escape 코드 자동 처리됨
                all_texts.append({
                    "text": e.text.strip(),
                    "layer": e.dxf.layer,
                    "x": e.dxf.insert.x,
                    "y": e.dxf.insert.y,
                    "rotation": e.dxf.rotation,
                    "height": e.dxf.char_height,
                })
            elif etype == "ATTRIB":
                all_texts.append({
                    "text": e.dxf.text.strip(),
                    "layer": e.dxf.layer,
                    "x": e.dxf.insert.x,
                    "y": e.dxf.insert.y,
                    "rotation": e.dxf.rotation,
                    "height": e.dxf.height,
                })

        all_texts = [t for t in all_texts if t["text"]]

        # MTEXT 폰트 escape 코드 청소 ({\fMalgun Gothic|...; 한울} → 한울)
        font_escape = _re.compile(r'\{\\f[^;]*;([^}]*)\}')
        for t in all_texts:
            cleaned = font_escape.sub(r'\1', t["text"])
            cleaned = _re.sub(r'\\[A-Za-z]\d*[\.\d]*[xX]?;?', '', cleaned)
            t["text"] = cleaned.strip()

        # 2. INSTRUMENT 레이어 CIRCLE = ISA balloon
        balloons = [
            {
                "x": c.dxf.center.x, "y": c.dxf.center.y, "r": c.dxf.radius,
                "layer": c.dxf.layer,
            }
            for c in msp.query("CIRCLE")
            if c.dxf.layer == instrument_layer
        ]

        # 3. INSTRUMENT 레이어 LWPOLYLINE/SOLID 중 사각형 = DCS balloon
        # (사각형+내접원 = DCS, 단일 원 = FIELD)
        # 사각형 검출은 LWPOLYLINE이 4점 닫힌 형태인지로 판별
        squares = []
        for poly in msp.query("LWPOLYLINE"):
            if poly.dxf.layer != instrument_layer:
                continue
            if not poly.is_closed:
                continue
            points = list(poly.get_points())
            if len(points) == 4:
                xs = [p[0] for p in points]
                ys = [p[1] for p in points]
                squares.append({
                    "x_center": sum(xs) / 4,
                    "y_center": sum(ys) / 4,
                    "size": max(max(xs) - min(xs), max(ys) - min(ys)),
                })

        # 각 balloon이 사각형 안에 있는지 확인 → DCS vs FIELD 분류
        for b in balloons:
            b["balloon_type"] = "FIELD"  # 기본값
            for sq in squares:
                if (abs(sq["x_center"] - b["x"]) < sq["size"] / 2
                        and abs(sq["y_center"] - b["y"]) < sq["size"] / 2):
                    b["balloon_type"] = "DCS"
                    break

        # 4. 각 balloon에 대해 내부 텍스트 매칭
        inst_texts = [t for t in all_texts if t["layer"] == instrument_layer]
        balloon_clusters = []
        used_text_keys = set()  # balloon에 매칭된 텍스트 추적

        tag_pat = _re.compile(r'^[A-Z]{1,4}$')          # function code
        loop_pat = _re.compile(r'^\d{2,5}[A-Z]?$')      # loop number
        full_pat = _re.compile(r'^[A-Z]{1,4}-?\d{2,5}[A-Z]?$')  # 합쳐진 형태

        for idx, b in enumerate(balloons):
            r_lim = b["r"] * balloon_text_tolerance
            inside = []
            for t in inst_texts:
                dx, dy = t["x"] - b["x"], t["y"] - b["y"]
                if (dx * dx + dy * dy) ** 0.5 <= r_lim:
                    inside.append(t)
                    used_text_keys.add((t["text"], round(t["x"], 2), round(t["y"], 2)))

            # y 내림차순 (위→아래)
            inside.sort(key=lambda t: -t["y"])

            # 좌표 매칭 시뮬레이션 — function code(위) + loop number(아래)
            pre_normalized = None
            confidence = 0.5
            if len(inside) >= 2:
                top, bot = inside[0]["text"], inside[1]["text"]
                if tag_pat.match(top) and loop_pat.match(bot):
                    pre_normalized = f"{top}-{bot}"
                    confidence = 0.95
                elif full_pat.match(top):
                    pre_normalized = top if "-" in top else top  # 이미 합쳐진 형태
                    confidence = 0.9
            elif len(inside) == 1 and full_pat.match(inside[0]["text"]):
                pre_normalized = inside[0]["text"]
                confidence = 0.85

            xs = [t["x"] for t in inside] + [b["x"] - b["r"], b["x"] + b["r"]]
            ys = [t["y"] for t in inside] + [b["y"] - b["r"], b["y"] + b["r"]]

            balloon_clusters.append({
                "id": f"p1c{idx}",
                "texts": [t["text"] for t in inside],
                "bbox": [round(min(xs), 2), round(min(ys), 2),
                         round(max(xs), 2), round(max(ys), 2)],
                "ocr_confidence": confidence,
                "has_korean": any(any('\uac00' <= ch <= '\ud7a3' for ch in t["text"])
                                  for t in inside),
                "layer": instrument_layer,
                "balloon_type": b["balloon_type"],
                "balloon_radius": round(b["r"], 3),
                "pre_normalized_tag": pre_normalized,
            })

        # 5. balloon에 못 잡힌 나머지 텍스트는 레이어별로 묶음
        # (장비태그 P-10101, 라인번호, 한글 설명 등)
        remaining = [
            t for t in all_texts
            if (t["text"], round(t["x"], 2), round(t["y"], 2)) not in used_text_keys
        ]

        # 레이어별 그룹핑 후 인접한 것끼리 DBSCAN
        from collections import defaultdict
        from sklearn.cluster import DBSCAN
        import numpy as np

        layer_groups = defaultdict(list)
        for t in remaining:
            layer_groups[t["layer"]].append(t)

        non_balloon_clusters = []
        cluster_idx = len(balloon_clusters)

        for layer, items in layer_groups.items():
            if len(items) == 0:
                continue

            # 메타데이터 레이어 스킵 (REV, BOM, 8-치수선 등)
            if any(skip in layer.upper() for skip in
                   ["REV", "BOM", "DIMENSION", "치수"]):
                continue

            if len(items) == 1:
                t = items[0]
                non_balloon_clusters.append({
                    "id": f"p1c{cluster_idx}",
                    "texts": [t["text"]],
                    "bbox": [round(t["x"], 2), round(t["y"], 2),
                             round(t["x"] + len(t["text"]) * t["height"] * 0.6, 2),
                             round(t["y"] + t["height"], 2)],
                    "ocr_confidence": 1.0,
                    "has_korean": any('\uac00' <= ch <= '\ud7a3' for ch in t["text"]),
                    "layer": layer,
                    "balloon_type": None,
                    "balloon_radius": None,
                    "pre_normalized_tag": (t["text"]
                                           if full_pat.match(t["text"]) else None),
                })
                cluster_idx += 1
                continue

            # 좌표 클러스터링 (DBSCAN, eps는 텍스트 평균 높이의 5배)
            avg_h = sum(t["height"] for t in items) / len(items)
            eps = max(avg_h * 5, 5.0)
            centers = np.array([[t["x"], t["y"]] for t in items])
            labels = DBSCAN(eps=eps, min_samples=1).fit_predict(centers)

            for lbl in sorted(set(labels)):
                members = [items[i] for i, lb in enumerate(labels) if lb == lbl]
                xs = [m["x"] for m in members]
                ys = [m["y"] for m in members]
                pre_norm = None
                for m in members:
                    if full_pat.match(m["text"]):
                        pre_norm = m["text"]
                        break

                non_balloon_clusters.append({
                    "id": f"p1c{cluster_idx}",
                    "texts": [m["text"] for m in members],
                    "bbox": [round(min(xs), 2), round(min(ys), 2),
                             round(max(xs) + max(m["height"] * len(m["text"]) * 0.6
                                                 for m in members), 2),
                             round(max(ys) + max(m["height"] for m in members), 2)],
                    "ocr_confidence": 1.0,
                    "has_korean": any(any('\uac00' <= ch <= '\ud7a3' for ch in m["text"])
                                      for m in members),
                    "layer": layer,
                    "balloon_type": None,
                    "balloon_radius": None,
                    "pre_normalized_tag": pre_norm,
                })
                cluster_idx += 1

        # 6. 도면번호 추출 (TITLE 레이어 또는 파일명에서)
        drawing_no = None
        for t in all_texts:
            if t["layer"].upper() == "TITLE":
                if _re.match(r'^[A-Z]+-?\d+(-\d+)*$', t["text"]):
                    drawing_no = t["text"]
                    break
        if not drawing_no:
            base = os.path.splitext(os.path.basename(dxf_path))[0]
            drawing_no = base.upper()

        all_clusters = balloon_clusters + non_balloon_clusters

        return json.dumps({
            "success": True,
            "source": "dxf",
            "source_path": dxf_path,
            "total_pages": 1,
            "processed_pages": [1],
            "pages": [{
                "page": 1,
                "image_size": None,  # DXF는 무한 도면 단위
                "drawing_no": drawing_no,
                "clusters": all_clusters,
            }],
            "stats": {
                "total_texts": len(all_texts),
                "balloons_detected": len(balloons),
                "balloons_with_tag": sum(1 for c in balloon_clusters
                                         if c["pre_normalized_tag"]),
                "dcs_balloons": sum(1 for c in balloon_clusters
                                    if c["balloon_type"] == "DCS"),
                "field_balloons": sum(1 for c in balloon_clusters
                                      if c["balloon_type"] == "FIELD"),
                "non_balloon_clusters": len(non_balloon_clusters),
            },
        }, ensure_ascii=False)

    except Exception as e:
        import traceback
        return json.dumps({
            "success": False,
            "error": f"DXF 파싱 실패: {e}",
            "trace": traceback.format_exc()[-500:],
        }, ensure_ascii=False)

4.5. parse_pid_dwg 도구 (DWG → DXF 변환 후 처리)

# ── P&ID DWG 파서 (ODA Converter로 DXF 변환 후 처리) ──────────────────────────

@mcp.tool()
def parse_pid_dwg(
    dwg_path: str,
    instrument_layer: str = "INSTRUMENT",
) -> str:
    """DWG 파일을 ODA File Converter로 DXF로 변환 후 parse_pid_dxf 호출.

    ODA File Converter가 시스템에 설치되어 있어야 함 (ODA_CONVERTER_PATH 상수).
    임시 디렉토리에 변환된 DXF 생성 후 처리, 완료 시 자동 삭제.

    Args:
        dwg_path:         DWG 파일 절대 경로
        instrument_layer: ISA balloon 레이어명 (parse_pid_dxf와 동일)

    Returns:
        parse_pid_dxf 결과와 동일하나 source="dwg", source_path는 원본 DWG.
    """
    try:
        import os
        import subprocess
        import tempfile
        import shutil

        if not os.path.exists(dwg_path):
            return json.dumps(
                {"success": False, "error": f"파일을 찾을 수 없습니다: {dwg_path}"},
                ensure_ascii=False,
            )

        if not os.path.exists(ODA_CONVERTER_PATH):
            return json.dumps({
                "success": False,
                "error": (f"ODA File Converter가 {ODA_CONVERTER_PATH}에 없습니다. "
                          f"https://www.opendesign.com/guestfiles/oda_file_converter "
                          f"에서 다운로드 후 설치하세요."),
            }, ensure_ascii=False)

        # 임시 입출력 디렉토리 생성 (ODA Converter는 디렉토리 단위로 작동)
        with tempfile.TemporaryDirectory(prefix="pid_dwg_") as in_dir, \
             tempfile.TemporaryDirectory(prefix="pid_dxf_") as out_dir:

            # 입력 파일을 in_dir에 복사
            in_file = os.path.join(in_dir, os.path.basename(dwg_path))
            shutil.copy2(dwg_path, in_file)

            # ODA File Converter 실행
            # 인자: <Input> <Output> <Version> <Format> <Recurse> <Audit> [<Filter>]
            cmd = [
                ODA_CONVERTER_PATH,
                in_dir, out_dir,
                "ACAD2018",  # 출력 DXF 버전
                "DXF",       # 포맷
                "0",         # 재귀 비활성
                "1",         # audit 활성
                "*.DWG",     # 필터
            ]

            result = subprocess.run(
                cmd, capture_output=True, text=True, timeout=120,
            )

            if result.returncode != 0:
                return json.dumps({
                    "success": False,
                    "error": (f"DWG → DXF 변환 실패 (returncode={result.returncode}). "
                              f"stderr: {result.stderr[:500]}"),
                }, ensure_ascii=False)

            # 변환된 DXF 찾기
            base_name = os.path.splitext(os.path.basename(dwg_path))[0]
            dxf_path = os.path.join(out_dir, base_name + ".dxf")
            if not os.path.exists(dxf_path):
                # 대소문자 변형 시도
                candidates = [f for f in os.listdir(out_dir) if f.endswith(".dxf")]
                if candidates:
                    dxf_path = os.path.join(out_dir, candidates[0])
                else:
                    return json.dumps({
                        "success": False,
                        "error": "DWG → DXF 변환은 성공했으나 출력 DXF를 찾을 수 없음",
                    }, ensure_ascii=False)

            # parse_pid_dxf 호출 (변환된 임시 DXF 처리)
            dxf_result_json = parse_pid_dxf(dxf_path, instrument_layer)
            dxf_result = json.loads(dxf_result_json)

            # source 정보 원본 DWG로 변경
            dxf_result["source"] = "dwg"
            dxf_result["source_path"] = dwg_path
            dxf_result["converted_via"] = "ODA File Converter"

            return json.dumps(dxf_result, ensure_ascii=False)

    except subprocess.TimeoutExpired:
        return json.dumps(
            {"success": False, "error": "DWG → DXF 변환 시간 초과 (120초)"},
            ensure_ascii=False,
        )
    except Exception as e:
        import traceback
        return json.dumps({
            "success": False,
            "error": f"DWG 파싱 실패: {e}",
            "trace": traceback.format_exc()[-500:],
        }, ensure_ascii=False)

4.6. parse_pid_pdf 도구 (OCR 기반, PDF만 있을 때 fallback)

(v2의 OCR 파이프라인 그대로. 단 출력 스키마를 통합 형식으로 맞춤)

# ── P&ID PDF 파서 (OCR 기반 fallback) ─────────────────────────────────────────

@mcp.tool()
def parse_pid_pdf(
    pdf_path: str,
    dpi: int = 300,
    cluster_eps: float = 50.0,
    page_range: str = "",
    min_confidence: float = 0.5,
) -> str:
    """P&ID PDF를 OCR로 파싱. SHX path 변환된 PDF 대응 (DXF/DWG 없을 때 fallback).

    출력 스키마는 parse_pid_dxf와 통합. 단 layer/balloon_type/pre_normalized_tag는 null.

    Args:
        pdf_path, dpi, cluster_eps, page_range, min_confidence: 기존과 동일
    """
    try:
        import fitz
        import numpy as np
        from PIL import Image
        from sklearn.cluster import DBSCAN
        import os
        import io

        if not os.path.exists(pdf_path):
            return json.dumps(
                {"success": False, "error": f"파일을 찾을 수 없습니다: {pdf_path}"},
                ensure_ascii=False,
            )

        doc = fitz.open(pdf_path)
        total_pages = len(doc)
        target_pages = _parse_page_range(page_range, total_pages)

        ocr = _ocr()
        pages_out = []
        processed = []

        for page_idx in target_pages:
            page = doc[page_idx]

            # 1. PDF → 이미지
            zoom = dpi / 72.0
            mat = fitz.Matrix(zoom, zoom)
            pix = page.get_pixmap(matrix=mat, alpha=False)
            img = Image.open(io.BytesIO(pix.tobytes("png"))).convert("RGB")
            img_np = np.array(img)
            img_w, img_h = img.size

            # 2. PaddleOCR
            ocr_result = ocr.ocr(img_np, cls=True)
            if ocr_result and isinstance(ocr_result[0], list):
                ocr_items = ocr_result[0] or []
            else:
                ocr_items = ocr_result or []

            # 3. 박스 정규화 + 신뢰도 필터
            boxes = []
            for item in ocr_items:
                if not item or len(item) < 2:
                    continue
                box_pts, (text, conf) = item[0], item[1]
                text = text.strip()
                if not text or conf < min_confidence:
                    continue
                xs = [p[0] for p in box_pts]
                ys = [p[1] for p in box_pts]
                bbox = [min(xs), min(ys), max(xs), max(ys)]
                has_korean = any('\uac00' <= ch <= '\ud7a3' for ch in text)
                boxes.append({
                    "text": text,
                    "bbox": [round(c, 1) for c in bbox],
                    "conf": round(float(conf), 3),
                    "has_korean": has_korean,
                })

            if not boxes:
                pages_out.append({
                    "page": page_idx + 1,
                    "image_size": [img_w, img_h],
                    "drawing_no": None,
                    "clusters": [],
                })
                processed.append(page_idx + 1)
                continue

            # 4. 공간 클러스터링 (DPI 비례 eps)
            centers = np.array([
                [(b["bbox"][0] + b["bbox"][2]) / 2,
                 (b["bbox"][1] + b["bbox"][3]) / 2]
                for b in boxes
            ])
            eps_scaled = cluster_eps * (dpi / 300.0)
            labels = DBSCAN(eps=eps_scaled, min_samples=1).fit_predict(centers)

            clusters_map = {}
            for box, lbl in zip(boxes, labels):
                clusters_map.setdefault(int(lbl), []).append(box)

            cluster_list = []
            for lbl, members in sorted(clusters_map.items()):
                xs = ([m["bbox"][0] for m in members]
                      + [m["bbox"][2] for m in members])
                ys = ([m["bbox"][1] for m in members]
                      + [m["bbox"][3] for m in members])
                avg_conf = sum(m["conf"] for m in members) / len(members)
                cluster_list.append({
                    "id": f"p{page_idx + 1}c{lbl}",
                    "texts": [m["text"] for m in members],
                    "bbox": [round(min(xs), 1), round(min(ys), 1),
                             round(max(xs), 1), round(max(ys), 1)],
                    "ocr_confidence": round(avg_conf, 3),
                    "has_korean": any(m["has_korean"] for m in members),
                    "layer": None,
                    "balloon_type": None,
                    "balloon_radius": None,
                    "pre_normalized_tag": None,  # PDF는 LLM 정규화 필요
                })

            pages_out.append({
                "page": page_idx + 1,
                "image_size": [img_w, img_h],
                "drawing_no": None,
                "clusters": cluster_list,
            })
            processed.append(page_idx + 1)

        doc.close()
        return json.dumps({
            "success": True,
            "source": "pdf",
            "source_path": pdf_path,
            "total_pages": total_pages,
            "processed_pages": processed,
            "pages": pages_out,
        }, ensure_ascii=False)

    except Exception as e:
        import traceback
        return json.dumps({
            "success": False,
            "error": f"PDF 파싱 실패: {e}",
            "trace": traceback.format_exc()[-500:],
        }, ensure_ascii=False)

4.7. parse_pid_drawing 통합 디스패처

# ── 통합 디스패처: 확장자로 자동 라우팅 ───────────────────────────────────────

@mcp.tool()
def parse_pid_drawing(file_path: str, **kwargs) -> str:
    """파일 확장자로 적절한 파서를 자동 선택.

    .dxf → parse_pid_dxf  (가장 정확, 추천)
    .dwg → parse_pid_dwg  (DXF로 변환 후 처리)
    .pdf → parse_pid_pdf  (OCR fallback, 정확도 낮음)

    Args:
        file_path: 도면 파일 경로
        **kwargs:  하위 파서로 전달되는 옵션 (instrument_layer, dpi, page_range 등)
    """
    import os
    ext = os.path.splitext(file_path)[1].lower()

    if ext == ".dxf":
        return parse_pid_dxf(
            file_path,
            instrument_layer=kwargs.get("instrument_layer", "INSTRUMENT"),
            balloon_text_tolerance=kwargs.get("balloon_text_tolerance", 1.5),
        )
    elif ext == ".dwg":
        return parse_pid_dwg(
            file_path,
            instrument_layer=kwargs.get("instrument_layer", "INSTRUMENT"),
        )
    elif ext == ".pdf":
        return parse_pid_pdf(
            file_path,
            dpi=kwargs.get("dpi", 300),
            cluster_eps=kwargs.get("cluster_eps", 50.0),
            page_range=kwargs.get("page_range", ""),
            min_confidence=kwargs.get("min_confidence", 0.5),
        )
    else:
        return json.dumps({
            "success": False,
            "error": f"지원하지 않는 확장자: {ext} (지원: .dxf, .dwg, .pdf)",
        }, ensure_ascii=False)

4.8. extract_pid_tags 확장 (DXF 사전정규화 활용)

@mcp.tool()
def extract_pid_tags(text: str, source_type: str) -> str:
    """P&ID 도면에서 태그 정보를 추출합니다.

    Args:
        text:        parse_pid_drawing/dxf/dwg/pdf의 JSON 결과(문자열) 또는 raw text
        source_type: 'clusters' (권장) | 'pdf' | 'dxf' (레거시)
    """
    if source_type == "clusters":
        # cluster JSON에 pre_normalized_tag가 있으면 LLM 호출 최소화
        try:
            import json as json_module
            parsed = json_module.loads(text)
            if parsed.get("source") in ("dxf", "dwg"):
                # DXF/DWG는 좌표 매칭으로 미리 정규화된 태그가 있음 → 그대로 활용
                return _extract_from_dxf_clusters(parsed)
        except Exception:
            pass  # JSON 파싱 실패 시 LLM 모드로

        # PDF 또는 DXF에서 매칭 안 된 cluster는 LLM 처리
        system = (
            "You are a P&ID expert. The input is clusters from a P&ID drawing.\n"
            "Each cluster represents ONE equipment/instrument with associated nearby texts.\n"
            "Texts may include both English (tags, codes) and Korean (descriptions).\n"
            "\n"
            "If a cluster has 'pre_normalized_tag', use it directly as tagNo.\n"
            "Otherwise, normalize ISA-5.1 two-line tags:\n"
            "    Top:    function code (FIC, PT, LT, ...)\n"
            "    Bottom: loop number (10101, 6113, 201A)\n"
            "    Merge: 'FIC-10101'\n"
            "\n"
            "If cluster has 'balloon_type', use it as instrumentLocation:\n"
            "    'DCS'   → DCS point\n"
            "    'FIELD' → field instrument\n"
            "    null    → unknown\n"
            "\n"
            "Return ONLY a JSON array:\n"
            '[{"tagNo":"FIC-10101","equipmentName":"...","equipmentNameKo":"...",\n'
            '  "instrumentType":"FIC","instrumentLocation":"DCS","layer":"INSTRUMENT",\n'
            '  "lineNumber":"L-101","pidDrawingNo":"P-9100","clusterId":"p1c3",\n'
            '  "confidence":0.95},...]\n'
            "\n"
            "OCR 오인식 보정 (PDF source인 경우):\n"
            "    'O' ↔ '0',  'I' ↔ '1' ↔ 'l',  'S' ↔ '5',  'B' ↔ '8'\n"
            "    보정 시 confidence 0.6 이하.\n"
            "\n"
            "- Skip clusters with only descriptive text and no tag pattern.\n"
            "- Skip pipe line numbers (4\"-CS-1234-A1A) unless clearly an instrument.\n"
            "- Skip title block / drawing metadata (DWG NO., SCALE, dates, names).\n"
            "- If no tags found, return [].\n"
            "- Do NOT include explanation, only JSON array."
        )
        truncated = text[:30000] if len(text) > 30000 else text
        user_content = f"Drawing clusters:\n{truncated}"
    else:
        # 기존 raw text 모드 (변경 없음)
        system = (
            "You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
            "Extract instrument and equipment tags from the provided text.\n"
            "Return ONLY a JSON array of objects with the following structure:\n"
            '[{"tagNo":"FT-101","equipmentName":"Flow Transmitter",'
            '"instrumentType":"FT","lineNumber":"L-101","pidDrawingNo":"P&ID-001",'
            '"confidence":0.95},...]\n'
            "Rules:\n"
            "- tagNo: Standard tag format\n"
            "- instrumentType: First 2-4 letters\n"
            "- equipmentName, lineNumber, pidDrawingNo: null if not present\n"
            "- confidence: 0.0~1.0\n"
            "- Empty array if no tags."
        )
        truncated = text[:12000] if len(text) > 12000 else text
        user_content = f"Source: {source_type}\n\nText:\n{truncated}"

    try:
        resp = _llm().chat.completions.create(
            model=VLLM_MODEL,
            messages=[
                {"role": "system", "content": system},
                {"role": "user", "content": user_content},
            ],
            max_tokens=4096,
            temperature=0.1,
        )

        raw = (resp.choices[0].message.content or "").strip()
        if raw.startswith("```"):
            lines = raw.splitlines()
            raw = "\n".join(
                lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]
            ).strip()

        import re
        match = re.search(r'\[.*\]', raw, re.DOTALL)
        if match:
            raw = match.group(0)

        import json as json_module
        data = json_module.loads(raw)

        return json_module.dumps({
            "success": True,
            "count": len(data),
            "tags": data,
        }, ensure_ascii=False, indent=2)

    except Exception as e:
        return json.dumps(
            {"success": False, "error": f"P&ID 태그 추출 실패: {e}"},
            ensure_ascii=False,
        )


def _extract_from_dxf_clusters(parsed: dict) -> str:
    """DXF/DWG cluster에서 pre_normalized_tag를 활용해 LLM 호출 없이 태그 추출."""
    import json as json_module
    import re as _re

    tags = []
    for page in parsed.get("pages", []):
        drawing_no = page.get("drawing_no")
        for c in page.get("clusters", []):
            tag_no = c.get("pre_normalized_tag")
            if not tag_no:
                continue

            # equipment 태그 패턴 (P-10101) vs instrument 태그 (FIC-10101) 구분
            m = _re.match(r'^([A-Z]+)-?(\d+[A-Z]?)$', tag_no)
            if not m:
                continue
            inst_type = m.group(1)

            # 한글 설명 텍스트 찾기
            ko_desc = None
            for t in c.get("texts", []):
                if any('\uac00' <= ch <= '\ud7a3' for ch in t):
                    ko_desc = t
                    break

            tags.append({
                "tagNo": tag_no,
                "equipmentName": None,  # LLM 보정 단계에서 채울 수 있음
                "equipmentNameKo": ko_desc,
                "instrumentType": inst_type,
                "instrumentLocation": c.get("balloon_type"),
                "layer": c.get("layer"),
                "lineNumber": None,
                "pidDrawingNo": drawing_no,
                "clusterId": c.get("id"),
                "confidence": c.get("ocr_confidence", 0.95),
            })

    return json_module.dumps({
        "success": True,
        "count": len(tags),
        "tags": tags,
        "method": "coordinate_matching",  # LLM 미사용 표시
    }, ensure_ascii=False, indent=2)

4.9. 검증 (Test)

test_parse_pid.py:

"""parse_pid_drawing 통합 테스트 (DXF/DWG/PDF)."""
import json
import sys
import time
sys.path.insert(0, ".")
from server import (
    parse_pid_drawing, parse_pid_dxf, parse_pid_dwg, parse_pid_pdf,
    extract_pid_tags,
)

# === DXF 테스트 ===
DXF_PATH = "test_data/sample.dxf"

print("[DXF] 파싱 시작...")
t0 = time.time()
dxf_json = parse_pid_dxf(DXF_PATH, instrument_layer="INSTRUMENT")
elapsed = time.time() - t0
result = json.loads(dxf_json)

assert result["success"], f"DXF 파싱 실패: {result.get('error')}"
print(f"[OK] {elapsed:.2f}초 (OCR 대비 100배+ 빠름)")
print(f"      통계: {result['stats']}")

stats = result["stats"]
assert stats["balloons_detected"] > 50, "balloon이 너무 적음"
match_rate = stats["balloons_with_tag"] / stats["balloons_detected"]
assert match_rate > 0.85, f"매칭률 부족: {match_rate:.1%}"
print(f"      ISA balloon 매칭률: {match_rate:.1%}")

# 태그 추출 (DXF는 LLM 거의 안 거침)
t0 = time.time()
tags_json = extract_pid_tags(dxf_json, "clusters")
print(f"[OK] 태그 추출 {time.time() - t0:.2f}초")
tags = json.loads(tags_json)
assert tags["success"]
assert tags.get("method") == "coordinate_matching", "DXF는 LLM 미사용 경로여야 함"
print(f"      추출 태그: {tags['count']}개")
for t in tags["tags"][:5]:
    print(f"      - {t['tagNo']:<15} ({t['instrumentType']}, "
          f"loc={t['instrumentLocation']}, conf={t['confidence']})")

# === PDF 테스트 (있으면) ===
import os
PDF_PATH = "test_data/sample.pdf"
if os.path.exists(PDF_PATH):
    print("\n[PDF] OCR 파싱 시작 (수십초 소요)...")
    t0 = time.time()
    pdf_json = parse_pid_pdf(PDF_PATH, dpi=300, page_range="1")
    print(f"[OK] OCR {time.time() - t0:.1f}초")
    pdf_result = json.loads(pdf_json)
    assert pdf_result["success"]
    assert pdf_result["source"] == "pdf"
    print(f"      OCR 박스: {pdf_result['pages'][0]['image_size']}")

# === DWG 테스트 (있으면) ===
DWG_PATH = "test_data/sample.dwg"
if os.path.exists(DWG_PATH):
    print("\n[DWG] ODA 변환 + 파싱 시작...")
    t0 = time.time()
    dwg_json = parse_pid_dwg(DWG_PATH)
    print(f"[OK] {time.time() - t0:.2f}초")
    dwg_result = json.loads(dwg_json)
    if not dwg_result["success"]:
        print(f"      ⚠️ DWG 파싱 실패 (ODA 미설치?): {dwg_result.get('error')[:200]}")
    else:
        assert dwg_result["source"] == "dwg"

# === 디스패처 테스트 ===
print("\n[디스패처] 확장자 자동 라우팅...")
disp_json = parse_pid_drawing(DXF_PATH)
disp_result = json.loads(disp_json)
assert disp_result["source"] == "dxf"
print(f"[OK] {DXF_PATH}{disp_result['source']} 라우팅")

print("\n✅ 모든 테스트 통과")

5. 인수 조건 (Acceptance Criteria)

# 조건 검증 방법
1 server.py에 4개 도구 추가됨 (parse_pid_dxf/dwg/pdf/drawing) grep
2 DXF 파싱이 1초 이내 완료된다 테스트 시간 측정
3 DXF에서 ISA balloon 매칭률 85% 이상 stats["balloons_with_tag"] / balloons_detected > 0.85
4 DXF cluster에 pre_normalized_tag, balloon_type, layer 필드 채워짐 JSON 검증
5 DCS/FIELD balloon이 구분된다 stats["dcs_balloons"] > 0 또는 사각형 미사용 도면이면 0
6 DXF 처리 시 LLM 호출 없음 (method=="coordinate_matching") 응답 필드 확인
7 DWG → DXF 변환이 ODA Converter로 동작 (또는 명확한 안내 메시지) 테스트 통과
8 PDF는 OCR 경로로 동작, source 필드가 "pdf" 테스트 통과
9 parse_pid_drawing 디스패처가 확장자별 라우팅 정확 테스트 통과
10 한글 텍스트가 깨지지 않음 (has_korean=true cluster 식별됨) JSON 검증
11 MTEXT 폰트 escape 코드 (`{\fMalgun Gothic ...; 한울}) → 한울` 청소됨
12 instrument_layer 파라미터로 다른 레이어명 도면 대응 다른 도면 테스트
13 기존 도구(search_codebase, nl2sql_query)가 정상 동작 회귀 테스트

6. 산출물 (Deliverables)

  1. 수정된 server.py
  2. test_parse_pid.py
  3. test_data/sample.dxf (필수, 검증용)
  4. test_data/sample.dwg (선택, ODA 검증용)
  5. test_data/sample.pdf (선택, OCR 검증용)
  6. requirements.txt 업데이트
  7. ODA File Converter 설치 가이드 (별도 README 또는 본 server.py 주석)
  8. 작업 요약: 변경 라인 수, 처리 시간 벤치마크 (DXF/DWG/PDF 각각), 매칭률

7. 주의사항 (Caveats)

7.1. 포맷별 정확도 차이

포맷 정확도 처리 시간 추가 정보
DXF 95%+ (좌표 매칭) <1초 레이어, 블록, 형상
DWG 95%+ (DXF 경유) 5~30초 (변환 포함) DXF와 동일
PDF (벡터, TTF) 90% (PyMuPDF) 1~3초 좌표만
PDF (SHX/스캔) 85% (OCR + LLM 보정) 5~90초 좌표만, 오인식 가능

가능하면 DWG/DXF를 받으세요. EPC사가 PDF만 주려고 하면 다음 카드 시도:

  1. "분석 자동화를 위해 DXF 추가 제공 가능한가요?"
  2. 협력사면 NDA 후 DWG 받기
  3. 안 되면 PDF + OCR로 fallback

7.2. DXF 처리

  • 레이어명은 회사마다 다름: INSTRUMENT / INST / INSTR / I-1 등. 첫 도면 분석 시 doc.layers 출력해서 ISA balloon이 있는 레이어 확인.
  • MTEXT 폰트 escape 코드: {\fMalgun Gothic|b0|i0|c129|p50;한울} 같은 형태로 들어옴. 본 코드에 정규식 청소 포함됨. 새로운 패턴 발견 시 추가.
  • 블록 참조: 일부 도면은 ISA balloon을 BLOCK + ATTRIB으로 정의. 현재 코드는 익명 블록(A$C...)을 무시. ATTRIB 처리 로직은 포함되어 있으나 실제 ATTRIB 기반 도면으로 추가 검증 필요.
  • 사각형 검출: LWPOLYLINE 4점 닫힌 형태로만 검출. 일부 도면은 LINE 4개로 사각형을 그릴 수 있음. 그 경우 추가 로직 필요 (phase 2).

7.3. DWG 처리

  • ODA File Converter 설치: 등록 + 다운로드 필요. 무료지만 자동화 안 됨. Docker 이미지로 만들어두면 배포 편함.
  • 변환 시간: 큰 DWG(50MB+)는 변환에 30초 이상 소요. timeout 120초 설정.
  • 버전 호환: ODA는 AutoCAD 2.5 ~ 2024 거의 모든 버전 지원. 최신 R2025+는 ODA 최신 버전 필요.
  • 상용 배포 시: ODA Teigha SDK(상용 라이선스) 또는 Aspose.CAD(상용) 검토.

7.4. PDF OCR 처리

  • (v2 작업지시서와 동일) 처리 시간, GPU 메모리, OCR 오인식 등.
  • DXF가 있으면 PDF 처리는 거의 안 쓸 것.

7.5. 운영

  • 임시 파일 정리: DWG 변환 시 tempfile.TemporaryDirectory로 자동 삭제됨. 중간에 프로세스가 죽으면 /tmp/pid_dwg_*에 남을 수 있음. 주기적 정리 cron 권장.
  • MCP HTTP 모드 timeout: DWG 변환 + 큰 도면은 30초 이상 걸릴 수 있음. C# McpClient 측 timeout을 180초로 설정.
  • 메모리: 큰 DXF(50MB+)는 ezdxf가 메모리 ~500MB 사용. 동시 처리 제한.

7.6. 라이선스

  • ezdxf: MIT — 상용 OK
  • PaddleOCR/PaddlePaddle: Apache 2.0 — 상용 OK
  • PyMuPDF: AGPL — 사내 한정 안전, 외부 배포 시 검토
  • ODA File Converter: 무료 사용 약관 (상용 SaaS 재배포 제한 있음)

8. 향후 작업 (Out of Scope)

  • (우선순위 높음) LEGEND 시트 자동 파싱: 약어 사전 추출 → in-context learning. DXF의 경우 LEGEND 시트도 그냥 텍스트가 다 들어있으므로 표 추출이 비교적 쉬움.
  • LEGEND 사전을 Qdrant pid-legend-{project_id} 컬렉션으로 캐시
  • 블록 + ATTRIB 기반 도면 지원: 현재 익명 블록 무시. ATTRIB 추출 로직 검증 + 활성화.
  • 사각형 검출 강화: LINE 4개로 그린 사각형도 검출 (Hough Transform 또는 polygon 결합)
  • 장비-인스트루먼트 연결 그래프: LWPOLYLINE 라인을 따라가며 어떤 장비에 어떤 instrument가 연결되어 있는지 그래프화
  • 대용량 DWG(100MB+) 비동기 처리
  • OCR 결과 캐싱 (PDF 재처리 시 OCR 스킵, 파일 hash 기반)
  • pid_tag_mapping PostgreSQL 테이블 + RAG 인덱싱
  • C# 클라이언트 측 호출 코드
  • 검수 UI: 추출 결과를 도면 위에 오버레이해서 사람이 검토/수정
  • 도면 변경 diff 추적: 같은 drawing_no의 새 버전 들어왔을 때 추가/삭제/변경 태그 식별

작성자: (작성자명) 작성일: 2026-05-01 버전: v3 (DWG/DXF/PDF 통합) 대상 에이전트: Claude Code / Cursor / Roo Code 등 코딩 에이전트 예상 소요: 6~12시간

  • 환경 셋업 (ODA Converter, PaddleOCR, ezdxf): 2~3시간
  • 코드 작성: 3~5시간
  • 테스트 데이터 준비 + 검증: 2~4시간

검증된 성능 지표 (사용자 제공 p-9100.dxf, 11MB):

  • 텍스트 엔티티: 3,925개 추출 (TEXT 3,562 + MTEXT 363)
  • ISA balloon 검출: 215개 (INSTRUMENT 레이어 CIRCLE)
  • 좌표 매칭으로 태그 정규화: 197개 (91.6%)
  • 처리 시간: <1초 (LLM 호출 없음)
  • 한글 라벨 추출: 59개 (깨짐 없음)
  • 레이어 99개 자동 분류