Files
ExperionCrawler/mcp-server/worker/pid_worker_py.md

26 KiB
Raw Permalink Blame History

pid_worker.py를 만들어줘

● 먼저 현재 디렉토리 구조를 확인하고 파일을 작성합니다.

Listed 1 directory (ctrl+o to expand)

● Bash(mkdir -p /home/windpacer/projects/ExperionCrawler/mcp-server/worker) ⎿  Done

● Write(mcp-server/worker/pid_worker.py)

──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── Create file mcp-server/worker/pid_worker.py ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ 1 #!/usr/bin/env python3 2 """P&ID 파싱 전용 워커 프로세스 3 4 Usage: python pid_worker.py 5 6 담당 도구: 7 extract_pid_tags, match_pid_tags, 8 parse_pid_dxf, parse_pid_pdf, parse_pid_drawing, 9 build_pid_graph_parallel, analyze_pid_impact 10 """ 11 12 from future import annotations 13 import sys 14 import os 15 16 # mcp-server 디렉토리를 Python 경로에 추가 (pipeline 패키지 접근) 17 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(file)))) 18 19 import io 20 import json 21 import asyncio 22 import signal 23 import logging 24 import re 25 from functools import lru_cache 26 27 from fastapi import FastAPI, Request 28 import uvicorn 29 30 # ── 설정 ───────────────────────────────────────────────────────────────────── 31 32 VLLM_BASE_URL = "http://localhost:8000/v1" 33 VLLM_MODEL = "Qwen/Qwen3-Coder-Next-FP8" 34 DB_CONNECTION_STRING = "postgresql://postgres:postgres@localhost:5432/iiot_platform" 35 DB_TIMEOUT = 10 36 37 _SERVER_DIR = os.path.dirname(os.path.dirname(os.path.abspath(file))) 38 STORAGE_DIR = os.path.join(_SERVER_DIR, "storage") 39 40 logging.basicConfig( 41 level=logging.INFO, 42 stream=sys.stderr, 43 format="%(asctime)s [pid_worker] %(levelname)s %(message)s", 44 ) 45 46 app = FastAPI() 47 48 # ── 싱글톤 ─────────────────────────────────────────────────────────────────── 49 50 @lru_cache(maxsize=1) 51 def _llm(): 52 from openai import OpenAI 53 return OpenAI(base_url=VLLM_BASE_URL, api_key="dummy") 54 55 56 @lru_cache(maxsize=1) 57 def _ocr(): 58 from paddleocr import PaddleOCR 59 use_gpu = os.environ.get("PADDLE_USE_GPU", "true").lower() == "true" 60 try: 61 return PaddleOCR(use_angle_cls=True, lang="korean", use_gpu=use_gpu, show_log=False) 62 except Exception: 63 if use_gpu: 64 os.environ["PADDLE_USE_GPU"] = "false" 65 return _ocr() 66 raise 67 68 # ── DB ─────────────────────────────────────────────────────────────────────── 69 70 def _get_db_connection(): 71 import psycopg 72 return psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT) 73 74 # ── 텍스트 추출 ────────────────────────────────────────────────────────────── 75 76 def _extract_text_from_dxf(filepath: str) -> str: 77 import ezdxf 78 from ezdxf.tools.text import plain_mtext 79 doc = ezdxf.readfile(filepath) 80 msp = doc.modelspace() 81 texts = [] 82 for entity in msp: 83 if entity.dxftype() == "TEXT": 84 texts.append(entity.dxf.text) 85 elif entity.dxftype() == "MTEXT": 86 try: 87 plain = plain_mtext(entity.dxf.text) 88 if plain.strip(): 89 texts.append(plain) 90 except Exception: 91 pass 92 return "\n".join(texts) 93 94 95 def _extract_text_from_pdf(filepath: str) -> str: 96 import fitz 97 doc = fitz.open(filepath) 98 return "\n".join(page.get_text() for page in doc) 99 100 101 def _extract_text_from_pdf_ocr(filepath: str) -> str: 102 import fitz 103 from PIL import Image 104 import numpy as np 105 doc = fitz.open(filepath) 106 all_texts = [] 107 for page in doc: 108 mat = fitz.Matrix(300 / 72) 109 pix = page.get_pixmap(matrix=mat) 110 img = Image.open(io.BytesIO(pix.tobytes("png"))) 111 result = _ocr().ocr(np.array(img), cls=True) 112 if result and result[0]: 113 all_texts.extend(line[1][0] for line in result[0]) 114 return "\n".join(all_texts) 115 116 # ── JSON 배열 파싱 유틸 ─────────────────────────────────────────────────────── 117 118 def _parse_json_array(raw: str, finish_reason: str = "") -> list: 119 """LLM 출력에서 JSON 배열 추출. finish_reason=length 잘림 복구 포함.""" 120 if raw.startswith(""): 121 lines = raw.splitlines() 122 raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "" else lines[1:]).strip() 123 124 if finish_reason == "length": 125 last_close = raw.rfind("}") 126 if last_close != -1: 127 raw = raw[:last_close + 1] + "]" 128 129 # 가장 긴 균형 잡힌 [...] 추출 130 depth = 0; start = -1; best = "" 131 for i, c in enumerate(raw): 132 if c == "[": 133 if depth == 0: 134 start = i 135 depth += 1 136 elif c == "]": 137 depth -= 1 138 if depth == 0 and start >= 0: 139 cand = raw[start:i + 1] 140 if len(cand) > len(best): 141 best = cand 142 raw = best if best else "[]" 143 144 try: 145 return json.loads(raw) 146 except json.JSONDecodeError: 147 data = [] 148 for obj in re.findall(r"{[^{}]*}", raw, re.DOTALL): 149 try: 150 data.append(json.loads(obj)) 151 except json.JSONDecodeError: 152 pass 153 return data 154 155 # ── 태그 추출/매핑 도구 ─────────────────────────────────────────────────────── 156 157 def _extract_pid_tags(text: str, source_type: str) -> str: 158 system = ( 159 "You are a P&ID (Piping and Instrumentation Diagram) expert.\n" 160 "Extract all instrument and equipment tags from the provided text.\n" 161 "Return ONLY a valid JSON array. Each element must have exactly these fields:\n" 162 '{"tagNo":"FCV-101","equipmentName":null,"instrumentType":"FCV",' 163 '"lineNumber":null,"pidDrawingNo":null,"confidence":0.95}\n' 164 "Rules:\n" 165 "- tagNo: any token matching [LETTERS]-[DIGITS] or [LETTERS]-[DIGITS]-[SUFFIX]\n" 166 " Examples: FCV-101, P-10101, T-10100, VG-6203-15A-F1A-n, BT-6200, DP-10101\n" 167 "- instrumentType: leading letters of tagNo\n" 168 "- equipmentName: descriptive name if present near tag, else null\n" 169 "- lineNumber/pidDrawingNo: null unless explicitly associated\n" 170 "- confidence: 0.95 for clear tags, lower for ambiguous\n" 171 "- Output ONLY the JSON array, no markdown, no explanation.\n" 172 "- If no tags found, return: []\n" 173 ) 174 truncated = text[:100000] 175 resp = _llm().chat.completions.create( 176 model=VLLM_MODEL, 177 messages=[ 178 {"role": "system", "content": system}, 179 {"role": "user", "content": f"Source: {source_type}\n\nText:\n{truncated}"}, 180 ], 181 max_tokens=32768, 182 temperature=0.1, 183 extra_body={"chat_template_kwargs": {"enable_thinking": False}}, 184 ) 185 raw = (resp.choices[0].message.content or "").strip() 186 data = _parse_json_array(raw, resp.choices[0].finish_reason) 187 logging.info(f"extract_pid_tags source={source_type} count={len(data)}") 188 return json.dumps({"success": True, "count": len(data), "tags": data}, 189 ensure_ascii=False, indent=2) 190 191 192 def _match_pid_tags(pid_tags: list, experion_tags: list) -> str: 193 system = ( 194 "You are a P&ID to Experion tag matching expert.\n" 195 "Match P&ID tags to Experion tags based on similarity.\n" 196 "Return ONLY a JSON array:\n" 197 '[{"pidTag":"FT-101","experionTag":"ft-101.pv","confidence":0.92},...]\n' 198 "- If no good match: confidence < 0.5, experionTag null\n" 199 "- Output ONLY the JSON array.\n" 200 ) 201 resp = _llm().chat.completions.create( 202 model=VLLM_MODEL, 203 messages=[ 204 {"role": "system", "content": system}, 205 {"role": "user", "content": ( 206 f"P&ID Tags:\n{chr(10).join(pid_tags)}\n\n" 207 f"Experion Tags:\n{chr(10).join(experion_tags)}" 208 )}, 209 ], 210 max_tokens=16384, 211 temperature=0.1, 212 extra_body={"chat_template_kwargs": {"enable_thinking": False}}, 213 ) 214 raw = (resp.choices[0].message.content or "").strip() 215 data = _parse_json_array(raw, resp.choices[0].finish_reason) 216 return json.dumps({"success": True, "count": len(data), "mappings": data}, 217 ensure_ascii=False, indent=2) 218 219 # ── 도면 파싱 도구 ──────────────────────────────────────────────────────────── 220 221 _TAG_EXTRACT_SYSTEM = ( 222 "You are a P&ID (Piping and Instrumentation Diagram) expert.\n" 223 "Extract instrument and equipment tags from the provided text.\n" 224 "Return ONLY a JSON array:\n" 225 '[{"tagNo":"FIT-10115","equipmentName":"Flow Transmitter","instrumentType":"FIT",' 226 '"lineNumber":"L-101","pidDrawingNo":"P&ID-001","confidence":0.95},...]\n' 227 "Rules:\n" 228 "- tagNo: Instrument [Function]-[Number], Equipment [Type]-[Number]\n" 229 "- instrumentType: first 2-4 letters of tagNo\n" 230 "- equipmentName/lineNumber/pidDrawingNo: null if not present\n" 231 "- confidence: 0.0 to 1.0\n" 232 "- Output ONLY the JSON array, no markdown.\n" 233 "- If no tags found, return: []\n" 234 ) 235 236 237 def _parse_pid_dxf(filepath: str) -> str: 238 text = _extract_text_from_dxf(filepath) 239 if not text.strip(): 240 return json.dumps({"success": True, "text": "", "count": 0, "tags": []}, 241 ensure_ascii=False, indent=2) 242 243 resp = _llm().chat.completions.create( 244 model=VLLM_MODEL, 245 messages=[ 246 {"role": "system", "content": _TAG_EXTRACT_SYSTEM}, 247 {"role": "user", "content": f"Source: dxf\n\nText:\n{text[:12000]}"}, 248 ], 249 max_tokens=4096, 250 temperature=0.1, 251 ) 252 raw = (resp.choices[0].message.content or "").strip() 253 data = _parse_json_array(raw, resp.choices[0].finish_reason) 254 if not isinstance(data, list): 255 data = [] 256 return json.dumps({"success": True, "text": text[:10000], "count": len(data), "tags": data}, 257 ensure_ascii=False, indent=2) 258 259 260 def _parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str: 261 text = _extract_text_from_pdf_ocr(filepath) if use_ocr else _extract_text_from_pdf(filepath) 262 if not text.strip(): 263 return json.dumps({"success": True, "text": "", "count": 0, "tags": []}, 264 ensure_ascii=False, indent=2) 265 266 resp = _llm().chat.completions.create( 267 model=VLLM_MODEL, 268 messages=[ 269 {"role": "system", "content": _TAG_EXTRACT_SYSTEM}, 270 {"role": "user", "content": f"Source: pdf\n\nText:\n{text[:12000]}"}, 271 ], 272 max_tokens=4096, 273 temperature=0.1, 274 ) 275 raw = (resp.choices[0].message.content or "").strip() 276 data = _parse_json_array(raw, resp.choices[0].finish_reason) 277 if not isinstance(data, list): 278 data = [] 279 return json.dumps({"success": True, "text": text[:10000], "count": len(data), "tags": data}, 280 ensure_ascii=False, indent=2) 281 282 283 def _parse_pid_drawing(filepath: str) -> str: 284 ext = os.path.splitext(filepath)[1].lower() 285 if ext == ".dxf": 286 return _parse_pid_dxf(filepath) 287 elif ext == ".pdf": 288 return _parse_pid_pdf(filepath) 289 elif ext == ".dwg": 290 return json.dumps({ 291 "success": False, 292 "error": "DWG 파일은 직접 파싱할 수 없습니다. DXF로 변환 후 사용하세요.", 293 }, ensure_ascii=False) 294 else: 295 return json.dumps({ 296 "success": False, 297 "error": f"지원하지 않는 형식: {ext}. 지원: .dxf, .pdf", 298 }, ensure_ascii=False) 299 300 # ── 그래프 도구 ─────────────────────────────────────────────────────────────── 301 302 async def _build_pid_graph_parallel(filepath: str) -> str: 303 from pipeline.extractor import PidGeometricExtractor 304 from pipeline.topology import PidTopologyBuilder 305 from pipeline.mapper import IntelligentMapper 306 from openai import AsyncOpenAI 307 308 os.makedirs(STORAGE_DIR, exist_ok=True) 309 310 # Phase 1: 기하 추출 311 extractor = PidGeometricExtractor(filepath) 312 geo_data_path = os.path.join(STORAGE_DIR, os.path.basename(filepath) + "_geo.json") 313 extractor.extract_and_save(geo_data_path) 314 with open(geo_data_path, "r", encoding="utf-8") as f: 315 geo_data = json.load(f) 316 317 # 시스템 태그 조회 318 system_tags: list[str] = [] 319 try: 320 conn = _get_db_connection() 321 with conn.cursor() as cur: 322 cur.execute("SELECT tagname FROM realtime_table") 323 system_tags = [r[0] for r in cur.fetchall()] 324 except Exception as e: 325 logging.warning(f"시스템 태그 조회 실패: {e}") 326 327 # Phase 2: 1차 위상 빌더 (Mapper용 그래프) 328 builder = PidTopologyBuilder(geo_data) 329 builder.build_graph() 330 331 # Phase 3: 병렬 LLM 매핑 332 api_client = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="dummy") 333 mapper = IntelligentMapper(builder.G, system_tags, api_client=api_client) 334 335 transmitter_nodes = [ 336 n for n, d in builder.G.nodes(data=True) 337 if d.get("value", "").upper() in {"FIT", "FT", "LT", "PT", "TE"} 338 ] 339 valve_nodes = [ 340 n for n, d in builder.G.nodes(data=True) 341 if d.get("value", "").upper() in {"FCV", "LCV", "TCV", "PCV", "XV"} 342 ] 343 equipment_nodes = [ 344 n for n, d in builder.G.nodes(data=True) 345 if d.get("type") not in {"TEXT", "LINE", "LWPOLYLINE"} 346 ] 347 348 extracted_results = await asyncio.gather( 349 mapper.extract_transmitters(transmitter_nodes), 350 mapper.extract_valves(valve_nodes), 351 mapper.extract_equipment(equipment_nodes), 352 ) 353 354 # 매핑 결과 통합 355 all_mapped_tags = [] 356 for res_dict in extracted_results: 357 for node_id, mapping in res_dict.items(): 358 if mapping.resolved_tag != "UNKNOWN": 359 node_data = builder.G.nodes[node_id] 360 all_mapped_tags.append({ 361 "entity_id": node_id, 362 "tagName": mapping.resolved_tag, 363 "bbox": ( 364 node_data["bbox"].bounds 365 if hasattr(node_data["bbox"], "bounds") 366 else node_data["bbox"] 367 ), 368 "clean_value": mapping.resolved_tag, 369 }) 370 371 # Phase 4: 최종 위상 모델링 + 저장 372 final_builder = PidTopologyBuilder(geo_data, all_extracted_tags=all_mapped_tags) 373 final_builder.build_graph() 374 375 graph_id = os.path.basename(filepath).replace(".dxf", "_graph.json") 376 graph_path = os.path.join(STORAGE_DIR, graph_id) 377 final_builder.save_graph(graph_path) 378 379 logging.info(f"build_pid_graph_parallel graph_id={graph_id} " 380 f"nodes={final_builder.G.number_of_nodes()} " 381 f"edges={final_builder.G.number_of_edges()}") 382 return json.dumps({ 383 "success": True, 384 "graph_id": graph_id, 385 "graph_path": graph_path, 386 "nodes": final_builder.G.number_of_nodes(), 387 "edges": final_builder.G.number_of_edges(), 388 }, ensure_ascii=False) 389 390 391 def _analyze_pid_impact(graph_id: str, start_node_id: str) -> str: 392 from pipeline.analyzer import PidAnalysisEngine 393 394 graph_path = os.path.join(STORAGE_DIR, graph_id) 395 mapping_path = graph_path.replace("_graph.json", "_mapping.json") 396 analyzer = PidAnalysisEngine(graph_path, mapping_path) 397 result = analyzer.analyze_impact(start_node_id) 398 return json.dumps(result, ensure_ascii=False, indent=2) 399 400 # ── 요청 디스패처 ───────────────────────────────────────────────────────────── 401 402 async def _dispatch(tool: str, params: dict) -> str: 403 try: 404 match tool: 405 case "extract_pid_tags": 406 return _extract_pid_tags(**params) 407 case "match_pid_tags": 408 return _match_pid_tags(**params) 409 case "parse_pid_dxf": 410 return _parse_pid_dxf(**params) 411 case "parse_pid_pdf": 412 return _parse_pid_pdf(**params) 413 case "parse_pid_drawing": 414 return _parse_pid_drawing(**params) 415 case "build_pid_graph_parallel": 416 return await _build_pid_graph_parallel(**params) 417 case "analyze_pid_impact": 418 return _analyze_pid_impact(**params) 419 case _: 420 return json.dumps({"success": False, "error": f"알 수 없는 도구: {tool}"}, 421 ensure_ascii=False) 422 except Exception as e: 423 logging.error(f"dispatch error tool={tool}: {e}", exc_info=True) 424 return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False) 425 426 # ── 종료 예약 ───────────────────────────────────────────────────────────────── 427 428 def _schedule_shutdown(): 429 """응답 전송 완료 후 0.5초 뒤 프로세스 종료 예약.""" 430 async def _do(): 431 await asyncio.sleep(0.5) 432 os.kill(os.getpid(), signal.SIGTERM) 433 asyncio.create_task(_do()) 434 435 # ── HTTP 엔드포인트 ─────────────────────────────────────────────────────────── 436 437 @app.get("/health") 438 async def health(): 439 return {"status": "ok"} ├─────┼───────────────────────────────┼────────┤ │ 9 │ atexit 정리 훅 │ 쉬움 │ └─────┴───────────────────────────────┴────────┘

