Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL. 기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체. - industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버) - src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer) - mcp-server: Python FastMCP (RAG/NL2SQL/P&ID) - 다중 컨트롤러(N-Controller) 지원 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
11 KiB
pid_worker.py 진단 보고서 (수정본)
작성일: 2026-05-03
파일 경로: mcp-server/worker/pid_worker.py
총 라인 수: 609줄
최종 검토자: Roo
검토일: 2026-05-03
1. 파일 개요
| 항목 | 내용 |
|---|---|
| 용도 | P&ID 도면 파싱 전용 MCP 워커 프로세스 (FastAPI 기반) |
| 진입점 | if __name__ == "__main__" (560줄) |
| 포트 | 기본 5004 (명령줄 인자로 변경 가능) |
2. 주요 기능 (7개 도구)
| 도구 | 설명 | 동기/비동기 | 라인 |
|---|---|---|---|
extract_pid_tags() |
텍스트에서 P&ID 태그 추출 | 동기 | 157 |
match_pid_tags() |
P&ID 태그 ↔ Experion 태그 매핑 | 동기 | 192 |
parse_pid_dxf() |
DXF 파일 파싱 | 동기 | 237 |
parse_pid_pdf() |
PDF 파일 파싱 (OCR 포함) | 동기 | 260 |
parse_pid_drawing() |
DXF/PDF 자동 분기 | 동기 | 283 |
build_pid_graph_parallel() |
병렬 P&ID 그래프 구축 | 비동기 | 302 |
analyze_pid_impact() |
그래프 영향도 분석 | 동기 | 391 |
3. 상세 기능 분석
3.1 extract_pid_tags()
def _extract_pid_tags(text: str, source_type: str) -> str:
- 기능: LLM을 사용하여 입력 텍스트에서 P&ID 태그(FCV-101, P-10101 등) 추출
- LLM 호출:
max_tokens=32768,temperature=0.1 - 반환 형식:
{"success": true, "count": N, "tags": [...]}
3.2 match_pid_tags()
def _match_pid_tags(pid_tags: list, experion_tags: list) -> str:
- 기능: P&ID 태그 목록과 Experion 태그 목록을 유사도 기반으로 매핑
- LLM 호출:
max_tokens=16384,temperature=0.1 - 반환 형식:
{"success": true, "count": N, "mappings": [...]}
3.3 parse_pid_dxf()
def _parse_pid_dxf(filepath: str) -> str:
- 기능: DXF 파일에서 TEXT/MTEXT 엔티티 추출 → LLM으로 태그 파싱
- 사용 라이브러리:
ezdxf(DXF 파싱),openai(LLM 호출) - 반환 형식:
{"success": true, "text": "...", "count": N, "tags": [...]}
3.4 parse_pid_pdf()
def _parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
- 기능: PDF 파일에서 텍스트 추출 (OCR 포함) → LLM으로 태그 파싱
- 사용 라이브러리:
fitz(PyMuPDF),paddleocr,PIL,numpy - OCR 옵션:
use_ocr=True이면 300 DPI로 이미지 변환 후 OCR
3.5 parse_pid_drawing()
def _parse_pid_drawing(filepath: str) -> str:
- 기능: 확장자에 따라
_parse_pid_dxf()또는_parse_pid_pdf()호출 - 지원 형식:
.dxf,.pdf - 거부 형식:
.dwg(직접 파싱 불가)
3.6 build_pid_graph_parallel()
async def _build_pid_graph_parallel(filepath: str) -> str:
- 기능: 4단계 파이프라인으로 P&ID 그래프 구축
- Phase 1:
PidGeometricExtractor로 기하 정보 추출 - Phase 2:
PidTopologyBuilder로 1차 그래프 구축 - Phase 3:
IntelligentMapper로 병렬 LLM 매핑 (3개 gather) - Phase 4: 최종 그래프 저장
- Phase 1:
- 병렬 처리:
asyncio.gather()로 transmitters/valves/equipment 3개 LLM 호출 병렬 실행 - 반환 형식:
{"success": true, "graph_id": "...", "nodes": N, "edges": N}
3.7 analyze_pid_impact()
def _analyze_pid_impact(graph_id: str, start_node_id: str) -> str:
- 기능: 그래프에서 특정 노드 장애 시 영향도 분석
- 사용 라이브러리:
pipeline.analyzer.PidAnalysisEngine - 반환 형식: 분석 결과 JSON
4. 주요 문제점
4.1 이벤트 루프 블로킹 문제 (HIGH) — ❌ 이미 해결됨
보고서 오류: 보고서가 인용한 코드는 구버전 기준입니다.
현재 코드 (406-417줄):
async def _dispatch(tool: str, params: dict) -> str:
try:
match tool:
case "extract_pid_tags":
return await asyncio.to_thread(_extract_pid_tags, **params) # ✅
case "match_pid_tags":
return await asyncio.to_thread(_match_pid_tags, **params) # ✅
case "parse_pid_dxf":
return await asyncio.to_thread(_parse_pid_dxf, **params) # ✅
case "parse_pid_pdf":
return await asyncio.to_thread(_parse_pid_pdf, **params) # ✅
case "parse_pid_drawing":
return await asyncio.to_thread(_parse_pid_drawing, **params) # ✅
case "analyze_pid_impact":
return await asyncio.to_thread(_analyze_pid_impact, **params) # ✅
case "build_pid_graph_parallel":
return await _build_pid_graph_parallel(**params) # ✅ (이미 async)
판정: ✅ 문제 없음 — asyncio.to_thread()로 이미 수정됨
4.2 에러 핸들링 불균형 (MED) — ❌ 진단 오류
보고서 오류: 보고서는 _extract_pid_tags()에 try-catch가 없다고 지적했지만, _dispatch()가 이미 전체를 감싸고 있습니다.
현재 코드 (402-426줄):
async def _dispatch(tool: str, params: dict) -> str:
try:
match tool:
# ... 모든 도구 호출
except Exception as e:
logging.error(f"dispatch error tool={tool}: {e}", exc_info=True)
return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False)
설계 의도:
- 통합 예외 처리 패턴 — 개별 함수에 try-catch를 중복하면 예외가 두 번 잡혀 로그가 이중으로 찍히고, 에러 응답 형식도 제각각이 됩니다.
_dispatch()에서 통합 핸들링하는 것이 의도적인 올바른 설계입니다.
판정: ❌ 진단 오류 — 의도적인 계층 분리 패턴을 결함으로 오판
4.3 메모리 고정 문제 (MED) — ❌ 설계 오류 아님
보고서 오류: @lru_cache로 _llm()과 _ocr()이 고정되어 메모리 증가한다고 지적했지만, 이는 의도적인 올바른 설계입니다.
현재 코드 (50-66줄):
@lru_cache(maxsize=1)
def _llm():
from openai import OpenAI
return OpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
@lru_cache(maxsize=1)
def _ocr():
from paddleocr import PaddleOCR
use_gpu = os.environ.get("PADDLE_USE_GPU", "true").lower() == "true"
try:
return PaddleOCR(use_angle_cls=True, lang="korean", use_gpu=use_gpu, show_log=False)
except Exception:
if use_gpu:
os.environ["PADDLE_USE_GPU"] = "false"
return _ocr()
raise
설계 의도:
- OpenAI 클라이언트는 내부에
httpx커넥션 풀을 가지므로, 요청마다 새로 생성하면 오히려 비효율입니다. - 워커는 one-shot 모드이므로 실질적 영향은 적습니다.
@lru_cache는 의도적인 올바른 설계입니다.
판정: ❌ 진단 오류 — @lru_cache가 올바른 패턴
4.4 중복 코드 (LOW) — ⚠️ 부분 오류
보고서 오류: _extract_pid_tags()와 _parse_pid_dxf()가 LLM 호출 패턴을 반복한다고 지적했지만, 이는 의도적인 파라미터 차이입니다.
실제 차이점:
| 항목 | _extract_pid_tags() |
_parse_pid_dxf() |
|---|---|---|
max_tokens |
32768 | 4096 |
extra_body |
{"chat_template_kwargs": {"enable_thinking": False}} |
없음 |
| 입력 크기 | 최대 100,000자 | 최대 12,000자 |
설명:
_extract_pid_tags()는 긴 텍스트(100k)를 처리하고enable_thinking=False로 최적화_parse_pid_dxf()는 짧은 텍스트(12k)를 처리하고 기본 설정 사용- 이 차이를 공통 함수로 묶으면 파라미터 분기 로직이 오히려 복잡해집니다.
판정: ⚠️ 부분 오류 — 유사해 보여도 다른 목적의 호출
4.5 설정 하드코딩 (LOW) — ✅ 유효한 지적
현재 코드 (32-35줄):
VLLM_BASE_URL = "http://localhost:8000/v1"
VLLM_MODEL = "Qwen/Qwen3-Coder-Next-FP8"
DB_CONNECTION_STRING = "postgresql://postgres:postgres@localhost:5432/iiot_platform"
DB_TIMEOUT = 10
권장: 환경 변수 또는 설정 파일로 분리
import os
VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8000/v1")
VLLM_MODEL = os.environ.get("VLLM_MODEL", "Qwen/Qwen3-Coder-Next-FP8")
DB_CONNECTION_STRING = os.environ.get("DB_CONNECTION_STRING", "postgresql://postgres:postgres@localhost:5432/iiot_platform")
판정: ✅ 유효한 지적
5. 양호한 점
| 항목 | 설명 |
|---|---|
| ✅ 병렬 처리 구현 | build_pid_graph_parallel()에서 asyncio.gather()로 3개 LLM 호출 병렬 실행 |
| ✅ 일관된 JSON 응답 | 모든 도구가 {"success": bool, ...} 형식으로 응답 |
| ✅ 한국어 OCR 지원 | PaddleOCR를 lang="korean"으로 설정 |
| ✅ 자동 폴더 생성 | os.makedirs(STORAGE_DIR, exist_ok=True)로 디렉토리 자동 생성 |
| ✅ 종료 신호 처리 | _schedule_shutdown()으로 응답 후 0.5초 뒤 SIGTERM 전송 |
| ✅ 이벤트 루프 비블로킹 | _dispatch()에서 asyncio.to_thread()로 동기 함수 오프로드 |
| ✅ 통합 예외 처리 | _dispatch()에서 전체 예외를 통합 처리 |
6. 최종 판정 요약
| 항목 | 보고서 심각도 | 실제 판정 | 설명 |
|---|---|---|---|
| 4.1 이벤트 루프 블로킹 | 🔴 HIGH | ✅ 이미 해결됨 | asyncio.to_thread() 적용됨 |
| 4.2 에러 핸들링 | 🟡 MED | ❌ 진단 오류 | 통합 예외 처리가 의도적인 설계 |
| 4.3 메모리 고정 | 🟡 MED | ❌ 설계 오류 아님 | @lru_cache가 올바른 패턴 |
| 4.4 중복 코드 | 🟢 LOW | ⚠️ 부분 오류 | 의도적인 파라미터 차이 |
| 4.5 설정 하드코딩 | 🟢 LOW | ✅ 유효 | 환경 변수로 분리 권장 |
총 결론: 보고서의 5개 지적 중 4개는 틀렸거나 이미 해결된 상태입니다. 유일하게 유효한 지적은 설정 하드코딩(4.5) 뿐입니다.
7. 최종 개선 우선순위
| 우선순위 | 항목 | 작업 |
|---|---|---|
| 🟢 LOW | 설정 외부화 | 환경 변수 또는 설정 파일 사용 |
8. Roo의 최종 평가
"보고서의 5개 지적 중 4개는 틀렸거나 이미 해결된 상태입니다. 유일하게 유효한 지적은 설정 하드코딩(4.5)뿐입니다."
pid_worker.py는 이미 올바르게 설계되어 있으며, 추가 수정이 필요하지 않습니다.