Files
ExperionCrawler/mcp-server/server.py
windpacer d09ef95869 feat: Phase 6 보강 도구 5종 (find_tags, query_events, active_alarms, summarize_events, generate_status_report)
이벤트 중심 도구와 LLM 요약/보고서 도구를 추가해 채팅에서
"활성 알람", "교대 보고서", "이벤트 요약" 같은 운전원 요청을 처리.

신규 MCP 도구 (mcp-server/server.py):
- find_tags(query, area?, top_k):
    v_tag_summary 뷰 기반. base_tag 또는 description ILIKE 매칭.
    PV/SP/OP/설명/area 함께 반환.
- query_events(tag_name?, event_type?, area?, since?, until?, limit):
    event_history_table 필터 조회. since/until 미지정 시 최근 24h.
    event_type은 ALARM/TRIP/NORMAL/RUN/CHANGE 5종.
- active_alarms(area?, limit):
    DISTINCT ON (tagname)으로 태그별 최신 이벤트 추출 후
    ALARM/TRIP만 반환 (NORMAL 들어왔으면 자동 해제).
- summarize_events(since?, area?, event_type?, max_events, focus?):
    query_events 결과를 LLM에 넣어 한국어 6~10줄 구조화 요약
    (현황/알람/패턴/권고) + by_type/by_area 통계.
- generate_status_report(area?, hours):
    활성 알람 + 최근 이벤트 통계/표본을 LLM에 넘겨
    교대 보고서 형식(요약/알람/이벤트분석/권고) 마크다운 생성.
    윈도우 1~168시간, 기본 24시간.

공통:
- prepared statement(파라미터 바인딩)로 SQL 인젝션 방지
- SET statement_timeout = SQL_STATEMENT_TIMEOUT_MS 적용
- _DB_SCHEMA에 event_history_table 정의 추가 (NL2SQL 인지용)

시스템 프롬프트 (OllamaController):
- ToolGuideKo에 신규 5종 + search_kb + event_type 5종 명시
- run_sql 자동 가드(LIMIT/timeout) 안내 추가

검증:
- dotnet build: 경고 0건, 에러 0건
- python3 -m py_compile: OK
- import server 후 5개 도구 attribute 확인

런타임:
- mcp-server 재시작 시 신규 도구 자동 인식
- 클라이언트는 ListToolsAsync로 동적 수집 — 추가 작업 불필요

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-14 05:24:36 +09:00

1928 lines
76 KiB
Python