실제 워커 파일(worker/rag_worker.py 등)을 작성할까요, 아니면 계획서를 수정 반영하는 것으로 마무리할까요?

✻ Brewed for 1m 12s

※ recap: MCP 서버 병렬 아키텍처 설계 문서 진단 및 수정 방향 제시 완료. 다음 단계로 실제 워커 파일 작성 또는 계획서 수정 반영 중 선택이 필요합니다. (disable recaps in /config)

pid_worker.py를 만들어줘

● 먼저 현재 디렉토리 구조를 확인하고 파일을 작성합니다.

Listed 1 directory (ctrl+o to expand)

● Bash(mkdir -p /home/windpacer/projects/ExperionCrawler/mcp-server/worker) ⎿  Done

● Write(mcp-server/worker/pid_worker.py) ⎿  Wrote 461 lines to mcp-server/worker/pid_worker.py 1 #!/usr/bin/env python3 2 """P&ID 파싱 전용 워커 프로세스 3 4 Usage: python pid_worker.py 5 6 담당 도구: 7 extract_pid_tags, match_pid_tags, 8 parse_pid_dxf, parse_pid_pdf, parse_pid_drawing, 9 build_pid_graph_parallel, analyze_pid_impact 10 """ … +451 lines (ctrl+o to expand)

