Files
HC900-Crawler/mcp-server/mcp-parallel-plan.md
windpacer 16fc7a2598 Initial commit: HC900 Crawler
Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL.
기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체.

- industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버)
- src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer)
- mcp-server: Python FastMCP (RAG/NL2SQL/P&ID)
- 다중 컨트롤러(N-Controller) 지원

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:28:14 +09:00

33 KiB

MCP 서버 병렬 아키텍처 설계 문서

1. 설계 개요

1.1 문제 인식

현재 MCP 서버는 단일 프로세스에서 모든 요청을 순차적으로 처리하는 구조로, 다음과 같은 문제점이 있음:

문제 설명 영향
단일 프로세스 모든 도구가 동일한 프로세스에서 실행 CPU 자원 미사용
순차 처리 긴 요청이 완료될 때까지 다른 요청 대기 응답 지연
LLM 병목 ask_iiot_llm, query_with_nl, extract_pid_tags 등 LLM 호출이 순차 실행 요청 간 차단
P&ID 파싱 병목 Phase 2 위상 빌더 O(n²) 복잡도 + Phase 3 LLM 매핑 수분 이상 소요

1.2 설계 목표

  1. 하드웨어 자원 최적화: 멀티프로세스를 활용하여 CPU 코어 전체 사용
  2. 병렬 처리: 독립적인 요청을 동시에 처리하여 대기 시간 최소화
  3. 확장성: 새로운 도구 추가 시 메인 서버 수정 없이 서브 프로세스로 추가 가능
  4. 격리: 각 서브 프로세스는 독립적인 메모리 공간을 가지므로 하나의 프로세스 실패가 전체 시스템에 영향 없음

2. 아키텍처 개요

┌─────────────────────────────────────────────────────────────────────────────┐
│                              메인 서버 (server.py)                          │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │  FastMCP (HTTP/stdio)                                                 │  │
│  │  - 요청 수신                                                            │  │
│  │  - 요청 분류 (tool name 기반)                                          │  │
│  │  - 서브 프로세스 관리 (PID, 상태, 리소스)                             │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
│                                    │                                         │
│        ┌───────────────────────────┼───────────────────────────┐           │
│        ▼                           ▼                           ▼           │
│  ┌───────────────┐         ┌───────────────┐         ┌───────────────┐     │
│  │  RAG 서브      │         │   NL2SQL 서브   │         │   P&ID 서브     │     │
│  │  (rag_worker.py)│        │  (nl2sql_worker.py)│    │  (pid_worker.py)│    │
│  └───────────────┘         └───────────────┘         └───────────────┘     │
│        │                           │                           │           │
│        ▼                           ▼                           ▼           │
│  ┌───────────────┐         ┌───────────────┐         ┌───────────────┐     │
│  │  Qdrant 검색   │         │  PostgreSQL   │         │  ezdxf/pyMuPDF  │     │
│  │  + Ollama Embed│         │  + LLM SQL    │         │  + LLM 추출     │     │
│  │  + vLLM LLM    │         │  실행         │         │  + NetworkX     │     │
│  └───────────────┘         └───────────────┘         └───────────────┘     │
│                                                                              │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │  공유 리소스 (메모리, 디스크)                                           │  │
│  │  - Qdrant (외부 서비스)                                                 │  │
│  │  - Ollama (외부 서비스)                                                 │  │
│  │  - vLLM (외부 서비스)                                                   │  │
│  │  - PostgreSQL (외부 서비스)                                             │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

3. 서브 프로세스 구조

3.1 RAG 서브 프로세스 (rag_worker.py)

담당 도구:

  • search_codebase() - 소스코드 검색
  • search_r530_docs() - 공식 문서 검색
  • ask_iiot_llm() - LLM 질문 응답
  • rag_query() - 통합 RAG

특징:

  • Ollama Embedding + Qdrant 검색 + vLLM LLM 조합
  • 메모리: ~2GB (임베딩 모델 + LLM 로드 시)
  • 병렬 처리: 각 요청은 독립적인 LLM 호출 가능

구조:

