Files
ExperionCrawler/mcp-server/worker/pid_worker_py.md

527 lines
26 KiB
Markdown
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
pid_worker.py를 만들어줘
● 먼저 현재 디렉토리 구조를 확인하고 파일을 작성합니다.
Listed 1 directory (ctrl+o to expand)
● Bash(mkdir -p /home/windpacer/projects/ExperionCrawler/mcp-server/worker)
 Done
● Write(mcp-server/worker/pid_worker.py)
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
Create file
mcp-server/worker/pid_worker.py
╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
1 #!/usr/bin/env python3
2 """P&ID 파싱 전용 워커 프로세스
3
4 Usage: python pid_worker.py <port>
5
6 담당 도구:
7 extract_pid_tags, match_pid_tags,
8 parse_pid_dxf, parse_pid_pdf, parse_pid_drawing,
9 build_pid_graph_parallel, analyze_pid_impact
10 """
11
12 from __future__ import annotations
13 import sys
14 import os
15
16 # mcp-server 디렉토리를 Python 경로에 추가 (pipeline 패키지 접근)
17 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
18
19 import io
20 import json
21 import asyncio
22 import signal
23 import logging
24 import re
25 from functools import lru_cache
26
27 from fastapi import FastAPI, Request
28 import uvicorn
29
30 # ── 설정 ─────────────────────────────────────────────────────────────────────
31
32 VLLM_BASE_URL = "http://localhost:8000/v1"
33 VLLM_MODEL = "Qwen/Qwen3-Coder-Next-FP8"
34 DB_CONNECTION_STRING = "postgresql://postgres:postgres@localhost:5432/iiot_platform"
35 DB_TIMEOUT = 10
36
37 _SERVER_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
38 STORAGE_DIR = os.path.join(_SERVER_DIR, "storage")
39
40 logging.basicConfig(
41 level=logging.INFO,
42 stream=sys.stderr,
43 format="%(asctime)s [pid_worker] %(levelname)s %(message)s",
44 )
45
46 app = FastAPI()
47
48 # ── 싱글톤 ───────────────────────────────────────────────────────────────────
49
50 @lru_cache(maxsize=1)
51 def _llm():
52 from openai import OpenAI
53 return OpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
54
55
56 @lru_cache(maxsize=1)
57 def _ocr():
58 from paddleocr import PaddleOCR
59 use_gpu = os.environ.get("PADDLE_USE_GPU", "true").lower() == "true"
60 try:
61 return PaddleOCR(use_angle_cls=True, lang="korean", use_gpu=use_gpu, show_log=False)
62 except Exception:
63 if use_gpu:
64 os.environ["PADDLE_USE_GPU"] = "false"
65 return _ocr()
66 raise
67
68 # ── DB ───────────────────────────────────────────────────────────────────────
69
70 def _get_db_connection():
71 import psycopg
72 return psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
73
74 # ── 텍스트 추출 ──────────────────────────────────────────────────────────────
75
76 def _extract_text_from_dxf(filepath: str) -> str:
77 import ezdxf
78 from ezdxf.tools.text import plain_mtext
79 doc = ezdxf.readfile(filepath)
80 msp = doc.modelspace()
81 texts = []
82 for entity in msp:
83 if entity.dxftype() == "TEXT":
84 texts.append(entity.dxf.text)
85 elif entity.dxftype() == "MTEXT":
86 try:
87 plain = plain_mtext(entity.dxf.text)
88 if plain.strip():
89 texts.append(plain)
90 except Exception:
91 pass
92 return "\n".join(texts)
93
94
95 def _extract_text_from_pdf(filepath: str) -> str:
96 import fitz
97 doc = fitz.open(filepath)
98 return "\n".join(page.get_text() for page in doc)
99
100
101 def _extract_text_from_pdf_ocr(filepath: str) -> str:
102 import fitz
103 from PIL import Image
104 import numpy as np
105 doc = fitz.open(filepath)
106 all_texts = []
107 for page in doc:
108 mat = fitz.Matrix(300 / 72)
109 pix = page.get_pixmap(matrix=mat)
110 img = Image.open(io.BytesIO(pix.tobytes("png")))
111 result = _ocr().ocr(np.array(img), cls=True)
112 if result and result[0]:
113 all_texts.extend(line[1][0] for line in result[0])
114 return "\n".join(all_texts)
115
116 # ── JSON 배열 파싱 유틸 ───────────────────────────────────────────────────────
117
118 def _parse_json_array(raw: str, finish_reason: str = "") -> list:
119 """LLM 출력에서 JSON 배열 추출. finish_reason=length 잘림 복구 포함."""
120 if raw.startswith("```"):
121 lines = raw.splitlines()
122 raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
123
124 if finish_reason == "length":
125 last_close = raw.rfind("}")
126 if last_close != -1:
127 raw = raw[:last_close + 1] + "]"
128
129 # 가장 긴 균형 잡힌 [...] 추출
130 depth = 0; start = -1; best = ""
131 for i, c in enumerate(raw):
132 if c == "[":
133 if depth == 0:
134 start = i
135 depth += 1
136 elif c == "]":
137 depth -= 1
138 if depth == 0 and start >= 0:
139 cand = raw[start:i + 1]
140 if len(cand) > len(best):
141 best = cand
142 raw = best if best else "[]"
143
144 try:
145 return json.loads(raw)
146 except json.JSONDecodeError:
147 data = []
148 for obj in re.findall(r"\{[^{}]*\}", raw, re.DOTALL):
149 try:
150 data.append(json.loads(obj))
151 except json.JSONDecodeError:
152 pass
153 return data
154
155 # ── 태그 추출/매핑 도구 ───────────────────────────────────────────────────────
156
157 def _extract_pid_tags(text: str, source_type: str) -> str:
158 system = (
159 "You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
160 "Extract all instrument and equipment tags from the provided text.\n"
161 "Return ONLY a valid JSON array. Each element must have exactly these fields:\n"
162 '{"tagNo":"FCV-101","equipmentName":null,"instrumentType":"FCV",'
163 '"lineNumber":null,"pidDrawingNo":null,"confidence":0.95}\n'
164 "Rules:\n"
165 "- tagNo: any token matching [LETTERS]-[DIGITS] or [LETTERS]-[DIGITS]-[SUFFIX]\n"
166 " Examples: FCV-101, P-10101, T-10100, VG-6203-15A-F1A-n, BT-6200, DP-10101\n"
167 "- instrumentType: leading letters of tagNo\n"
168 "- equipmentName: descriptive name if present near tag, else null\n"
169 "- lineNumber/pidDrawingNo: null unless explicitly associated\n"
170 "- confidence: 0.95 for clear tags, lower for ambiguous\n"
171 "- Output ONLY the JSON array, no markdown, no explanation.\n"
172 "- If no tags found, return: []\n"
173 )
174 truncated = text[:100000]
175 resp = _llm().chat.completions.create(
176 model=VLLM_MODEL,
177 messages=[
178 {"role": "system", "content": system},
179 {"role": "user", "content": f"Source: {source_type}\n\nText:\n{truncated}"},
180 ],
181 max_tokens=32768,
182 temperature=0.1,
183 extra_body={"chat_template_kwargs": {"enable_thinking": False}},
184 )
185 raw = (resp.choices[0].message.content or "").strip()
186 data = _parse_json_array(raw, resp.choices[0].finish_reason)
187 logging.info(f"extract_pid_tags source={source_type} count={len(data)}")
188 return json.dumps({"success": True, "count": len(data), "tags": data},
189 ensure_ascii=False, indent=2)
190
191
192 def _match_pid_tags(pid_tags: list, experion_tags: list) -> str:
193 system = (
194 "You are a P&ID to Experion tag matching expert.\n"
195 "Match P&ID tags to Experion tags based on similarity.\n"
196 "Return ONLY a JSON array:\n"
197 '[{"pidTag":"FT-101","experionTag":"ft-101.pv","confidence":0.92},...]\n'
198 "- If no good match: confidence < 0.5, experionTag null\n"
199 "- Output ONLY the JSON array.\n"
200 )
201 resp = _llm().chat.completions.create(
202 model=VLLM_MODEL,
203 messages=[
204 {"role": "system", "content": system},
205 {"role": "user", "content": (
206 f"P&ID Tags:\n{chr(10).join(pid_tags)}\n\n"
207 f"Experion Tags:\n{chr(10).join(experion_tags)}"
208 )},
209 ],
210 max_tokens=16384,
211 temperature=0.1,
212 extra_body={"chat_template_kwargs": {"enable_thinking": False}},
213 )
214 raw = (resp.choices[0].message.content or "").strip()
215 data = _parse_json_array(raw, resp.choices[0].finish_reason)
216 return json.dumps({"success": True, "count": len(data), "mappings": data},
217 ensure_ascii=False, indent=2)
218
219 # ── 도면 파싱 도구 ────────────────────────────────────────────────────────────
220
221 _TAG_EXTRACT_SYSTEM = (
222 "You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
223 "Extract instrument and equipment tags from the provided text.\n"
224 "Return ONLY a JSON array:\n"
225 '[{"tagNo":"FIT-10115","equipmentName":"Flow Transmitter","instrumentType":"FIT",'
226 '"lineNumber":"L-101","pidDrawingNo":"P&ID-001","confidence":0.95},...]\n'
227 "Rules:\n"
228 "- tagNo: Instrument [Function]-[Number], Equipment [Type]-[Number]\n"
229 "- instrumentType: first 2-4 letters of tagNo\n"
230 "- equipmentName/lineNumber/pidDrawingNo: null if not present\n"
231 "- confidence: 0.0 to 1.0\n"
232 "- Output ONLY the JSON array, no markdown.\n"
233 "- If no tags found, return: []\n"
234 )
235
236
237 def _parse_pid_dxf(filepath: str) -> str:
238 text = _extract_text_from_dxf(filepath)
239 if not text.strip():
240 return json.dumps({"success": True, "text": "", "count": 0, "tags": []},
241 ensure_ascii=False, indent=2)
242
243 resp = _llm().chat.completions.create(
244 model=VLLM_MODEL,
245 messages=[
246 {"role": "system", "content": _TAG_EXTRACT_SYSTEM},
247 {"role": "user", "content": f"Source: dxf\n\nText:\n{text[:12000]}"},
248 ],
249 max_tokens=4096,
250 temperature=0.1,
251 )
252 raw = (resp.choices[0].message.content or "").strip()
253 data = _parse_json_array(raw, resp.choices[0].finish_reason)
254 if not isinstance(data, list):
255 data = []
256 return json.dumps({"success": True, "text": text[:10000], "count": len(data), "tags": data},
257 ensure_ascii=False, indent=2)
258
259
260 def _parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
261 text = _extract_text_from_pdf_ocr(filepath) if use_ocr else _extract_text_from_pdf(filepath)
262 if not text.strip():
263 return json.dumps({"success": True, "text": "", "count": 0, "tags": []},
264 ensure_ascii=False, indent=2)
265
266 resp = _llm().chat.completions.create(
267 model=VLLM_MODEL,
268 messages=[
269 {"role": "system", "content": _TAG_EXTRACT_SYSTEM},
270 {"role": "user", "content": f"Source: pdf\n\nText:\n{text[:12000]}"},
271 ],
272 max_tokens=4096,
273 temperature=0.1,
274 )
275 raw = (resp.choices[0].message.content or "").strip()
276 data = _parse_json_array(raw, resp.choices[0].finish_reason)
277 if not isinstance(data, list):
278 data = []
279 return json.dumps({"success": True, "text": text[:10000], "count": len(data), "tags": data},
280 ensure_ascii=False, indent=2)
281
282
283 def _parse_pid_drawing(filepath: str) -> str:
284 ext = os.path.splitext(filepath)[1].lower()
285 if ext == ".dxf":
286 return _parse_pid_dxf(filepath)
287 elif ext == ".pdf":
288 return _parse_pid_pdf(filepath)
289 elif ext == ".dwg":
290 return json.dumps({
291 "success": False,
292 "error": "DWG 파일은 직접 파싱할 수 없습니다. DXF로 변환 후 사용하세요.",
293 }, ensure_ascii=False)
294 else:
295 return json.dumps({
296 "success": False,
297 "error": f"지원하지 않는 형식: {ext}. 지원: .dxf, .pdf",
298 }, ensure_ascii=False)
299
300 # ── 그래프 도구 ───────────────────────────────────────────────────────────────
301
302 async def _build_pid_graph_parallel(filepath: str) -> str:
303 from pipeline.extractor import PidGeometricExtractor
304 from pipeline.topology import PidTopologyBuilder
305 from pipeline.mapper import IntelligentMapper
306 from openai import AsyncOpenAI
307
308 os.makedirs(STORAGE_DIR, exist_ok=True)
309
310 # Phase 1: 기하 추출
311 extractor = PidGeometricExtractor(filepath)
312 geo_data_path = os.path.join(STORAGE_DIR, os.path.basename(filepath) + "_geo.json")
313 extractor.extract_and_save(geo_data_path)
314 with open(geo_data_path, "r", encoding="utf-8") as f:
315 geo_data = json.load(f)
316
317 # 시스템 태그 조회
318 system_tags: list[str] = []
319 try:
320 conn = _get_db_connection()
321 with conn.cursor() as cur:
322 cur.execute("SELECT tagname FROM realtime_table")
323 system_tags = [r[0] for r in cur.fetchall()]
324 except Exception as e:
325 logging.warning(f"시스템 태그 조회 실패: {e}")
326
327 # Phase 2: 1차 위상 빌더 (Mapper용 그래프)
328 builder = PidTopologyBuilder(geo_data)
329 builder.build_graph()
330
331 # Phase 3: 병렬 LLM 매핑
332 api_client = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
333 mapper = IntelligentMapper(builder.G, system_tags, api_client=api_client)
334
335 transmitter_nodes = [
336 n for n, d in builder.G.nodes(data=True)
337 if d.get("value", "").upper() in {"FIT", "FT", "LT", "PT", "TE"}
338 ]
339 valve_nodes = [
340 n for n, d in builder.G.nodes(data=True)
341 if d.get("value", "").upper() in {"FCV", "LCV", "TCV", "PCV", "XV"}
342 ]
343 equipment_nodes = [
344 n for n, d in builder.G.nodes(data=True)
345 if d.get("type") not in {"TEXT", "LINE", "LWPOLYLINE"}
346 ]
347
348 extracted_results = await asyncio.gather(
349 mapper.extract_transmitters(transmitter_nodes),
350 mapper.extract_valves(valve_nodes),
351 mapper.extract_equipment(equipment_nodes),
352 )
353
354 # 매핑 결과 통합
355 all_mapped_tags = []
356 for res_dict in extracted_results:
357 for node_id, mapping in res_dict.items():
358 if mapping.resolved_tag != "UNKNOWN":
359 node_data = builder.G.nodes[node_id]
360 all_mapped_tags.append({
361 "entity_id": node_id,
362 "tagName": mapping.resolved_tag,
363 "bbox": (
364 node_data["bbox"].bounds
365 if hasattr(node_data["bbox"], "bounds")
366 else node_data["bbox"]
367 ),
368 "clean_value": mapping.resolved_tag,
369 })
370
371 # Phase 4: 최종 위상 모델링 + 저장
372 final_builder = PidTopologyBuilder(geo_data, all_extracted_tags=all_mapped_tags)
373 final_builder.build_graph()
374
375 graph_id = os.path.basename(filepath).replace(".dxf", "_graph.json")
376 graph_path = os.path.join(STORAGE_DIR, graph_id)
377 final_builder.save_graph(graph_path)
378
379 logging.info(f"build_pid_graph_parallel graph_id={graph_id} "
380 f"nodes={final_builder.G.number_of_nodes()} "
381 f"edges={final_builder.G.number_of_edges()}")
382 return json.dumps({
383 "success": True,
384 "graph_id": graph_id,
385 "graph_path": graph_path,
386 "nodes": final_builder.G.number_of_nodes(),
387 "edges": final_builder.G.number_of_edges(),
388 }, ensure_ascii=False)
389
390
391 def _analyze_pid_impact(graph_id: str, start_node_id: str) -> str:
392 from pipeline.analyzer import PidAnalysisEngine
393
394 graph_path = os.path.join(STORAGE_DIR, graph_id)
395 mapping_path = graph_path.replace("_graph.json", "_mapping.json")
396 analyzer = PidAnalysisEngine(graph_path, mapping_path)
397 result = analyzer.analyze_impact(start_node_id)
398 return json.dumps(result, ensure_ascii=False, indent=2)
399
400 # ── 요청 디스패처 ─────────────────────────────────────────────────────────────
401
402 async def _dispatch(tool: str, params: dict) -> str:
403 try:
404 match tool:
405 case "extract_pid_tags":
406 return _extract_pid_tags(**params)
407 case "match_pid_tags":
408 return _match_pid_tags(**params)
409 case "parse_pid_dxf":
410 return _parse_pid_dxf(**params)
411 case "parse_pid_pdf":
412 return _parse_pid_pdf(**params)
413 case "parse_pid_drawing":
414 return _parse_pid_drawing(**params)
415 case "build_pid_graph_parallel":
416 return await _build_pid_graph_parallel(**params)
417 case "analyze_pid_impact":
418 return _analyze_pid_impact(**params)
419 case _:
420 return json.dumps({"success": False, "error": f"알 수 없는 도구: {tool}"},
421 ensure_ascii=False)
422 except Exception as e:
423 logging.error(f"dispatch error tool={tool}: {e}", exc_info=True)
424 return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False)
425
426 # ── 종료 예약 ─────────────────────────────────────────────────────────────────
427
428 def _schedule_shutdown():
429 """응답 전송 완료 후 0.5초 뒤 프로세스 종료 예약."""
430 async def _do():
431 await asyncio.sleep(0.5)
432 os.kill(os.getpid(), signal.SIGTERM)
433 asyncio.create_task(_do())
434
435 # ── HTTP 엔드포인트 ───────────────────────────────────────────────────────────
436
437 @app.get("/health")
438 async def health():
439 return {"status": "ok"}
├─────┼───────────────────────────────┼────────┤
│ 9 │ atexit 정리 훅 │ 쉬움 │
└─────┴───────────────────────────────┴────────┘
실제 워커 파일(worker/rag_worker.py 등)을 작성할까요, 아니면 계획서를 수정 반영하는 것으로 마무리할까요?
✻ Brewed for 1m 12s
※ recap: MCP 서버 병렬 아키텍처 설계 문서 진단 및 수정 방향 제시 완료. 다음 단계로 실제 워커 파일 작성 또는 계획서 수정
반영 중 선택이 필요합니다. (disable recaps in /config)
pid_worker.py를 만들어줘
● 먼저 현재 디렉토리 구조를 확인하고 파일을 작성합니다.
Listed 1 directory (ctrl+o to expand)
● Bash(mkdir -p /home/windpacer/projects/ExperionCrawler/mcp-server/worker)
 Done