#!/usr/bin/env python3
"""
ExperionCrawler Unified MCP Server
- RAG: Qdrant + Ollama nomic-embed-text + vLLM (llm-model.json)
- NL2SQL: 자연어 → LLM SQL 생성 → PostgreSQL 실행
- 사용처:
stdio 모드 (기본): Claude Code MCP / Roo Code MCP
HTTP 모드 (--http): C# McpClient (localhost:5001)
"""
from __future__ import annotations
import sys
import os
import re
import json
import logging
import httpx
from functools import lru_cache
from mcp.server.fastmcp import FastMCP
logging.basicConfig(level=logging.WARNING, stream=sys.stderr)
# ── 설정 ──────────────────────────────────────────────────────────────────────
QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333")
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
EMBED_MODEL = os.environ.get("EMBED_MODEL", "nomic-embed-text")
VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8000/v1")
from config import get_vllm_model
VLLM_MODEL = get_vllm_model()
# Qdrant 컬렉션
COL_CODEBASE = "ws-65f457145aee80b2" # ExperionCrawler 소스코드
COL_OPC_DOCS = "experion-opc-docs" # Experion HS R530 OPC UA 공식 문서 (266 chunks)
# 사용자 KB 컬렉션 (kb_collections 시드 5종과 일치)
KB_COLLECTIONS = {
"system_instrument": "kb_system_instrument",
"plant_operation": "kb_plant_operation",
"procedure": "kb_procedure",
"report": "kb_report",
"vendor_doc": "kb_vendor_doc",
}
# PostgreSQL 연결
DB_CONNECTION_STRING = os.environ.get("DB_CONNECTION_STRING", "postgresql://postgres:postgres@localhost:5432/iiot_platform")
DB_TIMEOUT = int(os.environ.get("DB_TIMEOUT", "10"))
# C# McpClient(localhost:5001)와 통신: json_response+stateless로 단순 POST→JSON 방식
mcp = FastMCP(
"iiot-rag",
port=5001,
json_response=True,
stateless_http=True,
)
# Pipeline Imports
from pipeline.extractor import PidGeometricExtractor
from pipeline.topology import PidTopologyBuilder
from pipeline.mapper import IntelligentMapper
from pipeline.analyzer import PidAnalysisEngine
import networkx as nx
import asyncio
# ── 임베딩 (Ollama) ───────────────────────────────────────────────────────────
async def _embed(text: str) -> list[float]:
"""Ollama nomic-embed-text로 768-dim 벡터 생성."""
import asyncio
def _call_embed():
with httpx.Client(timeout=30) as client:
resp = client.post(
f"{OLLAMA_URL}/api/embeddings",
json={"model": EMBED_MODEL, "prompt": text},
)
resp.raise_for_status()
return resp.json()["embedding"]
return await asyncio.to_thread(_call_embed)
# ── LLM (vLLM) ──────────────────────────────────────────────────────
@lru_cache(maxsize=1)
def _llm():
from openai import OpenAI
return OpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
# ── PaddleOCR 싱글톤 (PDF fallback용) ──────────────────────────────────────────
@lru_cache(maxsize=1)
def _ocr():
"""PaddleOCR 인스턴스 (한/영, GPU). 첫 호출 시 ~50MB 모델 다운로드."""
from paddleocr import PaddleOCR
import os
use_gpu = os.environ.get("PADDLE_USE_GPU", "true").lower() == "true"
try:
ocr = PaddleOCR(
use_angle_cls=True,
lang="korean",
use_gpu=use_gpu,
show_log=False,
)
return ocr
except Exception as e:
# GPU 실패 시 CPU 폴백
if use_gpu:
os.environ["PADDLE_USE_GPU"] = "false"
return _ocr()
raise e
# ── DXF/PDF 텍스트 추출 헬퍼 ───────────────────────────────────────────────────
async def _extract_text_from_dxf(filepath: str) -> str:
"""ezdxf로 DXF 파일에서 텍스트 추출 (MTEXT 포맷 코드 제거)."""
import asyncio
import ezdxf
from ezdxf.tools.text import plain_mtext
def _extract():
doc = ezdxf.readfile(filepath)
msp = doc.modelspace()
texts = []
for entity in msp:
if entity.dxftype() == "TEXT":
texts.append(entity.dxf.text)
elif entity.dxftype() == "MTEXT":
try:
plain = plain_mtext(entity.dxf.text)
if plain.strip():
texts.append(plain)
except Exception:
pass
return "\n".join(texts)
return await asyncio.to_thread(_extract)
async def _extract_text_from_pdf(filepath: str) -> str:
"""PyMuPDF로 PDF 파일에서 텍스트 추출."""
import asyncio
import fitz # pymupdf
def _extract():
doc = fitz.open(filepath)
texts = []
for page in doc:
texts.append(page.get_text())
return "\n".join(texts)
return await asyncio.to_thread(_extract)
async def _extract_text_from_pdf_ocr(filepath: str) -> str:
"""PaddleOCR로 PDF에서 이미지 추출 후 OCR (고정밀도)."""
import asyncio
import fitz # pymupdf
from PIL import Image
import numpy as np
def _extract():
doc = fitz.open(filepath)
all_texts = []
for page_idx, page in enumerate(doc):
# 페이지를 이미지로 변환
mat = fitz.Matrix(300 / 72) # 300 DPI
pix = page.get_pixmap(matrix=mat)
img_data = pix.tobytes("png")
img = Image.open(__import__("io").BytesIO(img_data))
# OCR 실행
result = _ocr().ocr(np.array(img), cls=True)
if result[0]:
for line in result[0]:
all_texts.append(line[1][0])
return "\n".join(all_texts)
return await asyncio.to_thread(_extract)
async def _convert_dwg_to_dxf_dxflib(filepath: str) -> str:
"""libreoffice로 DWG를 DXF로 변환."""
import asyncio
import subprocess
import os
dxf_path = filepath.replace(".dwg", ".dxf")
def _convert():
try:
# LibreOffice로 변환
result = subprocess.run(
[
"libreoffice",
"--headless",
"--convert-to", "dxf:AutoCAD DXF",
"--outdir", os.path.dirname(filepath) or ".",
filepath
],
check=True,
timeout=120,
capture_output=True,
text=True
)
if os.path.exists(dxf_path):
return dxf_path
else:
raise FileNotFoundError("DXF 변환 파일이 생성되지 않았습니다.")
except subprocess.CalledProcessError as e:
raise Exception(f"LibreOffice 변환 실패: {e.stderr}")
return await asyncio.to_thread(_convert)
# ── Qdrant 검색 헬퍼 ──────────────────────────────────────────────────────────
async def _search(collection: str, query: str, top_k: int, threshold: float = 0.25) -> str:
import asyncio
def _call_embed():
return _embed(query)
vec = await _call_embed()
def _call_search():
with httpx.Client(timeout=20) as client:
resp = client.post(
f"{QDRANT_URL}/collections/{collection}/points/search",
json={
"vector": vec,
"limit": top_k,
"with_payload": True,
"score_threshold": threshold,
},
)
resp.raise_for_status()
return resp.json().get("result", [])
hits = await asyncio.to_thread(_call_search)
if not hits:
return "관련 결과 없음."
parts = []
for h in hits:
p = h.get("payload", {})
file_path = p.get("filePath", p.get("path", "unknown"))
chunk = p.get("codeChunk", p.get("content", p.get("text", "")))
start_line = p.get("startLine", "")
loc = f"{file_path}:{start_line}" if start_line else file_path
parts.append(f"[score={h['score']:.3f}] {loc}\n```\n{chunk[:700]}\n```")
return "\n\n---\n\n".join(parts)
async def _search_kb_collection(
qdrant_name: str,
vec: list[float],
top_k: int,
tags: list[str] | None = None,
) -> list[dict]:
"""KB 컬렉션 1개에 대해 의미 검색. 결과를 정규화된 dict 리스트로 반환."""
must = []
if tags:
must.append({"key": "tags", "match": {"any": tags}})
body: dict = {
"vector": vec,
"limit": top_k,
"with_payload": True,
"score_threshold": 0.20,
}
if must:
body["filter"] = {"must": must}
def _call():
with httpx.Client(timeout=20) as client:
resp = client.post(f"{QDRANT_URL}/collections/{qdrant_name}/points/search", json=body)
if resp.status_code == 404:
return []
resp.raise_for_status()
return resp.json().get("result", [])
try:
return await asyncio.to_thread(_call)
except Exception as e:
logging.warning(f"[search_kb] {qdrant_name} 검색 실패: {e}")
return []
def _recency_factor(uploaded_at_iso: str | None) -> float:
"""uploaded_at 기준 최신 가중치. 최근 7일 +10%, 30일 +5%, 90일 +2%, 그 외 1.0."""
if not uploaded_at_iso:
return 1.0
try:
from datetime import datetime, timezone
ts = datetime.fromisoformat(uploaded_at_iso.replace("Z", "+00:00"))
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - ts).total_seconds() / 86400.0
if age < 7: return 1.10
if age < 30: return 1.05
if age < 90: return 1.02
return 1.0
except Exception:
return 1.0
# ── DB 헬퍼 ──────────────────────────────────────────────────────────────────
async def _get_db_connection():
"""PostgreSQL DB 연결 획득."""
import asyncio
import psycopg
def _connect():
return psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
return await asyncio.to_thread(_connect)
def _validate_sql(sql: str) -> tuple[bool, str]:
"""SQL 안전 검증 — SELECT/WITH만 허용, 위험 키워드 차단."""
if len(sql) > 2000:
return False, "쿼리 길이 2000자를 초과했습니다."
dangerous = ['EXEC', 'DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE', 'GRANT', 'REVOKE', 'TRUNCATE', 'COPY']
sql_upper = sql.upper()
for kw in dangerous:
if re.search(rf"\b{kw}\b", sql_upper):
return False, f"허용되지 않은 키워드 '{kw}'를 사용했습니다."
head = sql_upper.lstrip().lstrip('(').lstrip()
if not (head.startswith('SELECT') or head.startswith('WITH')):
return False, "SELECT 또는 WITH 쿼리만 허용됩니다."
if '..' in sql or '~' in sql:
return False, "파일 경로 표현은 허용되지 않습니다."
if ';' in sql.rstrip().rstrip(';'):
return False, "다중 문장(세미콜론)은 허용되지 않습니다."
return True, ""
# SQL 가드 — auto-LIMIT + statement_timeout
SQL_MAX_ROWS = int(os.environ.get("SQL_MAX_ROWS", "1000"))
SQL_STATEMENT_TIMEOUT_MS = int(os.environ.get("SQL_STATEMENT_TIMEOUT_MS", "30000"))
_RE_LIMIT_TAIL = re.compile(r"\bLIMIT\b\s+\d+(\s+OFFSET\s+\d+)?\s*$", re.IGNORECASE)
def _apply_sql_guards(sql: str, max_rows: int = SQL_MAX_ROWS) -> str:
"""LIMIT가 없으면 서브쿼리로 감싸 강제 부착. 이미 끝부분에 LIMIT가 있으면 그대로."""
s = sql.strip().rstrip(';').strip()
if _RE_LIMIT_TAIL.search(s):
return s
return f"SELECT * FROM ({s}) _capped LIMIT {max_rows}"
# DB 스키마 — LLM SQL 생성 시 컨텍스트로 사용
_DB_SCHEMA = """
PostgreSQL 시계열 데이터베이스 스키마
테이블: history_table (시계열 이력)
tagname TEXT - 태그명 (모두 소문자, 예: 'ficq-6113.pv') — 대소문자 구분
node_id TEXT - OPC UA 노드 ID
value TEXT - 측정값, 수치 연산 시 ::double precision 캐스트 필요
recorded_at TIMESTAMPTZ - 기록 시각(UTC), 스냅샷 주기 약 60초
테이블: realtime_table (실시간 최신값)
tagname TEXT - 태그명 (모두 소문자)
node_id TEXT - OPC UA 노드 ID
livevalue TEXT - 현재값
timestamp TIMESTAMPTZ - 최종 갱신 시각
테이블: tag_metadata (태그 메타데이터 - 변경 드묾)
base_tag TEXT - 기본 태그명 (예: 'ficq-6101', 'xv-6124')
attribute TEXT - 속성명 ('desc', 'area')
value TEXT - 메타데이터 값
node_id TEXT - OPC UA 노드 ID
loaded_at TIMESTAMPTZ - 마지막 로드 시각
테이블: event_history_table (디지털 포인트 상태 변경 이벤트)
id BIGSERIAL - PK
tagname TEXT - 태그명 (소문자)
node_id TEXT
prev_value TEXT - 직전 값
curr_value TEXT - 현재 값
event_type TEXT - 'ALARM' / 'TRIP' / 'NORMAL' / 'RUN' / 'CHANGE'
event_time TIMESTAMPTZ - 이벤트 발생 시각(UTC)
area TEXT - tag_metadata.area 복사본
section TEXT - 태그명 패턴에서 추출한 차수(예: '6-1차')
duration_seconds INT - 직전 상태에서 머문 시간
metadata JSONB - 부가 정보 (interlock 등)
created_at TIMESTAMPTZ
뷰: v_tag_summary (실시간값 + 메타데이터 통합 뷰)
base_tag TEXT - 기본 태그명
pv TEXT - 현재 프로세스 값
sp TEXT - 설정값
op TEXT - 출력값
instate0 TEXT - 상태 비트 0 (true/false)
instate1 TEXT - 상태 비트 1 (true/false)
instate2 TEXT - 상태 비트 2 (true/false)
description TEXT - 장비 설명 (tag_metadata.desc)
area TEXT - 소속 플랜트 (tag_metadata.area)
새로운 태그 타입:
- 아날로그: ficq-6101.pv/sp/op (Double)
- 디지털 XV: xv-6124.pv/op (Int32), xv-6124.instate0~7 (Boolean)
- Pump: p-6102.pv/op (Int32), p-6102.instate0~7 (Boolean)
- 메타데이터: desc (String), area (Enum)
BCD 상태 조회 팁:
- instate0~7은 Boolean (true/false)
- pv 값이 EnumValueType 형식인 경우 `{코드 | DisplayName | }`에서 DisplayName으로 상태 확인 가능
- v_tag_summary 뷰를 사용하면 실시간값+메타데이터 한 번에 조회 가능
N분 간격 집계 공식 (time_bucket 금지, date_trunc 사용):
1분 버킷: date_trunc('minute', recorded_at) AS bucket
2분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120) AS bucket
5분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/300)*300) AS bucket
10분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/600)*600) AS bucket
N분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)) AS bucket
예시 (2분 간격, 여러 태그):
SELECT to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120) AS bucket,
tagname, AVG(value::double precision) AS avg_val
FROM history_table
WHERE tagname IN ('tag1', 'tag2')
AND recorded_at >= NOW() - INTERVAL '3 hours'
GROUP BY bucket, tagname ORDER BY bucket, tagname
규칙:
- SELECT만 허용 (INSERT/UPDATE/DELETE/DROP 등 불가)
- tagname은 모두 소문자로 정확히 입력
- value 컬럼은 TEXT이므로 집계 시 ::double precision 캐스트 필수
- time_bucket 함수 사용 금지 — 위의 to_timestamp/FLOOR/EPOCH 공식 사용
"""
# ── RAG 도구 ─────────────────────────────────────────────────────────────────
@mcp.tool()
async def search_codebase(query: str, top_k: int = 6) -> str:
"""ExperionCrawler 프로젝트 소스코드 검색 (우리가 개발한 .NET 8 C# 코드).
Experion HS R530 공식 문서가 아닌, ExperionCrawler 구현 코드를 검색함.
사용 시점: ExperionCrawler 코드의 구현 방법, 버그, 구조를 알고 싶을 때.
⚠️ Experion HS R530 제품 동작/설정/스펙을 알고 싶으면 search_r530_docs 사용.
Args:
query: 검색어 (예: "OPC UA 구독 시작", "히스토리 스냅샷", "TextToSql 서비스")
top_k: 반환 결과 수 (기본 6)
"""
return await _search(COL_CODEBASE, query, top_k)
@mcp.tool()
async def search_r530_docs(query: str, top_k: int = 5) -> str:
"""Honeywell Experion HS R530 공식 제품 문서 검색.
ExperionCrawler 코드가 아닌, Honeywell 공식 HTM 문서를 검색함.
사용 시점: Experion HS R530의 OPC UA 설정, 인증서, 보안 정책, 포인트 주소 형식,
채널/컨트롤러 속성, 문제해결 등 제품 스펙과 동작을 알고 싶을 때.
Args:
query: 검색어 (예: "certificate configuration", "endpoint security policy")
top_k: 반환 결과 수 (기본 5)
"""
return await _search(COL_OPC_DOCS, query, top_k)
@mcp.tool()
def ask_iiot_llm(question: str, context: str = "") -> str:
"""LLM에게 IIoT/OPC UA 질문 (컨텍스트 없이 LLM 직접 질문).
사용 시점: search_codebase 또는 search_r530_docs 결과를 context로 넘겨
종합 분석·답변이 필요할 때. 또는 일반 IIoT/OPC UA 개념 질문.
Args:
question: 질문 내용
context: (선택) search_codebase 또는 search_r530_docs 검색 결과
"""
system = (
"당신은 IIoT(산업용 IoT), OPC UA, Honeywell Experion PKS/HS R530 전문가입니다.\n"
"컨텍스트가 제공된 경우 컨텍스트를 우선 근거로 삼아 한국어로 답변합니다.\n"
"컨텍스트 출처가 'Experion HS R530 공식 문서'인지 'ExperionCrawler 코드'인지 명확히 구분하여 설명합니다."
)
user_msg = f"컨텍스트:\n{context}\n\n질문: {question}" if context else question
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_msg},
],
max_tokens=2048,
temperature=0.1,
)
return resp.choices[0].message.content or "(응답 없음)"
@mcp.tool()
async def rag_query(
question: str,
search_code: bool = False,
search_docs: bool = True,
search_kb: bool = False,
kb_collections: list[str] | None = None,
) -> str:
"""검색 → LLM 답변 생성 (통합 RAG).
기본값: Experion HS R530 공식 문서만 검색.
사용자 KB 검색을 포함하려면 search_kb=True. 코드 검색은 search_code=True.
Args:
question: 질문
search_docs: Experion HS R530 공식 문서 검색 여부 (기본 True)
search_code: ExperionCrawler 소스코드 검색 여부 (기본 False)
search_kb: 사용자 KB 검색 여부 (기본 False)
kb_collections: 검색 대상 KB 컬렉션 키 목록. None이면 전체.
예: ["plant_operation", "procedure"]
"""
context_parts: list[str] = []
if search_docs:
context_parts.append(f"=== Experion HS R530 공식 문서 ===\n{await _search(COL_OPC_DOCS, question, 4)}")
if search_code:
context_parts.append(f"=== ExperionCrawler 구현 코드 ===\n{await _search(COL_CODEBASE, question, 3)}")
if search_kb:
kb_text = await _format_kb_results(question, kb_collections, top_k=6)
context_parts.append(f"=== 사용자 지식 베이스 ===\n{kb_text}")
return ask_iiot_llm(question, "\n\n".join(context_parts))
async def _format_kb_results(
query: str,
collection_keys: list[str] | None,
top_k: int,
tags: list[str] | None = None,
since: str | None = None,
boost_recent: bool = True,
) -> str:
"""search_kb 내부 헬퍼: 다중 컬렉션 의미검색 후 인용 텍스트로 직렬화."""
hits = await _search_kb_raw(query, collection_keys, top_k, tags, since, boost_recent)
if not hits:
return "관련 KB 결과 없음."
parts = []
for h in hits:
title = h.get("title") or "(제목없음)"
loc = h.get("locator") or ""
score = h.get("score", 0.0)
text = (h.get("text") or "").strip()
# 인용 헤더: "[score=0.812] 정비이력_2026Q1.xlsx > 시트:Pump-A > 행 12"
loc_str = f" > {loc}" if loc else ""
parts.append(f"[score={score:.3f}] {title}{loc_str}\n{text[:700]}")
return "\n\n---\n\n".join(parts)
async def _search_kb_raw(
query: str,
collection_keys: list[str] | None,
top_k: int,
tags: list[str] | None,
since: str | None,
boost_recent: bool,
) -> list[dict]:
"""KB 검색 핵심 로직 — 다중 컬렉션 의미검색 + 최신 가중치 + 후필터."""
targets = collection_keys or list(KB_COLLECTIONS.keys())
qdrant_names = [KB_COLLECTIONS[k] for k in targets if k in KB_COLLECTIONS]
if not qdrant_names:
return []
vec = await _embed(query)
per_coll_k = max(top_k, 8)
results: list[dict] = []
for qname in qdrant_names:
hits = await _search_kb_collection(qname, vec, per_coll_k, tags=tags)
for h in hits:
p = h.get("payload", {})
uploaded_at = p.get("uploaded_at")
if since and uploaded_at:
try:
if uploaded_at < since:
continue
except Exception:
pass
base_score = h.get("score", 0.0)
recency = _recency_factor(uploaded_at) if boost_recent else 1.0
results.append({
"score": base_score * recency,
"raw_score": base_score,
"doc_id": p.get("doc_id"),
"collection_key": p.get("collection_key"),
"title": p.get("title"),
"text": p.get("text", ""),
"chunk_kind": p.get("chunk_kind"),
"locator": p.get("locator"),
"uploaded_at": uploaded_at,
"tags": p.get("tags") or [],
})
# 점수 내림차순 정렬, 동일 doc_id 중복 dedup(최고점만)
results.sort(key=lambda r: r["score"], reverse=True)
seen: set[str] = set()
unique: list[dict] = []
for r in results:
key = f'{r.get("doc_id")}::{r.get("locator")}'
if key in seen:
continue
seen.add(key)
unique.append(r)
if len(unique) >= top_k:
break
return unique
@mcp.tool()
async def search_kb(
query: str,
collection_keys: list[str] | None = None,
top_k: int = 8,
tags: list[str] | None = None,
since: str | None = None,
boost_recent: bool = True,
) -> str:
"""사용자 지식 베이스(KB) 다중 컬렉션 의미 검색.
관리탭에서 업로드/인덱싱한 문서에서 질의와 의미적으로 가까운 청크를 찾는다.
Args:
query: 검색어 또는 자연어 질문
collection_keys: 대상 컬렉션 키 목록. None이면 전체.
가능한 값: system_instrument, plant_operation,
procedure, report, vendor_doc
top_k: 반환 결과 수 (기본 8)
tags: 태그 필터 (any 매칭). 예: ["unit-a", "P-6201"]
since: 이 ISO 시각 이후 업로드된 문서만. 예: "2026-04-01T00:00:00Z"
boost_recent: True이면 uploaded_at 기준 최신 가중치 적용 (기본 True)
Returns:
JSON 문자열: { success, count, hits: [{ doc_id, collection_key, title,
text, chunk_kind, locator, score, uploaded_at, tags }, ...] }
"""
try:
hits = await _search_kb_raw(query, collection_keys, top_k, tags, since, boost_recent)
return json.dumps(
{"success": True, "count": len(hits), "hits": hits},
ensure_ascii=False,
default=str,
)
except Exception as e:
return json.dumps({"success": False, "error": f"search_kb 실패: {e}"}, ensure_ascii=False)
# ── NL2SQL 도구 ───────────────────────────────────────────────────────────────
async def _execute_sql_internal(sql: str) -> str:
"""SQL 실행 내부 함수 (run_sql과 query_with_nl에서 공유).
가드: LIMIT 미지정 시 자동 LIMIT 부착(SQL_MAX_ROWS), statement_timeout 적용.
"""
valid, err = _validate_sql(sql)
if not valid:
return json.dumps({"success": False, "error": f"SQL 검증 실패: {err}"}, ensure_ascii=False)
capped_sql = _apply_sql_guards(sql)
conn = None
try:
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
cur.execute(capped_sql)
rows = cur.fetchall()
columns = [desc[0] for desc in cur.description]
result_data = [dict(zip(columns, row)) for row in rows]
return json.dumps({
"success": True,
"columns": columns,
"count": len(result_data),
"row_limit": SQL_MAX_ROWS,
"data": result_data
}, ensure_ascii=False, default=str)
except Exception as e:
return json.dumps({"success": False, "error": f"SQL 실행 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def run_sql(sql: str) -> str:
"""SQL 쿼리 실행 (SELECT/WITH만 허용).
안전 가드:
- 위험 키워드(INSERT/UPDATE/DELETE/DROP/ALTER/CREATE/GRANT/REVOKE/TRUNCATE/COPY/EXEC) 차단
- 다중 문장(세미콜론) 차단
- LIMIT 미지정 시 SQL_MAX_ROWS(기본 1000)로 자동 제한
- statement_timeout = SQL_STATEMENT_TIMEOUT_MS(기본 30s)
Args:
sql: 실행할 SELECT/WITH SQL 문자열 (최대 2000자)
Returns:
JSON: { success, columns, count, row_limit, data } 또는 { success, error }
"""
return await _execute_sql_internal(sql)
@mcp.tool()
async def query_pv_history(tag_names: list[str], time_from: str, time_to: str, limit: int = 100) -> str:
"""과거 값(PV) 히스토리 조회.
Args:
tag_names: 태그 이름 목록 (예: ["ficq-6113.pv", "ti-6101.pv"])
time_from: 시작 시간 (ISO 8601, 예: "2026-04-01T00:00:00")
time_to: 종료 시간 (ISO 8601, 예: "2026-04-02T00:00:00")
limit: 반환 행 수 제한 (기본 100, 최대 5000)
Returns:
JSON: { success, tag_names, time_range, limit, data }
"""
conn = None
try:
limit = min(limit, 5000)
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(
"""SELECT tagname, recorded_at, value
FROM history_table
WHERE tagname = ANY(%s)
AND recorded_at >= %s AND recorded_at <= %s
ORDER BY recorded_at, tagname
LIMIT %s""",
(tag_names, time_from, time_to, limit)
)
rows = cur.fetchall()
data = [{"tag_name": r[0], "timestamp": r[1].isoformat(), "value": r[2]} for r in rows]
return json.dumps({
"success": True,
"tag_names": tag_names,
"time_range": f"{time_from} ~ {time_to}",
"count": len(data),
"data": data
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"히스토리 쿼리 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def get_tag_metadata(query: str, limit: int = 10) -> str:
"""태그 메타데이터 검색 (realtime_table 기반).
Args:
query: 태그명 검색어 (패턴 매칭)
limit: 반환 태그 수 제한 (기본 10)
Returns:
JSON: { success, query, count, tags }
"""
conn = None
try:
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(
"""SELECT tagname, livevalue, timestamp, node_id
FROM realtime_table
WHERE tagname ILIKE %s
ORDER BY tagname LIMIT %s""",
(f"%{query}%", limit)
)
rows = cur.fetchall()
tags = [{"tag_name": r[0], "current_value": r[1],
"last_updated": r[2].isoformat() if r[2] else None,
"node_id": r[3]} for r in rows]
return json.dumps({"success": True, "query": query, "count": len(tags), "tags": tags},
ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"태그 메타데이터 검색 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def list_drawings(unit_no: str | None = None) -> str:
"""단위별 도면 목록 조회 (node_map_master.name 기반).
Args:
unit_no: 단위 번호 접두사 (예: "A", "B"). None이면 전체 목록
Returns:
JSON: { success, unit_no, count, names }
"""
conn = None
try:
conn = await _get_db_connection()
with conn.cursor() as cur:
if unit_no:
cur.execute(
"SELECT DISTINCT name FROM node_map_master WHERE name ILIKE %s ORDER BY name LIMIT 100",
(f"{unit_no}%",)
)
else:
cur.execute("SELECT DISTINCT name FROM node_map_master ORDER BY name LIMIT 100")
rows = cur.fetchall()
return json.dumps({"success": True, "unit_no": unit_no,
"count": len(rows), "names": [r[0] for r in rows]},
ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"도면 목록 조회 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def query_with_nl(question: str) -> str:
"""자연어 질문을 LLM이 SQL로 변환하고 시계열 DB를 조회합니다.
Args:
question: 자연어 질문 (예: "FICQ-6113.PV 최근 1시간 값을 1분 단위로 표시")
Returns:
JSON: { sql, success, columns, count, data } 또는 { sql, success, error }
"""
import asyncio
import json as json_module
system = (
"You are a PostgreSQL SQL expert.\n"
"Convert the user's question into a SELECT SQL using the schema below.\n"
"IMPORTANT rules:\n"
"- Use ONLY PostgreSQL syntax. No DATE_FORMAT, no INTERVAL N DAY.\n"
"- Time column is 'recorded_at' (TIMESTAMPTZ). Do NOT use 'timestamp'.\n"
"- NEVER use time_bucket(). For N-minute buckets use to_timestamp/FLOOR/EPOCH formula.\n"
"- INTERVAL rule:\n"
" * If the question specifies an interval (e.g. '2분 간격', '5-minute interval'):\n"
" use: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)) AS bucket\n"
" with GROUP BY bucket, tagname and AVG(value::double precision) AS avg_val\n"
" * If NO interval is specified: SELECT recorded_at, tagname, value — NO GROUP BY.\n"
"- Current year is 2026. '4월 27일' means 2026-04-27.\n"
"- All times in DB are UTC. Korean input is KST (UTC+9). Convert: KST 12:00 = UTC 03:00.\n"
"- value column is TEXT; cast with ::double precision only when aggregating.\n"
"- All tagnames are lowercase (e.g. 'ficq-6113.pv'). Match exactly.\n"
"- PostgreSQL LIKE: dot has no special meaning, no escaping needed.\n"
"- Return ONLY the SQL statement. No explanation, no markdown.\n\n"
f"{_DB_SCHEMA}"
)
try:
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": question},
],
max_tokens=8192,
temperature=0.1,
)
resp = await asyncio.to_thread(_call_llm)
sql = (resp.choices[0].message.content or "").strip()
# 마크다운 코드 블록 제거
if sql.startswith("```"):
lines = sql.splitlines()
sql = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]).strip()
if not sql:
return json.dumps({"success": False, "sql": "", "error": "LLM이 SQL을 생성하지 못했습니다."}, ensure_ascii=False)
except Exception as e:
return json.dumps({"success": False, "sql": "", "error": f"LLM SQL 생성 실패: {e}"}, ensure_ascii=False)
# SQL 실행
raw = await _execute_sql_internal(sql)
result = json.loads(raw)
result["sql"] = sql
# long format → pivot 변환 (tagname 컬럼이 있으면 자동 PIVOT)
if result.get("success") and "data" in result:
cols = result.get("columns", [])
data = result["data"]
if "tagname" in cols and data:
time_col = next((c for c in cols if c not in ("tagname", "value", "livevalue", "avg_val")), None)
val_col = next((c for c in ("avg_val", "value") if c in cols), cols[-1])
if time_col:
tag_names_list = sorted(dict.fromkeys(row["tagname"] for row in data))
pivoted: dict = {}
for row in data:
key = str(row[time_col])
if key not in pivoted:
pivoted[key] = {time_col: row[time_col]}
pivoted[key][row["tagname"]] = row.get(val_col)
result["data"] = list(pivoted.values())
result["columns"] = [time_col] + tag_names_list
result["count"] = len(result["data"])
return json.dumps(result, ensure_ascii=False, default=str)
# ── Phase 6 보강 도구 (이벤트/태그/보고서) ─────────────────────────────────────
_VALID_EVENT_TYPES = ("ALARM", "TRIP", "NORMAL", "RUN", "CHANGE")
@mcp.tool()
async def find_tags(query: str, area: str | None = None, top_k: int = 20) -> str:
"""태그 검색 — base_tag/설명(desc)/area 통합 검색 (v_tag_summary 뷰 기반).
사용 시점: 사용자가 "온도", "Tower 1 압력", "운전 중인 펌프" 같은 자연어로
태그를 지칭할 때 실제 base_tag(예: 'ti-6101', 'p-6102')를 역으로 찾기 위해.
get_tag_metadata와 차이: 단순 tagname LIKE만 보지 않고 description/area에도
매칭하며, 현재 PV/SP/OP/description/area를 함께 반환.
Args:
query: 검색어 (base_tag 또는 description 부분 일치, 대소문자 무시)
area: (선택) area 필터 (예: 'tower-1', 'utility'). NULL이면 전체
top_k: 반환 태그 수 (기본 20, 최대 100)
Returns:
JSON: { success, query, count, tags: [{base_tag, pv, sp, op, description, area}] }
"""
conn = None
try:
top_k = max(1, min(top_k, 100))
q = f"%{query.strip()}%"
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
if area:
cur.execute(
"""SELECT base_tag, pv, sp, op, description, area
FROM v_tag_summary
WHERE (base_tag ILIKE %s OR description ILIKE %s)
AND area = %s
ORDER BY base_tag
LIMIT %s""",
(q, q, area, top_k)
)
else:
cur.execute(
"""SELECT base_tag, pv, sp, op, description, area
FROM v_tag_summary
WHERE base_tag ILIKE %s OR description ILIKE %s
ORDER BY base_tag
LIMIT %s""",
(q, q, top_k)
)
rows = cur.fetchall()
tags = [
{"base_tag": r[0], "pv": r[1], "sp": r[2], "op": r[3],
"description": r[4], "area": r[5]}
for r in rows
]
return json.dumps({
"success": True, "query": query, "area": area,
"count": len(tags), "tags": tags
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"태그 검색 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def query_events(
tag_name: str | None = None,
event_type: str | None = None,
area: str | None = None,
since: str | None = None,
until: str | None = None,
limit: int = 100,
) -> str:
"""이벤트 히스토리 조회 (event_history_table — 디지털 포인트 상태 변경).
Args:
tag_name: (선택) 태그명 LIKE 패턴 (예: 'p-6102', 'xv-%')
event_type: (선택) ALARM / TRIP / NORMAL / RUN / CHANGE 중 하나
area: (선택) area 정확 매칭
since: (선택) 시작 시간 ISO 8601. 기본 24시간 전
until: (선택) 종료 시간 ISO 8601. 기본 현재
limit: 반환 행 수 (기본 100, 최대 1000)
Returns:
JSON: { success, count, time_range, events: [...] }
"""
if event_type and event_type.upper() not in _VALID_EVENT_TYPES:
return json.dumps({
"success": False,
"error": f"event_type은 {list(_VALID_EVENT_TYPES)} 중 하나여야 합니다."
}, ensure_ascii=False)
conn = None
try:
limit = max(1, min(limit, 1000))
where = ["event_time >= COALESCE(%s::timestamptz, NOW() - INTERVAL '24 hours')",
"event_time <= COALESCE(%s::timestamptz, NOW())"]
params: list = [since, until]
if tag_name:
where.append("tagname ILIKE %s")
params.append(f"%{tag_name}%")
if event_type:
where.append("event_type = %s")
params.append(event_type.upper())
if area:
where.append("area = %s")
params.append(area)
sql = f"""
SELECT id, tagname, prev_value, curr_value, event_type, event_time,
area, section, duration_seconds, metadata
FROM event_history_table
WHERE {' AND '.join(where)}
ORDER BY event_time DESC
LIMIT %s
"""
params.append(limit)
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
cur.execute(sql, params)
rows = cur.fetchall()
events = [
{"id": r[0], "tag_name": r[1], "prev_value": r[2], "curr_value": r[3],
"event_type": r[4], "event_time": r[5].isoformat() if r[5] else None,
"area": r[6], "section": r[7], "duration_seconds": r[8], "metadata": r[9]}
for r in rows
]
return json.dumps({
"success": True,
"count": len(events),
"time_range": f"{since or '24h ago'} ~ {until or 'now'}",
"filters": {"tag_name": tag_name, "event_type": event_type, "area": area},
"events": events,
}, ensure_ascii=False, indent=2, default=str)
except Exception as e:
return json.dumps({"success": False, "error": f"이벤트 조회 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def active_alarms(area: str | None = None, limit: int = 100) -> str:
"""현재 활성 알람 — 각 태그의 최신 이벤트가 ALARM 또는 TRIP인 경우만 반환.
동작: event_history_table에서 태그별 최신 이벤트(DISTINCT ON)를 가져온 뒤,
event_type이 ALARM 또는 TRIP인 것만 필터. 이후 NORMAL이 들어왔다면 해제된 것이므로 제외.
Args:
area: (선택) area 필터
limit: 최대 반환 수 (기본 100)
Returns:
JSON: { success, count, alarms: [{tag_name, event_type, since, duration_seconds, area, ...}] }
"""
conn = None
try:
limit = max(1, min(limit, 500))
sql = """
WITH latest AS (
SELECT DISTINCT ON (tagname)
id, tagname, curr_value, event_type, event_time,
area, section, duration_seconds, metadata
FROM event_history_table
WHERE event_time >= NOW() - INTERVAL '30 days'
ORDER BY tagname, event_time DESC
)
SELECT id, tagname, curr_value, event_type, event_time,
area, section, duration_seconds, metadata,
EXTRACT(EPOCH FROM (NOW() - event_time))::bigint AS age_seconds
FROM latest
WHERE event_type IN ('ALARM', 'TRIP')
AND (%s::text IS NULL OR area = %s)
ORDER BY event_time DESC
LIMIT %s
"""
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
cur.execute(sql, (area, area, limit))
rows = cur.fetchall()
alarms = [
{"id": r[0], "tag_name": r[1], "curr_value": r[2], "event_type": r[3],
"since": r[4].isoformat() if r[4] else None,
"area": r[5], "section": r[6], "duration_seconds": r[7], "metadata": r[8],
"age_seconds": r[9]}
for r in rows
]
return json.dumps({
"success": True, "area": area,
"count": len(alarms), "alarms": alarms,
}, ensure_ascii=False, indent=2, default=str)
except Exception as e:
return json.dumps({"success": False, "error": f"활성 알람 조회 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def summarize_events(
since: str | None = None,
area: str | None = None,
event_type: str | None = None,
max_events: int = 200,
focus: str = "",
) -> str:
"""이벤트 히스토리를 LLM으로 한글 요약.
내부 동작: query_events로 이벤트를 가져와 LLM에 요약 요청. 큰 건수는 잘림.
Args:
since: (선택) ISO 8601 시작 시간. 기본 24시간 전
area: (선택) area 필터
event_type: (선택) event_type 필터
max_events: 분석에 포함할 최대 이벤트 (기본 200, 최대 500)
focus: (선택) 요약 시 강조할 관점 (예: "interlock 동작", "교대 시점 이상")
Returns:
JSON: { success, summary, stats: {by_type, by_area, count} }
"""
max_events = max(10, min(max_events, 500))
raw = await query_events(event_type=event_type, area=area, since=since, limit=max_events)
parsed = json.loads(raw)
if not parsed.get("success"):
return raw
events = parsed.get("events", [])
if not events:
return json.dumps({
"success": True, "summary": "지정된 조건 범위에 이벤트가 없습니다.",
"stats": {"count": 0}
}, ensure_ascii=False, indent=2)
# 통계
by_type: dict[str, int] = {}
by_area: dict[str, int] = {}
for ev in events:
by_type[ev["event_type"]] = by_type.get(ev["event_type"], 0) + 1
a = ev.get("area") or "(unknown)"
by_area[a] = by_area.get(a, 0) + 1
# LLM 요약 — 가벼운 토큰 수로 제한
sample = events[:max_events]
bullet_lines = [
f"- [{ev['event_type']}] {ev['tag_name']} @ {ev['event_time']}"
f" ({ev.get('prev_value')}{ev.get('curr_value')})"
f"{(' area=' + ev['area']) if ev.get('area') else ''}"
for ev in sample
]
focus_line = f"\n특히 다음 관점을 우선해 설명하세요: {focus}\n" if focus else ""
system = (
"당신은 IIoT/공장 운전 분석 전문가입니다. 디지털 포인트의 상태 변경 이벤트 로그를 보고 "
"한국어로 6~10줄 요약을 만듭니다. 다음 구조를 따릅니다:\n"
"1) 핵심 현황 (총 이벤트 수, 주요 area)\n"
"2) 알람/트립 (ALARM/TRIP) 핵심 케이스 — 태그·시각·전후값\n"
"3) 패턴/특이점 (반복 발생, 동시 발생, area 집중)\n"
"4) 다음 점검 권고 (있다면)\n"
"구체적인 태그명과 시각을 포함하되 추측은 자제합니다."
)
user_msg = (
f"분석 대상 이벤트 {len(sample)}건 (전체 {parsed.get('count')}건). "
f"통계: type={by_type}, area={by_area}.{focus_line}\n\n"
+ "\n".join(bullet_lines)
)
def _call():
return _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_msg},
],
max_tokens=1200,
temperature=0.2,
)
try:
resp = await asyncio.to_thread(_call)
summary = resp.choices[0].message.content or "(요약 없음)"
except Exception as e:
summary = f"(LLM 요약 실패: {e})"
return json.dumps({
"success": True,
"summary": summary,
"stats": {"count": parsed.get("count"), "by_type": by_type, "by_area": by_area},
"time_range": parsed.get("time_range"),
}, ensure_ascii=False, indent=2)
@mcp.tool()
async def generate_status_report(area: str | None = None, hours: int = 24) -> str:
"""공장 운전 상태 종합 보고서 — 활성 알람 + 최근 이벤트 + 추세를 LLM이 한 장으로 정리.
Args:
area: (선택) area 필터 (전체 공장이면 NULL)
hours: 이벤트 분석 윈도우 (기본 24시간, 최대 168=7일)
Returns:
JSON: { success, report, sections: {active_alarms, recent_events, by_type}, generated_at }
"""
hours = max(1, min(hours, 168))
since_iso = None # query_events가 24h 기본을 쓰지만, hours로 명시 전달
from datetime import datetime, timezone, timedelta
since_iso = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
# 1) 활성 알람
alarms_raw = await active_alarms(area=area, limit=50)
alarms = json.loads(alarms_raw).get("alarms", [])
# 2) 최근 이벤트 통계
events_raw = await query_events(area=area, since=since_iso, limit=500)
events_parsed = json.loads(events_raw)
events = events_parsed.get("events", [])
by_type: dict[str, int] = {}
for ev in events:
by_type[ev["event_type"]] = by_type.get(ev["event_type"], 0) + 1
# 3) LLM 보고서
alarm_lines = [
f"- [{a['event_type']}] {a['tag_name']} since {a['since']} "
f"({a.get('age_seconds', 0)}s){' area=' + a['area'] if a.get('area') else ''}"
for a in alarms[:30]
] or ["- 활성 알람 없음"]
recent_lines = [
f"- [{ev['event_type']}] {ev['tag_name']} @ {ev['event_time']} "
f"({ev.get('prev_value')}{ev.get('curr_value')})"
for ev in events[:40]
] or ["- 최근 이벤트 없음"]
system = (
"당신은 공장 운전 교대 보고서를 작성하는 운전원 보조 AI입니다. "
"다음 형식으로 한국어 보고서를 작성하세요. 마크다운 사용 가능.\n\n"
"# 운전 상태 종합 보고서\n\n"
"## 1. 요약\n(2~3줄로 핵심 상황)\n\n"
"## 2. 활성 알람\n(있으면 표 또는 bullet, 없으면 \"없음\")\n\n"
"## 3. 최근 N시간 이벤트 분석\n(주요 패턴, 빈번한 태그, 동시 발생)\n\n"
"## 4. 권고 조치\n(점검할 태그/area, 우선순위)\n\n"
"수치는 통계 데이터를 그대로 인용하고, 추측은 명시적으로 표시하세요."
)
user_msg = (
f"대상 area: {area or '전체'}\n"
f"분석 윈도우: 최근 {hours}시간\n"
f"이벤트 통계 (type별): {by_type}\n"
f"활성 알람 {len(alarms)}건:\n" + "\n".join(alarm_lines) + "\n\n"
f"최근 이벤트 표본 (최대 40건):\n" + "\n".join(recent_lines)
)
def _call():
return _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_msg},
],
max_tokens=2048,
temperature=0.2,
)
try:
resp = await asyncio.to_thread(_call)
report = resp.choices[0].message.content or "(보고서 생성 실패)"
except Exception as e:
report = f"(LLM 보고서 실패: {e})"
from datetime import datetime as _dt, timezone as _tz
return json.dumps({
"success": True,
"report": report,
"sections": {
"active_alarms_count": len(alarms),
"recent_events_count": len(events),
"by_type": by_type,
"window_hours": hours,
"area": area,
},
"generated_at": _dt.now(_tz.utc).isoformat(),
}, ensure_ascii=False, indent=2)
# ── P&ID 추출 도구 ──────────────────────────────────────────────────────────────
@mcp.tool()
async def extract_pid_tags(text: str, source_type: str) -> str:
"""P&ID 도면(DXF/PDF)에서 태그 정보를 추출합니다.
Args:
text: DXF/PDF에서 추출한 텍스트
source_type: 'dxf' 또는 'pdf'
Returns:
JSON: { success, count, tags: [{tagNo, equipmentName, instrumentType, lineNumber, pidDrawingNo, confidence}] }
"""
import asyncio
import logging
import re
import json as json_module
system = (
"You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
"Extract all instrument and equipment tags from the provided text.\n"
"Return ONLY a valid JSON array. Each element must have exactly these fields:\n"
'{"tagNo":"FCV-101","equipmentName":null,"instrumentType":"FCV","lineNumber":null,"pidDrawingNo":null,"confidence":0.95}\n'
"Rules:\n"
"- tagNo: any token matching [LETTERS]-[DIGITS] or [LETTERS]-[DIGITS]-[SUFFIX]\n"
" Examples: FCV-101, P-10101, T-10100, VG-6203-15A-F1A-n, BT-6200, DP-10101\n"
"- instrumentType: leading letters of tagNo (e.g. FCV, P, T, VG, BT, DP, PSV)\n"
"- equipmentName: descriptive name if present in text near the tag, else null\n"
"- lineNumber: null unless a line number is explicitly associated\n"
"- pidDrawingNo: null unless a drawing number is explicitly associated\n"
"- confidence: 0.95 for clear tags, lower for ambiguous ones\n"
"- Output ONLY the JSON array, no markdown, no explanation.\n"
"- If no tags found, return: []\n"
)
try:
truncated_text = text[:100000] if len(text) > 100000 else text
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: {source_type}\n\nText:\n{truncated_text}"},
],
max_tokens=32768,
temperature=0.1,
extra_body={"chat_template_kwargs": {"enable_thinking": False}},
)
resp = await asyncio.to_thread(_call_llm)
raw = (resp.choices[0].message.content or "").strip()
finish_reason = resp.choices[0].finish_reason
# 마크다운 코드 블록 제거
if raw.startswith("```"):
lines = raw.splitlines()
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
# finish_reason=length 로 잘린 경우: 마지막 완전한 객체까지 살린 뒤 배열 닫기
if finish_reason == "length":
last_close = raw.rfind("}")
if last_close != -1:
raw = raw[:last_close + 1] + "]"
# 유효한 JSON 배열 추출 (가장 긴 균형 잡힌 [...] 선택)
def _extract_array(s: str) -> str:
depth = 0; start = -1; best = ""
for i, c in enumerate(s):
if c == '[':
if depth == 0: start = i
depth += 1
elif c == ']':
depth -= 1
if depth == 0 and start >= 0:
cand = s[start:i+1]
if len(cand) > len(best): best = cand
return best if best else "[]"
raw = _extract_array(raw)
# JSON 파싱 — 실패 시 개별 객체 추출로 폴백
try:
data = json_module.loads(raw)
except json_module.JSONDecodeError:
objects = re.findall(r'\{[^{}]*\}', raw, re.DOTALL)
data = []
for obj in objects:
try:
data.append(json_module.loads(obj))
except json_module.JSONDecodeError:
pass
if not data:
return json_module.dumps({"success": False, "count": 0, "tags": []}, ensure_ascii=False)
logging.info(f"[extract_pid_tags] source={source_type} count={len(data) if isinstance(data, list) else 0}")
return json_module.dumps({
"success": True,
"count": len(data),
"tags": data
}, ensure_ascii=False, indent=2)
except Exception as e:
logging.error(f"P&ID 태그 추출 실패: {e}")
logging.error(f"Raw response: {raw[:1000]}")
return json.dumps({"success": False, "error": f"P&ID 태그 추출 실패: {e}"}, ensure_ascii=False)
@mcp.tool()
async def match_pid_tags(pid_tags: list[str], experion_tags: list[str]) -> str:
"""P&ID 태그를 Experion 태그에 매핑합니다.
Args:
pid_tags: P&ID에서 추출한 태그 목록 (예: ["FT-101", "PT-201"])
experion_tags: Experion 시스템 태그 목록 (예: ["ficq-6113.pv", "pt-201.pv"])
Returns:
JSON: { success, count, mappings: [{pidTag, experionTag, confidence}] }
"""
import asyncio
import re
import json as json_module
system = (
"You are a P&ID to Experion tag matching expert.\n"
"Match P&ID tags to Experion tags based on similarity.\n"
"Return ONLY a JSON array of objects with the following structure:\n"
'[{"pidTag":"FT-101","experionTag":"ft-101.pv","confidence":0.92},...]\n'
"IMPORTANT rules:\n"
"- pidTag: The original P&ID tag from input\n"
"- experionTag: The matched Experion tag (lowercase, with .pv/.sp/.mv suffix)\n"
"- confidence: 0.0 to 1.0 based on match quality\n"
"- If no good match found, set confidence < 0.5 and leave experionTag null\n"
"- Do NOT include any explanation, only the JSON array.\n"
"- If no matches found, return an empty array: []\n"
"- temperature=0.1 for deterministic output.\n"
)
try:
pid_str = "\n".join(pid_tags)
experion_str = "\n".join(experion_tags)
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"P&ID Tags:\n{pid_str}\n\nExperion Tags:\n{experion_str}"},
],
max_tokens=16384,
temperature=0.1,
extra_body={"chat_template_kwargs": {"enable_thinking": False}},
)
resp = await asyncio.to_thread(_call_llm)
raw = (resp.choices[0].message.content or "").strip()
finish_reason = resp.choices[0].finish_reason
if raw.startswith("```"):
lines = raw.splitlines()
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
if finish_reason == "length":
last_close = raw.rfind("}")
if last_close != -1:
raw = raw[:last_close + 1] + "]"
match = re.search(r'\[.*\]', raw, re.DOTALL)
raw = match.group(0) if match else "[]"
data = json_module.loads(raw)
return json_module.dumps({"success": True, "count": len(data), "mappings": data},
ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"P&ID 태그 매핑 실패: {e}"}, ensure_ascii=False)
# ── P&ID 파싱 도구 (DXF/PDF/DWG) ───────────────────────────────────────────────
@mcp.tool()
async def parse_pid_dxf(filepath: str) -> str:
"""ezdxf 기반 DXF 파일 파싱. 텍스트 추출 후 LLM으로 태그 자동 추출.
Args:
filepath: DXF 파일 경로
Returns:
JSON: { success, text, count, tags: [{tagNo, equipmentName, ...}] }
"""
import asyncio
import json
import re
try:
def _extract_text():
return _extract_text_from_dxf(filepath)
text = await asyncio.to_thread(_extract_text)
if not text.strip():
return json.dumps({
"success": True,
"text": "",
"count": 0,
"tags": []
}, ensure_ascii=False, indent=2)
# LLM으로 태그 추출
system = (
"You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
"Extract instrument and equipment tags from the provided text.\n"
"Return ONLY a JSON array of objects with the following structure:\n"
'[{"tagNo":"FIT-10115","equipmentName":"Flow Transmitter","instrumentType":"FT" OR "FIT OR "TIA","lineNumber":"L-101","pidDrawingNo":"P&ID-001","confidence":0.95},...]\n'
"IMPORTANT rules:\n"
"- tagNo: Standard tag format with these patterns:\n"
" * Instrument: [Function][Loop]-[Number] (e.g., FT-101, PT-201, LI-301, FICQ-6113)\n"
" * Equipment: [Type]-[Number] (e.g., P-10101, T-10100, C-9111, E-10119)\n"
" * Complex: [Type]-[Number]-[Size]-[Class]-[Material]-[Option] (e.g., VG-6203-15A-F1A-n, CD-10513-40A-S1A-H50)\n"
" * Real examples from DXF: BT-6200, SARF-#6-PID-002, P-6101, DP-10101, CHS-6630-100A-F-C50\n"
"- instrumentType: First 2-4 letters of tagNo (FIT, PT, LI, FICQ, TCV, FCV, PCV, PG, TG, etc.)\n"
"- equipmentName: Descriptive name if available, otherwise null\n"
"- lineNumber: Line number if available, otherwise null\n"
"- pidDrawingNo: Drawing number if available, otherwise null\n"
"- confidence: 0.0 to 1.0 based on how clearly the tag was identified\n"
"- Do NOT include any explanation, only the JSON array.\n"
"- If no tags found, return an empty array: []\n"
"- temperature=0.1 for deterministic output.\n"
)
truncated_text = text[:12000] if len(text) > 12000 else text
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: dxf\n\nText:\n{truncated_text}"},
],
max_tokens=4096,
temperature=0.1,
)
resp = await asyncio.to_thread(_call_llm)
raw = (resp.choices[0].message.content or "").strip()
# 마크다운 코드 블록 제거
if raw.startswith("```"):
lines = raw.splitlines()
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
# JSON 배열 추출
match = re.search(r'\[.*\]', raw, re.DOTALL)
if match:
raw = match.group(0)
# JSON 파싱 시도
try:
data = json.loads(raw)
except json.JSONDecodeError:
# JSON 배열 추출 시도 (더 엄격한 패턴)
match = re.search(r'\[\s*\{.*?\}\s*\]', raw, re.DOTALL)
if match:
raw_clean = match.group(0)
try:
data = json.loads(raw_clean)
except json.JSONDecodeError:
# 마지막으로, JSON 배열을 개별 객체로 분리하여 파싱 시도
objects = re.findall(r'\{[^{}]*\}', raw, re.DOTALL)
data = []
for obj in objects:
try:
data.append(json.loads(obj))
except json.JSONDecodeError:
pass
if not isinstance(data, list):
data = []
return json.dumps({
"success": True,
"text": text[:10000], # 제한
"count": len(text),
"tags": data
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"DXF 파싱 실패: {e}"}, ensure_ascii=False)
@mcp.tool()
async def parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
"""PyMuPDF 기반 PDF 파일 파싱. 텍스트 추출 후 LLM으로 태그 자동 추출.
Args:
filepath: PDF 파일 경로
use_ocr: OCR 사용 여부 (기본 True, 고정밀도)
Returns:
JSON: { success, text, count, tags: [{tagNo, equipmentName, ...}] }
"""
import asyncio
import json
import re
try:
def _extract_text():
if use_ocr:
return _extract_text_from_pdf_ocr(filepath)
else:
return _extract_text_from_pdf(filepath)
text = await asyncio.to_thread(_extract_text)
if not text.strip():
return json.dumps({
"success": True,
"text": "",
"count": 0,
"tags": []
}, ensure_ascii=False, indent=2)
# LLM으로 태그 추출
system = (
"You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
"Extract instrument and equipment tags from the provided text.\n"
"Return ONLY a JSON array of objects with the following structure:\n"
'[{"tagNo":"FIT-10115","equipmentName":"Flow Transmitter","instrumentType":"FT" OR "FIT OR "TIA","lineNumber":"L-101","pidDrawingNo":"P&ID-001","confidence":0.95},...]\n'
"IMPORTANT rules:\n"
"- tagNo: Standard tag format with these patterns:\n"
" * Instrument: [Function][Loop]-[Number] (e.g., FT-101, PT-201, LI-301, FICQ-6113)\n"
" * Equipment: [Type]-[Number] (e.g., P-10101, T-10100, C-9111, E-10119)\n"
" * Complex: [Type]-[Number]-[Size]-[Class]-[Material]-[Option] (e.g., VG-6203-15A-F1A-n, CD-10513-40A-S1A-H50)\n"
" * Real examples from DXF: BT-6200, SARF-#6-PID-002, P-6101, DP-10101, CHS-6630-100A-F-C50\n"
"- instrumentType: First 2-4 letters of tagNo (FIT, PT, LI, FICQ, TCV, FCV, PCV, PG, TG, etc.)\n"
"- equipmentName: Descriptive name if available, otherwise null\n"
"- lineNumber: Line number if available, otherwise null\n"
"- pidDrawingNo: Drawing number if available, otherwise null\n"
"- confidence: 0.0 to 1.0 based on how clearly the tag was identified\n"
"- Do NOT include any explanation, only the JSON array.\n"
"- If no tags found, return an empty array: []\n"
"- temperature=0.1 for deterministic output.\n"
)
truncated_text = text[:12000] if len(text) > 12000 else text
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: pdf\n\nText:\n{truncated_text}"},
],
max_tokens=4096,
temperature=0.1,
)
resp = await asyncio.to_thread(_call_llm)
raw = (resp.choices[0].message.content or "").strip()
# 마크다운 코드 블록 제거
if raw.startswith("```"):
lines = raw.splitlines()
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
# JSON 배열 추출
match = re.search(r'\[.*\]', raw, re.DOTALL)
if match:
raw = match.group(0)
# JSON 파싱 시도
try:
data = json.loads(raw)
except json.JSONDecodeError:
# JSON 배열 추출 시도 (더 엄격한 패턴)
match = re.search(r'\[\s*\{.*?\}\s*\]', raw, re.DOTALL)
if match:
raw_clean = match.group(0)
try:
data = json.loads(raw_clean)
except json.JSONDecodeError:
# 마지막으로, JSON 배열을 개별 객체로 분리하여 파싱 시도
objects = re.findall(r'\{[^{}]*\}', raw, re.DOTALL)
data = []
for obj in objects:
try:
data.append(json.loads(obj))
except json.JSONDecodeError:
pass
if not isinstance(data, list):
data = []
return json.dumps({
"success": True,
"text": text[:10000],
"count": len(text),
"tags": data
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"PDF 파싱 실패: {e}"}, ensure_ascii=False)
@mcp.tool()
async def build_pid_graph_parallel(filepath: str) -> str:
"""
분산 처리 기법을 적용한 P&ID 그래프 생성 툴.
전처리 -> 병렬 분산 추출 -> 위상 모델링 -> 저장 과정을 수행합니다.
"""
import asyncio
import json
try:
# 1. 전처리 (Phase 1: Geometric Extraction)
def _extract_and_save():
extractor = PidGeometricExtractor(filepath)
geo_data_path = f"mcp-server/storage/{os.path.basename(filepath)}_geo.json"
geo_data_list = extractor.extract_and_save(geo_data_path)
return geo_data_path
geo_data_path = await asyncio.to_thread(_extract_and_save)
# geo_data_list는 경로를 반환하므로 다시 로드
def _load_geo_data():
with open(geo_data_path, 'r', encoding='utf-8') as f:
return json.load(f)
geo_data = await asyncio.to_thread(_load_geo_data)
# 2. 병렬 분산 추출 (Phase 3: Intelligent Mapping)
# 시스템 태그 목록 가져오기 (DB에서 조회하는 로직 필요, 여기서는 예시로 빈 리스트 또는 기본값)
# 실제로는 get_tag_metadata 등을 통해 전체 태그 리스트를 확보해야 함
system_tags = []
try:
conn = await _get_db_connection()
try:
with conn.cursor() as cur:
cur.execute("SELECT tagname FROM realtime_table")
system_tags = [r[0] for r in cur.fetchall()]
finally:
conn.close()
except Exception as e:
logging.warning(f"Failed to fetch system tags: {e}")
# 그래프 임시 생성 (Mapper가 위상 정보를 사용하므로 필요)
builder = PidTopologyBuilder(geo_data)
builder.build_graph()
# Mapper 설정
from openai import AsyncOpenAI
api_client = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
mapper = IntelligentMapper(builder.G, system_tags, api_client=api_client, model_name=VLLM_MODEL)
# 분류별 노드 분리
nodes = list(builder.G.nodes())
transmitter_nodes = [n for n, d in builder.G.nodes(data=True) if d.get('value', '').upper() in ['FIT', 'FT', 'LT', 'PT', 'TE']] # 단순화된 필터
valve_nodes = [n for n, d in builder.G.nodes(data=True) if d.get('value', '').upper() in ['FCV', 'LCV', 'TCV', 'PCV', 'XV']]
equipment_nodes = [n for n, d in builder.G.nodes(data=True) if d.get('type') not in ['TEXT', 'LINE', 'LWPOLYLINE']]
# 병렬 호출 (vLLM Batching 유도)
tasks = [
mapper.extract_transmitters(transmitter_nodes),
mapper.extract_valves(valve_nodes),
mapper.extract_equipment(equipment_nodes)
]
extracted_results = await asyncio.gather(*tasks)
# 결과 통합
all_mapped_tags = []
for res_dict in extracted_results:
for node_id, mapping in res_dict.items():
if mapping.resolved_tag != "UNKNOWN":
# TopologyBuilder가 기대하는 형식으로 변환
node_data = builder.G.nodes[node_id]
all_mapped_tags.append({
"entity_id": node_id,
"tagName": mapping.resolved_tag,
"bbox": node_data['bbox'].bounds if hasattr(node_data['bbox'], 'bounds') else node_data['bbox'],
"clean_value": mapping.resolved_tag
})
# 3. 최종 위상 모델링 (Phase 2)
final_builder = PidTopologyBuilder(geo_data, all_extracted_tags=all_mapped_tags)
final_builder.build_graph()
# 4. 저장
graph_id = os.path.basename(filepath).replace(".dxf", "_graph.json")
graph_path = f"mcp-server/storage/{graph_id}"
final_builder.save_graph(graph_path)
return json.dumps({
"success": True,
"data": {
"graph_id": graph_id,
"graph_path": graph_path,
"nodes": final_builder.G.number_of_nodes(),
"edges": final_builder.G.number_of_edges()
},
"message": "그래프 생성 완료"
}, ensure_ascii=False)
except Exception as e:
logging.error(f"build_pid_graph_parallel failed: {e}")
return json.dumps({"success": False, "data": None, "error": str(e), "message": "그래프 생성 실패"}, ensure_ascii=False)
@mcp.tool()
async def analyze_pid_impact(graph_id: str, start_node_id: str) -> str:
"""
구축된 그래프를 기반으로 특정 설비 장애 시 영향도 분석을 수행합니다.
"""
import asyncio
try:
graph_path = f"mcp-server/storage/{graph_id}"
mapping_path = graph_path.replace("_graph.json", "_mapping.json")
def _analyze():
analyzer = PidAnalysisEngine(graph_path, mapping_path)
return analyzer.analyze_impact(start_node_id)
result = await asyncio.to_thread(_analyze)
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"Impact analysis failed: {e}"}, ensure_ascii=False)
@mcp.tool()
async def parse_pid_drawing(filepath: str) -> str:
"""확장자 자동 감지하여 P&ID 도면 파싱.
Args:
filepath: DXF/DWG/PDF 파일 경로
Returns:
JSON: { success, text, count, tags, format }
"""
import os
ext = os.path.splitext(filepath)[1].lower()
if ext == ".dxf":
return await parse_pid_dxf(filepath)
elif ext == ".dwg":
# DWG 파일은 사전에 DXF로 변환하여 업로드해야 합니다.
# Linux에서 DWG를 DXF로 변환하는 도구는 제한되어 있습니다.
return json.dumps({
"success": False,
"error": "DWG 파일은 현재 직접 파싱할 수 없습니다.\n" +
"사전에 DXF로 변환하여 업로드해 주세요.\n" +
"\n변환 방법:\n" +
"1. Windows에서 AutoCAD 또는 ODA File Converter 사용\n" +
"2. 온라인 DWG → DXF 변환기 사용\n" +
"3. LibreOffice Draw (Windows/macOS 전용) 사용"
}, ensure_ascii=False)
elif ext == ".pdf":
return await parse_pid_pdf(filepath)
else:
return json.dumps({
"success": False,
"error": f"Unsupported format: {ext}. Supported: .dxf, .dwg, .pdf"
}, ensure_ascii=False)
# ── KB ingest 파서 ────────────────────────────────────────────────────────────
@mcp.tool()
async def parse_document(
doc_id: str,
title: str,
file_path: str,
mime_type: str = "",
collection_key: str = "",
chunking_policy: str = "",
) -> str:
"""KB ingest 파서. 파일 확장자에 따라 적절한 청킹을 수행한다.
Args:
doc_id: 문서 ID (UUID 문자열)
title: 제목 (오류 메시지에만 사용)
file_path: 절대 경로
mime_type: 정보용 (옵션)
collection_key: 정보용 (옵션)
chunking_policy: JSON 문자열, 향후 정책 분기에 사용
Returns:
JSON 문자열: {"success": true, "chunks": [{"text", "chunk_kind", "locator"}, ...]}
or {"success": false, "error": "..."}
"""
import os
if not os.path.isfile(file_path):
return json.dumps({"success": False, "error": f"file not found: {file_path}"}, ensure_ascii=False)
ext = os.path.splitext(file_path)[1].lower()
try:
if ext in (".xlsx", ".xlsm"):
from parsers import xlsx_parser
chunks = await asyncio.to_thread(xlsx_parser.parse, file_path)
elif ext == ".pdf":
from parsers import pdf_parser
chunks = await asyncio.to_thread(pdf_parser.parse, file_path)
elif ext == ".docx":
from parsers import docx_parser
chunks = await asyncio.to_thread(docx_parser.parse, file_path)
elif ext in (".md", ".txt", ".markdown"):
from parsers import text_parser
chunks = await asyncio.to_thread(text_parser.parse, file_path)
else:
return json.dumps(
{"success": False, "error": f"unsupported extension: {ext}"},
ensure_ascii=False
)
return json.dumps(
{"success": True, "doc_id": doc_id, "chunks": chunks, "count": len(chunks)},
ensure_ascii=False
)
except Exception as e:
return json.dumps({"success": False, "error": f"parse failed: {e}"}, ensure_ascii=False)
# ── 엔트리포인트 ──────────────────────────────────────────────────────────────
def main():
"""HTTP 모드로 실행 — C# McpClient (localhost:5001) 용."""
mcp.run(transport="streamable-http")
if __name__ == "__main__":
# --http 플래그: HTTP 모드 (C# McpClient 용)
# 플래그 없음: stdio 모드 (Claude Code / Roo Code MCP 용)
if "--http" in sys.argv:
mcp.run(transport="streamable-http")
else:
mcp.run(transport="stdio")