# rag_worker.py
class RAGWorker:
    def __init__(self):
        self.embed_client = OllamaEmbedClient()
        self.qdrant_client = QdrantClient()
        self.llm_client = VLLMClient()
    
    async def handle_request(self, tool_name: str, params: dict) -> str:
        if tool_name == "search_codebase":
            return await self._search_codebase(params)
        elif tool_name == "ask_iiot_llm":
            return await self._ask_llm(params)
        # ...

3.2 NL2SQL 서브 프로세스 (nl2sql_worker.py)

담당 도구:

  • run_sql() - SQL 실행
  • query_pv_history() - 히스토리 조회
  • get_tag_metadata() - 태그 메타데이터
  • list_drawings() - 도면 목록
  • query_with_nl() - 자연어 → SQL

특징:

  • PostgreSQL 직접 연결
  • LLM SQL 생성 + DB 실행 분리
  • 메모리: ~1GB (SQL 생성용 LLM)

구조:

# nl2sql_worker.py
class NL2SQLWorker:
    def __init__(self):
        self.db_pool = create_db_pool()
        self.llm_client = VLLMClient()
    
    async def handle_request(self, tool_name: str, params: dict) -> str:
        if tool_name == "run_sql":
            return await self._run_sql(params["sql"])
        elif tool_name == "query_with_nl":
            return await self._query_with_nl(params["question"])
        # ...

3.3 P&ID 서브 프로세스 (pid_worker.py)

담당 도구:

  • extract_pid_tags() - 텍스트에서 태그 추출
  • match_pid_tags() - 태그 매핑
  • parse_pid_dxf() - DXF 파싱
  • parse_pid_pdf() - PDF 파싱
  • parse_pid_drawing() - 확장자 자동 감지
  • build_pid_graph_parallel() - 그래프 생성
  • analyze_pid_impact() - 영향도 분석

특징:

  • ezdxf, PyMuPDF, PaddleOCR 로드 (메모리 ~3GB)
  • NetworkX 그래프 처리
  • LLM 기반 태그 추출/매핑
  • 가장 무거운 프로세스
  • 요청 후 종료: 연간 1-2회 사용 예상으로, 요청 완료 후 프로세스 종료

구조:

# pid_worker.py
class PIDWorker:
    def __init__(self):
        self.extractor = PidGeometricExtractor()
        self.topology_builder = PidTopologyBuilder()
        self.mapper = IntelligentMapper()
        self.analyzer = PidAnalysisEngine()
    
    async def handle_request(self, tool_name: str, params: dict) -> str:
        if tool_name == "parse_pid_dxf":
            return await self._parse_dxf(params["filepath"])
        elif tool_name == "build_pid_graph_parallel":
            return await self._build_graph(params["filepath"])
        # ...

4. 메인 서버 구현

4.1 프로세스 관리

# server.py (메인)
import subprocess
import asyncio
from typing import Dict, Optional
from dataclasses import dataclass

@dataclass
class WorkerProcess:
    process: subprocess.Popen
    port: int
    status: str  # "running", "stopped", "error"
    one_shot: bool = False  # 요청 후 프로세스 종료 여부 (P&ID 워커용)