● 문법 오류 없는지 확인합니다.

● Bash(cd /home/windpacer/projects/ExperionCrawler/mcp-server && .venv/bin/python -m py_compile worker/pid_worker.py && echo "OK") ⎿  OK

● 문법 오류 없음. 완성된 파일을 확인합니다.

Read 1 file (ctrl+o to expand)

● mcp-server/worker/pid_worker.py (461줄) 생성 완료. 진단에서 발견한 수정 사항이 모두 반영됐습니다.


적용된 수정 목록

┌──────────┬────────────────────────────────────────────────────────────────────────────────────┐ │ Fix │ 내용 │ ├──────────┼────────────────────────────────────────────────────────────────────────────────────┤ │ Fix 1 │ RAG 워커 혼입 없음 — P&ID 도구만 포함 │ ├──────────┼────────────────────────────────────────────────────────────────────────────────────┤ │ Fix 2 │ app = FastAPI() 선언 │ ├──────────┼────────────────────────────────────────────────────────────────────────────────────┤ │ Fix 3 │ _schedule_shutdown() — asyncio.create_task로 0.5초 지연 후 SIGTERM, 응답 먼저 반환 │ ├──────────┼────────────────────────────────────────────────────────────────────────────────────┤ │ Fix 4 │ Request.json() 바디 파싱 │ ├──────────┼────────────────────────────────────────────────────────────────────────────────────┤ │ Fix 7 │ /health 엔드포인트 추가 (메인 서버 헬스체크 루프용) │ ├──────────┼────────────────────────────────────────────────────────────────────────────────────┤ │ Fix 8 │ 워커는 stdout/stderr 제어 없음, 메인 서버에서 DEVNULL로 실행 │ └──────────┴────────────────────────────────────────────────────────────────────────────────────┘

구조 요약

/execute → 일반 요청 (워커 유지) /execute/one_shot → P&ID 요청 후 자동 종료 /health → 워커 준비 확인용

pipeline 모듈은 sys.path 조작으로 mcp-server/pipeline/을 자동 인식합니다. 실행: python worker/pid_worker.py 5004