Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL. 기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체. - industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버) - src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer) - mcp-server: Python FastMCP (RAG/NL2SQL/P&ID) - 다중 컨트롤러(N-Controller) 지원 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
902 lines
33 KiB
Markdown
902 lines
33 KiB
Markdown
# MCP 서버 병렬 아키텍처 설계 문서
|
|
|
|
## 1. 설계 개요
|
|
|
|
### 1.1 문제 인식
|
|
|
|
현재 MCP 서버는 단일 프로세스에서 모든 요청을 순차적으로 처리하는 구조로, 다음과 같은 문제점이 있음:
|
|
|
|
| 문제 | 설명 | 영향 |
|
|
|------|------|------|
|
|
| **단일 프로세스** | 모든 도구가 동일한 프로세스에서 실행 | CPU 자원 미사용 |
|
|
| **순차 처리** | 긴 요청이 완료될 때까지 다른 요청 대기 | 응답 지연 |
|
|
| **LLM 병목** | `ask_iiot_llm`, `query_with_nl`, `extract_pid_tags` 등 LLM 호출이 순차 실행 | 요청 간 차단 |
|
|
| **P&ID 파싱 병목** | Phase 2 위상 빌더 O(n²) 복잡도 + Phase 3 LLM 매핑 | 수분 이상 소요 |
|
|
|
|
### 1.2 설계 목표
|
|
|
|
1. **하드웨어 자원 최적화**: 멀티프로세스를 활용하여 CPU 코어 전체 사용
|
|
2. **병렬 처리**: 독립적인 요청을 동시에 처리하여 대기 시간 최소화
|
|
3. **확장성**: 새로운 도구 추가 시 메인 서버 수정 없이 서브 프로세스로 추가 가능
|
|
4. **격리**: 각 서브 프로세스는 독립적인 메모리 공간을 가지므로 하나의 프로세스 실패가 전체 시스템에 영향 없음
|
|
|
|
---
|
|
|
|
## 2. 아키텍처 개요
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
│ 메인 서버 (server.py) │
|
|
│ ┌───────────────────────────────────────────────────────────────────────┐ │
|
|
│ │ FastMCP (HTTP/stdio) │ │
|
|
│ │ - 요청 수신 │ │
|
|
│ │ - 요청 분류 (tool name 기반) │ │
|
|
│ │ - 서브 프로세스 관리 (PID, 상태, 리소스) │ │
|
|
│ └───────────────────────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ┌───────────────────────────┼───────────────────────────┐ │
|
|
│ ▼ ▼ ▼ │
|
|
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
|
|
│ │ RAG 서브 │ │ NL2SQL 서브 │ │ P&ID 서브 │ │
|
|
│ │ (rag_worker.py)│ │ (nl2sql_worker.py)│ │ (pid_worker.py)│ │
|
|
│ └───────────────┘ └───────────────┘ └───────────────┘ │
|
|
│ │ │ │ │
|
|
│ ▼ ▼ ▼ │
|
|
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
|
|
│ │ Qdrant 검색 │ │ PostgreSQL │ │ ezdxf/pyMuPDF │ │
|
|
│ │ + Ollama Embed│ │ + LLM SQL │ │ + LLM 추출 │ │
|
|
│ │ + vLLM LLM │ │ 실행 │ │ + NetworkX │ │
|
|
│ └───────────────┘ └───────────────┘ └───────────────┘ │
|
|
│ │
|
|
│ ┌───────────────────────────────────────────────────────────────────────┐ │
|
|
│ │ 공유 리소스 (메모리, 디스크) │ │
|
|
│ │ - Qdrant (외부 서비스) │ │
|
|
│ │ - Ollama (외부 서비스) │ │
|
|
│ │ - vLLM (외부 서비스) │ │
|
|
│ │ - PostgreSQL (외부 서비스) │ │
|
|
│ └───────────────────────────────────────────────────────────────────────┘ │
|
|
└─────────────────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## 3. 서브 프로세스 구조
|
|
|
|
### 3.1 RAG 서브 프로세스 (`rag_worker.py`)
|
|
|
|
**담당 도구**:
|
|
- `search_codebase()` - 소스코드 검색
|
|
- `search_r530_docs()` - 공식 문서 검색
|
|
- `ask_iiot_llm()` - LLM 질문 응답
|
|
- `rag_query()` - 통합 RAG
|
|
|
|
**특징**:
|
|
- Ollama Embedding + Qdrant 검색 + vLLM LLM 조합
|
|
- 메모리: ~2GB (임베딩 모델 + LLM 로드 시)
|
|
- 병렬 처리: 각 요청은 독립적인 LLM 호출 가능
|
|
|
|
**구조**:
|
|
```python
|
|
# rag_worker.py
|
|
class RAGWorker:
|
|
def __init__(self):
|
|
self.embed_client = OllamaEmbedClient()
|
|
self.qdrant_client = QdrantClient()
|
|
self.llm_client = VLLMClient()
|
|
|
|
async def handle_request(self, tool_name: str, params: dict) -> str:
|
|
if tool_name == "search_codebase":
|
|
return await self._search_codebase(params)
|
|
elif tool_name == "ask_iiot_llm":
|
|
return await self._ask_llm(params)
|
|
# ...
|
|
```
|
|
|
|
### 3.2 NL2SQL 서브 프로세스 (`nl2sql_worker.py`)
|
|
|
|
**담당 도구**:
|
|
- `run_sql()` - SQL 실행
|
|
- `query_pv_history()` - 히스토리 조회
|
|
- `get_tag_metadata()` - 태그 메타데이터
|
|
- `list_drawings()` - 도면 목록
|
|
- `query_with_nl()` - 자연어 → SQL
|
|
|
|
**특징**:
|
|
- PostgreSQL 직접 연결
|
|
- LLM SQL 생성 + DB 실행 분리
|
|
- 메모리: ~1GB (SQL 생성용 LLM)
|
|
|
|
**구조**:
|
|
```python
|
|
# nl2sql_worker.py
|
|
class NL2SQLWorker:
|
|
def __init__(self):
|
|
self.db_pool = create_db_pool()
|
|
self.llm_client = VLLMClient()
|
|
|
|
async def handle_request(self, tool_name: str, params: dict) -> str:
|
|
if tool_name == "run_sql":
|
|
return await self._run_sql(params["sql"])
|
|
elif tool_name == "query_with_nl":
|
|
return await self._query_with_nl(params["question"])
|
|
# ...
|
|
```
|
|
|
|
### 3.3 P&ID 서브 프로세스 (`pid_worker.py`)
|
|
|
|
**담당 도구**:
|
|
- `extract_pid_tags()` - 텍스트에서 태그 추출
|
|
- `match_pid_tags()` - 태그 매핑
|
|
- `parse_pid_dxf()` - DXF 파싱
|
|
- `parse_pid_pdf()` - PDF 파싱
|
|
- `parse_pid_drawing()` - 확장자 자동 감지
|
|
- `build_pid_graph_parallel()` - 그래프 생성
|
|
- `analyze_pid_impact()` - 영향도 분석
|
|
|
|
**특징**:
|
|
- ezdxf, PyMuPDF, PaddleOCR 로드 (메모리 ~3GB)
|
|
- NetworkX 그래프 처리
|
|
- LLM 기반 태그 추출/매핑
|
|
- **가장 무거운 프로세스**
|
|
- **요청 후 종료**: 연간 1-2회 사용 예상으로, 요청 완료 후 프로세스 종료
|
|
|
|
**구조**:
|
|
```python
|
|
# pid_worker.py
|
|
class PIDWorker:
|
|
def __init__(self):
|
|
self.extractor = PidGeometricExtractor()
|
|
self.topology_builder = PidTopologyBuilder()
|
|
self.mapper = IntelligentMapper()
|
|
self.analyzer = PidAnalysisEngine()
|
|
|
|
async def handle_request(self, tool_name: str, params: dict) -> str:
|
|
if tool_name == "parse_pid_dxf":
|
|
return await self._parse_dxf(params["filepath"])
|
|
elif tool_name == "build_pid_graph_parallel":
|
|
return await self._build_graph(params["filepath"])
|
|
# ...
|
|
```
|
|
|
|
---
|
|
|
|
## 4. 메인 서버 구현
|
|
|
|
### 4.1 프로세스 관리
|
|
|
|
```python
|
|
# server.py (메인)
|
|
import subprocess
|
|
import asyncio
|
|
from typing import Dict, Optional
|
|
from dataclasses import dataclass
|
|
|
|
@dataclass
|
|
class WorkerProcess:
|
|
process: subprocess.Popen
|
|
port: int
|
|
status: str # "running", "stopped", "error"
|
|
one_shot: bool = False # 요청 후 프로세스 종료 여부 (P&ID 워커용)
|
|
|
|
class ProcessManager:
|
|
def __init__(self):
|
|
self.workers: Dict[str, WorkerProcess] = {}
|
|
|
|
async def start_worker(self, worker_type: str, one_shot: bool = False) -> WorkerProcess:
|
|
"""서브 프로세스 시작
|
|
|
|
Args:
|
|
worker_type: 워커 타입 (rag, nl2sql, pid)
|
|
one_shot: True일 경우 요청 후 프로세스 종료 (P&ID 워커용)
|
|
"""
|
|
port = self._get_available_port()
|
|
cmd = [
|
|
sys.executable,
|
|
f"mcp-server/worker/{worker_type}_worker.py",
|
|
str(port)
|
|
]
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
# 프로세스 시작 대기
|
|
await asyncio.sleep(1)
|
|
|
|
worker = WorkerProcess(
|
|
process=proc,
|
|
port=port,
|
|
status="running",
|
|
one_shot=one_shot
|
|
)
|
|
self.workers[worker_type] = worker
|
|
return worker
|
|
|
|
async def stop_worker(self, worker_type: str):
|
|
"""서브 프로세스 종료"""
|
|
if worker_type in self.workers:
|
|
proc = self.workers[worker_type].process
|
|
proc.terminate()
|
|
await asyncio.sleep(0.5)
|
|
if proc.poll() is None:
|
|
proc.kill()
|
|
del self.workers[worker_type]
|
|
|
|
async def get_worker(self, tool_name: str, one_shot: bool = False) -> WorkerProcess:
|
|
"""도구 이름에 해당하는 워커 프로세스 반환 (자동 시작)
|
|
|
|
Args:
|
|
tool_name: 도구 이름
|
|
one_shot: True일 경우 요청 후 프로세스 종료 (P&ID 워커용)
|
|
"""
|
|
worker_type = self._classify_tool(tool_name)
|
|
|
|
if worker_type not in self.workers:
|
|
# 자동 시작
|
|
worker = await self.start_worker(worker_type, one_shot)
|
|
return worker
|
|
|
|
# 프로세스 상태 확인
|
|
proc = self.workers[worker_type].process
|
|
if proc.poll() is not None:
|
|
# 프로세스 종료됨 - 재시작
|
|
worker = await self.start_worker(worker_type, one_shot)
|
|
return worker
|
|
|
|
return self.workers[worker_type]
|
|
|
|
def _classify_tool(self, tool_name: str) -> str:
|
|
"""도구 이름을 워커 타입으로 분류"""
|
|
rag_tools = {"search_codebase", "search_r530_docs", "ask_iiot_llm", "rag_query"}
|
|
nl2sql_tools = {"run_sql", "query_pv_history", "get_tag_metadata", "list_drawings", "query_with_nl"}
|
|
pid_tools = {
|
|
"extract_pid_tags", "match_pid_tags", "parse_pid_dxf", "parse_pid_pdf",
|
|
"parse_pid_drawing", "build_pid_graph_parallel", "analyze_pid_impact"
|
|
}
|
|
|
|
if tool_name in rag_tools:
|
|
return "rag"
|
|
elif tool_name in nl2sql_tools:
|
|
return "nl2sql"
|
|
elif tool_name in pid_tools:
|
|
return "pid"
|
|
else:
|
|
return "default" # fallback
|
|
```
|
|
|
|
### 4.2 요청 라우팅
|
|
|
|
```python
|
|
# server.py (메인)
|
|
from fastmcp import FastMCP
|
|
|
|
mcp = FastMCP("iiot-rag-main", port=5001, json_response=True, stateless_http=True)
|
|
|
|
# 프로세스 매니저 초기화
|
|
process_manager = ProcessManager()
|
|
|
|
@mcp.tool()
|
|
async def search_codebase(query: str, top_k: int = 6) -> str:
|
|
"""RAG 워커로 요청 전달"""
|
|
worker = await process_manager.get_worker("search_codebase")
|
|
return await _forward_request(worker.port, "search_codebase", {
|
|
"query": query,
|
|
"top_k": top_k
|
|
})
|
|
|
|
@mcp.tool()
|
|
async def run_sql(sql: str) -> str:
|
|
"""NL2SQL 워커로 요청 전달"""
|
|
worker = await process_manager.get_worker("run_sql")
|
|
return await _forward_request(worker.port, "run_sql", {"sql": sql})
|
|
|
|
@mcp.tool()
|
|
async def parse_pid_dxf(filepath: str) -> str:
|
|
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
|
|
worker = await process_manager.get_worker("parse_pid_dxf", one_shot=True)
|
|
result = await _forward_request(worker.port, "parse_pid_dxf", {"filepath": filepath}, one_shot=True)
|
|
return result
|
|
|
|
@mcp.tool()
|
|
async def parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
|
|
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
|
|
worker = await process_manager.get_worker("parse_pid_pdf", one_shot=True)
|
|
result = await _forward_request(worker.port, "parse_pid_pdf", {"filepath": filepath, "use_ocr": use_ocr}, one_shot=True)
|
|
return result
|
|
|
|
@mcp.tool()
|
|
async def parse_pid_drawing(filepath: str) -> str:
|
|
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
|
|
worker = await process_manager.get_worker("parse_pid_drawing", one_shot=True)
|
|
result = await _forward_request(worker.port, "parse_pid_drawing", {"filepath": filepath}, one_shot=True)
|
|
return result
|
|
|
|
@mcp.tool()
|
|
async def extract_pid_tags(text: str, source_type: str) -> str:
|
|
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
|
|
worker = await process_manager.get_worker("extract_pid_tags", one_shot=True)
|
|
result = await _forward_request(worker.port, "extract_pid_tags", {"text": text, "source_type": source_type}, one_shot=True)
|
|
return result
|
|
|
|
@mcp.tool()
|
|
async def match_pid_tags(pid_tags: list[str], experion_tags: list[str]) -> str:
|
|
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
|
|
worker = await process_manager.get_worker("match_pid_tags", one_shot=True)
|
|
result = await _forward_request(worker.port, "match_pid_tags", {"pid_tags": pid_tags, "experion_tags": experion_tags}, one_shot=True)
|
|
return result
|
|
|
|
@mcp.tool()
|
|
async def build_pid_graph_parallel(filepath: str) -> str:
|
|
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
|
|
worker = await process_manager.get_worker("build_pid_graph_parallel", one_shot=True)
|
|
result = await _forward_request(worker.port, "build_pid_graph_parallel", {"filepath": filepath}, one_shot=True)
|
|
return result
|
|
|
|
@mcp.tool()
|
|
async def analyze_pid_impact(graph_id: str, start_node_id: str) -> str:
|
|
"""P&ID 워커로 요청 전달 (one_shot: 요청 후 종료)"""
|
|
worker = await process_manager.get_worker("analyze_pid_impact", one_shot=True)
|
|
result = await _forward_request(worker.port, "analyze_pid_impact", {"graph_id": graph_id, "start_node_id": start_node_id}, one_shot=True)
|
|
return result
|
|
|
|
async def _forward_request(port: int, tool_name: str, params: dict, one_shot: bool = False) -> str:
|
|
"""HTTP를 통해 워커 프로세스로 요청 전달
|
|
|
|
Args:
|
|
port: 워커 포트
|
|
tool_name: 도구 이름
|
|
params: 요청 파라미터
|
|
one_shot: True일 경우 요청 완료 후 워커 종료
|
|
"""
|
|
async with httpx.AsyncClient(timeout=300) as client: # 5분 타임아웃
|
|
endpoint = "/execute/one_shot" if one_shot else "/execute"
|
|
response = await client.post(
|
|
f"http://localhost:{port}{endpoint}",
|
|
json={"tool": tool_name, "params": params}
|
|
)
|
|
response.raise_for_status()
|
|
return response.text
|
|
```
|
|
|
|
---
|
|
|
|
## 5. 서브 프로세스 구현
|
|
|
|
### 5.1 RAG 워커 (`worker/rag_worker.py`)
|
|
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""RAG 전용 워커 프로세스"""
|
|
|
|
import sys
|
|
import json
|
|
import httpx
|
|
from fastmcp import FastMCP
|
|
|
|
# 설정
|
|
OLLAMA_URL = "http://localhost:11434"
|
|
QDRANT_URL = "http://localhost:6333"
|
|
VLLM_BASE_URL = "http://localhost:8000/v1"
|
|
|
|
# FastMCP 서버 (HTTP 전용)
|
|
mcp = FastMCP("rag-worker", port=int(sys.argv[1]), json_response=True, stateless_http=True)
|
|
|
|
# 도구 구현
|
|
@mcp.tool()
|
|
async def search_codebase(query: str, top_k: int = 6) -> str:
|
|
# ... 기존 구현 ...
|
|
|
|
@mcp.tool()
|
|
async def search_r530_docs(query: str, top_k: int = 5) -> str:
|
|
# ... 기존 구현 ...
|
|
|
|
@mcp.tool()
|
|
async def ask_iiot_llm(question: str, context: str = "") -> str:
|
|
# ... 기존 구현 ...
|
|
|
|
@mcp.tool()
|
|
async def rag_query(question: str, search_code: bool = False, search_docs: bool = True) -> str:
|
|
# ... 기존 구현 ...
|
|
|
|
# HTTP 엔드포인트 (FastMCP 대신 직접 구현)
|
|
from fastapi import FastAPI
|
|
import uvicorn
|
|
|
|
app = FastAPI()
|
|
|
|
@app.post("/execute")
|
|
async def execute(request: dict):
|
|
tool = request["tool"]
|
|
params = request["params"]
|
|
|
|
# 도구 호출
|
|
if tool == "search_codebase":
|
|
result = await search_codebase(**params)
|
|
elif tool == "ask_iiot_llm":
|
|
result = await ask_iiot_llm(**params)
|
|
# ...
|
|
|
|
return result
|
|
|
|
# P&ID 워커 전용: 요청 완료 후 종료
|
|
@app.post("/execute/one_shot")
|
|
async def execute_one_shot(request: dict):
|
|
"""one_shot 모드 - 요청 완료 후 프로세스 종료"""
|
|
tool = request["tool"]
|
|
params = request["params"]
|
|
|
|
# 도구 호출
|
|
if tool == "parse_pid_dxf":
|
|
result = await parse_pid_dxf(**params)
|
|
elif tool == "parse_pid_pdf":
|
|
result = await parse_pid_pdf(**params)
|
|
elif tool == "extract_pid_tags":
|
|
result = await extract_pid_tags(**params)
|
|
# ...
|
|
|
|
# 프로세스 종료 (graceful shutdown)
|
|
import os
|
|
import signal
|
|
os.kill(os.getpid(), signal.SIGTERM)
|
|
|
|
return result
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))
|
|
```
|
|
|
|
### 5.2 NL2SQL 워커 (`worker/nl2sql_worker.py`)
|
|
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""NL2SQL 전용 워커 프로세스"""
|
|
|
|
import sys
|
|
import json
|
|
import psycopg
|
|
from fastmcp import FastMCP
|
|
|
|
DB_CONNECTION_STRING = "postgresql://postgres:postgres@localhost:5432/iiot_platform"
|
|
|
|
mcp = FastMCP("nl2sql-worker", port=int(sys.argv[1]), json_response=True, stateless_http=True)
|
|
|
|
@mcp.tool()
|
|
async def run_sql(sql: str) -> str:
|
|
# ... 기존 구현 ...
|
|
|
|
@mcp.tool()
|
|
async def query_pv_history(tag_names: list[str], time_from: str, time_to: str, limit: int = 100) -> str:
|
|
# ... 기존 구현 ...
|
|
|
|
# ... 나머지 도구 ...
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))
|
|
```
|
|
|
|
### 5.3 P&ID 워커 (`worker/pid_worker.py`)
|
|
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""P&ID 파싱 전용 워커 프로세스"""
|
|
|
|
import sys
|
|
import json
|
|
from fastmcp import FastMCP
|
|
|
|
mcp = FastMCP("pid-worker", port=int(sys.argv[1]), json_response=True, stateless_http=True)
|
|
|
|
# Pipeline imports
|
|
from pipeline.extractor import PidGeometricExtractor
|
|
from pipeline.topology import PidTopologyBuilder
|
|
from pipeline.mapper import IntelligentMapper
|
|
from pipeline.analyzer import PidAnalysisEngine
|
|
|
|
@mcp.tool()
|
|
async def extract_pid_tags(text: str, source_type: str) -> str:
|
|
# ... 기존 구현 ...
|
|
|
|
@mcp.tool()
|
|
async def parse_pid_dxf(filepath: str) -> str:
|
|
# ... 기존 구현 ...
|
|
|
|
@mcp.tool()
|
|
async def build_pid_graph_parallel(filepath: str) -> str:
|
|
# ... 기존 구현 ...
|
|
|
|
# ... 나머지 도구 ...
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))
|
|
```
|
|
|
|
---
|
|
|
|
## 6. 실행 및 배포
|
|
|
|
### 6.1 실행 순서
|
|
|
|
```bash
|
|
# 1. 메인 서버 실행
|
|
cd mcp-server
|
|
python server.py --http
|
|
|
|
# 2. 메인 서버가 자동으로 워커 프로세스 시작
|
|
# - RAG 워커 (port 5002) - 메인 서버 종료 시까지 유지
|
|
# - NL2SQL 워커 (port 5003) - 메인 서버 종료 시까지 유지
|
|
# - P&ID 워커 (port 5004) - 요청 후 즉시 종료 (연간 1-2회 사용)
|
|
|
|
# 3. P&ID 요청 시:
|
|
# - 워커 자동 시작 → 요청 처리 → 요청 완료 후 즉시 종료
|
|
# - 다음 요청 시 다시 시작
|
|
```
|
|
|
|
### 6.2 리소스 관리
|
|
|
|
| 워커 | 메모리 | CPU | 포트 | 자동 시작 | 생명주기 |
|
|
|------|--------|-----|------|-----------|----------|
|
|
| RAG | ~2GB | 2 cores | 5002 | 요청 시 | 메인 서버 종료 시까지 유지 |
|
|
| NL2SQL | ~1GB | 1 core | 5003 | 요청 시 | 메인 서버 종료 시까지 유지 |
|
|
| P&ID | ~3GB | 2 cores | 5004 | 요청 시 | **요청 후 즉시 종료** (연간 1-2회 사용) |
|
|
|
|
**메모리 최적화**:
|
|
- P&ID 워커는 요청 완료 후 즉시 종료되어 메모리 해제
|
|
- RAG/NL2SQL 워커는 자주 사용되므로 메인 서버와 함께 유지
|
|
|
|
### 6.3 프로세스 상태 모니터링
|
|
|
|
```python
|
|
# server.py에 추가
|
|
@mcp.tool()
|
|
def get_worker_status() -> str:
|
|
"""모든 워커 프로세스 상태 조회"""
|
|
status = {}
|
|
for name, worker in process_manager.workers.items():
|
|
status[name] = {
|
|
"pid": worker.process.pid,
|
|
"status": worker.status,
|
|
"port": worker.port
|
|
}
|
|
return json.dumps(status, ensure_ascii=False, indent=2)
|
|
```
|
|
|
|
---
|
|
|
|
## 7. 장점 및 단점
|
|
|
|
### 7.1 장점
|
|
|
|
| 항목 | 설명 |
|
|
|------|------|
|
|
| **병렬 처리** | 요청 간 차단 없이 동시에 처리 가능 |
|
|
| **확장성** | 새로운 도구 추가 시 워커 프로세스만 추가 |
|
|
| **격리** | 하나의 워커 실패가 전체 시스템에 영향 없음 |
|
|
| **리소스 최적화** | CPU 코어 수에 따라 워커 수 조정 가능 |
|
|
| **유지보수** | 각 워커는 독립적으로 개발/테스트 가능 |
|
|
|
|
### 7.2 단점
|
|
|
|
| 항목 | 설명 | 완화 방안 |
|
|
|------|------|-----------|
|
|
| **메모리 사용량 증가** | 워커당 1-3GB, 총 6GB+ | 워커 수 제한, LRU 캐시 |
|
|
| **프로세스 관리 복잡도** | 프로세스 시작/종료/재시작 로직 | 자동 관리, 상태 모니터링 |
|
|
| **네트워크 오버헤드** | HTTP 통신 추가 | 로컬 통신, 커넥션 풀 |
|
|
|
|
---
|
|
|
|
## 8. P&ID 워커 생명주기 (요청 후 종료)
|
|
|
|
**설계 원칙**: 연간 1-2회 사용 예정인 P&ID 파싱은 요청 후 즉시 종료
|
|
|
|
### 8.1 동작 방식
|
|
|
|
```
|
|
1. 사용자가 P&ID 파싱 요청 → 메인 서버가 P&ID 워커 시작 (port 5004)
|
|
2. 워커가 요청 처리 (ezdxf + LLM 추출 + NetworkX)
|
|
3. 요청 완료 → /execute/one_shot 엔드포인트가 SIGTERM 전달
|
|
4. 워커 프로세스 종료 → 메모리 해제
|
|
5. 다음 요청 시 → 다시 시작 (1~2초 소요)
|
|
```
|
|
|
|
### 8.2 구현 로직
|
|
|
|
**메인 서버** (`server.py`):
|
|
```python
|
|
@mcp.tool()
|
|
async def parse_pid_dxf(filepath: str) -> str:
|
|
worker = await process_manager.get_worker("parse_pid_dxf", one_shot=True)
|
|
result = await _forward_request(worker.port, "parse_pid_dxf", {"filepath": filepath}, one_shot=True)
|
|
return result # 워커는 요청 완료 후 자동 종료됨
|
|
```
|
|
|
|
**워커** (`pid_worker.py`):
|
|
```python
|
|
@app.post("/execute/one_shot")
|
|
async def execute_one_shot(request: dict):
|
|
tool = request["tool"]
|
|
params = request["params"]
|
|
|
|
# 도구 호출
|
|
if tool == "parse_pid_dxf":
|
|
result = await parse_pid_dxf(**params)
|
|
|
|
# 프로세스 종료
|
|
os.kill(os.getpid(), signal.SIGTERM)
|
|
return result
|
|
```
|
|
|
|
### 8.3 장점
|
|
|
|
| 항목 | 설명 |
|
|
|------|------|
|
|
| **메모리 절약** | 요청 완료 후 3GB 메모리 즉시 해제 |
|
|
| **리소스 효율** | 다른 프로세스가 메모리 사용 가능 |
|
|
| **단순한 구조** | 별도의 타이머/스케줄러 불필요 |
|
|
|
|
### 8.4 단점
|
|
|
|
| 항목 | 설명 | 완화 방안 |
|
|
|------|------|-----------|
|
|
| **시작 오버헤드** | 매 요청 시 1~2초 소요 | 연간 1-2회 사용이므로 허용 가능 |
|
|
| **모델 로딩 시간** | ezdxf, PyMuPDF, PaddleOCR 매번 로드 | 향후 워커 풀로 개선 가능 |
|
|
|
|
---
|
|
|
|
## 9. 향후 개선 방향
|
|
|
|
1. **워커 풀**: 각 워커 타입당 여러 프로세스 실행 (예: P&ID 워커 3개)
|
|
2. **자동 스케일링**: 요청량에 따라 워커 수 자동 조정
|
|
3. **공유 메모리**: 대용량 데이터 전달 시 메모리 공유 (shared memory)
|
|
4. **그룹화**: 유사 도구를 하나의 워커에 통합 (예: RAG 도구 4개 → 1 워커)
|
|
|
|
---
|
|
|
|
## 10. 구현 체크리스트
|
|
|
|
- [ ] `mcp-server/worker/` 디렉토리 생성
|
|
- [ ] `rag_worker.py` 구현
|
|
- [ ] `nl2sql_worker.py` 구현
|
|
- [ ] `pid_worker.py` 구현
|
|
- [ ] 메인 서버에 `ProcessManager` 클래스 추가
|
|
- [ ] 요청 라우팅 로직 구현
|
|
- [ ] 프로세스 상태 모니터링 도구 추가
|
|
- [ ] 테스트: 각 워커 독립 실행
|
|
- [ ] 테스트: 병렬 요청 처리
|
|
- [ ] 문서 업데이트
|
|
|
|
---
|
|
|
|
**문서 버전**: 1.1
|
|
**작성일**: 2026-05-02
|
|
**수정일**: 2026-05-02 (P&ID 워커 생명주기 추가)
|
|
**작성자**: AI Assistant
|
|
**수정자**: AI Assistant
|
|
**수정 내용**: 연간 1-2회 사용 예정인 P&ID 워커에 대해 요청 후 즉시 종료하는 one_shot 모드 추가
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### 위 설계의 문제점 진단 및 수정 권고사항 반영
|
|
|
|
> **진단 개요**: 계획서의 구현 코드 예제에 4개의 치명적 오류와 6개의 심각한 설계 결함, 3개의 경미한 문제가 발견되었습니다.
|
|
> 현재 단일 모놀리스 server.py는 잘 동작 중이므로, 병렬 아키텍처 구현 시 반드시 아래 수정사항을 반영해야 합니다.
|
|
|
|
---
|
|
|
|
### 🔴 치명적 오류 (런타임 즉시 실패)
|
|
|
|
| # | 문제 | 위치 | 설명 | 수정 방안 |
|
|
|---|------|------|------|-----------|
|
|
| 1 | RAG 워커에 P&ID 도구 핸들러 혼입 | §5.1, L419-438 | `rag_worker.py`의 `/execute/one_shot` 엔드포인트에 `parse_pid_dxf` 등 P&ID 도구 핸들러가 존재 | `/execute/one_shot` 엔드포인트 자체를 **제거**. RAG 워커는 one_shot 모드 없음 |
|
|
| 2 | NL2SQL·P&ID 워커에서 `app` 미정의 | §5.2 L471, §5.3 L507 | `uvicorn.run(app, ...)` 호출 시 `app = FastAPI()` 선언이 없음 | 각 워커 파일 상단에 `app = FastAPI()` 추가 |
|
|
| 3 | SIGTERM이 HTTP 응답보다 먼저 실행됨 | §8.2, L621-622 | `os.kill(os.getpid(), signal.SIGTERM)` 직후 `return result` → uvicorn이 즉시 종료 시작 | `BackgroundTask`로 종료를 500ms 지연 |
|
|
| 4 | FastAPI가 dict 타입 요청 바디를 거부 | §5.1, L404-416 | `async def execute(request: dict)`는 FastAPI에서 지원하지 않음 | `request: Request`로 변경 후 `await request.json()` 사용 |
|
|
|
|
---
|
|
|
|
### 🟠 심각한 설계 결함
|
|
|
|
| # | 문제 | 위치 | 설명 | 수정 방안 |
|
|
|---|------|------|------|-----------|
|
|
| 5 | `get_worker()` Race Condition | §4.1, L222-243 | 두 요청이 동시에 진입하면 워커 프로세스 2개가 시작됨 | `asyncio.Lock` per worker_type 사용 |
|
|
| 6 | one_shot + 동시 요청 시 강제 종료 | §8 | 요청 A 완료 시 SIGTERM → 요청 B 처리 중 강제 종료 | `asyncio.Semaphore(1)`로 P&ID 요청 직렬화 |
|
|
| 7 | 워커 준비 완료 판단이 `sleep(1)` 고정 | §4.1, L200-201 | P&ID 워커는 임포트만 수초 이상 걸림 | 헬스체크 루프로 대체 (최대 15초 대기) |
|
|
| 8 | `subprocess.Popen(stdout=PIPE)` 데드락 위험 | §4.1, L198 | 대량 출력 시 파이프 버퍼 가득 참 → 데드락 | `stdout=subprocess.DEVNULL` 또는 파일 리다이렉션 |
|
|
| 9 | `analyze_pid_impact` 그래프 상태 유실 | §3.3 | 워커 종료 시 메모리 내 그래프 사라짐 | `build_pid_graph_parallel`이 `mcp-server/storage/{graph_id}.json`에 저장 (이미 구현됨) |
|
|
| 10 | 메인 서버 종료 시 워커 정리 훅 없음 | §4.1 | `atexit` 또는 `__del__` 등록 없음 | `atexit.register(self._cleanup)` 및 `signal.signal` 등록 |
|
|
|
|
---
|
|
|
|
### 🟡 경미한 문제
|
|
|
|
| # | 문제 | 위치 | 설명 | 수정 방안 |
|
|
|---|------|------|------|-----------|
|
|
| 11 | 메모리 추정 오류 | §6.2 | "RAG: ~2GB"는 vLLM 외부 서비스 포함 추정. 워커 자체는 ~200MB | 주석 추가: "vLLM 외부 서비스 사용 시 워커 자체는 ~200MB" |
|
|
| 12 | one_shot 플래그가 workers 딕셔너리에 남음 | §4.1, L237-243 | 워커 종료 후 상태 불일치 간격 발생 | `del self.workers[worker_type]` 즉시 실행 |
|
|
| 13 | P&ID 5분 타임아웃 불충분 가능성 | §4.2, L348 | 대형 도면 시 5분 부족 | `timeout=600`(10분) 또는 설정 가능하도록 변경 |
|
|
|
|
---
|
|
|
|
### ✅ 수정 완료된 설계 예제
|
|
|
|
#### 🔴 Fix 1 — RAG 워커의 one_shot 엔드포인트 제거
|
|
|
|
```python
|
|
# rag_worker.py
|
|
# /execute/one_shot 엔드포인트 자체를 제거
|
|
# RAG 워커는 one_shot 모드 없음 — pid_worker.py에만 존재
|
|
```
|
|
|
|
#### 🔴 Fix 2 & 4 — `app = FastAPI()` 추가 + Request.json()
|
|
|
|
```python
|
|
# nl2sql_worker.py, pid_worker.py 공통
|
|
from fastapi import FastAPI, Request
|
|
import uvicorn
|
|
|
|
app = FastAPI()
|
|
|
|
@app.post("/execute")
|
|
async def execute(request: Request):
|
|
body = await request.json()
|
|
tool = body["tool"]
|
|
params = body["params"]
|
|
|
|
if tool == "run_sql":
|
|
result = await run_sql(**params)
|
|
# ...
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=int(sys.argv[1]))
|
|
```
|
|
|
|
#### 🔴 Fix 3 — SIGTERM → BackgroundTask 지연
|
|
|
|
```python
|
|
from fastapi import BackgroundTask
|
|
|
|
def _shutdown_later():
|
|
async def _do():
|
|
await asyncio.sleep(0.5)
|
|
os.kill(os.getpid(), signal.SIGTERM)
|
|
asyncio.create_task(_do())
|
|
|
|
@app.post("/execute/one_shot")
|
|
async def execute_one_shot(request: Request):
|
|
body = await request.json()
|
|
result = await _dispatch(body["tool"], body["params"])
|
|
_shutdown_later() # 응답 반환 후 종료 예약
|
|
return result
|
|
```
|
|
|
|
#### 🟠 Fix 5 — `get_worker()` Race Condition → asyncio.Lock
|
|
|
|
```python
|
|
class ProcessManager:
|
|
def __init__(self):
|
|
self.workers: Dict[str, WorkerProcess] = {}
|
|
self._locks: Dict[str, asyncio.Lock] = {}
|
|
|
|
async def get_worker(self, tool_name: str, one_shot: bool = False) -> WorkerProcess:
|
|
worker_type = self._classify_tool(tool_name)
|
|
|
|
if worker_type not in self._locks:
|
|
self._locks[worker_type] = asyncio.Lock()
|
|
|
|
async with self._locks[worker_type]: # 동시 진입 차단
|
|
if worker_type not in self.workers:
|
|
return await self.start_worker(worker_type, one_shot)
|
|
proc = self.workers[worker_type].process
|
|
if proc.poll() is not None:
|
|
del self.workers[worker_type]
|
|
return await self.start_worker(worker_type, one_shot)
|
|
return self.workers[worker_type]
|
|
```
|
|
|
|
#### 🟠 Fix 6 — P&ID one_shot + 동시 요청 충돌 → 세마포어
|
|
|
|
```python
|
|
class ProcessManager:
|
|
def __init__(self):
|
|
...
|
|
self._pid_sem = asyncio.Semaphore(1) # P&ID는 1개 동시 실행만 허용
|
|
|
|
# 메인 서버의 P&ID 도구들
|
|
@mcp.tool()
|
|
async def parse_pid_dxf(filepath: str) -> str:
|
|
async with process_manager._pid_sem: # 동시 P&ID 요청 직렬화
|
|
worker = await process_manager.get_worker("parse_pid_dxf", one_shot=True)
|
|
return await _forward_request(worker.port, "parse_pid_dxf", {"filepath": filepath}, one_shot=True)
|
|
```
|
|
|
|
#### 🟠 Fix 7 — 헬스체크 루프
|
|
|
|
```python
|
|
async def start_worker(self, worker_type: str, one_shot: bool = False) -> WorkerProcess:
|
|
port = self._get_available_port()
|
|
proc = subprocess.Popen(
|
|
[sys.executable, f"mcp-server/worker/{worker_type}_worker.py", str(port)],
|
|
stdout=subprocess.DEVNULL, # Fix 8 포함
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
# sleep(1) 대신 실제 헬스체크
|
|
for _ in range(30): # 최대 15초 대기
|
|
await asyncio.sleep(0.5)
|
|
if proc.poll() is not None:
|
|
raise RuntimeError(f"{worker_type} 워커가 시작 직후 종료됨")
|
|
try:
|
|
async with httpx.AsyncClient(timeout=1) as c:
|
|
await c.get(f"http://localhost:{port}/health")
|
|
break # 헬스체크 성공
|
|
except Exception:
|
|
continue
|
|
else:
|
|
proc.kill()
|
|
raise RuntimeError(f"{worker_type} 워커 시작 타임아웃")
|
|
|
|
worker = WorkerProcess(process=proc, port=port, status="running", one_shot=one_shot)
|
|
self.workers[worker_type] = worker
|
|
return worker
|
|
```
|
|
|
|
각 워커에 `/health` 엔드포인트 추가:
|
|
|
|
```python
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok"}
|
|
```
|
|
|
|
#### 🟠 Fix 8 — PIPE → DEVNULL/파일
|
|
|
|
```python
|
|
# 로그를 남기려면 파일로
|
|
log_file = open(f"logs/{worker_type}_worker.log", "a")
|
|
proc = subprocess.Popen(cmd, stdout=log_file, stderr=log_file)
|
|
```
|
|
|
|
#### 🟠 Fix 9 — 메인 서버 종료 시 고아 프로세스 방지
|
|
|
|
```python
|
|
import atexit, signal
|
|
|
|
class ProcessManager:
|
|
def __init__(self):
|
|
...
|
|
atexit.register(self._cleanup)
|
|
signal.signal(signal.SIGTERM, lambda *_: self._cleanup())
|
|
|
|
def _cleanup(self):
|
|
for wtype, worker in list(self.workers.items()):
|
|
try:
|
|
worker.process.terminate()
|
|
except Exception:
|
|
pass
|
|
self.workers.clear()
|
|
```
|
|
|
|
---
|
|
|
|
### 📋 요약 체크리스트
|
|
|
|
| # | 수정 항목 | 난이도 | 상태 |
|
|
|---|-----------|--------|------|
|
|
| 1 | RAG 워커 P&ID 핸들러 제거 | 쉬움 | |
|
|
| 2 | `app = FastAPI()` 추가 | 쉬움 | |
|
|
| 3 | SIGTERM → BackgroundTask 지연 | 보통 | |
|
|
| 4 | dict → Request.json() | 쉬움 | |
|
|
| 5 | `asyncio.Lock` per worker | 보통 | |
|
|
| 6 | P&ID 세마포어 직렬화 | 보통 | |
|
|
| 7 | `sleep(1)` → 헬스체크 루프 | 보통 | |
|
|
| 8 | PIPE → DEVNULL/파일 | 쉬움 | |
|
|
| 9 | atexit 정리 훅 | 쉬움 | |
|
|
|
|
---
|
|
|
|
### 📝 참고: `analyze_pid_impact` 상태 영속화
|
|
|
|
`build_pid_graph_parallel`이 결과를 `mcp-server/storage/{graph_id}.json` 파일로 저장하므로,
|
|
P&ID 워커가 종료되더라도 다음 `analyze_pid_impact` 호출은 파일을 읽어 그래프를 복원할 수 있습니다.
|
|
(현재 구현 기준 이미 해결됨)
|