class ProcessManager:
    def __init__(self):
        self.workers: Dict[str, WorkerProcess] = {}
    
    async def start_worker(self, worker_type: str, one_shot: bool = False) -> WorkerProcess:
        """서브 프로세스 시작
        
        Args:
            worker_type: 워커 타입 (rag, nl2sql, pid)
            one_shot: True일 경우 요청 후 프로세스 종료 (P&ID 워커용)
        """
        port = self._get_available_port()
        cmd = [
            sys.executable,
            f"mcp-server/worker/{worker_type}_worker.py",
            str(port)
        ]
        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        
        # 프로세스 시작 대기
        await asyncio.sleep(1)
        
        worker = WorkerProcess(
            process=proc,
            port=port,
            status="running",
            one_shot=one_shot
        )
        self.workers[worker_type] = worker
        return worker
    
    async def stop_worker(self, worker_type: str):
        """서브 프로세스 종료"""
        if worker_type in self.workers:
            proc = self.workers[worker_type].process
            proc.terminate()
            await asyncio.sleep(0.5)
            if proc.poll() is None:
                proc.kill()
            del self.workers[worker_type]
    
    async def get_worker(self, tool_name: str, one_shot: bool = False) -> WorkerProcess:
        """도구 이름에 해당하는 워커 프로세스 반환 (자동 시작)
        
        Args:
            tool_name: 도구 이름
            one_shot: True일 경우 요청 후 프로세스 종료 (P&ID 워커용)
        """
        worker_type = self._classify_tool(tool_name)
        
        if worker_type not in self.workers:
            # 자동 시작
            worker = await self.start_worker(worker_type, one_shot)
            return worker
        
        # 프로세스 상태 확인
        proc = self.workers[worker_type].process
        if proc.poll() is not None:
            # 프로세스 종료됨 - 재시작
            worker = await self.start_worker(worker_type, one_shot)
            return worker
        
        return self.workers[worker_type]
    
    def _classify_tool(self, tool_name: str) -> str:
        """도구 이름을 워커 타입으로 분류"""
        rag_tools = {"search_codebase", "search_r530_docs", "ask_iiot_llm", "rag_query"}
        nl2sql_tools = {"run_sql", "query_pv_history", "get_tag_metadata", "list_drawings", "query_with_nl"}
        pid_tools = {
            "extract_pid_tags", "match_pid_tags", "parse_pid_dxf", "parse_pid_pdf",
            "parse_pid_drawing", "build_pid_graph_parallel", "analyze_pid_impact"
        }
        
        if tool_name in rag_tools:
            return "rag"
        elif tool_name in nl2sql_tools:
            return "nl2sql"
        elif tool_name in pid_tools:
            return "pid"
        else:
            return "default"  # fallback

4.2 요청 라우팅

# server.py (메인)
from fastmcp import FastMCP

mcp = FastMCP("iiot-rag-main", port=5001, json_response=True, stateless_http=True)

# 프로세스 매니저 초기화
process_manager = ProcessManager()

@mcp.tool()
async def search_codebase(query: str, top_k: int = 6) -> str:
    """RAG 워커로 요청 전달"""
    worker = await process_manager.get_worker("search_codebase")
    return await _forward_request(worker.port, "search_codebase", {
        "query": query,
        "top_k": top_k
    })

@mcp.tool()
async def run_sql(sql: str) -> str:
    """NL2SQL 워커로 요청 전달"""
    worker = await process_manager.get_worker("run_sql")
    return await _forward_request(worker.port, "run_sql", {"sql": sql})

@mcp.tool()
async def parse_pid_dxf(filepath: str) -> str:
    """P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
    worker = await process_manager.get_worker("parse_pid_dxf", one_shot=True)
    result = await _forward_request(worker.port, "parse_pid_dxf", {"filepath": filepath}, one_shot=True)
    return result

@mcp.tool()
async def parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
    """P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
    worker = await process_manager.get_worker("parse_pid_pdf", one_shot=True)
    result = await _forward_request(worker.port, "parse_pid_pdf", {"filepath": filepath, "use_ocr": use_ocr}, one_shot=True)
    return result

@mcp.tool()
async def parse_pid_drawing(filepath: str) -> str:
    """P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
    worker = await process_manager.get_worker("parse_pid_drawing", one_shot=True)
    result = await _forward_request(worker.port, "parse_pid_drawing", {"filepath": filepath}, one_shot=True)
    return result

@mcp.tool()
async def extract_pid_tags(text: str, source_type: str) -> str:
    """P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
    worker = await process_manager.get_worker("extract_pid_tags", one_shot=True)
    result = await _forward_request(worker.port, "extract_pid_tags", {"text": text, "source_type": source_type}, one_shot=True)
    return result

