Files
HC900-Crawler/mcp-server/server.py
windpacer 19c8c2e95c feat: MCP server RAG/NL2SQL/PID 개선
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 08:11:44 +09:00

2667 lines
107 KiB
Python

#!/usr/bin/env python3
"""
ExperionCrawler Unified MCP Server
- RAG: Qdrant + Ollama nomic-embed-text + vLLM (llm-model.json)
- NL2SQL: 자연어 → LLM SQL 생성 → PostgreSQL 실행
- 사용처:
stdio 모드 (기본): Claude Code MCP / Roo Code MCP
HTTP 모드 (--http): C# McpClient (localhost:5001)
"""
from __future__ import annotations
import sys
import os
import re
import json
import logging
import httpx
from functools import lru_cache
from mcp.server.fastmcp import FastMCP
logging.basicConfig(level=logging.WARNING, stream=sys.stderr)
# ── 시간 변환 ───────────────────────────────────────────────────────────────────
KST = __import__("zoneinfo").ZoneInfo("Asia/Seoul")
def _kst_str(dt_iso: str | None) -> str:
"""UTC ISO 문자열 → KST ISO 문자열 (초 단위, +09:00). None이면 그대로 None."""
if not dt_iso:
return str(dt_iso) if dt_iso is None else ""
from datetime import datetime
if isinstance(dt_iso, datetime):
return dt_iso.astimezone(KST).strftime("%Y-%m-%dT%H:%M:%S+09:00")
try:
dt = datetime.fromisoformat(dt_iso)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=__import__("datetime").timezone.utc)
return dt.astimezone(KST).strftime("%Y-%m-%dT%H:%M:%S+09:00")
except Exception:
return dt_iso # fallback
# ── 설정 ──────────────────────────────────────────────────────────────────────
QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333")
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
EMBED_MODEL = os.environ.get("EMBED_MODEL", "nomic-embed-text")
VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8001/v1")
from config import get_vllm_model
VLLM_MODEL = get_vllm_model()
# Qdrant 컬렉션
COL_CODEBASE = "ws-65f457145aee80b2" # ExperionCrawler 소스코드
COL_OPC_DOCS = "experion-opc-docs" # Experion HS R530 OPC UA 공식 문서 (266 chunks)
# 사용자 KB 컬렉션 (kb_collections 시드 5종과 일치)
KB_COLLECTIONS = {
"system_instrument": "kb_system_instrument",
"plant_operation": "kb_plant_operation",
"procedure": "kb_procedure",
"report": "kb_report",
"vendor_doc": "kb_vendor_doc",
}
# PostgreSQL 연결
DB_CONNECTION_STRING = os.environ.get("DB_CONNECTION_STRING", "postgresql://postgres:postgres@localhost:5432/iiot_platform")
# 라이브 데이터는 hc900 스키마에 있음. search_path 미지정 시 기본 public(레거시/stale)로 해석되므로 강제 지정.
if "search_path" not in DB_CONNECTION_STRING and "options=" not in DB_CONNECTION_STRING:
DB_CONNECTION_STRING += ("&" if "?" in DB_CONNECTION_STRING else "?") + "options=-csearch_path%3Dhc900"
DB_TIMEOUT = int(os.environ.get("DB_TIMEOUT", "10"))
# C# McpClient(localhost:5001)와 통신: json_response+stateless로 단순 POST→JSON 방식
mcp = FastMCP(
"iiot-rag",
port=5001,
json_response=True,
stateless_http=True,
)
# Pipeline Imports
from pipeline.extractor import PidGeometricExtractor
from pipeline.topology import PidTopologyBuilder
from pipeline.mapper import IntelligentMapper
from pipeline.analyzer import PidAnalysisEngine
import networkx as nx
import asyncio
# ── Verifier (Phase B MVP — R1·R2·R4) ──
from verifier.validators import (
validate_tag, validate_area, validate_direction, validate_max_depth,
log_rejection, VerifierError,
)
# ── 임베딩 (Ollama) ───────────────────────────────────────────────────────────
async def _embed(text: str) -> list[float]:
"""Ollama nomic-embed-text로 768-dim 벡터 생성."""
import asyncio
def _call_embed():
with httpx.Client(timeout=30) as client:
resp = client.post(
f"{OLLAMA_URL}/api/embeddings",
json={"model": EMBED_MODEL, "prompt": text},
)
resp.raise_for_status()
return resp.json()["embedding"]
return await asyncio.to_thread(_call_embed)
# ── LLM (vLLM) ──────────────────────────────────────────────────────
@lru_cache(maxsize=1)
def _llm():
from openai import OpenAI
return OpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
def _strip_think(text: str) -> str:
"""Qwen/DeepSeek 계열 모델의 <think>...</think> reasoning 블록 제거"""
if not text:
return text
for tag in ("think", "skip", "reason"):
pat = re.compile(rf"</?{tag}>.*?</?{tag}>", re.DOTALL)
text = pat.sub("", text).strip()
# 태그가 열렸으나 닫히지 않은 경우 (truncated) — 태그부터 끝까지 제거
for tag in ("think", "skip", "reason"):
if f"<{tag}>" in text and f"</{tag}>" not in text:
idx = text.index(f"<{tag}>")
text = text[:idx].strip()
if f"</{tag}>" in text and f"<{tag}>" not in text:
idx = text.index(f"</{tag}>") + len(f"</{tag}>")
text = text[idx:].strip()
return text
# ── PaddleOCR 싱글톤 (PDF fallback용) ──────────────────────────────────────────
@lru_cache(maxsize=1)
def _ocr():
"""PaddleOCR 인스턴스 (한/영, GPU). 첫 호출 시 ~50MB 모델 다운로드."""
from paddleocr import PaddleOCR
import os
use_gpu = os.environ.get("PADDLE_USE_GPU", "true").lower() == "true"
try:
ocr = PaddleOCR(
use_angle_cls=True,
lang="korean",
use_gpu=use_gpu,
show_log=False,
)
return ocr
except Exception as e:
# GPU 실패 시 CPU 폴백
if use_gpu:
os.environ["PADDLE_USE_GPU"] = "false"
return _ocr()
raise e
# ── DXF/PDF 텍스트 추출 헬퍼 ───────────────────────────────────────────────────
async def _extract_text_from_dxf(filepath: str) -> str:
"""ezdxf로 DXF 파일에서 텍스트 추출 (MTEXT 포맷 코드 제거)."""
import asyncio
import ezdxf
from ezdxf.tools.text import plain_mtext
def _extract():
doc = ezdxf.readfile(filepath)
msp = doc.modelspace()
texts = []
for entity in msp:
if entity.dxftype() == "TEXT":
texts.append(entity.dxf.text)
elif entity.dxftype() == "MTEXT":
try:
plain = plain_mtext(entity.dxf.text)
if plain.strip():
texts.append(plain)
except Exception:
pass
return "\n".join(texts)
return await asyncio.to_thread(_extract)
# ── P&ID 빠른 구조 추출 (좌표 계산 없음, LLM 호출 없음) ─────────────────────────
#
# 설계 원칙:
# 1) DXF의 layer 정보를 사용 — LINENO 레이어 텍스트는 라인 마스터로 직접 파싱
# 2) tag/LineNo 정규식 — 100% 결정론적
# 3) FLUID NAME ABBREVIATION은 Symbol & Legend에서 1회 추출한 사전을 내장
# (다른 플랜트 도면을 추가할 때만 사전 갱신)
# 4) 좌표 계산·KD-tree·NetworkX 그래프 없음 → I/O 시간만 소요
#
# LineNo 형식: SERVICE - LINENUM - SIZE - MaterialSpec+FlangeRating+InsulationCode - InsulationThickness
# 예) P-10138-600A-F2A-H100, CD-10513-40A-S1A-H50, VG-6203-15A-F1A-n
_PID_FLUID_DICT: dict[str, str] = {
"P": "PROCESS FLUID",
"CHE": "PROCESS FLUID",
"CWS": "COOLING WATER SUPPLY",
"CWR": "COOLING WATER RETURN",
"CHS": "CHILLED WATER SUPPLY",
"CHR": "CHILLED WATER RETURN",
"WW": "WASTE WATER",
"ST": "STEAM",
"CD": "STEAM CONDENSATE",
"IA": "INSTRUMENT AIR",
"AIR": "INSTRUMENT AIR",
"N2": "NITROGEN",
"VG": "VENT GAS",
"SCR": "VENT GAS",
"SC": "VENT GAS",
"DIW": "DEIONIZED WATER",
"SW": "SOFT WATER",
"SAM": "SAMPLE LINE",
"NBD": "NITROGEN BLOW DOWN",
}
_PID_EQUIPMENT_PREFIX: dict[str, str] = {
"P": "Pump",
"T": "Tank",
"F": "Filter",
"C": "Column",
"E": "Heat Exchanger",
"D": "Drum",
"V": "Vessel",
"BT": "Buffer Tank",
"DP": "Drainage Point",
"CH": "Chiller",
"CT": "Cooling Tower",
"VP": "Vacuum Pump",
"R": "Reactor",
"S": "Separator",
}
_PID_INSTRUMENT_FIRST: dict[str, str] = {
"P": "Pressure", "T": "Temperature", "F": "Flow", "L": "Level",
"A": "Analysis", "S": "Speed", "H": "Hand", "Q": "Quantity", "W": "Weight",
}
_PID_INSTRUMENT_MODIFIER: dict[str, str] = {
"I": "Indicator", "C": "Control", "T": "Transmitter", "R": "Recorder",
"A": "Alarm", "S": "Switch", "V": "Valve", "Q": "Totalizer",
"Y": "Computing", "E": "Element", "G": "Gauge",
}
# 5~7필드 배관번호 지원: SERVICE-LINENUM-SIZE-PIPESPEC-INSUL
# 10차: P-10101-25A-F1A-n (pipe_spec=F1A)
# 9차: P-9107-25A-F-n (pipe_spec=F, insul=n)
# 냉각: CHR-9641-50A-F-C50 (pipe_spec=F, insul=C50)
_PID_LINENO_FULL_RE = re.compile(
r'^([A-Z][A-Z0-9]{0,3})-(\d{3,6})-(\d{1,4}[A-Z]?)-([A-Za-z][A-Za-z0-9]*)-([A-Za-z0-9]+)$'
)
_PID_LINENO_SHORT_RE = re.compile(r'^([A-Z][A-Z0-9]{0,3})-(\d{3,6})$')
_PID_TAG_RE = re.compile(r'^([A-Z]{1,4})-(\d{3,6})([A-Z])?$')
# DCS 함수블록 prefix frozenset — compound형 포함 (ISA _systemFuncLetters 기준)
# ⚠️ C# Boot DDL UPDATE 목록과 수동 동기화 필요 (향후 dcs_prefixes.py 분리 고려)
_DCS_PREFIXES: frozenset[str] = frozenset({
"FIC", "FICA", "FICQ", "FICR",
"TIC", "TICA", "TICQ",
"PIC", "PICA",
"LIC", "LICA",
"FY", "TY", "PY", "LY",
"FV", "TV", "PV", "LV",
})
def _classify_pid_tag(tag_no: str) -> dict:
"""tagNo의 prefix로 장비/계기 종류 분류 (좌표·LLM 사용 안 함)."""
m = _PID_TAG_RE.match(tag_no)
if not m:
return {"kind": "unknown", "prefix": None, "type": None, "tag_dcs": False}
prefix = m.group(1)
if prefix in _PID_EQUIPMENT_PREFIX:
return {"kind": "equipment", "prefix": prefix, "type": _PID_EQUIPMENT_PREFIX[prefix], "tag_dcs": False}
if 2 <= len(prefix) <= 4 and prefix[0] in _PID_INSTRUMENT_FIRST:
measure = _PID_INSTRUMENT_FIRST[prefix[0]]
mods = [_PID_INSTRUMENT_MODIFIER[c] for c in prefix[1:] if c in _PID_INSTRUMENT_MODIFIER]
type_name = (measure + " " + " ".join(mods)).strip()
return {
"kind": "instrument",
"prefix": prefix,
"type": type_name,
"tag_dcs": prefix in _DCS_PREFIXES,
}
return {"kind": "unknown", "prefix": prefix, "type": None, "tag_dcs": False}
def _parse_pid_lineno(token: str) -> dict | None:
"""LineNo 토큰을 필드로 분해. 5~7필드 모두 지원. 매칭 실패 시 None."""
m = _PID_LINENO_FULL_RE.match(token)
if m:
service, line_no, size, pipe_spec, insul = m.groups()
return {
"raw": token,
"service": service,
"fluid": _PID_FLUID_DICT.get(service, service),
"line_no": line_no,
"size": size,
"pipe_spec": pipe_spec,
"insul": insul,
}
sm = _PID_LINENO_SHORT_RE.match(token)
if sm:
service, line_no = sm.groups()
return {
"raw": token,
"service": service,
"fluid": _PID_FLUID_DICT.get(service, service),
"line_no": line_no,
"size": None,
"pipe_spec": None,
"insul": None,
}
return None
async def _extract_pid_dxf_fast(filepath: str) -> dict:
"""DXF에서 layer + regex만으로 구조 추출. 좌표 계산·LLM 호출 없음."""
import ezdxf
from ezdxf.tools.text import plain_mtext
from collections import Counter
def _work():
doc = ezdxf.readfile(filepath)
msp = doc.modelspace()
linenos: list[dict] = []
tags: list[dict] = []
seen_tags: set[str] = set()
for e in msp.query('TEXT MTEXT'):
if e.dxftype() == 'TEXT':
txt = e.dxf.text or ""
else:
try:
txt = plain_mtext(e.dxf.text or "")
except Exception:
txt = e.dxf.text or ""
txt = txt.strip()
if not txt:
continue
layer = e.dxf.layer
# 1) 완전한 배관번호 형식 (FULL_RE) → 레이어 이름 무관하게 배관번호로 처리
if _PID_LINENO_FULL_RE.match(txt):
parsed = _parse_pid_lineno(txt)
if parsed is not None:
linenos.append(parsed)
continue
# 2) 짧은 형식(P-10101)은 레이어 이름에 'LINENO'가 포함된 경우에만 배관번호
# (펌프 태그와 텍스트가 동일하므로 레이어 힌트 불가피)
if 'LINENO' in layer.upper():
parsed = _parse_pid_lineno(txt)
if parsed is not None:
linenos.append(parsed)
continue
# 3) 일반 장비/계기 태그
if _PID_TAG_RE.match(txt):
if txt in seen_tags:
continue
seen_tags.add(txt)
cls = _classify_pid_tag(txt)
tags.append({"tagNo": txt, **cls, "layer": layer})
fluid_dist = Counter(l['service'] for l in linenos)
equipment_dist = Counter(t['prefix'] for t in tags if t['kind'] == 'equipment')
instrument_dist = Counter(t['prefix'] for t in tags if t['kind'] == 'instrument')
unique_pipes = len({(l['service'], l['line_no']) for l in linenos})
return {
"fluid_dictionary": _PID_FLUID_DICT,
"linenos": linenos,
"tags": tags,
"stats": {
"lineno_label_count": len(linenos),
"unique_pipes": unique_pipes,
"tag_count": len(tags),
"fluid_distribution": dict(fluid_dist.most_common()),
"equipment_distribution": dict(equipment_dist.most_common()),
"instrument_distribution": dict(instrument_dist.most_common()),
},
}
return await asyncio.to_thread(_work)
# 장비 prefix와 안 겹치는 fluid-only 코드 (짧은 LineNo `XXX-NNNN`을 안전하게 pipe로 인식)
_PID_FLUID_ONLY_CODES: set[str] = set(_PID_FLUID_DICT.keys()) - set(_PID_EQUIPMENT_PREFIX.keys())
def _extract_pid_tags_from_text(text: str) -> list[dict]:
"""plain text(PDF/OCR/필터된 DXF text)에서 tag/LineNo를 regex로 추출.
분기 우선순위:
1) 완전한 LineNo (`FLUID-NUM-SIZE-SPEC-INSUL` 5필드, 모호하지 않음) → pipe
2) 짧은 LineNo (`FLUID-NUM`)이지만 prefix가 fluid 전용 코드일 때 → pipe
3) 일반 tag (`XXX-NNNN[A]`) → 장비/계기 분류 + instrumentType 설정
"""
seen: set[str] = set()
out: list[dict] = []
for token in re.split(r'[\s,;|()\[\]<>{}"\']+', text or ''):
token = token.strip()
if not token or token in seen:
continue
# 1) 완전한 배관번호 (5~7필드) — FULL_RE 매칭
m_full = _PID_LINENO_FULL_RE.match(token)
if m_full:
seen.add(token)
parsed = _parse_pid_lineno(token)
out.append({
"tagNo": token,
"kind": "pipe",
"prefix": parsed["service"],
"type": parsed["fluid"],
"lineNumber": parsed["line_no"],
"size": parsed["size"],
"pipeSpec": parsed["pipe_spec"],
"insul": parsed["insul"],
"confidence": 0.99,
})
continue
# 2) 짧은 LineNo — fluid 전용 코드일 때만
m_short = _PID_LINENO_SHORT_RE.match(token)
if m_short and m_short.group(1) in _PID_FLUID_ONLY_CODES:
seen.add(token)
parsed = _parse_pid_lineno(token)
out.append({
"tagNo": token,
"kind": "pipe",
"prefix": parsed["service"],
"type": parsed["fluid"],
"lineNumber": parsed["line_no"],
"size": None,
"confidence": 0.95,
})
continue
# 3) 일반 tag
if _PID_TAG_RE.match(token):
seen.add(token)
cls = _classify_pid_tag(token)
out.append({
"tagNo": token,
**cls,
"instrumentType": cls["prefix"],
"confidence": 0.95 if cls["kind"] != "unknown" else 0.5,
})
return out
async def _extract_text_from_pdf(filepath: str) -> str:
"""PyMuPDF로 PDF 파일에서 텍스트 추출."""
import asyncio
import fitz # pymupdf
def _extract():
doc = fitz.open(filepath)
texts = []
for page in doc:
texts.append(page.get_text())
return "\n".join(texts)
return await asyncio.to_thread(_extract)
async def _extract_text_from_pdf_ocr(filepath: str) -> str:
"""PaddleOCR로 PDF에서 이미지 추출 후 OCR (고정밀도)."""
import asyncio
import fitz # pymupdf
from PIL import Image
import numpy as np
def _extract():
doc = fitz.open(filepath)
all_texts = []
for page_idx, page in enumerate(doc):
# 페이지를 이미지로 변환
mat = fitz.Matrix(300 / 72) # 300 DPI
pix = page.get_pixmap(matrix=mat)
img_data = pix.tobytes("png")
img = Image.open(__import__("io").BytesIO(img_data))
# OCR 실행
result = _ocr().ocr(np.array(img), cls=True)
if result[0]:
for line in result[0]:
all_texts.append(line[1][0])
return "\n".join(all_texts)
return await asyncio.to_thread(_extract)
async def _convert_dwg_to_dxf_dxflib(filepath: str) -> str:
"""libreoffice로 DWG를 DXF로 변환."""
import asyncio
import subprocess
import os
dxf_path = filepath.replace(".dwg", ".dxf")
def _convert():
try:
# LibreOffice로 변환
result = subprocess.run(
[
"libreoffice",
"--headless",
"--convert-to", "dxf:AutoCAD DXF",
"--outdir", os.path.dirname(filepath) or ".",
filepath
],
check=True,
timeout=120,
capture_output=True,
text=True
)
if os.path.exists(dxf_path):
return dxf_path
else:
raise FileNotFoundError("DXF 변환 파일이 생성되지 않았습니다.")
except subprocess.CalledProcessError as e:
raise Exception(f"LibreOffice 변환 실패: {e.stderr}")
return await asyncio.to_thread(_convert)
# ── Qdrant 검색 헬퍼 ──────────────────────────────────────────────────────────
async def _search(collection: str, query: str, top_k: int, threshold: float = 0.25) -> str:
import asyncio
def _call_embed():
return _embed(query)
vec = await _call_embed()
def _call_search():
with httpx.Client(timeout=20) as client:
resp = client.post(
f"{QDRANT_URL}/collections/{collection}/points/search",
json={
"vector": vec,
"limit": top_k,
"with_payload": True,
"score_threshold": threshold,
},
)
resp.raise_for_status()
return resp.json().get("result", [])
hits = await asyncio.to_thread(_call_search)
if not hits:
return "관련 결과 없음."
parts = []
for h in hits:
p = h.get("payload", {})
file_path = p.get("filePath", p.get("path", "unknown"))
chunk = p.get("codeChunk", p.get("content", p.get("text", "")))
start_line = p.get("startLine", "")
loc = f"{file_path}:{start_line}" if start_line else file_path
parts.append(f"[score={h['score']:.3f}] {loc}\n```\n{chunk[:700]}\n```")
return "\n\n---\n\n".join(parts)
async def _search_kb_collection(
qdrant_name: str,
vec: list[float],
top_k: int,
tags: list[str] | None = None,
) -> list[dict]:
"""KB 컬렉션 1개에 대해 의미 검색. 결과를 정규화된 dict 리스트로 반환."""
must = []
if tags:
must.append({"key": "tags", "match": {"any": tags}})
body: dict = {
"vector": vec,
"limit": top_k,
"with_payload": True,
"score_threshold": 0.20,
}
if must:
body["filter"] = {"must": must}
def _call():
with httpx.Client(timeout=20) as client:
resp = client.post(f"{QDRANT_URL}/collections/{qdrant_name}/points/search", json=body)
if resp.status_code == 404:
return []
resp.raise_for_status()
return resp.json().get("result", [])
try:
return await asyncio.to_thread(_call)
except Exception as e:
logging.warning(f"[search_kb] {qdrant_name} 검색 실패: {e}")
return []
def _recency_factor(uploaded_at_iso: str | None) -> float:
"""uploaded_at 기준 최신 가중치. 최근 7일 +10%, 30일 +5%, 90일 +2%, 그 외 1.0."""
if not uploaded_at_iso:
return 1.0
try:
from datetime import datetime, timezone
ts = datetime.fromisoformat(uploaded_at_iso.replace("Z", "+00:00"))
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - ts).total_seconds() / 86400.0
if age < 7: return 1.10
if age < 30: return 1.05
if age < 90: return 1.02
return 1.0
except Exception:
return 1.0
# ── DB 헬퍼 ──────────────────────────────────────────────────────────────────
async def _get_db_connection():
"""PostgreSQL DB 연결 획득."""
import asyncio
import psycopg
def _connect():
return psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
return await asyncio.to_thread(_connect)
def _get_db_connection_sync():
"""PostgreSQL DB 연결 획득 (sync — Verifier 내부용)."""
import psycopg
return psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
def _check(tool: str, params: dict, *errs) -> str | None:
"""첫 번째 비-None VerifierError를 로그 + JSON str 반환. 없으면 None."""
for e in errs:
if e:
log_rejection(tool, params, e)
return json.dumps(e.to_dict(), ensure_ascii=False)
return None
def _validate_sql(sql: str) -> tuple[bool, str]:
"""SQL 안전 검증 — SELECT/WITH만 허용, 위험 키워드 차단."""
if len(sql) > 4000:
return False, "쿼리 길이 4000자를 초과했습니다."
dangerous = ['EXEC', 'DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE', 'GRANT', 'REVOKE', 'TRUNCATE', 'COPY']
sql_upper = sql.upper()
for kw in dangerous:
if re.search(rf"\b{kw}\b", sql_upper):
return False, f"허용되지 않은 키워드 '{kw}'를 사용했습니다."
head = sql_upper.lstrip().lstrip('(').lstrip()
if not (head.startswith('SELECT') or head.startswith('WITH')):
return False, "SELECT 또는 WITH 쿼리만 허용됩니다."
if '..' in sql or '~' in sql:
return False, "파일 경로 표현은 허용되지 않습니다."
if ';' in sql.rstrip().rstrip(';'):
return False, "다중 문장(세미콜론)은 허용되지 않습니다."
return True, ""
# SQL 가드 — auto-LIMIT + statement_timeout
SQL_MAX_ROWS = int(os.environ.get("SQL_MAX_ROWS", "1000"))
SQL_STATEMENT_TIMEOUT_MS = int(os.environ.get("SQL_STATEMENT_TIMEOUT_MS", "30000"))
_RE_LIMIT_TAIL = re.compile(r"\bLIMIT\b\s+\d+(\s+OFFSET\s+\d+)?\s*$", re.IGNORECASE)
def _apply_sql_guards(sql: str, max_rows: int = SQL_MAX_ROWS) -> str:
"""LIMIT가 없으면 서브쿼리로 감싸 강제 부착. 이미 끝부분에 LIMIT가 있으면 그대로."""
s = sql.strip().rstrip(';').strip()
if _RE_LIMIT_TAIL.search(s):
return s
return f"SELECT * FROM ({s}) _capped LIMIT {max_rows}"
# Compact DB schema for LLM SQL generation
_DB_SCHEMA = """
Tables:
history_table(tagname TEXT, value TEXT, recorded_at TIMESTAMPTZ)
realtime_table(tagname TEXT, livevalue TEXT, timestamp TIMESTAMPTZ)
tag_metadata(base_tag TEXT, attribute TEXT, value TEXT)
event_history_table(tagname TEXT, prev_value TEXT, curr_value TEXT, event_type TEXT, event_time TIMESTAMPTZ, duration_seconds INT)
pid_equipment(tag_no TEXT, category TEXT, tag_dcs BOOL, tag_class TEXT, instrument_type TEXT, from_tag TEXT, to_tag TEXT)
-- tag_dcs=TRUE: DCS 함수블록(FIC/TIC/PIC류), FALSE: 현장 물리 계기(FT/FCV류)
-- tag_class: 'field'(현장) / 'system'(DCS) — tag_dcs 기반
-- from_tag(상류) → tag_no → to_tag(하류) 연결 추적
Views:
v_tag_summary(base_tag TEXT, pv TEXT, sp TEXT, op TEXT, description TEXT, area TEXT, sub_area TEXT)
Rules:
- SELECT only. tagname UPPERCASE exact match (e.g. 'FICQ-6113.PV').
- If user input is lowercase, convert to UPPERCASE before querying.
- value is TEXT; cast ::double precision when aggregating.
- time_bucket() banned. For N-min buckets: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60))
- KST input = UTC-9 in DB.
- sub_area는 "P6-1" 또는 공용 "P6-1,P6-2" 형식. 매칭은 항상 토큰 비교: 'P6-1' = ANY(string_to_array(sub_area, ','))
"""
def _area_or_subarea_filter(area: str | None, tagname_col: str, area_col: str) -> tuple[str, list]:
"""area 필터 SQL 조건 + 파라미터 반환.
- area가 "P6-1"처럼 '-'를 포함하면 sub_area 코드로 간주 →
tag_metadata(attribute='sub_area')를 EXISTS 조인하여 토큰 매칭 (공용 "P6-1,P6-2" 포함).
이벤트 테이블의 tagname은 'p-6116.pv' 형식이므로 split_part로 base_tag 추출.
- 그 외(area 코드, 예 "P6")는 기존 컬럼 정확 매칭.
- area가 None이면 항상 참(TRUE).
"""
if not area:
return "TRUE", []
if "-" in area: # sub_area 코드
return (
f"EXISTS (SELECT 1 FROM tag_metadata tm "
f"WHERE tm.base_tag = split_part({tagname_col}, '.', 1) "
f"AND tm.attribute = 'sub_area' AND %s = ANY(string_to_array(tm.value, ',')))",
[area],
)
return f"{area_col} = %s", [area]
# ── RAG 도구 ─────────────────────────────────────────────────────────────────
@mcp.tool()
async def search_codebase(query: str, top_k: int = 6) -> str:
"""ExperionCrawler 프로젝트 소스코드 검색 (우리가 개발한 .NET 8 C# 코드).
Experion HS R530 공식 문서가 아닌, ExperionCrawler 구현 코드를 검색함.
사용 시점: ExperionCrawler 코드의 구현 방법, 버그, 구조를 알고 싶을 때.
⚠️ Experion HS R530 제품 동작/설정/스펙을 알고 싶으면 search_r530_docs 사용.
Args:
query: 검색어 (예: "OPC UA 구독 시작", "히스토리 스냅샷", "TextToSql 서비스")
top_k: 반환 결과 수 (기본 6)
"""
return await _search(COL_CODEBASE, query, top_k)
@mcp.tool()
async def search_r530_docs(query: str, top_k: int = 5) -> str:
"""Honeywell Experion HS R530 공식 제품 문서 검색.
ExperionCrawler 코드가 아닌, Honeywell 공식 HTM 문서를 검색함.
사용 시점: Experion HS R530의 OPC UA 설정, 인증서, 보안 정책, 포인트 주소 형식,
채널/컨트롤러 속성, 문제해결 등 제품 스펙과 동작을 알고 싶을 때.
Args:
query: 검색어 (예: "certificate configuration", "endpoint security policy")
top_k: 반환 결과 수 (기본 5)
"""
return await _search(COL_OPC_DOCS, query, top_k)
@mcp.tool()
def ask_iiot_llm(question: str, context: str = "") -> str:
"""LLM에게 IIoT/OPC UA 질문 (컨텍스트 없이 LLM 직접 질문).
사용 시점: search_codebase 또는 search_r530_docs 결과를 context로 넘겨
종합 분석·답변이 필요할 때. 또는 일반 IIoT/OPC UA 개념 질문.
Args:
question: 질문 내용
context: (선택) search_codebase 또는 search_r530_docs 검색 결과
"""
system = (
"당신은 IIoT(산업용 IoT), OPC UA, Honeywell Experion PKS/HS R530 전문가입니다.\n"
"컨텍스트가 제공된 경우 컨텍스트를 우선 근거로 삼아 한국어로 답변합니다.\n"
"컨텍스트 출처가 'Experion HS R530 공식 문서'인지 'ExperionCrawler 코드'인지 명확히 구분하여 설명합니다."
)
user_msg = f"컨텍스트:\n{context}\n\n질문: {question}" if context else question
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_msg},
],
max_tokens=2048,
temperature=0.1,
)
content = resp.choices[0].message.content or "(응답 없음)"
return _strip_think(content)
@mcp.tool()
async def rag_query(
question: str,
search_code: bool = False,
search_docs: bool = True,
search_kb: bool = False,
kb_collections: list[str] | None = None,
) -> str:
"""검색 → LLM 답변 생성 (통합 RAG).
기본값: Experion HS R530 공식 문서만 검색.
사용자 KB 검색을 포함하려면 search_kb=True. 코드 검색은 search_code=True.
Args:
question: 질문
search_docs: Experion HS R530 공식 문서 검색 여부 (기본 True)
search_code: ExperionCrawler 소스코드 검색 여부 (기본 False)
search_kb: 사용자 KB 검색 여부 (기본 False)
kb_collections: 검색 대상 KB 컬렉션 키 목록. None이면 전체.
예: ["plant_operation", "procedure"]
"""
context_parts: list[str] = []
if search_docs:
context_parts.append(f"=== Experion HS R530 공식 문서 ===\n{await _search(COL_OPC_DOCS, question, 4)}")
if search_code:
context_parts.append(f"=== ExperionCrawler 구현 코드 ===\n{await _search(COL_CODEBASE, question, 3)}")
if search_kb:
kb_text = await _format_kb_results(question, kb_collections, top_k=6)
context_parts.append(f"=== 사용자 지식 베이스 ===\n{kb_text}")
return ask_iiot_llm(question, "\n\n".join(context_parts))
async def _format_kb_results(
query: str,
collection_keys: list[str] | None,
top_k: int,
tags: list[str] | None = None,
since: str | None = None,
boost_recent: bool = True,
) -> str:
"""search_kb 내부 헬퍼: 다중 컬렉션 의미검색 후 인용 텍스트로 직렬화."""
hits = await _search_kb_raw(query, collection_keys, top_k, tags, since, boost_recent)
if not hits:
return "관련 KB 결과 없음."
parts = []
for h in hits:
title = h.get("title") or "(제목없음)"
loc = h.get("locator") or ""
score = h.get("score", 0.0)
text = (h.get("text") or "").strip()
# 인용 헤더: "[score=0.812] 정비이력_2026Q1.xlsx > 시트:Pump-A > 행 12"
loc_str = f" > {loc}" if loc else ""
parts.append(f"[score={score:.3f}] {title}{loc_str}\n{text[:700]}")
return "\n\n---\n\n".join(parts)
async def _search_kb_raw(
query: str,
collection_keys: list[str] | None,
top_k: int,
tags: list[str] | None,
since: str | None,
boost_recent: bool,
) -> list[dict]:
"""KB 검색 핵심 로직 — 다중 컬렉션 의미검색 + 최신 가중치 + 후필터."""
targets = collection_keys or list(KB_COLLECTIONS.keys())
qdrant_names = [KB_COLLECTIONS[k] for k in targets if k in KB_COLLECTIONS]
if not qdrant_names:
return []
vec = await _embed(query)
per_coll_k = max(top_k, 8)
results: list[dict] = []
for qname in qdrant_names:
hits = await _search_kb_collection(qname, vec, per_coll_k, tags=tags)
for h in hits:
p = h.get("payload", {})
uploaded_at = p.get("uploaded_at")
if since and uploaded_at:
try:
if uploaded_at < since:
continue
except Exception:
pass
base_score = h.get("score", 0.0)
recency = _recency_factor(uploaded_at) if boost_recent else 1.0
results.append({
"score": base_score * recency,
"raw_score": base_score,
"doc_id": p.get("doc_id"),
"collection_key": p.get("collection_key"),
"title": p.get("title"),
"text": p.get("text", ""),
"chunk_kind": p.get("chunk_kind"),
"locator": p.get("locator"),
"uploaded_at": uploaded_at,
"tags": p.get("tags") or [],
})
# 점수 내림차순 정렬, 동일 doc_id 중복 dedup(최고점만)
results.sort(key=lambda r: r["score"], reverse=True)
seen: set[str] = set()
unique: list[dict] = []
for r in results:
key = f'{r.get("doc_id")}::{r.get("locator")}'
if key in seen:
continue
seen.add(key)
unique.append(r)
if len(unique) >= top_k:
break
return unique
@mcp.tool()
async def search_kb(
query: str,
collection_keys: list[str] | None = None,
top_k: int = 8,
tags: list[str] | None = None,
since: str | None = None,
boost_recent: bool = True,
) -> str:
"""사용자 지식 베이스(KB) 다중 컬렉션 의미 검색.
관리탭에서 업로드/인덱싱한 문서에서 질의와 의미적으로 가까운 청크를 찾는다.
Args:
query: 검색어 또는 자연어 질문
collection_keys: 대상 컬렉션 키 목록. None이면 전체.
가능한 값: system_instrument, plant_operation,
procedure, report, vendor_doc
top_k: 반환 결과 수 (기본 8)
tags: 태그 필터 (any 매칭). 예: ["unit-a", "P-6201"]
since: 이 ISO 시각 이후 업로드된 문서만. 예: "2026-04-01T00:00:00Z"
boost_recent: True이면 uploaded_at 기준 최신 가중치 적용 (기본 True)
Returns:
JSON 문자열: { success, count, hits: [{ doc_id, collection_key, title,
text, chunk_kind, locator, score, uploaded_at, tags }, ...] }
"""
try:
hits = await _search_kb_raw(query, collection_keys, top_k, tags, since, boost_recent)
return json.dumps(
{"success": True, "count": len(hits), "hits": hits},
ensure_ascii=False,
default=str,
)
except Exception as e:
return json.dumps({"success": False, "error": f"search_kb 실패: {e}"}, ensure_ascii=False)
# ── NL2SQL 도구 ───────────────────────────────────────────────────────────────
async def _execute_sql_internal(sql: str) -> str:
"""SQL 실행 내부 함수 (run_sql과 query_with_nl에서 공유).
가드: LIMIT 미지정 시 자동 LIMIT 부착(SQL_MAX_ROWS), statement_timeout 적용.
"""
valid, err = _validate_sql(sql)
if not valid:
return json.dumps({"success": False, "error": f"SQL 검증 실패: {err}"}, ensure_ascii=False)
capped_sql = _apply_sql_guards(sql)
conn = None
try:
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
cur.execute(capped_sql)
rows = cur.fetchall()
columns = [desc[0] for desc in cur.description]
result_data = [dict(zip(columns, row)) for row in rows]
return json.dumps({
"success": True,
"columns": columns,
"count": len(result_data),
"row_limit": SQL_MAX_ROWS,
"data": result_data
}, ensure_ascii=False, default=str)
except Exception as e:
return json.dumps({"success": False, "error": f"SQL 실행 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def run_sql(sql: str) -> str:
"""SQL 쿼리 실행 (SELECT/WITH만 허용).
안전 가드:
- 위험 키워드(INSERT/UPDATE/DELETE/DROP/ALTER/CREATE/GRANT/REVOKE/TRUNCATE/COPY/EXEC) 차단
- 다중 문장(세미콜론) 차단
- LIMIT 미지정 시 SQL_MAX_ROWS(기본 1000)로 자동 제한
- statement_timeout = SQL_STATEMENT_TIMEOUT_MS(기본 30s)
Args:
sql: 실행할 SELECT/WITH SQL 문자열 (최대 2000자)
Returns:
JSON: { success, columns, count, row_limit, data } 또는 { success, error }
"""
return await _execute_sql_internal(sql)
@mcp.tool()
async def upsert_pid_connection(
tag_no: str,
from_tag: str | None = None,
to_tag: str | None = None,
from_at: str | None = None,
to_at: str | None = None,
role: str | None = None,
category: str | None = None,
tag_class: str | None = None,
tag_dcs: bool | None = None,
) -> str:
"""운전자가 작성한 from/to 연결 1건을 pid_equipment 에 멱등(idempotent) 반영.
⚠️ 오직 pid_equipment 테이블만, 아래 컬럼만 변경한다(나머지는 절대 안 건드림):
from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs, connection_locked, updated_at
equipment_name/instrument_type/line_number/pid_drawing_no/pos_x/pos_y/confidence/is_active 는 보존.
매칭 규칙(문서규칙 §5):
(1) 같은 연결((tag_no, from_tag, to_tag) 동일)이 있으면 그 행 UPDATE ← 재실행 안전
(2) 없고, 해당 tag_no 의 from_tag·to_tag 가 비어있는 미완성 행(DXF)이 있으면 그 행을 채워 UPDATE
(3) 둘 다 없으면 INSERT (같은 tag 추가 경로면 기존 행의 equipment_name/type/line/drawing/category 복사)
자동 처리:
- from_tag 또는 to_tag 가 있으면 connection_locked=TRUE (연결분석이 덮어쓰지 않게)
- updated_at = now()
Args:
tag_no: 태그번호 (필수, 대소문자 무시 매칭).
from_tag/to_tag: 상류/하류 태그. 병렬이면 호출 측에서 "A, B" 콤마 병합해 전달(이 도구는 분리하지 않음).
from_at/to_at: 위치/서술 텍스트. role/category/tag_class/tag_dcs: 그대로 저장(None이면 기존 유지).
tag_dcs: TRUE=DCS 함수블록(FIC/TIC류), FALSE=현장 물리 계기(FT/FCV류). None이면 기존 유지.
Returns:
JSON { success, action(update_existing|update_filled|insert), id, tag_no, before, after }
"""
def _n(v):
if v is None:
return None
v = str(v).strip()
return v or None
tag_no = _n(tag_no)
if not tag_no:
return json.dumps({"success": False, "error": "tag_no 가 비었습니다."}, ensure_ascii=False)
err = _check("upsert_pid_connection", {"tag_no": tag_no}, validate_tag(tag_no, _get_db_connection_sync))
if err: return err
from_tag, to_tag = _n(from_tag), _n(to_tag)
from_at, to_at = _n(from_at), _n(to_at)
role, category, tag_class = _n(role), _n(category), _n(tag_class)
# bool은 _n() 미사용 — None이면 기존 유지, 값이면 bool로 강제 변환
tag_dcs_val = bool(tag_dcs) if tag_dcs is not None else None
locked = (from_tag is not None) or (to_tag is not None)
_SNAP = ["tag_no", "from_tag", "to_tag", "from_at", "to_at", "role", "category", "tag_class", "tag_dcs", "connection_locked"]
def _snap(cur, _id):
cur.execute("""SELECT tag_no, from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs, connection_locked
FROM pid_equipment WHERE id=%s""", (_id,))
r = cur.fetchone()
return dict(zip(_SNAP, r)) if r else None
conn = None
try:
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
# (1) 같은 연결 (재실행 멱등)
cur.execute(
"""SELECT id FROM pid_equipment
WHERE lower(tag_no)=lower(%s)
AND from_tag IS NOT DISTINCT FROM %s
AND to_tag IS NOT DISTINCT FROM %s
ORDER BY id LIMIT 1""",
(tag_no, from_tag, to_tag))
row = cur.fetchone()
target_id = row[0] if row else None
action = "update_existing" if row else None
# (2) 빈 미완성(DXF) 행 채우기
if target_id is None:
cur.execute(
"""SELECT id FROM pid_equipment
WHERE lower(tag_no)=lower(%s)
AND (from_tag IS NULL OR from_tag='')
AND (to_tag IS NULL OR to_tag='')
ORDER BY id LIMIT 1""",
(tag_no,))
r2 = cur.fetchone()
if r2:
target_id, action = r2[0], "update_filled"
if target_id is not None:
before = _snap(cur, target_id)
cur.execute(
"""UPDATE pid_equipment SET
from_tag=%s, to_tag=%s, from_at=%s, to_at=%s, role=%s,
category=COALESCE(%s, category),
tag_class=COALESCE(%s, tag_class),
tag_dcs=COALESCE(%s, tag_dcs),
connection_locked=%s, updated_at=now()
WHERE id=%s""",
(from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs_val, locked, target_id))
else:
action = "insert"
before = None
# 같은 tag 기존 행에서 정적 메타 복사 (없으면 NULL)
cur.execute("""SELECT equipment_name, instrument_type, line_number, pid_drawing_no, category, tag_dcs
FROM pid_equipment WHERE lower(tag_no)=lower(%s) ORDER BY id LIMIT 1""", (tag_no,))
meta = cur.fetchone() or (None, None, None, None, None, None)
eq_name, inst_type, line_no, draw_no, ex_cat, ex_dcs = meta
cur.execute(
"""INSERT INTO pid_equipment
(tag_no, equipment_name, instrument_type, line_number, pid_drawing_no,
from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs,
connection_locked, confidence, is_active, extracted_at, updated_at)
VALUES (%s,%s,%s,%s,%s, %s,%s,%s,%s,%s, COALESCE(%s,%s),%s, COALESCE(%s,%s,FALSE), %s, 0, TRUE, now(), now())
RETURNING id""",
(tag_no, eq_name, inst_type, line_no, draw_no,
from_tag, to_tag, from_at, to_at, role, category, ex_cat, tag_class,
tag_dcs_val, ex_dcs, locked))
target_id = cur.fetchone()[0]
after = _snap(cur, target_id)
conn.commit()
return json.dumps({"success": True, "action": action, "id": target_id,
"tag_no": tag_no, "before": before, "after": after},
ensure_ascii=False, default=str)
except Exception as e:
if conn:
try:
conn.rollback()
except Exception:
pass
return json.dumps({"success": False, "error": f"upsert 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def query_pv_history(tag_names: list[str], time_from: str, time_to: str, limit: int = 100) -> str:
"""과거 값(PV) 히스토리 조회.
Args:
tag_names: 태그 이름 목록 (예: ["ficq-6113.pv", "ti-6101.pv"])
time_from: 시작 시간 (ISO 8601, 예: "2026-04-01T00:00:00")
time_to: 종료 시간 (ISO 8601, 예: "2026-04-02T00:00:00")
limit: 반환 행 수 제한 (기본 100, 최대 5000)
Returns:
JSON: { success, tag_names, time_range, limit, data }
"""
_first_bad = None
for t in (tag_names or []):
_first_bad = validate_tag(t, _get_db_connection_sync)
if _first_bad:
break
err = _check("query_pv_history", {"tag_names": tag_names}, _first_bad)
if err: return err
conn = None
try:
limit = min(limit, 5000)
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(
"""SELECT tagname, recorded_at, value
FROM history_table
WHERE tagname = ANY(%s)
AND recorded_at >= %s AND recorded_at <= %s
ORDER BY recorded_at, tagname
LIMIT %s""",
(tag_names, time_from, time_to, limit)
)
rows = cur.fetchall()
data = [{"tag_name": r[0], "timestamp": r[1].isoformat(), "value": r[2]} for r in rows]
return json.dumps({
"success": True,
"tag_names": tag_names,
"time_range": f"{time_from} ~ {time_to}",
"count": len(data),
"data": data
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"히스토리 쿼리 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def get_tag_metadata(query: str, limit: int = 10) -> str:
"""태그 메타데이터 검색 (realtime_table 기반).
Args:
query: 태그명 검색어 (패턴 매칭)
limit: 반환 태그 수 제한 (기본 10)
Returns:
JSON: { success, query, count, tags }
"""
conn = None
try:
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(
"""SELECT tagname, livevalue, timestamp, node_id
FROM realtime_table
WHERE tagname ILIKE %s
ORDER BY tagname LIMIT %s""",
(f"%{query}%", limit)
)
rows = cur.fetchall()
tags = [{"tag_name": r[0], "current_value": r[1],
"last_updated": r[2].isoformat() if r[2] else None,
"node_id": r[3]} for r in rows]
return json.dumps({"success": True, "query": query, "count": len(tags), "tags": tags},
ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"태그 메타데이터 검색 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def list_drawings(unit_no: str | None = None) -> str:
"""단위별 도면 목록 조회 (node_map_master.name 기반).
Args:
unit_no: 단위 번호 접두사 (예: "A", "B"). None이면 전체 목록
Returns:
JSON: { success, unit_no, count, names }
"""
conn = None
try:
conn = await _get_db_connection()
with conn.cursor() as cur:
if unit_no:
cur.execute(
"SELECT DISTINCT name FROM node_map_master WHERE name ILIKE %s ORDER BY name LIMIT 100",
(f"{unit_no}%",)
)
else:
cur.execute("SELECT DISTINCT name FROM node_map_master ORDER BY name LIMIT 100")
rows = cur.fetchall()
return json.dumps({"success": True, "unit_no": unit_no,
"count": len(rows), "names": [r[0] for r in rows]},
ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"도면 목록 조회 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
# ── Phase 7.1: NL2SQL 의도 라우터 ──────────────────────────────────
# 키워드/정규식 기반 (ML 불필요). query_with_nl 진입 시 분류 → 적절한 도구로 위임.
_CLASSIFY_RULES = [
# (정규식, 라우팅 대상 도구명)
(r'활성.*알람|현재.*알람|지금.*알람|active.*alarm', 'active_alarms'),
(r'트립|trip', 'active_alarms'),
(r'상태\s*보고서|교대.*보고|status.*report|운전.*보고', 'generate_status_report'),
# "이상 상황 보고" 패턴 — 사용자가 특정 일자/area 범위로 비정상 상황 요약을 요청하는 경우.
# summarize_events가 since/until/area 파라미터를 받으므로 generate_status_report보다 적합.
(r'이상.*상황|상황.*보고|이상.*보고|비정상.*상황|abnormal', 'summarize_events'),
(r'요약|보고서|리포트|summary|summarize|report', 'summarize_events'),
(r'태그.*찾|tag.*찾|찾아\s*줘|find.*tag|어떤.*태그', 'find_tags'),
(r'이벤트.*조회|이벤트.*목록|event.*list|event.*query|로그.*조회', 'query_events'),
]
def _classify_intent(question: str) -> str:
"""질문을 키워드 정규식으로 분류. 매칭 실패 시 'query_with_nl' (SQL 경로) 반환."""
if not question:
return 'query_with_nl'
q = question.lower()
for pattern, target in _CLASSIFY_RULES:
if re.search(pattern, q, re.IGNORECASE):
return target
return 'query_with_nl'
@mcp.tool()
async def classify_intent(question: str) -> str:
"""질문 의도를 분류하여 적절한 도구명을 반환합니다 (실행하지는 않음).
반환값(도구명): active_alarms, generate_status_report, summarize_events,
find_tags, query_events, query_with_nl(기본)
Args:
question: 자연어 질문
Returns:
JSON: { success, question, route }
"""
route = _classify_intent(question)
return json.dumps({"success": True, "question": question, "route": route}, ensure_ascii=False)
@mcp.tool()
async def query_with_nl(question: str) -> str:
"""자연어 질문을 LLM이 SQL로 변환하고 시계열 DB를 조회합니다.
Args:
question: 자연어 질문 (예: "FICQ-6113.PV 최근 1시간 값을 1분 단위로 표시")
Returns:
JSON: { sql, success, columns, count, data } 또는 { sql, success, error }
"""
import asyncio
import json as json_module
# Phase 7.1: 의도 라우팅 — 명백한 비-SQL 질문은 전용 도구로 위임
route = _classify_intent(question)
if route != 'query_with_nl':
try:
if route == 'active_alarms':
payload = await active_alarms()
elif route == 'find_tags':
payload = await find_tags(query=question)
elif route == 'query_events':
payload = await query_events()
elif route == 'summarize_events':
payload = await summarize_events()
elif route == 'generate_status_report':
payload = await generate_status_report()
else:
payload = None
if payload is not None:
try:
obj = json.loads(payload)
except Exception:
obj = {"raw": payload}
obj["_routed_from"] = "query_with_nl"
obj["_route"] = route
return json.dumps(obj, ensure_ascii=False, default=str)
except Exception as e:
# 라우팅 실패 시 원래 SQL 경로로 fallback
pass
system = (
"You are a PostgreSQL SQL expert.\n"
"Convert the user's question into a SELECT SQL.\n"
"Return ONLY the SQL. No explanation, no markdown, NO <think> tags.\n"
"Use PostgreSQL syntax. tagname UPPERCASE exact match (e.g. 'FICQ-6113.PV').\n"
"If user input is lowercase, convert to UPPERCASE before querying.\n"
"value is TEXT; cast ::double precision when aggregating.\n"
"KST input = UTC-9. Example: KST 12:00 = UTC 03:00.\n"
"For N-min buckets: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)).\n"
"No GROUP BY if no interval specified.\n\n"
f"{_DB_SCHEMA}"
)
try:
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": question},
],
max_tokens=8192,
temperature=0.1,
)
resp = await asyncio.to_thread(_call_llm)
sql = _strip_think(resp.choices[0].message.content or "").strip()
# 마크다운 코드 블록 제거
if sql.startswith("```"):
lines = sql.splitlines()
sql = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]).strip()
if not sql:
return json.dumps({"success": False, "sql": "", "error": "LLM이 SQL을 생성하지 못했습니다."}, ensure_ascii=False)
except Exception as e:
return json.dumps({"success": False, "sql": "", "error": f"LLM SQL 생성 실패: {e}"}, ensure_ascii=False)
# SQL 실행
raw = await _execute_sql_internal(sql)
result = json.loads(raw)
result["sql"] = sql
# long format → pivot 변환 (tagname 컬럼이 있으면 자동 PIVOT)
if result.get("success") and "data" in result:
cols = result.get("columns", [])
data = result["data"]
if "tagname" in cols and data:
time_col = next((c for c in cols if c not in ("tagname", "value", "livevalue", "avg_val")), None)
val_col = next((c for c in ("avg_val", "value") if c in cols), cols[-1])
if time_col:
tag_names_list = sorted(dict.fromkeys(row["tagname"] for row in data))
pivoted: dict = {}
for row in data:
key = str(row[time_col])
if key not in pivoted:
pivoted[key] = {time_col: row[time_col]}
pivoted[key][row["tagname"]] = row.get(val_col)
result["data"] = list(pivoted.values())
result["columns"] = [time_col] + tag_names_list
result["count"] = len(result["data"])
return json.dumps(result, ensure_ascii=False, default=str)
# ── Phase 6 보강 도구 (이벤트/태그/보고서) ─────────────────────────────────────
_VALID_EVENT_TYPES = ("ALARM", "TRIP", "NORMAL", "RUN", "CHANGE")
@mcp.tool()
async def find_tags(query: str | None = None, area: str | None = None, sub_area: str | None = None, top_k: int = 20) -> str:
"""태그 검색 — base_tag/설명(desc)/area/sub_area 통합 검색 (v_tag_summary 뷰 기반).
사용 시점: 사용자가 "온도", "Tower 1 압력", "운전 중인 펌프" 같은 자연어로
태그를 지칭할 때 실제 base_tag(예: 'ti-6101', 'p-6102')를 역으로 찾기 위해.
sub_area만 지정하면 해당 area 전체 태그를 반환한다.
get_tag_metadata와 차이: 단순 tagname LIKE만 보지 않고 description/area에도
매칭하며, 현재 PV/SP/OP/description/area/sub_area를 함께 반환.
Args:
query: (선택) 검색어 (base_tag 또는 description 부분 일치, 대소문자 무시).
생략하면 sub_area/area 조건만으로 전체 조회.
area: (선택) area 필터 (예: 'P6'). "P6-1"처럼 '-'가 있으면 sub_area로 자동 처리.
sub_area: (선택) sub_area 필터 (예: 'P6-1'). 공용 태그("P6-1,P6-2")도 매칭됨.
query 생략 시 해당 sub_area 전체 태그 반환.
top_k: 반환 태그 수 (기본 20, 최대 100)
Returns:
JSON: { success, query, count, tags: [{base_tag, pv, sp, op, description, area, sub_area}] }
"""
err = _check("find_tags", {"query": query, "area": area, "sub_area": sub_area},
validate_area(area), validate_area(sub_area, "sub_area"))
if err: return err
conn = None
try:
top_k = max(1, min(top_k, 100))
# sub_area 명시 파라미터 우선, 없으면 area가 "P6-1" 형식이면 sub_area로 간주
effective_sub = sub_area or (area if (area and "-" in area) else None)
effective_area = None if (area and "-" in area) else area
conds: list[str] = []
params: list = []
if query:
q = f"%{query.strip()}%"
conds.append("(base_tag ILIKE %s OR description ILIKE %s)")
params.extend([q, q])
if effective_sub:
conds.append("%s = ANY(string_to_array(sub_area, ','))")
params.append(effective_sub)
if effective_area:
# v_tag_summary.area는 '{12 | P6 | }' 원본 형식이므로 코드만 추출해 비교
conds.append("trim(split_part(area, '|', 2)) = %s")
params.append(effective_area)
if not conds:
conds.append("TRUE")
sql = (
"SELECT base_tag, pv, sp, op, description, area, sub_area "
"FROM v_tag_summary WHERE " + " AND ".join(conds) +
" ORDER BY base_tag LIMIT %s"
)
params.append(top_k)
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
cur.execute(sql, params)
rows = cur.fetchall()
tags = [
{"base_tag": r[0], "pv": r[1], "sp": r[2], "op": r[3],
"description": r[4], "area": r[5], "sub_area": r[6]}
for r in rows
]
return json.dumps({
"success": True, "query": query, "area": area, "sub_area": effective_sub,
"count": len(tags), "tags": tags
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"태그 검색 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def trace_connections(start_tag: str, direction: str = "downstream", max_depth: int = 20) -> str:
"""pid_equipment 테이블의 from_tag/to_tag를 활용해 장비 연결 경로를 추적.
사용 시점: "스팀 경로 설명해줘", "원료 흐름 따라가줘", "T-203에서 어디로 가?" 같은 질문.
개별 태그 검색 + SQL 조합(10+ 라운드) → trace_connections 1회 호출로 대체.
Args:
start_tag: 시작 태그명 (예: 'FT-6115', 'T-203')
direction: 'downstream'(하류) 또는 'upstream'(상류). 기본 downstream.
max_depth: 최대 추적 깊이 (기본 20)
Returns:
JSON: { success, start_tag, direction, path: [{step, tag_no, from_tag, to_tag, role, live_state}] }
- from_tag/to_tag에 쉼표가 여러 개면 병렬 펌프·라인 → 모두 경로에 포함됨(누락 없음).
- live_state: 해당 태그의 실시간 상태(예: R-RUN/L-STOP). 병렬 펌프 중 R-RUN/L-RUN이 현재 공급원.
"""
err = _check("trace_connections", {"start_tag": start_tag, "direction": direction, "max_depth": max_depth},
validate_tag(start_tag, _get_db_connection_sync),
validate_direction(direction),
validate_max_depth(max_depth))
if err: return err
conn = None
try:
start_tag = start_tag.strip().upper()
direction = direction.strip().lower()
if direction not in ("downstream", "upstream"):
return json.dumps({"success": False, "error": "direction은 'downstream' 또는 'upstream'"}, ensure_ascii=False)
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
def _split_tags(tag_str):
if not tag_str:
return []
return [t.strip() for t in tag_str.split(',') if t.strip()]
def _build_or_condition(tags):
if not tags:
return "", []
conditions = []
params = []
for t in tags:
conditions.append("from_tag LIKE %s")
params.append(f'%{t}%')
conditions.append("tag_no = %s")
params.append(t)
return f"({' OR '.join(conditions)})", params
def _trace_downstream(current_tags, visited, depth):
if depth > max_depth or not current_tags:
return
or_clause, params = _build_or_condition(current_tags)
if not or_clause:
return
cur.execute(f"""
SELECT tag_no, from_tag, to_tag, role
FROM pid_equipment
WHERE {or_clause}
ORDER BY tag_no
""", params)
rows = cur.fetchall()
for row in rows:
tag_no = row[0]
if tag_no in visited:
continue
visited.add(tag_no)
path.append({
"step": len(path) + 1,
"tag_no": tag_no,
"from_tag": row[1],
"to_tag": row[2],
"role": row[3],
})
next_tags = _split_tags(row[2])
_trace_downstream(next_tags, visited, depth + 1)
path = []
visited = set()
visited.add(start_tag)
start_tags = _split_tags(start_tag)
_trace_downstream(start_tags, visited, 0)
# upstream
if direction == "upstream":
def _trace_upstream(current_tags, visited, depth):
if depth > max_depth or not current_tags:
return
conditions = []
params = []
for t in current_tags:
conditions.append("to_tag LIKE %s")
params.append(f'%{t}%')
conditions.append("tag_no = %s")
params.append(t)
if not conditions:
return
or_clause = f"({' OR '.join(conditions)})"
cur.execute(f"""
SELECT tag_no, from_tag, to_tag, role
FROM pid_equipment
WHERE {or_clause}
ORDER BY tag_no
""", params)
rows = cur.fetchall()
for row in rows:
tag_no = row[0]
if tag_no in visited:
continue
visited.add(tag_no)
path.append({
"step": len(path) + 1,
"tag_no": tag_no,
"from_tag": row[1],
"to_tag": row[2],
"role": row[3],
})
prev_tags = _split_tags(row[1])
_trace_upstream(prev_tags, visited, depth + 1)
path = []
visited = set()
visited.add(start_tag)
start_tags = _split_tags(start_tag)
_trace_upstream(start_tags, visited, 0)
# 각 노드에 실시간 상태(pv) 부착 — 병렬 펌프 중 '실제 가동 중'인 것을 식별.
# 예: F-6101A/B 상류에 P-6102(R-RUN)·P-6201(L-STOP)이 병렬이면 현재 공급원은 P-6102.
if path:
pv_tags = [p["tag_no"].upper() + ".PV" for p in path]
cur.execute(
"SELECT tagname, livevalue FROM realtime_table WHERE tagname = ANY(%s)",
(pv_tags,),
)
pv_map = {}
for tn, lv in cur.fetchall():
m = re.match(r'\{\s*\d+\s*\|\s*([^|]+?)\s*\|', lv or '')
pv_map[tn[:-3]] = (m.group(1).strip() if m else (lv or None))
for p in path:
p["live_state"] = pv_map.get(p["tag_no"])
return json.dumps({
"success": True,
"start_tag": start_tag,
"direction": direction,
"path": path,
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"연결 추적 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def query_events(
tag_name: str | None = None,
event_type: str | None = None,
area: str | None = None,
since: str | None = None,
until: str | None = None,
limit: int = 100,
) -> str:
"""이벤트 히스토리 조회 (event_history_table — 디지털 포인트 상태 변경).
Args:
tag_name: (선택) 태그명 LIKE 패턴 (예: 'p-6102', 'xv-%')
event_type: (선택) ALARM / TRIP / NORMAL / RUN / CHANGE 중 하나
area: (선택) area 코드("P6") 정확 매칭. "P6-1"처럼 '-'가 있으면 sub_area로 자동 처리(공용 포함)
since: (선택) 시작 시간 ISO 8601. 기본 24시간 전
until: (선택) 종료 시간 ISO 8601. 기본 현재
limit: 반환 행 수 (기본 100, 최대 1000)
Returns:
JSON: { success, count, time_range, events: [...] }
"""
err = _check("query_events", {"tag_name": tag_name, "area": area},
validate_tag(tag_name, _get_db_connection_sync) if tag_name else None,
validate_area(area))
if err: return err
if event_type and event_type.upper() not in _VALID_EVENT_TYPES:
return json.dumps({
"success": False,
"error": f"event_type은 {list(_VALID_EVENT_TYPES)} 중 하나여야 합니다."
}, ensure_ascii=False)
conn = None
try:
limit = max(1, min(limit, 1000))
where = ["event_time >= COALESCE(%s::timestamptz, NOW() - INTERVAL '24 hours')",
"event_time <= COALESCE(%s::timestamptz, NOW())"]
params: list = [since, until]
if tag_name:
where.append("tagname ILIKE %s")
params.append(f"%{tag_name}%")
if event_type:
where.append("event_type = %s")
params.append(event_type.upper())
if area:
# area 코드 정확매칭 / "P6-1" sub_area 코드면 tag_metadata 토큰 매칭
cond, cparams = _area_or_subarea_filter(area, "tagname", "area")
where.append(cond)
params.extend(cparams)
sql = f"""
SELECT id, tagname, prev_value, curr_value, event_type, event_time,
area, sub_area, duration_seconds, metadata
FROM event_history_table
WHERE {' AND '.join(where)}
ORDER BY event_time DESC
LIMIT %s
"""
params.append(limit)
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
cur.execute(sql, params)
rows = cur.fetchall()
events = [
{"id": r[0], "tag_name": r[1], "prev_value": r[2], "curr_value": r[3],
"event_type": r[4], "event_time": r[5].isoformat() if r[5] else None,
"area": r[6], "sub_area": r[7], "prev_state_duration_s": r[8], "metadata": r[9]}
for r in rows
]
return json.dumps({
"success": True,
"count": len(events),
"time_range": f"{since or '24h ago'} ~ {until or 'now'}",
"filters": {"tag_name": tag_name, "event_type": event_type, "area": area},
"events": events,
}, ensure_ascii=False, indent=2, default=str)
except Exception as e:
return json.dumps({"success": False, "error": f"이벤트 조회 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def active_alarms(area: str | None = None, limit: int = 100) -> str:
"""현재 활성 알람 — 각 태그의 최신 이벤트가 ALARM 또는 TRIP인 경우만 반환.
동작: event_history_table에서 태그별 최신 이벤트(DISTINCT ON)를 가져온 뒤,
event_type이 ALARM 또는 TRIP인 것만 필터. 이후 NORMAL이 들어왔다면 해제된 것이므로 제외.
Args:
area: (선택) area 코드("P6") 필터. "P6-1"처럼 '-'가 있으면 sub_area로 자동 처리(공용 포함)
limit: 최대 반환 수 (기본 100)
Returns:
JSON: { success, count, alarms: [{tag_name, event_type, since, prev_state_duration_s, area, ...}] }
"""
err = _check("active_alarms", {"area": area}, validate_area(area))
if err: return err
conn = None
try:
limit = max(1, min(limit, 500))
# area 코드 정확매칭 / "P6-1" sub_area 코드면 tag_metadata 토큰 매칭 (공용 포함)
area_cond, area_params = _area_or_subarea_filter(area, "tagname", "area")
sql = f"""
WITH latest AS (
SELECT DISTINCT ON (tagname)
id, tagname, curr_value, event_type, event_time,
area, sub_area, duration_seconds, metadata
FROM event_history_table
WHERE event_time >= NOW() - INTERVAL '30 days'
ORDER BY tagname, event_time DESC
)
SELECT id, tagname, curr_value, event_type, event_time,
area, sub_area, duration_seconds, metadata,
EXTRACT(EPOCH FROM (NOW() - event_time))::bigint AS age_seconds
FROM latest
WHERE event_type IN ('ALARM', 'TRIP')
AND {area_cond}
ORDER BY event_time DESC
LIMIT %s
"""
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
cur.execute(sql, (*area_params, limit))
rows = cur.fetchall()
alarms = [
{"id": r[0], "tag_name": r[1], "curr_value": r[2], "event_type": r[3],
"since": r[4].isoformat() if r[4] else None,
"area": r[5], "sub_area": r[6], "prev_state_duration_s": r[7], "metadata": r[8],
"age_seconds": r[9]}
for r in rows
]
return json.dumps({
"success": True, "area": area,
"count": len(alarms), "alarms": alarms,
}, ensure_ascii=False, indent=2, default=str)
except Exception as e:
return json.dumps({"success": False, "error": f"활성 알람 조회 실패: {e}"}, ensure_ascii=False)
finally:
if conn:
conn.close()
@mcp.tool()
async def summarize_events(
since: str | None = None,
area: str | None = None,
event_type: str | None = None,
max_events: int = 200,
focus: str = "",
) -> str:
"""이벤트 히스토리를 LLM으로 한글 요약.
내부 동작: query_events로 이벤트를 가져와 LLM에 요약 요청. 큰 건수는 잘림.
Args:
since: (선택) ISO 8601 시작 시간. 기본 24시간 전
area: (선택) area 필터
event_type: (선택) event_type 필터
max_events: 분석에 포함할 최대 이벤트 (기본 200, 최대 500)
focus: (선택) 요약 시 강조할 관점 (예: "interlock 동작", "교대 시점 이상")
Returns:
JSON: { success, summary, stats: {by_type, by_area, count} }
"""
err = _check("summarize_events", {"area": area}, validate_area(area))
if err: return err
max_events = max(10, min(max_events, 500))
raw = await query_events(event_type=event_type, area=area, since=since, limit=max_events)
parsed = json.loads(raw)
if not parsed.get("success"):
return raw
events = parsed.get("events", [])
if not events:
return json.dumps({
"success": True, "summary": "지정된 조건 범위에 이벤트가 없습니다.",
"stats": {"count": 0}
}, ensure_ascii=False, indent=2)
# 통계
by_type: dict[str, int] = {}
by_area: dict[str, int] = {}
for ev in events:
by_type[ev["event_type"]] = by_type.get(ev["event_type"], 0) + 1
a = ev.get("area") or "(unknown)"
by_area[a] = by_area.get(a, 0) + 1
# LLM 요약 — 가벼운 토큰 수로 제한
sample = events[:max_events]
bullet_lines = [
f"- [{ev['event_type']}] {ev['tag_name']} @ {_kst_str(ev['event_time'])}"
f" ({ev.get('prev_value')}{ev.get('curr_value')})"
f" (직전상태유지={ev.get('prev_state_duration_s', '?')}s)"
f"{(' area=' + ev['area']) if ev.get('area') else ''}"
for ev in sample
]
focus_line = f"\n특히 다음 관점을 우선해 설명하세요: {focus}\n" if focus else ""
system = (
"당신은 IIoT/공장 운전 분석 전문가입니다. 디지털 포인트의 상태 변경 이벤트 로그를 보고 "
"한국어로 6~10줄 요약을 만듭니다. 다음 구조를 따릅니다:\n"
"1) 핵심 현황 (총 이벤트 수, 주요 area)\n"
"2) 알람/트립 (ALARM/TRIP) 핵심 케이스 — 태그·시각·전후값\n"
"3) 패턴/특이점 (반복 발생, 동시 발생, area 집중)\n"
"4) 다음 점검 권고 (있다면)\n"
"구체적인 태그명과 시각을 포함하되 추측은 자제합니다.\n\n"
"참고 - 모든 시각은 KST(UTC+9, Asia/Seoul)입니다. "
"`직전상태유지`는 이 이벤트 직전에 태그가 머물렀던 시간(초)입니다."
)
user_msg = (
f"분석 대상 이벤트 {len(sample)}건 (전체 {parsed.get('count')}건). "
f"통계: type={by_type}, area={by_area}.{focus_line}\n\n"
+ "\n".join(bullet_lines)
)
def _call():
return _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_msg},
],
max_tokens=1200,
temperature=0.2,
)
try:
resp = await asyncio.to_thread(_call)
summary = _strip_think(resp.choices[0].message.content) or "(요약 없음)"
except Exception as e:
summary = f"(LLM 요약 실패: {e})"
return json.dumps({
"success": True,
"summary": summary,
"stats": {"count": parsed.get("count"), "by_type": by_type, "by_area": by_area},
"time_range": parsed.get("time_range"),
}, ensure_ascii=False, indent=2)
@mcp.tool()
async def generate_status_report(area: str | None = None, hours: int = 24) -> str:
"""공장 운전 상태 종합 보고서 — 활성 알람 + 최근 이벤트 + 추세를 LLM이 한 장으로 정리.
Args:
area: (선택) area 필터 (전체 공장이면 NULL)
hours: 이벤트 분석 윈도우 (기본 24시간, 최대 168=7일)
Returns:
JSON: { success, report, sections: {active_alarms, recent_events, by_type}, generated_at }
"""
err = _check("generate_status_report", {"area": area, "hours": hours}, validate_area(area))
if err: return err
hours = max(1, min(hours, 168))
since_iso = None # query_events가 24h 기본을 쓰지만, hours로 명시 전달
from datetime import datetime, timezone, timedelta
since_iso = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
# 1) 활성 알람
alarms_raw = await active_alarms(area=area, limit=50)
alarms = json.loads(alarms_raw).get("alarms", [])
# 2) 최근 이벤트 통계
events_raw = await query_events(area=area, since=since_iso, limit=500)
events_parsed = json.loads(events_raw)
events = events_parsed.get("events", [])
by_type: dict[str, int] = {}
for ev in events:
by_type[ev["event_type"]] = by_type.get(ev["event_type"], 0) + 1
# 2.5) 펌프 운전 교차검증 (유량/진공 기반) — v_plant_running_state_agg
pump_corr: list[dict] = []
try:
corr_raw = await _execute_sql_internal(
"SELECT area_code, status, total_pumps, confirmed_running, suspicious_running, "
"stale_running, indeterminate_running, tripped_pumps, "
"confirmed_tags, suspicious_tags, stale_tags "
"FROM v_plant_running_state_agg ORDER BY area_code"
)
corr_parsed = json.loads(corr_raw)
if corr_parsed.get("success"):
pump_corr = corr_parsed.get("data", [])
if area:
_f = [r for r in pump_corr if (r.get("area_code") or "").upper() == area.upper()]
if _f:
pump_corr = _f
except Exception:
pump_corr = []
# 3) LLM 보고서
alarm_lines = [
f"- [{a['event_type']}] {a['tag_name']} since {_kst_str(a['since'])} "
f"({a.get('age_seconds', 0)}s){' area=' + a['area'] if a.get('area') else ''}"
for a in alarms[:30]
] or ["- 활성 알람 없음"]
recent_lines = [
f"- [{ev['event_type']}] {ev['tag_name']} @ {_kst_str(ev['event_time'])} "
f"({ev.get('prev_value')}{ev.get('curr_value')})"
f" (직전상태유지={ev.get('prev_state_duration_s', '?')}s)"
for ev in events[:40]
]
corr_lines = [
f"- {r.get('area_code')}: {r.get('status')} "
f"(확인 {r.get('confirmed_running', 0)}, 의심 {r.get('suspicious_running', 0)}, "
f"정체 {r.get('stale_running', 0)}, 트립 {r.get('tripped_pumps', 0)})"
+ (f" 의심펌프={r.get('suspicious_tags')}" if r.get('suspicious_running') else "")
+ (f" 정체펌프={r.get('stale_tags')}" if r.get('stale_running') else "")
for r in pump_corr
] or ["- 펌프 교차검증 데이터 없음"]
system = (
"당신은 IIoT/공장 운전 분석 전문가입니다. 디지털 포인트의 상태 변경 이벤트 로그를 보고 "
"한국어로 6~10줄 요약을 만듭니다. 다음 구조를 따릅니다:\n"
"1) 핵심 현황 (총 이벤트 수, 주요 area)\n"
"2) 알람/트립 (ALARM/TRIP) 핵심 케이스 — 태그·시각·전후값\n"
"3) 패턴/특이점 (반복 발생, 동시 발생, area 집중)\n"
"4) 다음 점검 권고 (있다면)\n"
"구체적인 태그명과 시각을 포함하되 추측은 자제합니다.\n\n"
"참고 - 모든 시각은 KST(UTC+9, Asia/Seoul)입니다. "
"`직전상태유지`는 이 이벤트 직전에 태그가 머물렀던 시간(초)입니다.\n"
"펌프 운전 교차검증: 펌프 RUN 상태를 유량(kg/hr)·진공압(torr)으로 확인한 결과입니다. "
"'확인'=실질 운전, '의심'=RUN인데 유량/진공 없음(deadhead·센서이상 가능), "
"'정체'=실시간 수집 지연으로 판정 보류. 의심/정체가 있으면 보고서에 우선 명시하세요."
)
user_msg = (
f"대상 area: {area or '전체'}\n"
f"분석 윈도우: 최근 {hours}시간\n"
f"이벤트 통계 (type별): {by_type}\n"
f"활성 알람 {len(alarms)}건:\n" + "\n".join(alarm_lines) + "\n\n"
f"펌프 운전 교차검증 (area별):\n" + "\n".join(corr_lines) + "\n\n"
f"최근 이벤트 표본 (최대 40건):\n" + "\n".join(recent_lines)
)
def _call():
return _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_msg},
],
max_tokens=2048,
temperature=0.2,
)
try:
resp = await asyncio.to_thread(_call)
report = _strip_think(resp.choices[0].message.content) or "(보고서 생성 실패)"
except Exception as e:
report = f"(LLM 보고서 실패: {e})"
from datetime import datetime as _dt, timezone as _tz
return json.dumps({
"success": True,
"report": report,
"sections": {
"active_alarms_count": len(alarms),
"recent_events_count": len(events),
"by_type": by_type,
"pump_corroboration": pump_corr,
"window_hours": hours,
"area": area,
},
"generated_at": _dt.now(_tz.utc).isoformat(),
}, ensure_ascii=False, indent=2)
# ── P&ID 추출 도구 ──────────────────────────────────────────────────────────────
@mcp.tool()
async def extract_pid_tags(text: str, source_type: str) -> str:
"""텍스트에서 P&ID 태그를 regex로 결정론적 추출. LLM 호출 없음.
Args:
text: DXF/PDF에서 추출한 plain text
source_type: 'dxf' 또는 'pdf' (로깅용)
Returns:
JSON: { success, count, tags: [{tagNo, kind, prefix, type, confidence, ...}] }
- kind: 'pipe' | 'equipment' | 'instrument' | 'unknown'
- pipe면 lineNumber/size 포함, fluid는 type 필드
"""
try:
tags = _extract_pid_tags_from_text(text or "")
logging.info(f"[extract_pid_tags] source={source_type} count={len(tags)}")
return json.dumps({"success": True, "count": len(tags), "tags": tags},
ensure_ascii=False, indent=2)
except Exception as e:
logging.error(f"P&ID 태그 추출 실패: {e}")
return json.dumps({"success": False, "error": str(e), "count": 0, "tags": []},
ensure_ascii=False)
@mcp.tool()
async def match_pid_tags(pid_tags: list[str], experion_tags: list[str]) -> str:
"""P&ID 태그를 Experion 태그에 결정론적으로 매핑. LLM 호출 없음.
매칭 전략 (보수적 — false positive 방지):
1) 정규화(lowercase + Experion suffix 제거) 후 동일 → confidence 0.99
2) Experion이 PID의 확장 (예: "FT-101""ft-101.pv") → confidence 0.95
단, PID prefix가 최소 4글자 이상이어야 (`P-1` 같은 짧은 식별자가 다수와 매칭되는 것 방지)
3) 그 외 → experionTag=null, confidence=0.0
(UI에서는 "매핑" 버튼 표시 → 운전원 수동 매핑)
이전 difflib fuzzy(cutoff 0.75)는 PSV-10101 ↔ p-10101.pv 같은 거짓 매칭을 만들어 제거함.
숫자 부분만 같으면 종류(PSV/P/XV/VP)가 달라도 묶이는 문제.
Args:
pid_tags: P&ID에서 추출한 태그 목록
experion_tags: Experion 시스템 태그 목록 (보통 ".pv" 등 suffix 포함)
Returns:
JSON: { success, count, mappings: [{pidTag, experionTag, confidence}] }
"""
_EX_SUFFIXES = ('.pv', '.sp', '.mv', '.op', '.cv', '.fieldvalue', '.qv', '.hzset')
_MIN_PREFIX_LEN = 4 # prefix 매칭 최소 길이
def _norm(tag: str) -> str:
t = (tag or "").strip().lower()
for s in _EX_SUFFIXES:
if t.endswith(s):
t = t[:-len(s)]
break
return t
try:
# 정규화된 experion → 원본 (충돌 시 더 짧은 원본 우선)
ex_index: dict[str, str] = {}
for ex in experion_tags or []:
n = _norm(ex)
if not n:
continue
if n not in ex_index or len(ex) < len(ex_index[n]):
ex_index[n] = ex
# prefix 매칭 후보 inverted index (n -> 자기 자신)
ex_norms = list(ex_index.keys())
mappings = []
for pid in pid_tags or []:
pid_norm = _norm(pid)
if not pid_norm:
mappings.append({"pidTag": pid, "experionTag": None, "confidence": 0.0})
continue
# 1) 정확 매칭
if pid_norm in ex_index:
mappings.append({"pidTag": pid, "experionTag": ex_index[pid_norm], "confidence": 0.99})
continue
# 2) prefix 매칭 — 단, pid가 충분히 구체적일 때만 (≥ 4 chars)
if len(pid_norm) >= _MIN_PREFIX_LEN:
hit = next(
(n for n in ex_norms
if n.startswith(pid_norm + ".") or n.startswith(pid_norm + "-")),
None,
)
if hit:
mappings.append({"pidTag": pid, "experionTag": ex_index[hit], "confidence": 0.95})
continue
# 3) 매칭 없음
mappings.append({"pidTag": pid, "experionTag": None, "confidence": 0.0})
return json.dumps({"success": True, "count": len(mappings), "mappings": mappings},
ensure_ascii=False, indent=2)
except Exception as e:
logging.error(f"match_pid_tags failed: {e}")
return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False)
# ── P&ID 파싱 도구 (DXF/PDF/DWG) ───────────────────────────────────────────────
@mcp.tool()
async def parse_pid_dxf(filepath: str) -> str:
"""DXF에서 구조화된 P&ID 정보를 빠르게 추출. 좌표 계산 없음, LLM 호출 없음.
- LINENO 레이어 → 라인 마스터 (service/line_no/size/material_spec/flange/insul_*)
- 그 외 TEXT/MTEXT → 태그 후보 (prefix로 장비·계기 분류)
- fluid 사전은 Symbol & Legend FLUID NAME ABBREVIATION 기준 내장
- 그래프/위상 분석이 필요하면 별도 도구 `build_pid_graph_parallel` 사용
Args:
filepath: DXF 파일 경로
Returns:
JSON: {
success, fluid_dictionary, linenos, tags, stats
}
"""
try:
data = await _extract_pid_dxf_fast(filepath)
return json.dumps({"success": True, **data}, ensure_ascii=False, indent=2)
except Exception as e:
logging.error(f"parse_pid_dxf failed: {e}")
return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False)
@mcp.tool()
async def parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
"""PyMuPDF 기반 PDF 파일 파싱. 텍스트 추출 후 LLM으로 태그 자동 추출.
Args:
filepath: PDF 파일 경로
use_ocr: OCR 사용 여부 (기본 True, 고정밀도)
Returns:
JSON: { success, text, count, tags: [{tagNo, equipmentName, ...}] }
"""
import asyncio
import json
import re
try:
def _extract_text():
if use_ocr:
return _extract_text_from_pdf_ocr(filepath)
else:
return _extract_text_from_pdf(filepath)
text = await asyncio.to_thread(_extract_text)
if not text.strip():
return json.dumps({
"success": True,
"text": "",
"count": 0,
"tags": []
}, ensure_ascii=False, indent=2)
# LLM으로 태그 추출
system = (
"You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
"Extract instrument and equipment tags from the provided text.\n"
"Return ONLY a JSON array of objects with the following structure:\n"
'[{"tagNo":"FIT-10115","equipmentName":"Flow Transmitter","instrumentType":"FT" OR "FIT OR "TIA","lineNumber":"L-101","pidDrawingNo":"P&ID-001","confidence":0.95},...]\n'
"IMPORTANT rules:\n"
"- tagNo: Standard tag format with these patterns:\n"
" * Instrument: [Function][Loop]-[Number] (e.g., FT-101, PT-201, LI-301, FICQ-6113)\n"
" * Equipment: [Type]-[Number] (e.g., P-10101, T-10100, C-9111, E-10119)\n"
" * Complex: [Type]-[Number]-[Size]-[Class]-[Material]-[Option] (e.g., VG-6203-15A-F1A-n, CD-10513-40A-S1A-H50)\n"
" * Real examples from DXF: BT-6200, SARF-#6-PID-002, P-6101, DP-10101, CHS-6630-100A-F-C50\n"
"- instrumentType: First 2-4 letters of tagNo (FIT, PT, LI, FICQ, TCV, FCV, PCV, PG, TG, etc.)\n"
"- equipmentName: Descriptive name if available, otherwise null\n"
"- lineNumber: Line number if available, otherwise null\n"
"- pidDrawingNo: Drawing number if available, otherwise null\n"
"- confidence: 0.0 to 1.0 based on how clearly the tag was identified\n"
"- Do NOT include any explanation, only the JSON array.\n"
"- If no tags found, return an empty array: []\n"
"- temperature=0.1 for deterministic output.\n"
)
truncated_text = text[:12000] if len(text) > 12000 else text
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: pdf\n\nText:\n{truncated_text}"},
],
max_tokens=4096,
temperature=0.1,
)
resp = await asyncio.to_thread(_call_llm)
raw = _strip_think(resp.choices[0].message.content or "").strip()
# 마크다운 코드 블록 제거
if raw.startswith("```"):
lines = raw.splitlines()
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
# JSON 배열 추출
match = re.search(r'\[.*\]', raw, re.DOTALL)
if match:
raw = match.group(0)
# JSON 파싱 시도
try:
data = json.loads(raw)
except json.JSONDecodeError:
# JSON 배열 추출 시도 (더 엄격한 패턴)
match = re.search(r'\[\s*\{.*?\}\s*\]', raw, re.DOTALL)
if match:
raw_clean = match.group(0)
try:
data = json.loads(raw_clean)
except json.JSONDecodeError:
# 마지막으로, JSON 배열을 개별 객체로 분리하여 파싱 시도
objects = re.findall(r'\{[^{}]*\}', raw, re.DOTALL)
data = []
for obj in objects:
try:
data.append(json.loads(obj))
except json.JSONDecodeError:
pass
if not isinstance(data, list):
data = []
return json.dumps({
"success": True,
"text": text[:10000],
"count": len(text),
"tags": data
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"PDF 파싱 실패: {e}"}, ensure_ascii=False)
@mcp.tool()
async def build_pid_graph_parallel(filepath: str) -> str:
"""
분산 처리 기법을 적용한 P&ID 그래프 생성 툴.
전처리 -> 병렬 분산 추출 -> 위상 모델링 -> 저장 과정을 수행합니다.
"""
import asyncio
import json
try:
# 1. 전처리 (Phase 1: Geometric Extraction)
def _extract_and_save():
extractor = PidGeometricExtractor(filepath)
geo_data_path = f"mcp-server/storage/{os.path.basename(filepath)}_geo.json"
geo_data_list = extractor.extract_and_save(geo_data_path)
return geo_data_path
geo_data_path = await asyncio.to_thread(_extract_and_save)
# geo_data_list는 경로를 반환하므로 다시 로드
def _load_geo_data():
with open(geo_data_path, 'r', encoding='utf-8') as f:
return json.load(f)
geo_data = await asyncio.to_thread(_load_geo_data)
# 2. 병렬 분산 추출 (Phase 3: Intelligent Mapping)
# 시스템 태그 목록 가져오기 (DB에서 조회하는 로직 필요, 여기서는 예시로 빈 리스트 또는 기본값)
# 실제로는 get_tag_metadata 등을 통해 전체 태그 리스트를 확보해야 함
system_tags = []
try:
conn = await _get_db_connection()
try:
with conn.cursor() as cur:
cur.execute("SELECT tagname FROM realtime_table")
system_tags = [r[0] for r in cur.fetchall()]
finally:
conn.close()
except Exception as e:
logging.warning(f"Failed to fetch system tags: {e}")
# 그래프 임시 생성 (Mapper가 위상 정보를 사용하므로 필요)
builder = PidTopologyBuilder(geo_data)
builder.build_graph()
# Mapper 설정
from openai import AsyncOpenAI
api_client = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
mapper = IntelligentMapper(builder.G, system_tags, api_client=api_client, model_name=VLLM_MODEL)
# 분류별 노드 분리
nodes = list(builder.G.nodes())
transmitter_nodes = [n for n, d in builder.G.nodes(data=True) if d.get('value', '').upper() in ['FIT', 'FT', 'LT', 'PT', 'TE']] # 단순화된 필터
valve_nodes = [n for n, d in builder.G.nodes(data=True) if d.get('value', '').upper() in ['FCV', 'LCV', 'TCV', 'PCV', 'XV']]
equipment_nodes = [n for n, d in builder.G.nodes(data=True) if d.get('type') not in ['TEXT', 'LINE', 'LWPOLYLINE']]
# 병렬 호출 (vLLM Batching 유도)
tasks = [
mapper.extract_transmitters(transmitter_nodes),
mapper.extract_valves(valve_nodes),
mapper.extract_equipment(equipment_nodes)
]
extracted_results = await asyncio.gather(*tasks)
# 결과 통합
all_mapped_tags = []
for res_dict in extracted_results:
for node_id, mapping in res_dict.items():
if mapping.resolved_tag != "UNKNOWN":
# TopologyBuilder가 기대하는 형식으로 변환
node_data = builder.G.nodes[node_id]
all_mapped_tags.append({
"entity_id": node_id,
"tagName": mapping.resolved_tag,
"bbox": node_data['bbox'].bounds if hasattr(node_data['bbox'], 'bounds') else node_data['bbox'],
"clean_value": mapping.resolved_tag
})
# 3. 최종 위상 모델링 (Phase 2)
final_builder = PidTopologyBuilder(geo_data, all_extracted_tags=all_mapped_tags)
final_builder.build_graph()
# 4. 저장
graph_id = os.path.basename(filepath).replace(".dxf", "_graph.json")
graph_path = f"mcp-server/storage/{graph_id}"
final_builder.save_graph(graph_path)
return json.dumps({
"success": True,
"data": {
"graph_id": graph_id,
"graph_path": graph_path,
"nodes": final_builder.G.number_of_nodes(),
"edges": final_builder.G.number_of_edges()
},
"message": "그래프 생성 완료"
}, ensure_ascii=False)
except Exception as e:
logging.error(f"build_pid_graph_parallel failed: {e}")
return json.dumps({"success": False, "data": None, "error": str(e), "message": "그래프 생성 실패"}, ensure_ascii=False)
@mcp.tool()
async def analyze_pid_impact(graph_id: str, start_node_id: str) -> str:
"""
구축된 그래프를 기반으로 특정 설비 장애 시 영향도 분석을 수행합니다.
"""
import asyncio
try:
graph_path = f"mcp-server/storage/{graph_id}"
mapping_path = graph_path.replace("_graph.json", "_mapping.json")
def _analyze():
analyzer = PidAnalysisEngine(graph_path, mapping_path)
return analyzer.analyze_impact(start_node_id)
result = await asyncio.to_thread(_analyze)
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"Impact analysis failed: {e}"}, ensure_ascii=False)
@mcp.tool()
async def parse_pid_drawing(filepath: str) -> str:
"""확장자 자동 감지하여 P&ID 도면 파싱.
Args:
filepath: DXF/DWG/PDF 파일 경로
Returns:
JSON: { success, text, count, tags, format }
"""
import os
ext = os.path.splitext(filepath)[1].lower()
if ext == ".dxf":
return await parse_pid_dxf(filepath)
elif ext == ".dwg":
# DWG 파일은 사전에 DXF로 변환하여 업로드해야 합니다.
# Linux에서 DWG를 DXF로 변환하는 도구는 제한되어 있습니다.
return json.dumps({
"success": False,
"error": "DWG 파일은 현재 직접 파싱할 수 없습니다.\n" +
"사전에 DXF로 변환하여 업로드해 주세요.\n" +
"\n변환 방법:\n" +
"1. Windows에서 AutoCAD 또는 ODA File Converter 사용\n" +
"2. 온라인 DWG → DXF 변환기 사용\n" +
"3. LibreOffice Draw (Windows/macOS 전용) 사용"
}, ensure_ascii=False)
elif ext == ".pdf":
return await parse_pid_pdf(filepath)
else:
return json.dumps({
"success": False,
"error": f"Unsupported format: {ext}. Supported: .dxf, .dwg, .pdf"
}, ensure_ascii=False)
# ── KB ingest 파서 ────────────────────────────────────────────────────────────
@mcp.tool()
async def parse_document(
doc_id: str,
title: str,
file_path: str,
mime_type: str = "",
collection_key: str = "",
chunking_policy: str = "",
) -> str:
"""KB ingest 파서. 파일 확장자에 따라 적절한 청킹을 수행한다.
Args:
doc_id: 문서 ID (UUID 문자열)
title: 제목 (오류 메시지에만 사용)
file_path: 절대 경로
mime_type: 정보용 (옵션)
collection_key: 정보용 (옵션)
chunking_policy: JSON 문자열, 향후 정책 분기에 사용
Returns:
JSON 문자열: {"success": true, "chunks": [{"text", "chunk_kind", "locator"}, ...]}
or {"success": false, "error": "..."}
"""
import os
if not os.path.isfile(file_path):
return json.dumps({"success": False, "error": f"file not found: {file_path}"}, ensure_ascii=False)
ext = os.path.splitext(file_path)[1].lower()
try:
if ext in (".xlsx", ".xlsm"):
from parsers import xlsx_parser
chunks = await asyncio.to_thread(xlsx_parser.parse, file_path)
elif ext == ".pdf":
from parsers import pdf_parser
chunks = await asyncio.to_thread(pdf_parser.parse, file_path)
elif ext == ".docx":
from parsers import docx_parser
chunks = await asyncio.to_thread(docx_parser.parse, file_path)
elif ext in (".md", ".txt", ".markdown"):
from parsers import text_parser
chunks = await asyncio.to_thread(text_parser.parse, file_path)
else:
return json.dumps(
{"success": False, "error": f"unsupported extension: {ext}"},
ensure_ascii=False
)
return json.dumps(
{"success": True, "doc_id": doc_id, "chunks": chunks, "count": len(chunks)},
ensure_ascii=False
)
except Exception as e:
return json.dumps({"success": False, "error": f"parse failed: {e}"}, ensure_ascii=False)
# ── Field Instrument Inference ──────────────────────────────────────────────
@mcp.tool()
def infer_field_instruments(
use_llm: bool = False,
seed_doc_id: str | None = None,
seed_excel_path: str | None = None,
) -> str:
"""
v_tag_summary의 모든 base_tag를 대상으로 현장 계기를 자동 유추하여 Excel 초안 생성.
Args:
use_llm: LLM으로 description 한국어 초안 보강 (기본 False, Phase C에서 활성화)
seed_doc_id: 기존 Excel doc_id 지정 시 수정사항 보존 + 신규만 추가
seed_excel_path: 기존 Excel 파일 경로 (C# 측에서 전달)
Returns:
JSON: { success, doc_path, instrument_count, unmatched_count, message }
"""
from instrument_inference.rules import load_rules
from instrument_inference.infer import infer_instruments_for_base_tag
from instrument_inference.excel import generate_excel
# [H-3 수정] v_tag_summary에서 base_tag별 dp 집합을 SQL로 한 번에 추출
def _fetch_tags():
import psycopg
conn = psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
try:
with conn.cursor() as cur:
cur.execute("""
SELECT
base_tag,
area,
CASE WHEN pv IS NOT NULL THEN '.pv' END,
CASE WHEN sp IS NOT NULL THEN '.sp' END,
CASE WHEN op IS NOT NULL THEN '.op' END,
CASE WHEN instate0 IS NOT NULL THEN '.instate0' END,
CASE WHEN instate1 IS NOT NULL THEN '.instate1' END,
CASE WHEN instate2 IS NOT NULL THEN '.instate2' END
FROM v_tag_summary
ORDER BY base_tag
""")
return cur.fetchall()
finally:
conn.close()
# [L-3 수정] .qv는 v_tag_summary에 없으므로 별도 조회
def _fetch_qv_tags():
import psycopg
conn = psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
try:
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT SUBSTRING(tagname FROM '(.*)\\.qv') AS base_tag
FROM realtime_table
WHERE tagname LIKE '%.qv'
""")
return set(r[0] for r in cur.fetchall())
finally:
conn.close()
tags = _fetch_tags()
# [M-1 수정] asyncio.to_thread 데드 코드 삭제 — sync def이므로 blocking 호출
qv_tags = _fetch_qv_tags()
all_instruments = []
power_equipment = []
unmatched_tags = []
for row in tags:
base_tag = row[0]
area = row[1]
if not base_tag:
continue
# [H-3 수정] 실제 DB 컬럼 NULL 여부로 dp 집합 구성
dp = [d for d in row[2:8] if d is not None]
if base_tag in qv_tags:
dp.append(".qv")
try:
result = infer_instruments_for_base_tag(base_tag, dp, area or "")
# [H-4 수정] 빈 리스트 가드
if not result:
unmatched_tags.append(base_tag)
continue
if result[0].get("needs_review") and result[0].get("inference_basis") == "unmatched_prefix":
unmatched_tags.append(base_tag)
else:
for inst in result:
if inst.get("role") == "power_equipment":
power_equipment.append(inst)
else:
all_instruments.append(inst)
except Exception as e:
logging.warning(f"[infer] {base_tag} 처리 실패: {e}")
unmatched_tags.append(base_tag)
# [M-5 수정] instrument_id 기준 dedup
def _dedup(inst_list):
seen = {}
deduped = []
for inst in inst_list:
iid = inst["instrument_id"]
if iid in seen:
existing = seen[iid]
existing["inference_basis"] = f"{existing['inference_basis']}; also from {inst.get('parent_base_tag', '?')}"
else:
seen[iid] = inst
deduped.append(inst)
return deduped
all_instruments = _dedup(all_instruments)
power_equipment = _dedup(power_equipment)
# [M-3 수정] seed_excel_path 기반 머지 로직
if seed_excel_path and os.path.exists(seed_excel_path):
all_instruments = _merge_with_seed_excel(seed_excel_path, all_instruments)
# Excel 생성 (4시트: instruments, naming_convention, unmatched_tags, power_equipment)
xlsx_path = generate_excel(all_instruments, unmatched_tags, power_equipment)
# LLM 보강 (Phase C — 아직 stub)
if use_llm:
xlsx_path = _enrich_descriptions_llm(xlsx_path)
return json.dumps({
"success": True,
"doc_path": xlsx_path,
"instrument_count": len(all_instruments),
"power_equipment_count": len(power_equipment),
"unmatched_count": len(unmatched_tags),
"message": f"{len(all_instruments)}개 계기, {len(power_equipment)}개 동력기기 유추 완료 ({len(unmatched_tags)}개 미매칭)",
})
def _merge_with_seed_excel(seed_path: str, new_instruments: list[dict]) -> list[dict]:
"""[M-3 수정] 기존 Excel에서 instruments 시트 로드 후 머지."""
from openpyxl import load_workbook
wb = load_workbook(seed_path, read_only=True, data_only=True)
ws = wb["instruments"]
rows = list(ws.iter_rows(values_only=True))
if not rows:
return new_instruments
headers = [str(c) for c in rows[0]]
existing = []
for r in rows[1:]:
inst = dict(zip(headers, r))
if inst.get("delete") in (True, "TRUE", "True", "YES", "Yes", "Y", "y"):
continue
existing.append(inst)
existing_ids = {i["instrument_id"] for i in existing}
merged = list(existing)
for inst in new_instruments:
if inst["instrument_id"] not in existing_ids:
merged.append(inst)
return merged
def _enrich_descriptions_llm(xlsx_path: str) -> str:
"""LLM으로 description 한국어 초안 보강 (Phase C stub)."""
# [M-2 수정] Phase C 구현 전까지 no-op. UI는 disabled로 처리.
return xlsx_path
# ── 엔트리포인트 ──────────────────────────────────────────────────────────────
def main():
"""HTTP 모드로 실행 — C# McpClient (localhost:5001) 용."""
mcp.run(transport="streamable-http")
if __name__ == "__main__":
# --http 플래그: HTTP 모드 (C# McpClient 용)
# 플래그 없음: stdio 모드 (Claude Code / Roo Code MCP 용)
if "--http" in sys.argv:
mcp.run(transport="streamable-http")
else:
mcp.run(transport="stdio")