Files
ExperionCrawler/mcp-server/mcp-parallel-plan.md

902 lines
33 KiB
Markdown

# MCP 서버 병렬 아키텍처 설계 문서
## 1. 설계 개요
### 1.1 문제 인식
현재 MCP 서버는 단일 프로세스에서 모든 요청을 순차적으로 처리하는 구조로, 다음과 같은 문제점이 있음:
| 문제 | 설명 | 영향 |
|------|------|------|
| **단일 프로세스** | 모든 도구가 동일한 프로세스에서 실행 | CPU 자원 미사용 |
| **순차 처리** | 긴 요청이 완료될 때까지 다른 요청 대기 | 응답 지연 |
| **LLM 병목** | `ask_iiot_llm`, `query_with_nl`, `extract_pid_tags` 등 LLM 호출이 순차 실행 | 요청 간 차단 |
| **P&ID 파싱 병목** | Phase 2 위상 빌더 O(n²) 복잡도 + Phase 3 LLM 매핑 | 수분 이상 소요 |
### 1.2 설계 목표
1. **하드웨어 자원 최적화**: 멀티프로세스를 활용하여 CPU 코어 전체 사용
2. **병렬 처리**: 독립적인 요청을 동시에 처리하여 대기 시간 최소화
3. **확장성**: 새로운 도구 추가 시 메인 서버 수정 없이 서브 프로세스로 추가 가능
4. **격리**: 각 서브 프로세스는 독립적인 메모리 공간을 가지므로 하나의 프로세스 실패가 전체 시스템에 영향 없음
---
## 2. 아키텍처 개요
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ 메인 서버 (server.py) │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ FastMCP (HTTP/stdio) │ │
│ │ - 요청 수신 │ │
│ │ - 요청 분류 (tool name 기반) │ │
│ │ - 서브 프로세스 관리 (PID, 상태, 리소스) │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────┼───────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ RAG 서브 │ │ NL2SQL 서브 │ │ P&ID 서브 │ │
│ │ (rag_worker.py)│ │ (nl2sql_worker.py)│ │ (pid_worker.py)│ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ Qdrant 검색 │ │ PostgreSQL │ │ ezdxf/pyMuPDF │ │
│ │ + Ollama Embed│ │ + LLM SQL │ │ + LLM 추출 │ │
│ │ + vLLM LLM │ │ 실행 │ │ + NetworkX │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ 공유 리소스 (메모리, 디스크) │ │
│ │ - Qdrant (외부 서비스) │ │
│ │ - Ollama (외부 서비스) │ │
│ │ - vLLM (외부 서비스) │ │
│ │ - PostgreSQL (외부 서비스) │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
```
---
## 3. 서브 프로세스 구조
### 3.1 RAG 서브 프로세스 (`rag_worker.py`)
**담당 도구**:
- `search_codebase()` - 소스코드 검색
- `search_r530_docs()` - 공식 문서 검색
- `ask_iiot_llm()` - LLM 질문 응답
- `rag_query()` - 통합 RAG
**특징**:
- Ollama Embedding + Qdrant 검색 + vLLM LLM 조합
- 메모리: ~2GB (임베딩 모델 + LLM 로드 시)
- 병렬 처리: 각 요청은 독립적인 LLM 호출 가능
**구조**:
```python
# rag_worker.py
class RAGWorker:
def __init__(self):
self.embed_client = OllamaEmbedClient()
self.qdrant_client = QdrantClient()
self.llm_client = VLLMClient()
async def handle_request(self, tool_name: str, params: dict) -> str:
if tool_name == "search_codebase":
return await self._search_codebase(params)
elif tool_name == "ask_iiot_llm":
return await self._ask_llm(params)
# ...
```
### 3.2 NL2SQL 서브 프로세스 (`nl2sql_worker.py`)
**담당 도구**:
- `run_sql()` - SQL 실행
- `query_pv_history()` - 히스토리 조회
- `get_tag_metadata()` - 태그 메타데이터
- `list_drawings()` - 도면 목록
- `query_with_nl()` - 자연어 → SQL
**특징**:
- PostgreSQL 직접 연결
- LLM SQL 생성 + DB 실행 분리
- 메모리: ~1GB (SQL 생성용 LLM)
**구조**:
```python
# nl2sql_worker.py
class NL2SQLWorker:
def __init__(self):
self.db_pool = create_db_pool()
self.llm_client = VLLMClient()
async def handle_request(self, tool_name: str, params: dict) -> str:
if tool_name == "run_sql":
return await self._run_sql(params["sql"])
elif tool_name == "query_with_nl":
return await self._query_with_nl(params["question"])
# ...
```
### 3.3 P&ID 서브 프로세스 (`pid_worker.py`)
**담당 도구**:
- `extract_pid_tags()` - 텍스트에서 태그 추출
- `match_pid_tags()` - 태그 매핑
- `parse_pid_dxf()` - DXF 파싱
- `parse_pid_pdf()` - PDF 파싱
- `parse_pid_drawing()` - 확장자 자동 감지
- `build_pid_graph_parallel()` - 그래프 생성
- `analyze_pid_impact()` - 영향도 분석
**특징**:
- ezdxf, PyMuPDF, PaddleOCR 로드 (메모리 ~3GB)
- NetworkX 그래프 처리
- LLM 기반 태그 추출/매핑
- **가장 무거운 프로세스**
- **요청 후 종료**: 연간 1-2회 사용 예상으로, 요청 완료 후 프로세스 종료
**구조**:
```python
# pid_worker.py
class PIDWorker:
def __init__(self):
self.extractor = PidGeometricExtractor()
self.topology_builder = PidTopologyBuilder()
self.mapper = IntelligentMapper()
self.analyzer = PidAnalysisEngine()
async def handle_request(self, tool_name: str, params: dict) -> str:
if tool_name == "parse_pid_dxf":
return await self._parse_dxf(params["filepath"])
elif tool_name == "build_pid_graph_parallel":
return await self._build_graph(params["filepath"])
# ...
```
---
## 4. 메인 서버 구현
### 4.1 프로세스 관리
```python
# server.py (메인)
import subprocess
import asyncio
from typing import Dict, Optional
from dataclasses import dataclass
@dataclass
class WorkerProcess:
process: subprocess.Popen
port: int
status: str # "running", "stopped", "error"
one_shot: bool = False # 요청 후 프로세스 종료 여부 (P&ID 워커용)
class ProcessManager:
def __init__(self):
self.workers: Dict[str, WorkerProcess] = {}
async def start_worker(self, worker_type: str, one_shot: bool = False) -> WorkerProcess:
"""서브 프로세스 시작
Args:
worker_type: 워커 타입 (rag, nl2sql, pid)
one_shot: True일 경우 요청 후 프로세스 종료 (P&ID 워커용)
"""
port = self._get_available_port()
cmd = [
sys.executable,
f"mcp-server/worker/{worker_type}_worker.py",
str(port)
]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 프로세스 시작 대기
await asyncio.sleep(1)
worker = WorkerProcess(
process=proc,
port=port,
status="running",
one_shot=one_shot
)
self.workers[worker_type] = worker
return worker
async def stop_worker(self, worker_type: str):
"""서브 프로세스 종료"""
if worker_type in self.workers:
proc = self.workers[worker_type].process
proc.terminate()
await asyncio.sleep(0.5)
if proc.poll() is None:
proc.kill()
del self.workers[worker_type]
async def get_worker(self, tool_name: str, one_shot: bool = False) -> WorkerProcess:
"""도구 이름에 해당하는 워커 프로세스 반환 (자동 시작)
Args:
tool_name: 도구 이름
one_shot: True일 경우 요청 후 프로세스 종료 (P&ID 워커용)
"""
worker_type = self._classify_tool(tool_name)
if worker_type not in self.workers:
# 자동 시작
worker = await self.start_worker(worker_type, one_shot)
return worker
# 프로세스 상태 확인
proc = self.workers[worker_type].process
if proc.poll() is not None:
# 프로세스 종료됨 - 재시작
worker = await self.start_worker(worker_type, one_shot)
return worker
return self.workers[worker_type]
def _classify_tool(self, tool_name: str) -> str:
"""도구 이름을 워커 타입으로 분류"""
rag_tools = {"search_codebase", "search_r530_docs", "ask_iiot_llm", "rag_query"}
nl2sql_tools = {"run_sql", "query_pv_history", "get_tag_metadata", "list_drawings", "query_with_nl"}
pid_tools = {
"extract_pid_tags", "match_pid_tags", "parse_pid_dxf", "parse_pid_pdf",
"parse_pid_drawing", "build_pid_graph_parallel", "analyze_pid_impact"
}
if tool_name in rag_tools:
return "rag"
elif tool_name in nl2sql_tools:
return "nl2sql"
elif tool_name in pid_tools:
return "pid"
else:
return "default" # fallback
```
### 4.2 요청 라우팅
```python
# server.py (메인)
from fastmcp import FastMCP
mcp = FastMCP("iiot-rag-main", port=5001, json_response=True, stateless_http=True)
# 프로세스 매니저 초기화
process_manager = ProcessManager()
@mcp.tool()
async def search_codebase(query: str, top_k: int = 6) -> str:
"""RAG 워커로 요청 전달"""
worker = await process_manager.get_worker("search_codebase")
return await _forward_request(worker.port, "search_codebase", {
"query": query,
"top_k": top_k
})
@mcp.tool()
async def run_sql(sql: str) -> str:
"""NL2SQL 워커로 요청 전달"""
worker = await process_manager.get_worker("run_sql")
return await _forward_request(worker.port, "run_sql", {"sql": sql})
@mcp.tool()
async def parse_pid_dxf(filepath: str) -> str:
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
worker = await process_manager.get_worker("parse_pid_dxf", one_shot=True)
result = await _forward_request(worker.port, "parse_pid_dxf", {"filepath": filepath}, one_shot=True)
return result
@mcp.tool()
async def parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
worker = await process_manager.get_worker("parse_pid_pdf", one_shot=True)
result = await _forward_request(worker.port, "parse_pid_pdf", {"filepath": filepath, "use_ocr": use_ocr}, one_shot=True)
return result
@mcp.tool()
async def parse_pid_drawing(filepath: str) -> str:
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
worker = await process_manager.get_worker("parse_pid_drawing", one_shot=True)
result = await _forward_request(worker.port, "parse_pid_drawing", {"filepath": filepath}, one_shot=True)
return result
@mcp.tool()
async def extract_pid_tags(text: str, source_type: str) -> str:
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
worker = await process_manager.get_worker("extract_pid_tags", one_shot=True)
result = await _forward_request(worker.port, "extract_pid_tags", {"text": text, "source_type": source_type}, one_shot=True)
return result
@mcp.tool()
async def match_pid_tags(pid_tags: list[str], experion_tags: list[str]) -> str:
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
worker = await process_manager.get_worker("match_pid_tags", one_shot=True)
result = await _forward_request(worker.port, "match_pid_tags", {"pid_tags": pid_tags, "experion_tags": experion_tags}, one_shot=True)
return result
@mcp.tool()
async def build_pid_graph_parallel(filepath: str) -> str:
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
worker = await process_manager.get_worker("build_pid_graph_parallel", one_shot=True)
result = await _forward_request(worker.port, "build_pid_graph_parallel", {"filepath": filepath}, one_shot=True)
return result
@mcp.tool()
async def analyze_pid_impact(graph_id: str, start_node_id: str) -> str:
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
worker = await process_manager.get_worker("analyze_pid_impact", one_shot=True)
result = await _forward_request(worker.port, "analyze_pid_impact", {"graph_id": graph_id, "start_node_id": start_node_id}, one_shot=True)
return result
async def _forward_request(port: int, tool_name: str, params: dict, one_shot: bool = False) -> str:
"""HTTP를 통해 워커 프로세스로 요청 전달
Args:
port: 워커 포트
tool_name: 도구 이름
params: 요청 파라미터
one_shot: True일 경우 요청 완료 후 워커 종료
"""
async with httpx.AsyncClient(timeout=300) as client: # 5분 타임아웃
endpoint = "/execute/one_shot" if one_shot else "/execute"
response = await client.post(
f"http://localhost:{port}{endpoint}",
json={"tool": tool_name, "params": params}
)
response.raise_for_status()
return response.text
```
---
## 5. 서브 프로세스 구현
### 5.1 RAG 워커 (`worker/rag_worker.py`)
```python
#!/usr/bin/env python3
"""RAG 전용 워커 프로세스"""
import sys
import json
import httpx
from fastmcp import FastMCP
# 설정
OLLAMA_URL = "http://localhost:11434"
QDRANT_URL = "http://localhost:6333"
VLLM_BASE_URL = "http://localhost:8000/v1"
# FastMCP 서버 (HTTP 전용)
mcp = FastMCP("rag-worker", port=int(sys.argv[1]), json_response=True, stateless_http=True)
# 도구 구현
@mcp.tool()
async def search_codebase(query: str, top_k: int = 6) -> str:
# ... 기존 구현 ...
@mcp.tool()
async def search_r530_docs(query: str, top_k: int = 5) -> str:
# ... 기존 구현 ...
@mcp.tool()
async def ask_iiot_llm(question: str, context: str = "") -> str:
# ... 기존 구현 ...
@mcp.tool()
async def rag_query(question: str, search_code: bool = False, search_docs: bool = True) -> str:
# ... 기존 구현 ...
# HTTP 엔드포인트 (FastMCP 대신 직접 구현)
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.post("/execute")
async def execute(request: dict):
tool = request["tool"]
params = request["params"]
# 도구 호출
if tool == "search_codebase":
result = await search_codebase(**params)
elif tool == "ask_iiot_llm":
result = await ask_iiot_llm(**params)
# ...
return result
# P&ID 워커 전용: 요청 완료 후 종료
@app.post("/execute/one_shot")
async def execute_one_shot(request: dict):
"""one_shot 모드 - 요청 완료 후 프로세스 종료"""
tool = request["tool"]
params = request["params"]
# 도구 호출
if tool == "parse_pid_dxf":
result = await parse_pid_dxf(**params)
elif tool == "parse_pid_pdf":
result = await parse_pid_pdf(**params)
elif tool == "extract_pid_tags":
result = await extract_pid_tags(**params)
# ...
# 프로세스 종료 (graceful shutdown)
import os
import signal
os.kill(os.getpid(), signal.SIGTERM)
return result
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))
```
### 5.2 NL2SQL 워커 (`worker/nl2sql_worker.py`)
```python
#!/usr/bin/env python3
"""NL2SQL 전용 워커 프로세스"""
import sys
import json
import psycopg
from fastmcp import FastMCP
DB_CONNECTION_STRING = "postgresql://postgres:postgres@localhost:5432/iiot_platform"
mcp = FastMCP("nl2sql-worker", port=int(sys.argv[1]), json_response=True, stateless_http=True)
@mcp.tool()
async def run_sql(sql: str) -> str:
# ... 기존 구현 ...
@mcp.tool()
async def query_pv_history(tag_names: list[str], time_from: str, time_to: str, limit: int = 100) -> str:
# ... 기존 구현 ...
# ... 나머지 도구 ...
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))
```
### 5.3 P&ID 워커 (`worker/pid_worker.py`)
```python
#!/usr/bin/env python3
"""P&ID 파싱 전용 워커 프로세스"""
import sys
import json
from fastmcp import FastMCP
mcp = FastMCP("pid-worker", port=int(sys.argv[1]), json_response=True, stateless_http=True)
# Pipeline imports
from pipeline.extractor import PidGeometricExtractor
from pipeline.topology import PidTopologyBuilder
from pipeline.mapper import IntelligentMapper
from pipeline.analyzer import PidAnalysisEngine
@mcp.tool()
async def extract_pid_tags(text: str, source_type: str) -> str:
# ... 기존 구현 ...
@mcp.tool()
async def parse_pid_dxf(filepath: str) -> str:
# ... 기존 구현 ...
@mcp.tool()
async def build_pid_graph_parallel(filepath: str) -> str:
# ... 기존 구현 ...
# ... 나머지 도구 ...
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))
```
---
## 6. 실행 및 배포
### 6.1 실행 순서
```bash
# 1. 메인 서버 실행
cd mcp-server
python server.py --http
# 2. 메인 서버가 자동으로 워커 프로세스 시작
# - RAG 워커 (port 5002) - 메인 서버 종료 시까지 유지
# - NL2SQL 워커 (port 5003) - 메인 서버 종료 시까지 유지
# - P&ID 워커 (port 5004) - 요청 후 즉시 종료 (연간 1-2회 사용)
# 3. P&ID 요청 시:
# - 워커 자동 시작 → 요청 처리 → 요청 완료 후 즉시 종료
# - 다음 요청 시 다시 시작
```
### 6.2 리소스 관리
| 워커 | 메모리 | CPU | 포트 | 자동 시작 | 생명주기 |
|------|--------|-----|------|-----------|----------|
| RAG | ~2GB | 2 cores | 5002 | 요청 시 | 메인 서버 종료 시까지 유지 |
| NL2SQL | ~1GB | 1 core | 5003 | 요청 시 | 메인 서버 종료 시까지 유지 |
| P&ID | ~3GB | 2 cores | 5004 | 요청 시 | **요청 후 즉시 종료** (연간 1-2회 사용) |
**메모리 최적화**:
- P&ID 워커는 요청 완료 후 즉시 종료되어 메모리 해제
- RAG/NL2SQL 워커는 자주 사용되므로 메인 서버와 함께 유지
### 6.3 프로세스 상태 모니터링
```python
# server.py에 추가
@mcp.tool()
def get_worker_status() -> str:
"""모든 워커 프로세스 상태 조회"""
status = {}
for name, worker in process_manager.workers.items():
status[name] = {
"pid": worker.process.pid,
"status": worker.status,
"port": worker.port
}
return json.dumps(status, ensure_ascii=False, indent=2)
```
---
## 7. 장점 및 단점
### 7.1 장점
| 항목 | 설명 |
|------|------|
| **병렬 처리** | 요청 간 차단 없이 동시에 처리 가능 |
| **확장성** | 새로운 도구 추가 시 워커 프로세스만 추가 |
| **격리** | 하나의 워커 실패가 전체 시스템에 영향 없음 |
| **리소스 최적화** | CPU 코어 수에 따라 워커 수 조정 가능 |
| **유지보수** | 각 워커는 독립적으로 개발/테스트 가능 |
### 7.2 단점
| 항목 | 설명 | 완화 방안 |
|------|------|-----------|
| **메모리 사용량 증가** | 워커당 1-3GB, 총 6GB+ | 워커 수 제한, LRU 캐시 |
| **프로세스 관리 복잡도** | 프로세스 시작/종료/재시작 로직 | 자동 관리, 상태 모니터링 |
| **네트워크 오버헤드** | HTTP 통신 추가 | 로컬 통신, 커넥션 풀 |
---
## 8. P&ID 워커 생명주기 (요청 후 종료)
**설계 원칙**: 연간 1-2회 사용 예정인 P&ID 파싱은 요청 후 즉시 종료
### 8.1 동작 방식
```
1. 사용자가 P&ID 파싱 요청 → 메인 서버가 P&ID 워커 시작 (port 5004)
2. 워커가 요청 처리 (ezdxf + LLM 추출 + NetworkX)
3. 요청 완료 → /execute/one_shot 엔드포인트가 SIGTERM 전달
4. 워커 프로세스 종료 → 메모리 해제
5. 다음 요청 시 → 다시 시작 (1~2초 소요)
```
### 8.2 구현 로직
**메인 서버** (`server.py`):
```python
@mcp.tool()
async def parse_pid_dxf(filepath: str) -> str:
worker = await process_manager.get_worker("parse_pid_dxf", one_shot=True)
result = await _forward_request(worker.port, "parse_pid_dxf", {"filepath": filepath}, one_shot=True)
return result # 워커는 요청 완료 후 자동 종료됨
```
**워커** (`pid_worker.py`):
```python
@app.post("/execute/one_shot")
async def execute_one_shot(request: dict):
tool = request["tool"]
params = request["params"]
# 도구 호출
if tool == "parse_pid_dxf":
result = await parse_pid_dxf(**params)
# 프로세스 종료
os.kill(os.getpid(), signal.SIGTERM)
return result
```
### 8.3 장점
| 항목 | 설명 |
|------|------|
| **메모리 절약** | 요청 완료 후 3GB 메모리 즉시 해제 |
| **리소스 효율** | 다른 프로세스가 메모리 사용 가능 |
| **단순한 구조** | 별도의 타이머/스케줄러 불필요 |
### 8.4 단점
| 항목 | 설명 | 완화 방안 |
|------|------|-----------|
| **시작 오버헤드** | 매 요청 시 1~2초 소요 | 연간 1-2회 사용이므로 허용 가능 |
| **모델 로딩 시간** | ezdxf, PyMuPDF, PaddleOCR 매번 로드 | 향후 워커 풀로 개선 가능 |
---
## 9. 향후 개선 방향
1. **워커 풀**: 각 워커 타입당 여러 프로세스 실행 (예: P&ID 워커 3개)
2. **자동 스케일링**: 요청량에 따라 워커 수 자동 조정
3. **공유 메모리**: 대용량 데이터 전달 시 메모리 공유 (shared memory)
4. **그룹화**: 유사 도구를 하나의 워커에 통합 (예: RAG 도구 4개 → 1 워커)
---
## 10. 구현 체크리스트
- [ ] `mcp-server/worker/` 디렉토리 생성
- [ ] `rag_worker.py` 구현
- [ ] `nl2sql_worker.py` 구현
- [ ] `pid_worker.py` 구현
- [ ] 메인 서버에 `ProcessManager` 클래스 추가
- [ ] 요청 라우팅 로직 구현
- [ ] 프로세스 상태 모니터링 도구 추가
- [ ] 테스트: 각 워커 독립 실행
- [ ] 테스트: 병렬 요청 처리
- [ ] 문서 업데이트
---
**문서 버전**: 1.1
**작성일**: 2026-05-02
**수정일**: 2026-05-02 (P&ID 워커 생명주기 추가)
**작성자**: AI Assistant
**수정자**: AI Assistant
**수정 내용**: 연간 1-2회 사용 예정인 P&ID 워커에 대해 요청 후 즉시 종료하는 one_shot 모드 추가
#### 위 설계의 문제점 진단 및 수정 권고사항 반영
> **진단 개요**: 계획서의 구현 코드 예제에 4개의 치명적 오류와 6개의 심각한 설계 결함, 3개의 경미한 문제가 발견되었습니다.
> 현재 단일 모놀리스 server.py는 잘 동작 중이므로, 병렬 아키텍처 구현 시 반드시 아래 수정사항을 반영해야 합니다.
---
### 🔴 치명적 오류 (런타임 즉시 실패)
| # | 문제 | 위치 | 설명 | 수정 방안 |
|---|------|------|------|-----------|
| 1 | RAG 워커에 P&ID 도구 핸들러 혼입 | §5.1, L419-438 | `rag_worker.py``/execute/one_shot` 엔드포인트에 `parse_pid_dxf` 등 P&ID 도구 핸들러가 존재 | `/execute/one_shot` 엔드포인트 자체를 **제거**. RAG 워커는 one_shot 모드 없음 |
| 2 | NL2SQL·P&ID 워커에서 `app` 미정의 | §5.2 L471, §5.3 L507 | `uvicorn.run(app, ...)` 호출 시 `app = FastAPI()` 선언이 없음 | 각 워커 파일 상단에 `app = FastAPI()` 추가 |
| 3 | SIGTERM이 HTTP 응답보다 먼저 실행됨 | §8.2, L621-622 | `os.kill(os.getpid(), signal.SIGTERM)` 직후 `return result` → uvicorn이 즉시 종료 시작 | `BackgroundTask`로 종료를 500ms 지연 |
| 4 | FastAPI가 dict 타입 요청 바디를 거부 | §5.1, L404-416 | `async def execute(request: dict)`는 FastAPI에서 지원하지 않음 | `request: Request`로 변경 후 `await request.json()` 사용 |
---
### 🟠 심각한 설계 결함
| # | 문제 | 위치 | 설명 | 수정 방안 |
|---|------|------|------|-----------|
| 5 | `get_worker()` Race Condition | §4.1, L222-243 | 두 요청이 동시에 진입하면 워커 프로세스 2개가 시작됨 | `asyncio.Lock` per worker_type 사용 |
| 6 | one_shot + 동시 요청 시 강제 종료 | §8 | 요청 A 완료 시 SIGTERM → 요청 B 처리 중 강제 종료 | `asyncio.Semaphore(1)`로 P&ID 요청 직렬화 |
| 7 | 워커 준비 완료 판단이 `sleep(1)` 고정 | §4.1, L200-201 | P&ID 워커는 임포트만 수초 이상 걸림 | 헬스체크 루프로 대체 (최대 15초 대기) |
| 8 | `subprocess.Popen(stdout=PIPE)` 데드락 위험 | §4.1, L198 | 대량 출력 시 파이프 버퍼 가득 참 → 데드락 | `stdout=subprocess.DEVNULL` 또는 파일 리다이렉션 |
| 9 | `analyze_pid_impact` 그래프 상태 유실 | §3.3 | 워커 종료 시 메모리 내 그래프 사라짐 | `build_pid_graph_parallel``mcp-server/storage/{graph_id}.json`에 저장 (이미 구현됨) |
| 10 | 메인 서버 종료 시 워커 정리 훅 없음 | §4.1 | `atexit` 또는 `__del__` 등록 없음 | `atexit.register(self._cleanup)``signal.signal` 등록 |
---
### 🟡 경미한 문제
| # | 문제 | 위치 | 설명 | 수정 방안 |
|---|------|------|------|-----------|
| 11 | 메모리 추정 오류 | §6.2 | "RAG: ~2GB"는 vLLM 외부 서비스 포함 추정. 워커 자체는 ~200MB | 주석 추가: "vLLM 외부 서비스 사용 시 워커 자체는 ~200MB" |
| 12 | one_shot 플래그가 workers 딕셔너리에 남음 | §4.1, L237-243 | 워커 종료 후 상태 불일치 간격 발생 | `del self.workers[worker_type]` 즉시 실행 |
| 13 | P&ID 5분 타임아웃 불충분 가능성 | §4.2, L348 | 대형 도면 시 5분 부족 | `timeout=600`(10분) 또는 설정 가능하도록 변경 |
---
### ✅ 수정 완료된 설계 예제
#### 🔴 Fix 1 — RAG 워커의 one_shot 엔드포인트 제거
```python
# rag_worker.py
# /execute/one_shot 엔드포인트 자체를 제거
# RAG 워커는 one_shot 모드 없음 — pid_worker.py에만 존재
```
#### 🔴 Fix 2 & 4 — `app = FastAPI()` 추가 + Request.json()
```python
# nl2sql_worker.py, pid_worker.py 공통
from fastapi import FastAPI, Request
import uvicorn
app = FastAPI()
@app.post("/execute")
async def execute(request: Request):
body = await request.json()
tool = body["tool"]
params = body["params"]
if tool == "run_sql":
result = await run_sql(**params)
# ...
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))
```
#### 🔴 Fix 3 — SIGTERM → BackgroundTask 지연
```python
from fastapi import BackgroundTask
def _shutdown_later():
async def _do():
await asyncio.sleep(0.5)
os.kill(os.getpid(), signal.SIGTERM)
asyncio.create_task(_do())
@app.post("/execute/one_shot")
async def execute_one_shot(request: Request):
body = await request.json()
result = await _dispatch(body["tool"], body["params"])
_shutdown_later() # 응답 반환 후 종료 예약
return result
```
#### 🟠 Fix 5 — `get_worker()` Race Condition → asyncio.Lock
```python
class ProcessManager:
def __init__(self):
self.workers: Dict[str, WorkerProcess] = {}
self._locks: Dict[str, asyncio.Lock] = {}
async def get_worker(self, tool_name: str, one_shot: bool = False) -> WorkerProcess:
worker_type = self._classify_tool(tool_name)
if worker_type not in self._locks:
self._locks[worker_type] = asyncio.Lock()
async with self._locks[worker_type]: # 동시 진입 차단
if worker_type not in self.workers:
return await self.start_worker(worker_type, one_shot)
proc = self.workers[worker_type].process
if proc.poll() is not None:
del self.workers[worker_type]
return await self.start_worker(worker_type, one_shot)
return self.workers[worker_type]
```
#### 🟠 Fix 6 — P&ID one_shot + 동시 요청 충돌 → 세마포어
```python
class ProcessManager:
def __init__(self):
...
self._pid_sem = asyncio.Semaphore(1) # P&ID는 1개 동시 실행만 허용
# 메인 서버의 P&ID 도구들
@mcp.tool()
async def parse_pid_dxf(filepath: str) -> str:
async with process_manager._pid_sem: # 동시 P&ID 요청 직렬화
worker = await process_manager.get_worker("parse_pid_dxf", one_shot=True)
return await _forward_request(worker.port, "parse_pid_dxf", {"filepath": filepath}, one_shot=True)
```
#### 🟠 Fix 7 — 헬스체크 루프
```python
async def start_worker(self, worker_type: str, one_shot: bool = False) -> WorkerProcess:
port = self._get_available_port()
proc = subprocess.Popen(
[sys.executable, f"mcp-server/worker/{worker_type}_worker.py", str(port)],
stdout=subprocess.DEVNULL, # Fix 8 포함
stderr=subprocess.DEVNULL,
)
# sleep(1) 대신 실제 헬스체크
for _ in range(30): # 최대 15초 대기
await asyncio.sleep(0.5)
if proc.poll() is not None:
raise RuntimeError(f"{worker_type} 워커가 시작 직후 종료됨")
try:
async with httpx.AsyncClient(timeout=1) as c:
await c.get(f"http://localhost:{port}/health")
break # 헬스체크 성공
except Exception:
continue
else:
proc.kill()
raise RuntimeError(f"{worker_type} 워커 시작 타임아웃")
worker = WorkerProcess(process=proc, port=port, status="running", one_shot=one_shot)
self.workers[worker_type] = worker
return worker
```
각 워커에 `/health` 엔드포인트 추가:
```python
@app.get("/health")
async def health():
return {"status": "ok"}
```
#### 🟠 Fix 8 — PIPE → DEVNULL/파일
```python
# 로그를 남기려면 파일로
log_file = open(f"logs/{worker_type}_worker.log", "a")
proc = subprocess.Popen(cmd, stdout=log_file, stderr=log_file)
```
#### 🟠 Fix 9 — 메인 서버 종료 시 고아 프로세스 방지
```python
import atexit, signal
class ProcessManager:
def __init__(self):
...
atexit.register(self._cleanup)
signal.signal(signal.SIGTERM, lambda *_: self._cleanup())
def _cleanup(self):
for wtype, worker in list(self.workers.items()):
try:
worker.process.terminate()
except Exception:
pass
self.workers.clear()
```
---
### 📋 요약 체크리스트
| # | 수정 항목 | 난이도 | 상태 |
|---|-----------|--------|------|
| 1 | RAG 워커 P&ID 핸들러 제거 | 쉬움 | |
| 2 | `app = FastAPI()` 추가 | 쉬움 | |
| 3 | SIGTERM → BackgroundTask 지연 | 보통 | |
| 4 | dict → Request.json() | 쉬움 | |
| 5 | `asyncio.Lock` per worker | 보통 | |
| 6 | P&ID 세마포어 직렬화 | 보통 | |
| 7 | `sleep(1)` → 헬스체크 루프 | 보통 | |
| 8 | PIPE → DEVNULL/파일 | 쉬움 | |
| 9 | atexit 정리 훅 | 쉬움 | |
---
### 📝 참고: `analyze_pid_impact` 상태 영속화
`build_pid_graph_parallel`이 결과를 `mcp-server/storage/{graph_id}.json` 파일로 저장하므로,
P&ID 워커가 종료되더라도 다음 `analyze_pid_impact` 호출은 파일을 읽어 그래프를 복원할 수 있습니다.
(현재 구현 기준 이미 해결됨)