● Write(mcp-server/worker/pid_worker.py)
 Wrote 461 lines to mcp-server/worker/pid_worker.py
1 #!/usr/bin/env python3
2 """P&ID 파싱 전용 워커 프로세스
3
4 Usage: python pid_worker.py <port>
5
6 담당 도구:
7 extract_pid_tags, match_pid_tags,
8 parse_pid_dxf, parse_pid_pdf, parse_pid_drawing,
9 build_pid_graph_parallel, analyze_pid_impact
10 """
… +451 lines (ctrl+o to expand)
● 문법 오류 없는지 확인합니다.
● Bash(cd /home/windpacer/projects/ExperionCrawler/mcp-server && .venv/bin/python -m py_compile worker/pid_worker.py &&
echo "OK")
 OK
● 문법 오류 없음. 완성된 파일을 확인합니다.
Read 1 file (ctrl+o to expand)
● mcp-server/worker/pid_worker.py (461줄) 생성 완료. 진단에서 발견한 수정 사항이 모두 반영됐습니다.
---
적용된 수정 목록
┌──────────┬────────────────────────────────────────────────────────────────────────────────────┐
│ Fix │ 내용 │
├──────────┼────────────────────────────────────────────────────────────────────────────────────┤
│ ✅ Fix 1 │ RAG 워커 혼입 없음 — P&ID 도구만 포함 │
├──────────┼────────────────────────────────────────────────────────────────────────────────────┤
│ ✅ Fix 2 │ app = FastAPI() 선언 │
├──────────┼────────────────────────────────────────────────────────────────────────────────────┤
│ ✅ Fix 3 │ _schedule_shutdown() — asyncio.create_task로 0.5초 지연 후 SIGTERM, 응답 먼저 반환 │
├──────────┼────────────────────────────────────────────────────────────────────────────────────┤
│ ✅ Fix 4 │ Request.json() 바디 파싱 │
├──────────┼────────────────────────────────────────────────────────────────────────────────────┤
│ ✅ Fix 7 │ /health 엔드포인트 추가 (메인 서버 헬스체크 루프용) │
├──────────┼────────────────────────────────────────────────────────────────────────────────────┤
│ ✅ Fix 8 │ 워커는 stdout/stderr 제어 없음, 메인 서버에서 DEVNULL로 실행 │
└──────────┴────────────────────────────────────────────────────────────────────────────────────┘
구조 요약
/execute → 일반 요청 (워커 유지)
/execute/one_shot → P&ID 요청 후 자동 종료
/health → 워커 준비 확인용
pipeline 모듈은 sys.path 조작으로 mcp-server/pipeline/을 자동 인식합니다. 실행: python worker/pid_worker.py 5004