@mcp.tool()
async def match_pid_tags(pid_tags: list[str], experion_tags: list[str]) -> str:
    """P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
    worker = await process_manager.get_worker("match_pid_tags", one_shot=True)
    result = await _forward_request(worker.port, "match_pid_tags", {"pid_tags": pid_tags, "experion_tags": experion_tags}, one_shot=True)
    return result

@mcp.tool()
async def build_pid_graph_parallel(filepath: str) -> str:
    """P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
    worker = await process_manager.get_worker("build_pid_graph_parallel", one_shot=True)
    result = await _forward_request(worker.port, "build_pid_graph_parallel", {"filepath": filepath}, one_shot=True)
    return result

@mcp.tool()
async def analyze_pid_impact(graph_id: str, start_node_id: str) -> str:
    """P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
    worker = await process_manager.get_worker("analyze_pid_impact", one_shot=True)
    result = await _forward_request(worker.port, "analyze_pid_impact", {"graph_id": graph_id, "start_node_id": start_node_id}, one_shot=True)
    return result

async def _forward_request(port: int, tool_name: str, params: dict, one_shot: bool = False) -> str:
    """HTTP를 통해 워커 프로세스로 요청 전달
    
    Args:
        port: 워커 포트
        tool_name: 도구 이름
        params: 요청 파라미터
        one_shot: True일 경우 요청 완료 후 워커 종료
    """
    async with httpx.AsyncClient(timeout=300) as client:  # 5분 타임아웃
        endpoint = "/execute/one_shot" if one_shot else "/execute"
        response = await client.post(
            f"http://localhost:{port}{endpoint}",
            json={"tool": tool_name, "params": params}
        )
        response.raise_for_status()
        return response.text

5. 서브 프로세스 구현

5.1 RAG 워커 (worker/rag_worker.py)

#!/usr/bin/env python3
"""RAG 전용 워커 프로세스"""

import sys
import json
import httpx
from fastmcp import FastMCP

# 설정
OLLAMA_URL = "http://localhost:11434"
QDRANT_URL = "http://localhost:6333"
VLLM_BASE_URL = "http://localhost:8000/v1"

# FastMCP 서버 (HTTP 전용)
mcp = FastMCP("rag-worker", port=int(sys.argv[1]), json_response=True, stateless_http=True)

# 도구 구현
@mcp.tool()
async def search_codebase(query: str, top_k: int = 6) -> str:
    # ... 기존 구현 ...

@mcp.tool()
async def search_r530_docs(query: str, top_k: int = 5) -> str:
    # ... 기존 구현 ...

@mcp.tool()
async def ask_iiot_llm(question: str, context: str = "") -> str:
    # ... 기존 구현 ...

@mcp.tool()
async def rag_query(question: str, search_code: bool = False, search_docs: bool = True) -> str:
    # ... 기존 구현 ...

# HTTP 엔드포인트 (FastMCP 대신 직접 구현)
from fastapi import FastAPI
import uvicorn

app = FastAPI()

@app.post("/execute")
async def execute(request: dict):
    tool = request["tool"]
    params = request["params"]
    
    # 도구 호출
    if tool == "search_codebase":
        result = await search_codebase(**params)
    elif tool == "ask_iiot_llm":
        result = await ask_iiot_llm(**params)
    # ...
    
    return result

# P&ID 워커 전용: 요청 완료 후 종료
@app.post("/execute/one_shot")
async def execute_one_shot(request: dict):
    """one_shot 모드 - 요청 완료 후 프로세스 종료"""
    tool = request["tool"]
    params = request["params"]
    
    # 도구 호출
    if tool == "parse_pid_dxf":
        result = await parse_pid_dxf(**params)
    elif tool == "parse_pid_pdf":
        result = await parse_pid_pdf(**params)
    elif tool == "extract_pid_tags":
        result = await extract_pid_tags(**params)
    # ...
    
    # 프로세스 종료 (graceful shutdown)
    import os
    import signal
    os.kill(os.getpid(), signal.SIGTERM)
    
    return result

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))

5.2 NL2SQL 워커 (worker/nl2sql_worker.py)

#!/usr/bin/env python3
"""NL2SQL 전용 워커 프로세스"""

import sys
import json
import psycopg
from fastmcp import FastMCP

DB_CONNECTION_STRING = "postgresql://postgres:postgres@localhost:5432/iiot_platform"

mcp = FastMCP("nl2sql-worker", port=int(sys.argv[1]), json_response=True, stateless_http=True)

@mcp.tool()
async def run_sql(sql: str) -> str:
    # ... 기존 구현 ...

@mcp.tool()
async def query_pv_history(tag_names: list[str], time_from: str, time_to: str, limit: int = 100) -> str:
    # ... 기존 구현 ...

# ... 나머지 도구 ...

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))

5.3 P&ID 워커 (worker/pid_worker.py)

#!/usr/bin/env python3
"""P&ID 파싱 전용 워커 프로세스"""

import sys
import json
from fastmcp import FastMCP

mcp = FastMCP("pid-worker", port=int(sys.argv[1]), json_response=True, stateless_http=True)

# Pipeline imports
from pipeline.extractor import PidGeometricExtractor
from pipeline.topology import PidTopologyBuilder
from pipeline.mapper import IntelligentMapper
from pipeline.analyzer import PidAnalysisEngine

@mcp.tool()
async def extract_pid_tags(text: str, source_type: str) -> str:
    # ... 기존 구현 ...

@mcp.tool()
async def parse_pid_dxf(filepath: str) -> str:
    # ... 기존 구현 ...

@mcp.tool()
async def build_pid_graph_parallel(filepath: str) -> str:
    # ... 기존 구현 ...

# ... 나머지 도구 ...

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))

6. 실행 및 배포

6.1 실행 순서

# 1. 메인 서버 실행
cd mcp-server
python server.py --http

# 2. 메인 서버가 자동으로 워커 프로세스 시작
#    - RAG 워커 (port 5002) - 메인 서버 종료 시까지 유지
#    - NL2SQL 워커 (port 5003) - 메인 서버 종료 시까지 유지
#    - P&ID 워커 (port 5004) - 요청 후 즉시 종료 (연간 1-2회 사용)

# 3. P&ID 요청 시:
#    - 워커 자동 시작 → 요청 처리 → 요청 완료 후 즉시 종료
#    - 다음 요청 시 다시 시작

6.2 리소스 관리

워커 메모리 CPU 포트 자동 시작 생명주기
RAG ~2GB 2 cores 5002 요청 시 메인 서버 종료 시까지 유지
NL2SQL ~1GB 1 core 5003 요청 시 메인 서버 종료 시까지 유지
P&ID ~3GB 2 cores 5004 요청 시 요청 후 즉시 종료 (연간 1-2회 사용)

메모리 최적화:

  • P&ID 워커는 요청 완료 후 즉시 종료되어 메모리 해제
  • RAG/NL2SQL 워커는 자주 사용되므로 메인 서버와 함께 유지

6.3 프로세스 상태 모니터링

# server.py에 추가
@mcp.tool()
def get_worker_status() -> str:
    """모든 워커 프로세스 상태 조회"""
    status = {}
    for name, worker in process_manager.workers.items():
        status[name] = {
            "pid": worker.process.pid,
            "status": worker.status,
            "port": worker.port
        }
    return json.dumps(status, ensure_ascii=False, indent=2)

7. 장점 및 단점

7.1 장점

항목 설명
병렬 처리 요청 간 차단 없이 동시에 처리 가능
확장성 새로운 도구 추가 시 워커 프로세스만 추가
격리 하나의 워커 실패가 전체 시스템에 영향 없음
리소스 최적화 CPU 코어 수에 따라 워커 수 조정 가능
유지보수 각 워커는 독립적으로 개발/테스트 가능

7.2 단점

항목 설명 완화 방안
메모리 사용량 증가 워커당 1-3GB, 총 6GB+ 워커 수 제한, LRU 캐시
프로세스 관리 복잡도 프로세스 시작/종료/재시작 로직 자동 관리, 상태 모니터링
네트워크 오버헤드 HTTP 통신 추가 로컬 통신, 커넥션 풀

8. P&ID 워커 생명주기 (요청 후 종료)

설계 원칙: 연간 1-2회 사용 예정인 P&ID 파싱은 요청 후 즉시 종료

8.1 동작 방식

1. 사용자가 P&ID 파싱 요청 → 메인 서버가 P&ID 워커 시작 (port 5004)
2. 워커가 요청 처리 (ezdxf + LLM 추출 + NetworkX)
3. 요청 완료 → /execute/one_shot 엔드포인트가 SIGTERM 전달
4. 워커 프로세스 종료 → 메모리 해제
5. 다음 요청 시 → 다시 시작 (1~2초 소요)

8.2 구현 로직

메인 서버 (server.py):

@mcp.tool()
async def parse_pid_dxf(filepath: str) -> str:
    worker = await process_manager.get_worker("parse_pid_dxf", one_shot=True)
    result = await _forward_request(worker.port, "parse_pid_dxf", {"filepath": filepath}, one_shot=True)
    return result  # 워커는 요청 완료 후 자동 종료됨

워커 (pid_worker.py):

@app.post("/execute/one_shot")
async def execute_one_shot(request: dict):
    tool = request["tool"]
    params = request["params"]
    
    # 도구 호출
    if tool == "parse_pid_dxf":
        result = await parse_pid_dxf(**params)
    
    # 프로세스 종료
    os.kill(os.getpid(), signal.SIGTERM)
    return result

8.3 장점

항목 설명
메모리 절약 요청 완료 후 3GB 메모리 즉시 해제
리소스 효율 다른 프로세스가 메모리 사용 가능
단순한 구조 별도의 타이머/스케줄러 불필요

8.4 단점

항목 설명 완화 방안
시작 오버헤드 매 요청 시 1~2초 소요 연간 1-2회 사용이므로 허용 가능
모델 로딩 시간 ezdxf, PyMuPDF, PaddleOCR 매번 로드 향후 워커 풀로 개선 가능

9. 향후 개선 방향

  1. 워커 풀: 각 워커 타입당 여러 프로세스 실행 (예: P&ID 워커 3개)
  2. 자동 스케일링: 요청량에 따라 워커 수 자동 조정
  3. 공유 메모리: 대용량 데이터 전달 시 메모리 공유 (shared memory)
  4. 그룹화: 유사 도구를 하나의 워커에 통합 (예: RAG 도구 4개 → 1 워커)

10. 구현 체크리스트

  • mcp-server/worker/ 디렉토리 생성
  • rag_worker.py 구현
  • nl2sql_worker.py 구현
  • pid_worker.py 구현
  • 메인 서버에 ProcessManager 클래스 추가
  • 요청 라우팅 로직 구현
  • 프로세스 상태 모니터링 도구 추가
  • 테스트: 각 워커 독립 실행
  • 테스트: 병렬 요청 처리
  • 문서 업데이트

문서 버전: 1.1 작성일: 2026-05-02 수정일: 2026-05-02 (P&ID 워커 생명주기 추가) 작성자: AI Assistant 수정자: AI Assistant 수정 내용: 연간 1-2회 사용 예정인 P&ID 워커에 대해 요청 후 즉시 종료하는 one_shot 모드 추가

위 설계의 문제점 진단 및 수정 권고사항 반영

진단 개요: 계획서의 구현 코드 예제에 4개의 치명적 오류와 6개의 심각한 설계 결함, 3개의 경미한 문제가 발견되었습니다. 현재 단일 모놀리스 server.py는 잘 동작 중이므로, 병렬 아키텍처 구현 시 반드시 아래 수정사항을 반영해야 합니다.


🔴 치명적 오류 (런타임 즉시 실패)

# 문제 위치 설명 수정 방안
1 RAG 워커에 P&ID 도구 핸들러 혼입 §5.1, L419-438 rag_worker.py/execute/one_shot 엔드포인트에 parse_pid_dxf 등 P&ID 도구 핸들러가 존재 /execute/one_shot 엔드포인트 자체를 제거. RAG 워커는 one_shot 모드 없음
2 NL2SQL·P&ID 워커에서 app 미정의 §5.2 L471, §5.3 L507 uvicorn.run(app, ...) 호출 시 app = FastAPI() 선언이 없음 각 워커 파일 상단에 app = FastAPI() 추가
3 SIGTERM이 HTTP 응답보다 먼저 실행됨 §8.2, L621-622 os.kill(os.getpid(), signal.SIGTERM) 직후 return result → uvicorn이 즉시 종료 시작 BackgroundTask로 종료를 500ms 지연
4 FastAPI가 dict 타입 요청 바디를 거부 §5.1, L404-416 async def execute(request: dict)는 FastAPI에서 지원하지 않음 request: Request로 변경 후 await request.json() 사용

🟠 심각한 설계 결함

# 문제 위치 설명 수정 방안
5 get_worker() Race Condition §4.1, L222-243 두 요청이 동시에 진입하면 워커 프로세스 2개가 시작됨 asyncio.Lock per worker_type 사용
6 one_shot + 동시 요청 시 강제 종료 §8 요청 A 완료 시 SIGTERM → 요청 B 처리 중 강제 종료 asyncio.Semaphore(1)로 P&ID 요청 직렬화
7 워커 준비 완료 판단이 sleep(1) 고정 §4.1, L200-201 P&ID 워커는 임포트만 수초 이상 걸림 헬스체크 루프로 대체 (최대 15초 대기)
8 subprocess.Popen(stdout=PIPE) 데드락 위험 §4.1, L198 대량 출력 시 파이프 버퍼 가득 참 → 데드락 stdout=subprocess.DEVNULL 또는 파일 리다이렉션
9 analyze_pid_impact 그래프 상태 유실 §3.3 워커 종료 시 메모리 내 그래프 사라짐 build_pid_graph_parallelmcp-server/storage/{graph_id}.json에 저장 (이미 구현됨)
10 메인 서버 종료 시 워커 정리 훅 없음 §4.1 atexit 또는 __del__ 등록 없음 atexit.register(self._cleanup)signal.signal 등록

🟡 경미한 문제

# 문제 위치 설명 수정 방안
11 메모리 추정 오류 §6.2 "RAG: ~2GB"는 vLLM 외부 서비스 포함 추정. 워커 자체는 ~200MB 주석 추가: "vLLM 외부 서비스 사용 시 워커 자체는 ~200MB"
12 one_shot 플래그가 workers 딕셔너리에 남음 §4.1, L237-243 워커 종료 후 상태 불일치 간격 발생 del self.workers[worker_type] 즉시 실행
13 P&ID 5분 타임아웃 불충분 가능성 §4.2, L348 대형 도면 시 5분 부족 timeout=600(10분) 또는 설정 가능하도록 변경

수정 완료된 설계 예제

🔴 Fix 1 — RAG 워커의 one_shot 엔드포인트 제거

# rag_worker.py
# /execute/one_shot 엔드포인트 자체를 제거
# RAG 워커는 one_shot 모드 없음 — pid_worker.py에만 존재

🔴 Fix 2 & 4 — app = FastAPI() 추가 + Request.json()

# nl2sql_worker.py, pid_worker.py 공통
from fastapi import FastAPI, Request
import uvicorn

app = FastAPI()

@app.post("/execute")
async def execute(request: Request):
    body = await request.json()
    tool = body["tool"]
    params = body["params"]
    
    if tool == "run_sql":
        result = await run_sql(**params)
    # ...

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))

🔴 Fix 3 — SIGTERM → BackgroundTask 지연

from fastapi import BackgroundTask

def _shutdown_later():
    async def _do():
        await asyncio.sleep(0.5)
        os.kill(os.getpid(), signal.SIGTERM)
    asyncio.create_task(_do())

@app.post("/execute/one_shot")
async def execute_one_shot(request: Request):
    body = await request.json()
    result = await _dispatch(body["tool"], body["params"])
    _shutdown_later()  # 응답 반환 후 종료 예약
    return result

🟠 Fix 5 — get_worker() Race Condition → asyncio.Lock

class ProcessManager:
    def __init__(self):
        self.workers: Dict[str, WorkerProcess] = {}
        self._locks: Dict[str, asyncio.Lock] = {}

    async def get_worker(self, tool_name: str, one_shot: bool = False) -> WorkerProcess:
        worker_type = self._classify_tool(tool_name)

        if worker_type not in self._locks:
            self._locks[worker_type] = asyncio.Lock()

        async with self._locks[worker_type]:  # 동시 진입 차단
            if worker_type not in self.workers:
                return await self.start_worker(worker_type, one_shot)
            proc = self.workers[worker_type].process
            if proc.poll() is not None:
                del self.workers[worker_type]
                return await self.start_worker(worker_type, one_shot)
            return self.workers[worker_type]

🟠 Fix 6 — P&ID one_shot + 동시 요청 충돌 → 세마포어

class ProcessManager:
    def __init__(self):
        ...
        self._pid_sem = asyncio.Semaphore(1)  # P&ID는 1개 동시 실행만 허용

# 메인 서버의 P&ID 도구들
@mcp.tool()
async def parse_pid_dxf(filepath: str) -> str:
    async with process_manager._pid_sem:  # 동시 P&ID 요청 직렬화
        worker = await process_manager.get_worker("parse_pid_dxf", one_shot=True)
        return await _forward_request(worker.port, "parse_pid_dxf", {"filepath": filepath}, one_shot=True)

🟠 Fix 7 — 헬스체크 루프

async def start_worker(self, worker_type: str, one_shot: bool = False) -> WorkerProcess:
    port = self._get_available_port()
    proc = subprocess.Popen(
        [sys.executable, f"mcp-server/worker/{worker_type}_worker.py", str(port)],
        stdout=subprocess.DEVNULL,  # Fix 8 포함
        stderr=subprocess.DEVNULL,
    )

    # sleep(1) 대신 실제 헬스체크
    for _ in range(30):  # 최대 15초 대기
        await asyncio.sleep(0.5)
        if proc.poll() is not None:
            raise RuntimeError(f"{worker_type} 워커가 시작 직후 종료됨")
        try:
            async with httpx.AsyncClient(timeout=1) as c:
                await c.get(f"http://localhost:{port}/health")
            break  # 헬스체크 성공
        except Exception:
            continue
    else:
        proc.kill()
        raise RuntimeError(f"{worker_type} 워커 시작 타임아웃")

    worker = WorkerProcess(process=proc, port=port, status="running", one_shot=one_shot)
    self.workers[worker_type] = worker
    return worker

각 워커에 /health 엔드포인트 추가:

@app.get("/health")
async def health():
    return {"status": "ok"}

🟠 Fix 8 — PIPE → DEVNULL/파일

# 로그를 남기려면 파일로
log_file = open(f"logs/{worker_type}_worker.log", "a")
proc = subprocess.Popen(cmd, stdout=log_file, stderr=log_file)

🟠 Fix 9 — 메인 서버 종료 시 고아 프로세스 방지

import atexit, signal

class ProcessManager:
    def __init__(self):
        ...
        atexit.register(self._cleanup)
        signal.signal(signal.SIGTERM, lambda *_: self._cleanup())

    def _cleanup(self):
        for wtype, worker in list(self.workers.items()):
            try:
                worker.process.terminate()
            except Exception:
                pass
        self.workers.clear()

📋 요약 체크리스트

# 수정 항목 난이도 상태
1 RAG 워커 P&ID 핸들러 제거 쉬움
2 app = FastAPI() 추가 쉬움
3 SIGTERM → BackgroundTask 지연 보통
4 dict → Request.json() 쉬움
5 asyncio.Lock per worker 보통
6 P&ID 세마포어 직렬화 보통
7 sleep(1) → 헬스체크 루프 보통
8 PIPE → DEVNULL/파일 쉬움
9 atexit 정리 훅 쉬움

📝 참고: analyze_pid_impact 상태 영속화

build_pid_graph_parallel이 결과를 mcp-server/storage/{graph_id}.json 파일로 저장하므로, P&ID 워커가 종료되더라도 다음 analyze_pid_impact 호출은 파일을 읽어 그래프를 복원할 수 있습니다. (현재 구현 기준 이미 해결됨)