- P&ID: 연결 분석 API, Prefix 규칙 관리, 카테고리 분류, DXF 그래프 빌드 - LLM: 대화 요약, tool card 영구 보존, 시계열 차트(uPlot), 에이전트 모드 - KB: 청크 미리보기, Field Instrument Inference, 인증/Qdrant 클라이언트 - MCP: 서버 기능 확장, 파이프라인 수정, timeout 개선 - Frontend: P&ID UI, LLM UI, KB UI, OPC UA Write 탭 추가 - 설정: AGENTS.md, plant_context, README, opencode.json 업데이트 - 정리: 진단 체크리스트 문서 삭제
2369 lines
92 KiB
Python
2369 lines
92 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ExperionCrawler Unified MCP Server
|
|
- RAG: Qdrant + Ollama nomic-embed-text + vLLM (llm-model.json)
|
|
- NL2SQL: 자연어 → LLM SQL 생성 → PostgreSQL 실행
|
|
- 사용처:
|
|
stdio 모드 (기본): Claude Code MCP / Roo Code MCP
|
|
HTTP 모드 (--http): C# McpClient (localhost:5001)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import sys
|
|
import os
|
|
import re
|
|
import json
|
|
import logging
|
|
import httpx
|
|
from functools import lru_cache
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
logging.basicConfig(level=logging.WARNING, stream=sys.stderr)
|
|
|
|
# ── 시간 변환 ───────────────────────────────────────────────────────────────────
|
|
KST = __import__("zoneinfo").ZoneInfo("Asia/Seoul")
|
|
|
|
def _kst_str(dt_iso: str | None) -> str:
|
|
"""UTC ISO 문자열 → KST ISO 문자열 (초 단위, +09:00). None이면 그대로 None."""
|
|
if not dt_iso:
|
|
return str(dt_iso) if dt_iso is None else ""
|
|
from datetime import datetime
|
|
if isinstance(dt_iso, datetime):
|
|
return dt_iso.astimezone(KST).strftime("%Y-%m-%dT%H:%M:%S+09:00")
|
|
try:
|
|
dt = datetime.fromisoformat(dt_iso)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=__import__("datetime").timezone.utc)
|
|
return dt.astimezone(KST).strftime("%Y-%m-%dT%H:%M:%S+09:00")
|
|
except Exception:
|
|
return dt_iso # fallback
|
|
|
|
# ── 설정 ──────────────────────────────────────────────────────────────────────
|
|
QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333")
|
|
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
|
|
EMBED_MODEL = os.environ.get("EMBED_MODEL", "nomic-embed-text")
|
|
VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8001/v1")
|
|
from config import get_vllm_model
|
|
VLLM_MODEL = get_vllm_model()
|
|
|
|
# Qdrant 컬렉션
|
|
COL_CODEBASE = "ws-65f457145aee80b2" # ExperionCrawler 소스코드
|
|
COL_OPC_DOCS = "experion-opc-docs" # Experion HS R530 OPC UA 공식 문서 (266 chunks)
|
|
|
|
# 사용자 KB 컬렉션 (kb_collections 시드 5종과 일치)
|
|
KB_COLLECTIONS = {
|
|
"system_instrument": "kb_system_instrument",
|
|
"plant_operation": "kb_plant_operation",
|
|
"procedure": "kb_procedure",
|
|
"report": "kb_report",
|
|
"vendor_doc": "kb_vendor_doc",
|
|
}
|
|
|
|
# PostgreSQL 연결
|
|
DB_CONNECTION_STRING = os.environ.get("DB_CONNECTION_STRING", "postgresql://postgres:postgres@localhost:5432/iiot_platform")
|
|
DB_TIMEOUT = int(os.environ.get("DB_TIMEOUT", "10"))
|
|
|
|
# C# McpClient(localhost:5001)와 통신: json_response+stateless로 단순 POST→JSON 방식
|
|
mcp = FastMCP(
|
|
"iiot-rag",
|
|
port=5001,
|
|
json_response=True,
|
|
stateless_http=True,
|
|
)
|
|
|
|
# Pipeline Imports
|
|
from pipeline.extractor import PidGeometricExtractor
|
|
from pipeline.topology import PidTopologyBuilder
|
|
from pipeline.mapper import IntelligentMapper
|
|
from pipeline.analyzer import PidAnalysisEngine
|
|
import networkx as nx
|
|
import asyncio
|
|
|
|
# ── 임베딩 (Ollama) ───────────────────────────────────────────────────────────
|
|
|
|
async def _embed(text: str) -> list[float]:
|
|
"""Ollama nomic-embed-text로 768-dim 벡터 생성."""
|
|
import asyncio
|
|
|
|
def _call_embed():
|
|
with httpx.Client(timeout=30) as client:
|
|
resp = client.post(
|
|
f"{OLLAMA_URL}/api/embeddings",
|
|
json={"model": EMBED_MODEL, "prompt": text},
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()["embedding"]
|
|
|
|
return await asyncio.to_thread(_call_embed)
|
|
|
|
# ── LLM (vLLM) ──────────────────────────────────────────────────────
|
|
|
|
@lru_cache(maxsize=1)
|
|
def _llm():
|
|
from openai import OpenAI
|
|
return OpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
|
|
|
|
|
|
def _strip_think(text: str) -> str:
|
|
"""Qwen/DeepSeek 계열 모델의 <think>...</think> reasoning 블록 제거"""
|
|
if not text:
|
|
return text
|
|
for tag in ("think", "skip", "reason"):
|
|
pat = re.compile(rf"</?{tag}>.*?</?{tag}>", re.DOTALL)
|
|
text = pat.sub("", text).strip()
|
|
# 태그가 열렸으나 닫히지 않은 경우 (truncated) — 태그부터 끝까지 제거
|
|
for tag in ("think", "skip", "reason"):
|
|
if f"<{tag}>" in text and f"</{tag}>" not in text:
|
|
idx = text.index(f"<{tag}>")
|
|
text = text[:idx].strip()
|
|
if f"</{tag}>" in text and f"<{tag}>" not in text:
|
|
idx = text.index(f"</{tag}>") + len(f"</{tag}>")
|
|
text = text[idx:].strip()
|
|
return text
|
|
|
|
|
|
# ── PaddleOCR 싱글톤 (PDF fallback용) ──────────────────────────────────────────
|
|
|
|
@lru_cache(maxsize=1)
|
|
def _ocr():
|
|
"""PaddleOCR 인스턴스 (한/영, GPU). 첫 호출 시 ~50MB 모델 다운로드."""
|
|
from paddleocr import PaddleOCR
|
|
import os
|
|
|
|
use_gpu = os.environ.get("PADDLE_USE_GPU", "true").lower() == "true"
|
|
try:
|
|
ocr = PaddleOCR(
|
|
use_angle_cls=True,
|
|
lang="korean",
|
|
use_gpu=use_gpu,
|
|
show_log=False,
|
|
)
|
|
return ocr
|
|
except Exception as e:
|
|
# GPU 실패 시 CPU 폴백
|
|
if use_gpu:
|
|
os.environ["PADDLE_USE_GPU"] = "false"
|
|
return _ocr()
|
|
raise e
|
|
|
|
|
|
# ── DXF/PDF 텍스트 추출 헬퍼 ───────────────────────────────────────────────────
|
|
|
|
async def _extract_text_from_dxf(filepath: str) -> str:
|
|
"""ezdxf로 DXF 파일에서 텍스트 추출 (MTEXT 포맷 코드 제거)."""
|
|
import asyncio
|
|
import ezdxf
|
|
from ezdxf.tools.text import plain_mtext
|
|
|
|
def _extract():
|
|
doc = ezdxf.readfile(filepath)
|
|
msp = doc.modelspace()
|
|
texts = []
|
|
for entity in msp:
|
|
if entity.dxftype() == "TEXT":
|
|
texts.append(entity.dxf.text)
|
|
elif entity.dxftype() == "MTEXT":
|
|
try:
|
|
plain = plain_mtext(entity.dxf.text)
|
|
if plain.strip():
|
|
texts.append(plain)
|
|
except Exception:
|
|
pass
|
|
return "\n".join(texts)
|
|
|
|
return await asyncio.to_thread(_extract)
|
|
|
|
|
|
# ── P&ID 빠른 구조 추출 (좌표 계산 없음, LLM 호출 없음) ─────────────────────────
|
|
#
|
|
# 설계 원칙:
|
|
# 1) DXF의 layer 정보를 사용 — LINENO 레이어 텍스트는 라인 마스터로 직접 파싱
|
|
# 2) tag/LineNo 정규식 — 100% 결정론적
|
|
# 3) FLUID NAME ABBREVIATION은 Symbol & Legend에서 1회 추출한 사전을 내장
|
|
# (다른 플랜트 도면을 추가할 때만 사전 갱신)
|
|
# 4) 좌표 계산·KD-tree·NetworkX 그래프 없음 → I/O 시간만 소요
|
|
#
|
|
# LineNo 형식: SERVICE - LINENUM - SIZE - MaterialSpec+FlangeRating+InsulationCode - InsulationThickness
|
|
# 예) P-10138-600A-F2A-H100, CD-10513-40A-S1A-H50, VG-6203-15A-F1A-n
|
|
|
|
_PID_FLUID_DICT: dict[str, str] = {
|
|
"P": "PROCESS FLUID",
|
|
"CHE": "PROCESS FLUID",
|
|
"CWS": "COOLING WATER SUPPLY",
|
|
"CWR": "COOLING WATER RETURN",
|
|
"CHS": "CHILLED WATER SUPPLY",
|
|
"CHR": "CHILLED WATER RETURN",
|
|
"WW": "WASTE WATER",
|
|
"ST": "STEAM",
|
|
"CD": "STEAM CONDENSATE",
|
|
"IA": "INSTRUMENT AIR",
|
|
"AIR": "INSTRUMENT AIR",
|
|
"N2": "NITROGEN",
|
|
"VG": "VENT GAS",
|
|
"SCR": "VENT GAS",
|
|
"SC": "VENT GAS",
|
|
"DIW": "DEIONIZED WATER",
|
|
"SW": "SOFT WATER",
|
|
"SAM": "SAMPLE LINE",
|
|
"NBD": "NITROGEN BLOW DOWN",
|
|
}
|
|
|
|
_PID_EQUIPMENT_PREFIX: dict[str, str] = {
|
|
"P": "Pump",
|
|
"T": "Tank",
|
|
"F": "Filter",
|
|
"C": "Column",
|
|
"E": "Heat Exchanger",
|
|
"D": "Drum",
|
|
"V": "Vessel",
|
|
"BT": "Buffer Tank",
|
|
"DP": "Drainage Point",
|
|
"CH": "Chiller",
|
|
"CT": "Cooling Tower",
|
|
"VP": "Vacuum Pump",
|
|
"R": "Reactor",
|
|
"S": "Separator",
|
|
}
|
|
|
|
_PID_INSTRUMENT_FIRST: dict[str, str] = {
|
|
"P": "Pressure", "T": "Temperature", "F": "Flow", "L": "Level",
|
|
"A": "Analysis", "S": "Speed", "H": "Hand", "Q": "Quantity", "W": "Weight",
|
|
}
|
|
|
|
_PID_INSTRUMENT_MODIFIER: dict[str, str] = {
|
|
"I": "Indicator", "C": "Control", "T": "Transmitter", "R": "Recorder",
|
|
"A": "Alarm", "S": "Switch", "V": "Valve", "Q": "Totalizer",
|
|
"Y": "Computing", "E": "Element", "G": "Gauge",
|
|
}
|
|
|
|
# 5~7필드 배관번호 지원: SERVICE-LINENUM-SIZE-PIPESPEC-INSUL
|
|
# 10차: P-10101-25A-F1A-n (pipe_spec=F1A)
|
|
# 9차: P-9107-25A-F-n (pipe_spec=F, insul=n)
|
|
# 냉각: CHR-9641-50A-F-C50 (pipe_spec=F, insul=C50)
|
|
_PID_LINENO_FULL_RE = re.compile(
|
|
r'^([A-Z][A-Z0-9]{0,3})-(\d{3,6})-(\d{1,4}[A-Z]?)-([A-Za-z][A-Za-z0-9]*)-([A-Za-z0-9]+)$'
|
|
)
|
|
_PID_LINENO_SHORT_RE = re.compile(r'^([A-Z][A-Z0-9]{0,3})-(\d{3,6})$')
|
|
_PID_TAG_RE = re.compile(r'^([A-Z]{1,4})-(\d{3,6})([A-Z])?$')
|
|
|
|
|
|
|
|
def _classify_pid_tag(tag_no: str) -> dict:
|
|
"""tagNo의 prefix로 장비/계기 종류 분류 (좌표·LLM 사용 안 함)."""
|
|
m = _PID_TAG_RE.match(tag_no)
|
|
if not m:
|
|
return {"kind": "unknown", "prefix": None, "type": None}
|
|
prefix = m.group(1)
|
|
if prefix in _PID_EQUIPMENT_PREFIX:
|
|
return {"kind": "equipment", "prefix": prefix, "type": _PID_EQUIPMENT_PREFIX[prefix]}
|
|
if 2 <= len(prefix) <= 4 and prefix[0] in _PID_INSTRUMENT_FIRST:
|
|
measure = _PID_INSTRUMENT_FIRST[prefix[0]]
|
|
mods = [_PID_INSTRUMENT_MODIFIER[c] for c in prefix[1:] if c in _PID_INSTRUMENT_MODIFIER]
|
|
type_name = (measure + " " + " ".join(mods)).strip()
|
|
return {"kind": "instrument", "prefix": prefix, "type": type_name}
|
|
return {"kind": "unknown", "prefix": prefix, "type": None}
|
|
|
|
|
|
def _parse_pid_lineno(token: str) -> dict | None:
|
|
"""LineNo 토큰을 필드로 분해. 5~7필드 모두 지원. 매칭 실패 시 None."""
|
|
m = _PID_LINENO_FULL_RE.match(token)
|
|
if m:
|
|
service, line_no, size, pipe_spec, insul = m.groups()
|
|
return {
|
|
"raw": token,
|
|
"service": service,
|
|
"fluid": _PID_FLUID_DICT.get(service, service),
|
|
"line_no": line_no,
|
|
"size": size,
|
|
"pipe_spec": pipe_spec,
|
|
"insul": insul,
|
|
}
|
|
sm = _PID_LINENO_SHORT_RE.match(token)
|
|
if sm:
|
|
service, line_no = sm.groups()
|
|
return {
|
|
"raw": token,
|
|
"service": service,
|
|
"fluid": _PID_FLUID_DICT.get(service, service),
|
|
"line_no": line_no,
|
|
"size": None,
|
|
"pipe_spec": None,
|
|
"insul": None,
|
|
}
|
|
return None
|
|
|
|
|
|
async def _extract_pid_dxf_fast(filepath: str) -> dict:
|
|
"""DXF에서 layer + regex만으로 구조 추출. 좌표 계산·LLM 호출 없음."""
|
|
import ezdxf
|
|
from ezdxf.tools.text import plain_mtext
|
|
from collections import Counter
|
|
|
|
def _work():
|
|
doc = ezdxf.readfile(filepath)
|
|
msp = doc.modelspace()
|
|
|
|
linenos: list[dict] = []
|
|
tags: list[dict] = []
|
|
seen_tags: set[str] = set()
|
|
|
|
for e in msp.query('TEXT MTEXT'):
|
|
if e.dxftype() == 'TEXT':
|
|
txt = e.dxf.text or ""
|
|
else:
|
|
try:
|
|
txt = plain_mtext(e.dxf.text or "")
|
|
except Exception:
|
|
txt = e.dxf.text or ""
|
|
txt = txt.strip()
|
|
if not txt:
|
|
continue
|
|
|
|
layer = e.dxf.layer
|
|
|
|
# 1) 완전한 배관번호 형식 (FULL_RE) → 레이어 이름 무관하게 배관번호로 처리
|
|
if _PID_LINENO_FULL_RE.match(txt):
|
|
parsed = _parse_pid_lineno(txt)
|
|
if parsed is not None:
|
|
linenos.append(parsed)
|
|
continue
|
|
|
|
# 2) 짧은 형식(P-10101)은 레이어 이름에 'LINENO'가 포함된 경우에만 배관번호
|
|
# (펌프 태그와 텍스트가 동일하므로 레이어 힌트 불가피)
|
|
if 'LINENO' in layer.upper():
|
|
parsed = _parse_pid_lineno(txt)
|
|
if parsed is not None:
|
|
linenos.append(parsed)
|
|
continue
|
|
|
|
# 3) 일반 장비/계기 태그
|
|
if _PID_TAG_RE.match(txt):
|
|
if txt in seen_tags:
|
|
continue
|
|
seen_tags.add(txt)
|
|
cls = _classify_pid_tag(txt)
|
|
tags.append({"tagNo": txt, **cls, "layer": layer})
|
|
|
|
fluid_dist = Counter(l['service'] for l in linenos)
|
|
equipment_dist = Counter(t['prefix'] for t in tags if t['kind'] == 'equipment')
|
|
instrument_dist = Counter(t['prefix'] for t in tags if t['kind'] == 'instrument')
|
|
unique_pipes = len({(l['service'], l['line_no']) for l in linenos})
|
|
|
|
return {
|
|
"fluid_dictionary": _PID_FLUID_DICT,
|
|
"linenos": linenos,
|
|
"tags": tags,
|
|
"stats": {
|
|
"lineno_label_count": len(linenos),
|
|
"unique_pipes": unique_pipes,
|
|
"tag_count": len(tags),
|
|
"fluid_distribution": dict(fluid_dist.most_common()),
|
|
"equipment_distribution": dict(equipment_dist.most_common()),
|
|
"instrument_distribution": dict(instrument_dist.most_common()),
|
|
},
|
|
}
|
|
|
|
return await asyncio.to_thread(_work)
|
|
|
|
|
|
# 장비 prefix와 안 겹치는 fluid-only 코드 (짧은 LineNo `XXX-NNNN`을 안전하게 pipe로 인식)
|
|
_PID_FLUID_ONLY_CODES: set[str] = set(_PID_FLUID_DICT.keys()) - set(_PID_EQUIPMENT_PREFIX.keys())
|
|
|
|
|
|
def _extract_pid_tags_from_text(text: str) -> list[dict]:
|
|
"""plain text(PDF/OCR/필터된 DXF text)에서 tag/LineNo를 regex로 추출.
|
|
|
|
분기 우선순위:
|
|
1) 완전한 LineNo (`FLUID-NUM-SIZE-SPEC-INSUL` 5필드, 모호하지 않음) → pipe
|
|
2) 짧은 LineNo (`FLUID-NUM`)이지만 prefix가 fluid 전용 코드일 때 → pipe
|
|
3) 일반 tag (`XXX-NNNN[A]`) → 장비/계기 분류 + instrumentType 설정
|
|
"""
|
|
seen: set[str] = set()
|
|
out: list[dict] = []
|
|
for token in re.split(r'[\s,;|()\[\]<>{}"\']+', text or ''):
|
|
token = token.strip()
|
|
if not token or token in seen:
|
|
continue
|
|
|
|
# 1) 완전한 배관번호 (5~7필드) — FULL_RE 매칭
|
|
m_full = _PID_LINENO_FULL_RE.match(token)
|
|
if m_full:
|
|
seen.add(token)
|
|
parsed = _parse_pid_lineno(token)
|
|
out.append({
|
|
"tagNo": token,
|
|
"kind": "pipe",
|
|
"prefix": parsed["service"],
|
|
"type": parsed["fluid"],
|
|
"lineNumber": parsed["line_no"],
|
|
"size": parsed["size"],
|
|
"pipeSpec": parsed["pipe_spec"],
|
|
"insul": parsed["insul"],
|
|
"confidence": 0.99,
|
|
})
|
|
continue
|
|
|
|
# 2) 짧은 LineNo — fluid 전용 코드일 때만
|
|
m_short = _PID_LINENO_SHORT_RE.match(token)
|
|
if m_short and m_short.group(1) in _PID_FLUID_ONLY_CODES:
|
|
seen.add(token)
|
|
parsed = _parse_pid_lineno(token)
|
|
out.append({
|
|
"tagNo": token,
|
|
"kind": "pipe",
|
|
"prefix": parsed["service"],
|
|
"type": parsed["fluid"],
|
|
"lineNumber": parsed["line_no"],
|
|
"size": None,
|
|
"confidence": 0.95,
|
|
})
|
|
continue
|
|
|
|
# 3) 일반 tag
|
|
if _PID_TAG_RE.match(token):
|
|
seen.add(token)
|
|
cls = _classify_pid_tag(token)
|
|
out.append({
|
|
"tagNo": token,
|
|
**cls,
|
|
"instrumentType": cls["prefix"],
|
|
"confidence": 0.95 if cls["kind"] != "unknown" else 0.5,
|
|
})
|
|
return out
|
|
|
|
|
|
async def _extract_text_from_pdf(filepath: str) -> str:
|
|
"""PyMuPDF로 PDF 파일에서 텍스트 추출."""
|
|
import asyncio
|
|
import fitz # pymupdf
|
|
|
|
def _extract():
|
|
doc = fitz.open(filepath)
|
|
texts = []
|
|
for page in doc:
|
|
texts.append(page.get_text())
|
|
return "\n".join(texts)
|
|
|
|
return await asyncio.to_thread(_extract)
|
|
|
|
|
|
async def _extract_text_from_pdf_ocr(filepath: str) -> str:
|
|
"""PaddleOCR로 PDF에서 이미지 추출 후 OCR (고정밀도)."""
|
|
import asyncio
|
|
import fitz # pymupdf
|
|
from PIL import Image
|
|
import numpy as np
|
|
|
|
def _extract():
|
|
doc = fitz.open(filepath)
|
|
all_texts = []
|
|
|
|
for page_idx, page in enumerate(doc):
|
|
# 페이지를 이미지로 변환
|
|
mat = fitz.Matrix(300 / 72) # 300 DPI
|
|
pix = page.get_pixmap(matrix=mat)
|
|
img_data = pix.tobytes("png")
|
|
img = Image.open(__import__("io").BytesIO(img_data))
|
|
|
|
# OCR 실행
|
|
result = _ocr().ocr(np.array(img), cls=True)
|
|
if result[0]:
|
|
for line in result[0]:
|
|
all_texts.append(line[1][0])
|
|
|
|
return "\n".join(all_texts)
|
|
|
|
return await asyncio.to_thread(_extract)
|
|
|
|
|
|
async def _convert_dwg_to_dxf_dxflib(filepath: str) -> str:
|
|
"""libreoffice로 DWG를 DXF로 변환."""
|
|
import asyncio
|
|
import subprocess
|
|
import os
|
|
|
|
dxf_path = filepath.replace(".dwg", ".dxf")
|
|
|
|
def _convert():
|
|
try:
|
|
# LibreOffice로 변환
|
|
result = subprocess.run(
|
|
[
|
|
"libreoffice",
|
|
"--headless",
|
|
"--convert-to", "dxf:AutoCAD DXF",
|
|
"--outdir", os.path.dirname(filepath) or ".",
|
|
filepath
|
|
],
|
|
check=True,
|
|
timeout=120,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if os.path.exists(dxf_path):
|
|
return dxf_path
|
|
else:
|
|
raise FileNotFoundError("DXF 변환 파일이 생성되지 않았습니다.")
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise Exception(f"LibreOffice 변환 실패: {e.stderr}")
|
|
|
|
return await asyncio.to_thread(_convert)
|
|
|
|
|
|
# ── Qdrant 검색 헬퍼 ──────────────────────────────────────────────────────────
|
|
|
|
async def _search(collection: str, query: str, top_k: int, threshold: float = 0.25) -> str:
|
|
import asyncio
|
|
|
|
def _call_embed():
|
|
return _embed(query)
|
|
|
|
vec = await _call_embed()
|
|
|
|
def _call_search():
|
|
with httpx.Client(timeout=20) as client:
|
|
resp = client.post(
|
|
f"{QDRANT_URL}/collections/{collection}/points/search",
|
|
json={
|
|
"vector": vec,
|
|
"limit": top_k,
|
|
"with_payload": True,
|
|
"score_threshold": threshold,
|
|
},
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json().get("result", [])
|
|
|
|
hits = await asyncio.to_thread(_call_search)
|
|
|
|
if not hits:
|
|
return "관련 결과 없음."
|
|
|
|
parts = []
|
|
for h in hits:
|
|
p = h.get("payload", {})
|
|
file_path = p.get("filePath", p.get("path", "unknown"))
|
|
chunk = p.get("codeChunk", p.get("content", p.get("text", "")))
|
|
start_line = p.get("startLine", "")
|
|
loc = f"{file_path}:{start_line}" if start_line else file_path
|
|
parts.append(f"[score={h['score']:.3f}] {loc}\n```\n{chunk[:700]}\n```")
|
|
|
|
return "\n\n---\n\n".join(parts)
|
|
|
|
|
|
async def _search_kb_collection(
|
|
qdrant_name: str,
|
|
vec: list[float],
|
|
top_k: int,
|
|
tags: list[str] | None = None,
|
|
) -> list[dict]:
|
|
"""KB 컬렉션 1개에 대해 의미 검색. 결과를 정규화된 dict 리스트로 반환."""
|
|
must = []
|
|
if tags:
|
|
must.append({"key": "tags", "match": {"any": tags}})
|
|
|
|
body: dict = {
|
|
"vector": vec,
|
|
"limit": top_k,
|
|
"with_payload": True,
|
|
"score_threshold": 0.20,
|
|
}
|
|
if must:
|
|
body["filter"] = {"must": must}
|
|
|
|
def _call():
|
|
with httpx.Client(timeout=20) as client:
|
|
resp = client.post(f"{QDRANT_URL}/collections/{qdrant_name}/points/search", json=body)
|
|
if resp.status_code == 404:
|
|
return []
|
|
resp.raise_for_status()
|
|
return resp.json().get("result", [])
|
|
|
|
try:
|
|
return await asyncio.to_thread(_call)
|
|
except Exception as e:
|
|
logging.warning(f"[search_kb] {qdrant_name} 검색 실패: {e}")
|
|
return []
|
|
|
|
|
|
def _recency_factor(uploaded_at_iso: str | None) -> float:
|
|
"""uploaded_at 기준 최신 가중치. 최근 7일 +10%, 30일 +5%, 90일 +2%, 그 외 1.0."""
|
|
if not uploaded_at_iso:
|
|
return 1.0
|
|
try:
|
|
from datetime import datetime, timezone
|
|
ts = datetime.fromisoformat(uploaded_at_iso.replace("Z", "+00:00"))
|
|
if ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=timezone.utc)
|
|
age = (datetime.now(timezone.utc) - ts).total_seconds() / 86400.0
|
|
if age < 7: return 1.10
|
|
if age < 30: return 1.05
|
|
if age < 90: return 1.02
|
|
return 1.0
|
|
except Exception:
|
|
return 1.0
|
|
|
|
|
|
# ── DB 헬퍼 ──────────────────────────────────────────────────────────────────
|
|
|
|
async def _get_db_connection():
|
|
"""PostgreSQL DB 연결 획득."""
|
|
import asyncio
|
|
import psycopg
|
|
|
|
def _connect():
|
|
return psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
|
|
|
|
return await asyncio.to_thread(_connect)
|
|
|
|
|
|
def _validate_sql(sql: str) -> tuple[bool, str]:
|
|
"""SQL 안전 검증 — SELECT/WITH만 허용, 위험 키워드 차단."""
|
|
if len(sql) > 4000:
|
|
return False, "쿼리 길이 4000자를 초과했습니다."
|
|
dangerous = ['EXEC', 'DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE', 'GRANT', 'REVOKE', 'TRUNCATE', 'COPY']
|
|
sql_upper = sql.upper()
|
|
for kw in dangerous:
|
|
if re.search(rf"\b{kw}\b", sql_upper):
|
|
return False, f"허용되지 않은 키워드 '{kw}'를 사용했습니다."
|
|
head = sql_upper.lstrip().lstrip('(').lstrip()
|
|
if not (head.startswith('SELECT') or head.startswith('WITH')):
|
|
return False, "SELECT 또는 WITH 쿼리만 허용됩니다."
|
|
if '..' in sql or '~' in sql:
|
|
return False, "파일 경로 표현은 허용되지 않습니다."
|
|
if ';' in sql.rstrip().rstrip(';'):
|
|
return False, "다중 문장(세미콜론)은 허용되지 않습니다."
|
|
return True, ""
|
|
|
|
|
|
# SQL 가드 — auto-LIMIT + statement_timeout
|
|
SQL_MAX_ROWS = int(os.environ.get("SQL_MAX_ROWS", "1000"))
|
|
SQL_STATEMENT_TIMEOUT_MS = int(os.environ.get("SQL_STATEMENT_TIMEOUT_MS", "30000"))
|
|
|
|
_RE_LIMIT_TAIL = re.compile(r"\bLIMIT\b\s+\d+(\s+OFFSET\s+\d+)?\s*$", re.IGNORECASE)
|
|
|
|
|
|
def _apply_sql_guards(sql: str, max_rows: int = SQL_MAX_ROWS) -> str:
|
|
"""LIMIT가 없으면 서브쿼리로 감싸 강제 부착. 이미 끝부분에 LIMIT가 있으면 그대로."""
|
|
s = sql.strip().rstrip(';').strip()
|
|
if _RE_LIMIT_TAIL.search(s):
|
|
return s
|
|
return f"SELECT * FROM ({s}) _capped LIMIT {max_rows}"
|
|
|
|
|
|
# Compact DB schema for LLM SQL generation
|
|
_DB_SCHEMA = """
|
|
Tables:
|
|
history_table(tagname TEXT, value TEXT, recorded_at TIMESTAMPTZ)
|
|
realtime_table(tagname TEXT, livevalue TEXT, timestamp TIMESTAMPTZ)
|
|
tag_metadata(base_tag TEXT, attribute TEXT, value TEXT)
|
|
event_history_table(tagname TEXT, prev_value TEXT, curr_value TEXT, event_type TEXT, event_time TIMESTAMPTZ, duration_seconds INT)
|
|
|
|
Views:
|
|
v_tag_summary(base_tag TEXT, pv TEXT, sp TEXT, op TEXT, description TEXT, area TEXT)
|
|
|
|
Rules:
|
|
- SELECT only. tagname lowercase exact match.
|
|
- value is TEXT; cast ::double precision when aggregating.
|
|
- time_bucket() banned. For N-min buckets: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60))
|
|
- KST input = UTC-9 in DB.
|
|
"""
|
|
|
|
# ── RAG 도구 ─────────────────────────────────────────────────────────────────
|
|
|
|
@mcp.tool()
|
|
async def search_codebase(query: str, top_k: int = 6) -> str:
|
|
"""ExperionCrawler 프로젝트 소스코드 검색 (우리가 개발한 .NET 8 C# 코드).
|
|
Experion HS R530 공식 문서가 아닌, ExperionCrawler 구현 코드를 검색함.
|
|
|
|
사용 시점: ExperionCrawler 코드의 구현 방법, 버그, 구조를 알고 싶을 때.
|
|
⚠️ Experion HS R530 제품 동작/설정/스펙을 알고 싶으면 search_r530_docs 사용.
|
|
|
|
Args:
|
|
query: 검색어 (예: "OPC UA 구독 시작", "히스토리 스냅샷", "TextToSql 서비스")
|
|
top_k: 반환 결과 수 (기본 6)
|
|
"""
|
|
return await _search(COL_CODEBASE, query, top_k)
|
|
|
|
|
|
@mcp.tool()
|
|
async def search_r530_docs(query: str, top_k: int = 5) -> str:
|
|
"""Honeywell Experion HS R530 공식 제품 문서 검색.
|
|
ExperionCrawler 코드가 아닌, Honeywell 공식 HTM 문서를 검색함.
|
|
|
|
사용 시점: Experion HS R530의 OPC UA 설정, 인증서, 보안 정책, 포인트 주소 형식,
|
|
채널/컨트롤러 속성, 문제해결 등 제품 스펙과 동작을 알고 싶을 때.
|
|
|
|
Args:
|
|
query: 검색어 (예: "certificate configuration", "endpoint security policy")
|
|
top_k: 반환 결과 수 (기본 5)
|
|
"""
|
|
return await _search(COL_OPC_DOCS, query, top_k)
|
|
|
|
|
|
@mcp.tool()
|
|
def ask_iiot_llm(question: str, context: str = "") -> str:
|
|
"""LLM에게 IIoT/OPC UA 질문 (컨텍스트 없이 LLM 직접 질문).
|
|
|
|
사용 시점: search_codebase 또는 search_r530_docs 결과를 context로 넘겨
|
|
종합 분석·답변이 필요할 때. 또는 일반 IIoT/OPC UA 개념 질문.
|
|
|
|
Args:
|
|
question: 질문 내용
|
|
context: (선택) search_codebase 또는 search_r530_docs 검색 결과
|
|
"""
|
|
system = (
|
|
"당신은 IIoT(산업용 IoT), OPC UA, Honeywell Experion PKS/HS R530 전문가입니다.\n"
|
|
"컨텍스트가 제공된 경우 컨텍스트를 우선 근거로 삼아 한국어로 답변합니다.\n"
|
|
"컨텍스트 출처가 'Experion HS R530 공식 문서'인지 'ExperionCrawler 코드'인지 명확히 구분하여 설명합니다."
|
|
)
|
|
user_msg = f"컨텍스트:\n{context}\n\n질문: {question}" if context else question
|
|
resp = _llm().chat.completions.create(
|
|
model=VLLM_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": user_msg},
|
|
],
|
|
max_tokens=2048,
|
|
temperature=0.1,
|
|
)
|
|
content = resp.choices[0].message.content or "(응답 없음)"
|
|
return _strip_think(content)
|
|
|
|
|
|
@mcp.tool()
|
|
async def rag_query(
|
|
question: str,
|
|
search_code: bool = False,
|
|
search_docs: bool = True,
|
|
search_kb: bool = False,
|
|
kb_collections: list[str] | None = None,
|
|
) -> str:
|
|
"""검색 → LLM 답변 생성 (통합 RAG).
|
|
|
|
기본값: Experion HS R530 공식 문서만 검색.
|
|
사용자 KB 검색을 포함하려면 search_kb=True. 코드 검색은 search_code=True.
|
|
|
|
Args:
|
|
question: 질문
|
|
search_docs: Experion HS R530 공식 문서 검색 여부 (기본 True)
|
|
search_code: ExperionCrawler 소스코드 검색 여부 (기본 False)
|
|
search_kb: 사용자 KB 검색 여부 (기본 False)
|
|
kb_collections: 검색 대상 KB 컬렉션 키 목록. None이면 전체.
|
|
예: ["plant_operation", "procedure"]
|
|
"""
|
|
context_parts: list[str] = []
|
|
if search_docs:
|
|
context_parts.append(f"=== Experion HS R530 공식 문서 ===\n{await _search(COL_OPC_DOCS, question, 4)}")
|
|
if search_code:
|
|
context_parts.append(f"=== ExperionCrawler 구현 코드 ===\n{await _search(COL_CODEBASE, question, 3)}")
|
|
if search_kb:
|
|
kb_text = await _format_kb_results(question, kb_collections, top_k=6)
|
|
context_parts.append(f"=== 사용자 지식 베이스 ===\n{kb_text}")
|
|
return ask_iiot_llm(question, "\n\n".join(context_parts))
|
|
|
|
|
|
async def _format_kb_results(
|
|
query: str,
|
|
collection_keys: list[str] | None,
|
|
top_k: int,
|
|
tags: list[str] | None = None,
|
|
since: str | None = None,
|
|
boost_recent: bool = True,
|
|
) -> str:
|
|
"""search_kb 내부 헬퍼: 다중 컬렉션 의미검색 후 인용 텍스트로 직렬화."""
|
|
hits = await _search_kb_raw(query, collection_keys, top_k, tags, since, boost_recent)
|
|
if not hits:
|
|
return "관련 KB 결과 없음."
|
|
|
|
parts = []
|
|
for h in hits:
|
|
title = h.get("title") or "(제목없음)"
|
|
loc = h.get("locator") or ""
|
|
score = h.get("score", 0.0)
|
|
text = (h.get("text") or "").strip()
|
|
# 인용 헤더: "[score=0.812] 정비이력_2026Q1.xlsx > 시트:Pump-A > 행 12"
|
|
loc_str = f" > {loc}" if loc else ""
|
|
parts.append(f"[score={score:.3f}] {title}{loc_str}\n{text[:700]}")
|
|
return "\n\n---\n\n".join(parts)
|
|
|
|
|
|
async def _search_kb_raw(
|
|
query: str,
|
|
collection_keys: list[str] | None,
|
|
top_k: int,
|
|
tags: list[str] | None,
|
|
since: str | None,
|
|
boost_recent: bool,
|
|
) -> list[dict]:
|
|
"""KB 검색 핵심 로직 — 다중 컬렉션 의미검색 + 최신 가중치 + 후필터."""
|
|
targets = collection_keys or list(KB_COLLECTIONS.keys())
|
|
qdrant_names = [KB_COLLECTIONS[k] for k in targets if k in KB_COLLECTIONS]
|
|
if not qdrant_names:
|
|
return []
|
|
|
|
vec = await _embed(query)
|
|
per_coll_k = max(top_k, 8)
|
|
|
|
results: list[dict] = []
|
|
for qname in qdrant_names:
|
|
hits = await _search_kb_collection(qname, vec, per_coll_k, tags=tags)
|
|
for h in hits:
|
|
p = h.get("payload", {})
|
|
uploaded_at = p.get("uploaded_at")
|
|
|
|
if since and uploaded_at:
|
|
try:
|
|
if uploaded_at < since:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
base_score = h.get("score", 0.0)
|
|
recency = _recency_factor(uploaded_at) if boost_recent else 1.0
|
|
results.append({
|
|
"score": base_score * recency,
|
|
"raw_score": base_score,
|
|
"doc_id": p.get("doc_id"),
|
|
"collection_key": p.get("collection_key"),
|
|
"title": p.get("title"),
|
|
"text": p.get("text", ""),
|
|
"chunk_kind": p.get("chunk_kind"),
|
|
"locator": p.get("locator"),
|
|
"uploaded_at": uploaded_at,
|
|
"tags": p.get("tags") or [],
|
|
})
|
|
|
|
# 점수 내림차순 정렬, 동일 doc_id 중복 dedup(최고점만)
|
|
results.sort(key=lambda r: r["score"], reverse=True)
|
|
seen: set[str] = set()
|
|
unique: list[dict] = []
|
|
for r in results:
|
|
key = f'{r.get("doc_id")}::{r.get("locator")}'
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
unique.append(r)
|
|
if len(unique) >= top_k:
|
|
break
|
|
return unique
|
|
|
|
|
|
@mcp.tool()
|
|
async def search_kb(
|
|
query: str,
|
|
collection_keys: list[str] | None = None,
|
|
top_k: int = 8,
|
|
tags: list[str] | None = None,
|
|
since: str | None = None,
|
|
boost_recent: bool = True,
|
|
) -> str:
|
|
"""사용자 지식 베이스(KB) 다중 컬렉션 의미 검색.
|
|
|
|
관리탭에서 업로드/인덱싱한 문서에서 질의와 의미적으로 가까운 청크를 찾는다.
|
|
|
|
Args:
|
|
query: 검색어 또는 자연어 질문
|
|
collection_keys: 대상 컬렉션 키 목록. None이면 전체.
|
|
가능한 값: system_instrument, plant_operation,
|
|
procedure, report, vendor_doc
|
|
top_k: 반환 결과 수 (기본 8)
|
|
tags: 태그 필터 (any 매칭). 예: ["unit-a", "P-6201"]
|
|
since: 이 ISO 시각 이후 업로드된 문서만. 예: "2026-04-01T00:00:00Z"
|
|
boost_recent: True이면 uploaded_at 기준 최신 가중치 적용 (기본 True)
|
|
|
|
Returns:
|
|
JSON 문자열: { success, count, hits: [{ doc_id, collection_key, title,
|
|
text, chunk_kind, locator, score, uploaded_at, tags }, ...] }
|
|
"""
|
|
try:
|
|
hits = await _search_kb_raw(query, collection_keys, top_k, tags, since, boost_recent)
|
|
return json.dumps(
|
|
{"success": True, "count": len(hits), "hits": hits},
|
|
ensure_ascii=False,
|
|
default=str,
|
|
)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"search_kb 실패: {e}"}, ensure_ascii=False)
|
|
|
|
|
|
# ── NL2SQL 도구 ───────────────────────────────────────────────────────────────
|
|
|
|
async def _execute_sql_internal(sql: str) -> str:
|
|
"""SQL 실행 내부 함수 (run_sql과 query_with_nl에서 공유).
|
|
|
|
가드: LIMIT 미지정 시 자동 LIMIT 부착(SQL_MAX_ROWS), statement_timeout 적용.
|
|
"""
|
|
valid, err = _validate_sql(sql)
|
|
if not valid:
|
|
return json.dumps({"success": False, "error": f"SQL 검증 실패: {err}"}, ensure_ascii=False)
|
|
|
|
capped_sql = _apply_sql_guards(sql)
|
|
conn = None
|
|
try:
|
|
conn = await _get_db_connection()
|
|
with conn.cursor() as cur:
|
|
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
|
|
cur.execute(capped_sql)
|
|
rows = cur.fetchall()
|
|
columns = [desc[0] for desc in cur.description]
|
|
result_data = [dict(zip(columns, row)) for row in rows]
|
|
return json.dumps({
|
|
"success": True,
|
|
"columns": columns,
|
|
"count": len(result_data),
|
|
"row_limit": SQL_MAX_ROWS,
|
|
"data": result_data
|
|
}, ensure_ascii=False, default=str)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"SQL 실행 실패: {e}"}, ensure_ascii=False)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
@mcp.tool()
|
|
async def run_sql(sql: str) -> str:
|
|
"""SQL 쿼리 실행 (SELECT/WITH만 허용).
|
|
|
|
안전 가드:
|
|
- 위험 키워드(INSERT/UPDATE/DELETE/DROP/ALTER/CREATE/GRANT/REVOKE/TRUNCATE/COPY/EXEC) 차단
|
|
- 다중 문장(세미콜론) 차단
|
|
- LIMIT 미지정 시 SQL_MAX_ROWS(기본 1000)로 자동 제한
|
|
- statement_timeout = SQL_STATEMENT_TIMEOUT_MS(기본 30s)
|
|
|
|
Args:
|
|
sql: 실행할 SELECT/WITH SQL 문자열 (최대 2000자)
|
|
|
|
Returns:
|
|
JSON: { success, columns, count, row_limit, data } 또는 { success, error }
|
|
"""
|
|
return await _execute_sql_internal(sql)
|
|
|
|
|
|
@mcp.tool()
|
|
async def query_pv_history(tag_names: list[str], time_from: str, time_to: str, limit: int = 100) -> str:
|
|
"""과거 값(PV) 히스토리 조회.
|
|
|
|
Args:
|
|
tag_names: 태그 이름 목록 (예: ["ficq-6113.pv", "ti-6101.pv"])
|
|
time_from: 시작 시간 (ISO 8601, 예: "2026-04-01T00:00:00")
|
|
time_to: 종료 시간 (ISO 8601, 예: "2026-04-02T00:00:00")
|
|
limit: 반환 행 수 제한 (기본 100, 최대 5000)
|
|
|
|
Returns:
|
|
JSON: { success, tag_names, time_range, limit, data }
|
|
"""
|
|
conn = None
|
|
try:
|
|
limit = min(limit, 5000)
|
|
conn = await _get_db_connection()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT tagname, recorded_at, value
|
|
FROM history_table
|
|
WHERE tagname = ANY(%s)
|
|
AND recorded_at >= %s AND recorded_at <= %s
|
|
ORDER BY recorded_at, tagname
|
|
LIMIT %s""",
|
|
(tag_names, time_from, time_to, limit)
|
|
)
|
|
rows = cur.fetchall()
|
|
data = [{"tag_name": r[0], "timestamp": r[1].isoformat(), "value": r[2]} for r in rows]
|
|
return json.dumps({
|
|
"success": True,
|
|
"tag_names": tag_names,
|
|
"time_range": f"{time_from} ~ {time_to}",
|
|
"count": len(data),
|
|
"data": data
|
|
}, ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"히스토리 쿼리 실패: {e}"}, ensure_ascii=False)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_tag_metadata(query: str, limit: int = 10) -> str:
|
|
"""태그 메타데이터 검색 (realtime_table 기반).
|
|
|
|
Args:
|
|
query: 태그명 검색어 (패턴 매칭)
|
|
limit: 반환 태그 수 제한 (기본 10)
|
|
|
|
Returns:
|
|
JSON: { success, query, count, tags }
|
|
"""
|
|
conn = None
|
|
try:
|
|
conn = await _get_db_connection()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT tagname, livevalue, timestamp, node_id
|
|
FROM realtime_table
|
|
WHERE tagname ILIKE %s
|
|
ORDER BY tagname LIMIT %s""",
|
|
(f"%{query}%", limit)
|
|
)
|
|
rows = cur.fetchall()
|
|
tags = [{"tag_name": r[0], "current_value": r[1],
|
|
"last_updated": r[2].isoformat() if r[2] else None,
|
|
"node_id": r[3]} for r in rows]
|
|
return json.dumps({"success": True, "query": query, "count": len(tags), "tags": tags},
|
|
ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"태그 메타데이터 검색 실패: {e}"}, ensure_ascii=False)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_drawings(unit_no: str | None = None) -> str:
|
|
"""단위별 도면 목록 조회 (node_map_master.name 기반).
|
|
|
|
Args:
|
|
unit_no: 단위 번호 접두사 (예: "A", "B"). None이면 전체 목록
|
|
|
|
Returns:
|
|
JSON: { success, unit_no, count, names }
|
|
"""
|
|
conn = None
|
|
try:
|
|
conn = await _get_db_connection()
|
|
with conn.cursor() as cur:
|
|
if unit_no:
|
|
cur.execute(
|
|
"SELECT DISTINCT name FROM node_map_master WHERE name ILIKE %s ORDER BY name LIMIT 100",
|
|
(f"{unit_no}%",)
|
|
)
|
|
else:
|
|
cur.execute("SELECT DISTINCT name FROM node_map_master ORDER BY name LIMIT 100")
|
|
rows = cur.fetchall()
|
|
return json.dumps({"success": True, "unit_no": unit_no,
|
|
"count": len(rows), "names": [r[0] for r in rows]},
|
|
ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"도면 목록 조회 실패: {e}"}, ensure_ascii=False)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
# ── Phase 7.1: NL2SQL 의도 라우터 ──────────────────────────────────
|
|
# 키워드/정규식 기반 (ML 불필요). query_with_nl 진입 시 분류 → 적절한 도구로 위임.
|
|
|
|
_CLASSIFY_RULES = [
|
|
# (정규식, 라우팅 대상 도구명)
|
|
(r'활성.*알람|현재.*알람|지금.*알람|active.*alarm', 'active_alarms'),
|
|
(r'트립|trip', 'active_alarms'),
|
|
(r'상태\s*보고서|교대.*보고|status.*report|운전.*보고', 'generate_status_report'),
|
|
# "이상 상황 보고" 패턴 — 사용자가 특정 일자/area 범위로 비정상 상황 요약을 요청하는 경우.
|
|
# summarize_events가 since/until/area 파라미터를 받으므로 generate_status_report보다 적합.
|
|
(r'이상.*상황|상황.*보고|이상.*보고|비정상.*상황|abnormal', 'summarize_events'),
|
|
(r'요약|보고서|리포트|summary|summarize|report', 'summarize_events'),
|
|
(r'태그.*찾|tag.*찾|찾아\s*줘|find.*tag|어떤.*태그', 'find_tags'),
|
|
(r'이벤트.*조회|이벤트.*목록|event.*list|event.*query|로그.*조회', 'query_events'),
|
|
]
|
|
|
|
|
|
def _classify_intent(question: str) -> str:
|
|
"""질문을 키워드 정규식으로 분류. 매칭 실패 시 'query_with_nl' (SQL 경로) 반환."""
|
|
if not question:
|
|
return 'query_with_nl'
|
|
q = question.lower()
|
|
for pattern, target in _CLASSIFY_RULES:
|
|
if re.search(pattern, q, re.IGNORECASE):
|
|
return target
|
|
return 'query_with_nl'
|
|
|
|
|
|
@mcp.tool()
|
|
async def classify_intent(question: str) -> str:
|
|
"""질문 의도를 분류하여 적절한 도구명을 반환합니다 (실행하지는 않음).
|
|
|
|
반환값(도구명): active_alarms, generate_status_report, summarize_events,
|
|
find_tags, query_events, query_with_nl(기본)
|
|
|
|
Args:
|
|
question: 자연어 질문
|
|
|
|
Returns:
|
|
JSON: { success, question, route }
|
|
"""
|
|
route = _classify_intent(question)
|
|
return json.dumps({"success": True, "question": question, "route": route}, ensure_ascii=False)
|
|
|
|
|
|
@mcp.tool()
|
|
async def query_with_nl(question: str) -> str:
|
|
"""자연어 질문을 LLM이 SQL로 변환하고 시계열 DB를 조회합니다.
|
|
|
|
Args:
|
|
question: 자연어 질문 (예: "FICQ-6113.PV 최근 1시간 값을 1분 단위로 표시")
|
|
|
|
Returns:
|
|
JSON: { sql, success, columns, count, data } 또는 { sql, success, error }
|
|
"""
|
|
import asyncio
|
|
import json as json_module
|
|
|
|
# Phase 7.1: 의도 라우팅 — 명백한 비-SQL 질문은 전용 도구로 위임
|
|
route = _classify_intent(question)
|
|
if route != 'query_with_nl':
|
|
try:
|
|
if route == 'active_alarms':
|
|
payload = await active_alarms()
|
|
elif route == 'find_tags':
|
|
payload = await find_tags(query=question)
|
|
elif route == 'query_events':
|
|
payload = await query_events()
|
|
elif route == 'summarize_events':
|
|
payload = await summarize_events()
|
|
elif route == 'generate_status_report':
|
|
payload = await generate_status_report()
|
|
else:
|
|
payload = None
|
|
if payload is not None:
|
|
try:
|
|
obj = json.loads(payload)
|
|
except Exception:
|
|
obj = {"raw": payload}
|
|
obj["_routed_from"] = "query_with_nl"
|
|
obj["_route"] = route
|
|
return json.dumps(obj, ensure_ascii=False, default=str)
|
|
except Exception as e:
|
|
# 라우팅 실패 시 원래 SQL 경로로 fallback
|
|
pass
|
|
|
|
system = (
|
|
"You are a PostgreSQL SQL expert.\n"
|
|
"Convert the user's question into a SELECT SQL.\n"
|
|
"Return ONLY the SQL. No explanation, no markdown, NO <think> tags.\n"
|
|
"Use PostgreSQL syntax. tagname lowercase exact match.\n"
|
|
"value is TEXT; cast ::double precision when aggregating.\n"
|
|
"KST input = UTC-9. Example: KST 12:00 = UTC 03:00.\n"
|
|
"For N-min buckets: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)).\n"
|
|
"No GROUP BY if no interval specified.\n\n"
|
|
f"{_DB_SCHEMA}"
|
|
)
|
|
|
|
try:
|
|
def _call_llm():
|
|
return _llm().chat.completions.create(
|
|
model=VLLM_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": question},
|
|
],
|
|
max_tokens=8192,
|
|
temperature=0.1,
|
|
)
|
|
|
|
resp = await asyncio.to_thread(_call_llm)
|
|
sql = _strip_think(resp.choices[0].message.content or "").strip()
|
|
# 마크다운 코드 블록 제거
|
|
if sql.startswith("```"):
|
|
lines = sql.splitlines()
|
|
sql = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]).strip()
|
|
if not sql:
|
|
return json.dumps({"success": False, "sql": "", "error": "LLM이 SQL을 생성하지 못했습니다."}, ensure_ascii=False)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "sql": "", "error": f"LLM SQL 생성 실패: {e}"}, ensure_ascii=False)
|
|
|
|
# SQL 실행
|
|
raw = await _execute_sql_internal(sql)
|
|
result = json.loads(raw)
|
|
result["sql"] = sql
|
|
|
|
# long format → pivot 변환 (tagname 컬럼이 있으면 자동 PIVOT)
|
|
if result.get("success") and "data" in result:
|
|
cols = result.get("columns", [])
|
|
data = result["data"]
|
|
if "tagname" in cols and data:
|
|
time_col = next((c for c in cols if c not in ("tagname", "value", "livevalue", "avg_val")), None)
|
|
val_col = next((c for c in ("avg_val", "value") if c in cols), cols[-1])
|
|
if time_col:
|
|
tag_names_list = sorted(dict.fromkeys(row["tagname"] for row in data))
|
|
pivoted: dict = {}
|
|
for row in data:
|
|
key = str(row[time_col])
|
|
if key not in pivoted:
|
|
pivoted[key] = {time_col: row[time_col]}
|
|
pivoted[key][row["tagname"]] = row.get(val_col)
|
|
result["data"] = list(pivoted.values())
|
|
result["columns"] = [time_col] + tag_names_list
|
|
result["count"] = len(result["data"])
|
|
|
|
return json.dumps(result, ensure_ascii=False, default=str)
|
|
|
|
|
|
# ── Phase 6 보강 도구 (이벤트/태그/보고서) ─────────────────────────────────────
|
|
|
|
_VALID_EVENT_TYPES = ("ALARM", "TRIP", "NORMAL", "RUN", "CHANGE")
|
|
|
|
|
|
@mcp.tool()
|
|
async def find_tags(query: str, area: str | None = None, top_k: int = 20) -> str:
|
|
"""태그 검색 — base_tag/설명(desc)/area 통합 검색 (v_tag_summary 뷰 기반).
|
|
|
|
사용 시점: 사용자가 "온도", "Tower 1 압력", "운전 중인 펌프" 같은 자연어로
|
|
태그를 지칭할 때 실제 base_tag(예: 'ti-6101', 'p-6102')를 역으로 찾기 위해.
|
|
|
|
get_tag_metadata와 차이: 단순 tagname LIKE만 보지 않고 description/area에도
|
|
매칭하며, 현재 PV/SP/OP/description/area를 함께 반환.
|
|
|
|
Args:
|
|
query: 검색어 (base_tag 또는 description 부분 일치, 대소문자 무시)
|
|
area: (선택) area 필터 (예: 'tower-1', 'utility'). NULL이면 전체
|
|
top_k: 반환 태그 수 (기본 20, 최대 100)
|
|
|
|
Returns:
|
|
JSON: { success, query, count, tags: [{base_tag, pv, sp, op, description, area}] }
|
|
"""
|
|
conn = None
|
|
try:
|
|
top_k = max(1, min(top_k, 100))
|
|
q = f"%{query.strip()}%"
|
|
conn = await _get_db_connection()
|
|
with conn.cursor() as cur:
|
|
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
|
|
if area:
|
|
cur.execute(
|
|
"""SELECT base_tag, pv, sp, op, description, area
|
|
FROM v_tag_summary
|
|
WHERE (base_tag ILIKE %s OR description ILIKE %s)
|
|
AND area = %s
|
|
ORDER BY base_tag
|
|
LIMIT %s""",
|
|
(q, q, area, top_k)
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"""SELECT base_tag, pv, sp, op, description, area
|
|
FROM v_tag_summary
|
|
WHERE base_tag ILIKE %s OR description ILIKE %s
|
|
ORDER BY base_tag
|
|
LIMIT %s""",
|
|
(q, q, top_k)
|
|
)
|
|
rows = cur.fetchall()
|
|
tags = [
|
|
{"base_tag": r[0], "pv": r[1], "sp": r[2], "op": r[3],
|
|
"description": r[4], "area": r[5]}
|
|
for r in rows
|
|
]
|
|
return json.dumps({
|
|
"success": True, "query": query, "area": area,
|
|
"count": len(tags), "tags": tags
|
|
}, ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"태그 검색 실패: {e}"}, ensure_ascii=False)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
@mcp.tool()
|
|
async def trace_connections(start_tag: str, direction: str = "downstream", max_depth: int = 20) -> str:
|
|
"""pid_equipment 테이블의 from_tag/to_tag를 활용해 장비 연결 경로를 추적.
|
|
|
|
사용 시점: "스팀 경로 설명해줘", "원료 흐름 따라가줘", "T-203에서 어디로 가?" 같은 질문.
|
|
개별 태그 검색 + SQL 조합(10+ 라운드) → trace_connections 1회 호출로 대체.
|
|
|
|
Args:
|
|
start_tag: 시작 태그명 (예: 'FT-6115', 'T-203')
|
|
direction: 'downstream'(하류) 또는 'upstream'(상류). 기본 downstream.
|
|
max_depth: 최대 추적 깊이 (기본 20)
|
|
|
|
Returns:
|
|
JSON: { success, start_tag, direction, path: [{step, from_tag, to_tag, role, tag_no}] }
|
|
"""
|
|
conn = None
|
|
try:
|
|
start_tag = start_tag.strip().upper()
|
|
direction = direction.strip().lower()
|
|
if direction not in ("downstream", "upstream"):
|
|
return json.dumps({"success": False, "error": "direction은 'downstream' 또는 'upstream'"}, ensure_ascii=False)
|
|
|
|
conn = await _get_db_connection()
|
|
with conn.cursor() as cur:
|
|
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
|
|
|
|
def _split_tags(tag_str):
|
|
if not tag_str:
|
|
return []
|
|
return [t.strip() for t in tag_str.split(',') if t.strip()]
|
|
|
|
def _build_or_condition(tags):
|
|
if not tags:
|
|
return "", []
|
|
conditions = []
|
|
params = []
|
|
for t in tags:
|
|
conditions.append("from_tag LIKE %s")
|
|
params.append(f'%{t}%')
|
|
conditions.append("tag_no = %s")
|
|
params.append(t)
|
|
return f"({' OR '.join(conditions)})", params
|
|
|
|
def _trace_downstream(current_tags, visited, depth):
|
|
if depth > max_depth or not current_tags:
|
|
return
|
|
or_clause, params = _build_or_condition(current_tags)
|
|
if not or_clause:
|
|
return
|
|
cur.execute(f"""
|
|
SELECT tag_no, from_tag, to_tag, role
|
|
FROM pid_equipment
|
|
WHERE {or_clause}
|
|
ORDER BY tag_no
|
|
""", params)
|
|
rows = cur.fetchall()
|
|
for row in rows:
|
|
tag_no = row[0]
|
|
if tag_no in visited:
|
|
continue
|
|
visited.add(tag_no)
|
|
path.append({
|
|
"step": len(path) + 1,
|
|
"tag_no": tag_no,
|
|
"from_tag": row[1],
|
|
"to_tag": row[2],
|
|
"role": row[3],
|
|
})
|
|
next_tags = _split_tags(row[2])
|
|
_trace_downstream(next_tags, visited, depth + 1)
|
|
|
|
path = []
|
|
visited = set()
|
|
visited.add(start_tag)
|
|
start_tags = _split_tags(start_tag)
|
|
_trace_downstream(start_tags, visited, 0)
|
|
|
|
# upstream
|
|
if direction == "upstream":
|
|
def _trace_upstream(current_tags, visited, depth):
|
|
if depth > max_depth or not current_tags:
|
|
return
|
|
conditions = []
|
|
params = []
|
|
for t in current_tags:
|
|
conditions.append("to_tag LIKE %s")
|
|
params.append(f'%{t}%')
|
|
conditions.append("tag_no = %s")
|
|
params.append(t)
|
|
if not conditions:
|
|
return
|
|
or_clause = f"({' OR '.join(conditions)})"
|
|
cur.execute(f"""
|
|
SELECT tag_no, from_tag, to_tag, role
|
|
FROM pid_equipment
|
|
WHERE {or_clause}
|
|
ORDER BY tag_no
|
|
""", params)
|
|
rows = cur.fetchall()
|
|
for row in rows:
|
|
tag_no = row[0]
|
|
if tag_no in visited:
|
|
continue
|
|
visited.add(tag_no)
|
|
path.append({
|
|
"step": len(path) + 1,
|
|
"tag_no": tag_no,
|
|
"from_tag": row[1],
|
|
"to_tag": row[2],
|
|
"role": row[3],
|
|
})
|
|
prev_tags = _split_tags(row[1])
|
|
_trace_upstream(prev_tags, visited, depth + 1)
|
|
|
|
path = []
|
|
visited = set()
|
|
visited.add(start_tag)
|
|
start_tags = _split_tags(start_tag)
|
|
_trace_upstream(start_tags, visited, 0)
|
|
|
|
return json.dumps({
|
|
"success": True,
|
|
"start_tag": start_tag,
|
|
"direction": direction,
|
|
"path": path,
|
|
}, ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"연결 추적 실패: {e}"}, ensure_ascii=False)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
@mcp.tool()
|
|
async def query_events(
|
|
tag_name: str | None = None,
|
|
event_type: str | None = None,
|
|
area: str | None = None,
|
|
since: str | None = None,
|
|
until: str | None = None,
|
|
limit: int = 100,
|
|
) -> str:
|
|
"""이벤트 히스토리 조회 (event_history_table — 디지털 포인트 상태 변경).
|
|
|
|
Args:
|
|
tag_name: (선택) 태그명 LIKE 패턴 (예: 'p-6102', 'xv-%')
|
|
event_type: (선택) ALARM / TRIP / NORMAL / RUN / CHANGE 중 하나
|
|
area: (선택) area 정확 매칭
|
|
since: (선택) 시작 시간 ISO 8601. 기본 24시간 전
|
|
until: (선택) 종료 시간 ISO 8601. 기본 현재
|
|
limit: 반환 행 수 (기본 100, 최대 1000)
|
|
|
|
Returns:
|
|
JSON: { success, count, time_range, events: [...] }
|
|
"""
|
|
if event_type and event_type.upper() not in _VALID_EVENT_TYPES:
|
|
return json.dumps({
|
|
"success": False,
|
|
"error": f"event_type은 {list(_VALID_EVENT_TYPES)} 중 하나여야 합니다."
|
|
}, ensure_ascii=False)
|
|
|
|
conn = None
|
|
try:
|
|
limit = max(1, min(limit, 1000))
|
|
where = ["event_time >= COALESCE(%s::timestamptz, NOW() - INTERVAL '24 hours')",
|
|
"event_time <= COALESCE(%s::timestamptz, NOW())"]
|
|
params: list = [since, until]
|
|
if tag_name:
|
|
where.append("tagname ILIKE %s")
|
|
params.append(f"%{tag_name}%")
|
|
if event_type:
|
|
where.append("event_type = %s")
|
|
params.append(event_type.upper())
|
|
if area:
|
|
where.append("area = %s")
|
|
params.append(area)
|
|
|
|
sql = f"""
|
|
SELECT id, tagname, prev_value, curr_value, event_type, event_time,
|
|
area, section, duration_seconds, metadata
|
|
FROM event_history_table
|
|
WHERE {' AND '.join(where)}
|
|
ORDER BY event_time DESC
|
|
LIMIT %s
|
|
"""
|
|
params.append(limit)
|
|
|
|
conn = await _get_db_connection()
|
|
with conn.cursor() as cur:
|
|
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
|
|
cur.execute(sql, params)
|
|
rows = cur.fetchall()
|
|
events = [
|
|
{"id": r[0], "tag_name": r[1], "prev_value": r[2], "curr_value": r[3],
|
|
"event_type": r[4], "event_time": r[5].isoformat() if r[5] else None,
|
|
"area": r[6], "section": r[7], "prev_state_duration_s": r[8], "metadata": r[9]}
|
|
for r in rows
|
|
]
|
|
return json.dumps({
|
|
"success": True,
|
|
"count": len(events),
|
|
"time_range": f"{since or '24h ago'} ~ {until or 'now'}",
|
|
"filters": {"tag_name": tag_name, "event_type": event_type, "area": area},
|
|
"events": events,
|
|
}, ensure_ascii=False, indent=2, default=str)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"이벤트 조회 실패: {e}"}, ensure_ascii=False)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
@mcp.tool()
|
|
async def active_alarms(area: str | None = None, limit: int = 100) -> str:
|
|
"""현재 활성 알람 — 각 태그의 최신 이벤트가 ALARM 또는 TRIP인 경우만 반환.
|
|
|
|
동작: event_history_table에서 태그별 최신 이벤트(DISTINCT ON)를 가져온 뒤,
|
|
event_type이 ALARM 또는 TRIP인 것만 필터. 이후 NORMAL이 들어왔다면 해제된 것이므로 제외.
|
|
|
|
Args:
|
|
area: (선택) area 필터
|
|
limit: 최대 반환 수 (기본 100)
|
|
|
|
Returns:
|
|
JSON: { success, count, alarms: [{tag_name, event_type, since, prev_state_duration_s, area, ...}] }
|
|
"""
|
|
conn = None
|
|
try:
|
|
limit = max(1, min(limit, 500))
|
|
sql = """
|
|
WITH latest AS (
|
|
SELECT DISTINCT ON (tagname)
|
|
id, tagname, curr_value, event_type, event_time,
|
|
area, section, duration_seconds, metadata
|
|
FROM event_history_table
|
|
WHERE event_time >= NOW() - INTERVAL '30 days'
|
|
ORDER BY tagname, event_time DESC
|
|
)
|
|
SELECT id, tagname, curr_value, event_type, event_time,
|
|
area, section, duration_seconds, metadata,
|
|
EXTRACT(EPOCH FROM (NOW() - event_time))::bigint AS age_seconds
|
|
FROM latest
|
|
WHERE event_type IN ('ALARM', 'TRIP')
|
|
AND (%s::text IS NULL OR area = %s)
|
|
ORDER BY event_time DESC
|
|
LIMIT %s
|
|
"""
|
|
conn = await _get_db_connection()
|
|
with conn.cursor() as cur:
|
|
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
|
|
cur.execute(sql, (area, area, limit))
|
|
rows = cur.fetchall()
|
|
alarms = [
|
|
{"id": r[0], "tag_name": r[1], "curr_value": r[2], "event_type": r[3],
|
|
"since": r[4].isoformat() if r[4] else None,
|
|
"area": r[5], "section": r[6], "prev_state_duration_s": r[7], "metadata": r[8],
|
|
"age_seconds": r[9]}
|
|
for r in rows
|
|
]
|
|
return json.dumps({
|
|
"success": True, "area": area,
|
|
"count": len(alarms), "alarms": alarms,
|
|
}, ensure_ascii=False, indent=2, default=str)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"활성 알람 조회 실패: {e}"}, ensure_ascii=False)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
@mcp.tool()
|
|
async def summarize_events(
|
|
since: str | None = None,
|
|
area: str | None = None,
|
|
event_type: str | None = None,
|
|
max_events: int = 200,
|
|
focus: str = "",
|
|
) -> str:
|
|
"""이벤트 히스토리를 LLM으로 한글 요약.
|
|
|
|
내부 동작: query_events로 이벤트를 가져와 LLM에 요약 요청. 큰 건수는 잘림.
|
|
|
|
Args:
|
|
since: (선택) ISO 8601 시작 시간. 기본 24시간 전
|
|
area: (선택) area 필터
|
|
event_type: (선택) event_type 필터
|
|
max_events: 분석에 포함할 최대 이벤트 (기본 200, 최대 500)
|
|
focus: (선택) 요약 시 강조할 관점 (예: "interlock 동작", "교대 시점 이상")
|
|
|
|
Returns:
|
|
JSON: { success, summary, stats: {by_type, by_area, count} }
|
|
"""
|
|
max_events = max(10, min(max_events, 500))
|
|
raw = await query_events(event_type=event_type, area=area, since=since, limit=max_events)
|
|
parsed = json.loads(raw)
|
|
if not parsed.get("success"):
|
|
return raw
|
|
|
|
events = parsed.get("events", [])
|
|
if not events:
|
|
return json.dumps({
|
|
"success": True, "summary": "지정된 조건 범위에 이벤트가 없습니다.",
|
|
"stats": {"count": 0}
|
|
}, ensure_ascii=False, indent=2)
|
|
|
|
# 통계
|
|
by_type: dict[str, int] = {}
|
|
by_area: dict[str, int] = {}
|
|
for ev in events:
|
|
by_type[ev["event_type"]] = by_type.get(ev["event_type"], 0) + 1
|
|
a = ev.get("area") or "(unknown)"
|
|
by_area[a] = by_area.get(a, 0) + 1
|
|
|
|
# LLM 요약 — 가벼운 토큰 수로 제한
|
|
sample = events[:max_events]
|
|
bullet_lines = [
|
|
f"- [{ev['event_type']}] {ev['tag_name']} @ {_kst_str(ev['event_time'])}"
|
|
f" ({ev.get('prev_value')}→{ev.get('curr_value')})"
|
|
f" (직전상태유지={ev.get('prev_state_duration_s', '?')}s)"
|
|
f"{(' area=' + ev['area']) if ev.get('area') else ''}"
|
|
for ev in sample
|
|
]
|
|
focus_line = f"\n특히 다음 관점을 우선해 설명하세요: {focus}\n" if focus else ""
|
|
|
|
system = (
|
|
"당신은 IIoT/공장 운전 분석 전문가입니다. 디지털 포인트의 상태 변경 이벤트 로그를 보고 "
|
|
"한국어로 6~10줄 요약을 만듭니다. 다음 구조를 따릅니다:\n"
|
|
"1) 핵심 현황 (총 이벤트 수, 주요 area)\n"
|
|
"2) 알람/트립 (ALARM/TRIP) 핵심 케이스 — 태그·시각·전후값\n"
|
|
"3) 패턴/특이점 (반복 발생, 동시 발생, area 집중)\n"
|
|
"4) 다음 점검 권고 (있다면)\n"
|
|
"구체적인 태그명과 시각을 포함하되 추측은 자제합니다.\n\n"
|
|
"참고 - 모든 시각은 KST(UTC+9, Asia/Seoul)입니다. "
|
|
"`직전상태유지`는 이 이벤트 직전에 태그가 머물렀던 시간(초)입니다."
|
|
)
|
|
user_msg = (
|
|
f"분석 대상 이벤트 {len(sample)}건 (전체 {parsed.get('count')}건). "
|
|
f"통계: type={by_type}, area={by_area}.{focus_line}\n\n"
|
|
+ "\n".join(bullet_lines)
|
|
)
|
|
|
|
def _call():
|
|
return _llm().chat.completions.create(
|
|
model=VLLM_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": user_msg},
|
|
],
|
|
max_tokens=1200,
|
|
temperature=0.2,
|
|
)
|
|
|
|
try:
|
|
resp = await asyncio.to_thread(_call)
|
|
summary = _strip_think(resp.choices[0].message.content) or "(요약 없음)"
|
|
except Exception as e:
|
|
summary = f"(LLM 요약 실패: {e})"
|
|
|
|
return json.dumps({
|
|
"success": True,
|
|
"summary": summary,
|
|
"stats": {"count": parsed.get("count"), "by_type": by_type, "by_area": by_area},
|
|
"time_range": parsed.get("time_range"),
|
|
}, ensure_ascii=False, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
async def generate_status_report(area: str | None = None, hours: int = 24) -> str:
|
|
"""공장 운전 상태 종합 보고서 — 활성 알람 + 최근 이벤트 + 추세를 LLM이 한 장으로 정리.
|
|
|
|
Args:
|
|
area: (선택) area 필터 (전체 공장이면 NULL)
|
|
hours: 이벤트 분석 윈도우 (기본 24시간, 최대 168=7일)
|
|
|
|
Returns:
|
|
JSON: { success, report, sections: {active_alarms, recent_events, by_type}, generated_at }
|
|
"""
|
|
hours = max(1, min(hours, 168))
|
|
since_iso = None # query_events가 24h 기본을 쓰지만, hours로 명시 전달
|
|
from datetime import datetime, timezone, timedelta
|
|
since_iso = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
|
|
# 1) 활성 알람
|
|
alarms_raw = await active_alarms(area=area, limit=50)
|
|
alarms = json.loads(alarms_raw).get("alarms", [])
|
|
|
|
# 2) 최근 이벤트 통계
|
|
events_raw = await query_events(area=area, since=since_iso, limit=500)
|
|
events_parsed = json.loads(events_raw)
|
|
events = events_parsed.get("events", [])
|
|
by_type: dict[str, int] = {}
|
|
for ev in events:
|
|
by_type[ev["event_type"]] = by_type.get(ev["event_type"], 0) + 1
|
|
|
|
# 3) LLM 보고서
|
|
alarm_lines = [
|
|
f"- [{a['event_type']}] {a['tag_name']} since {_kst_str(a['since'])} "
|
|
f"({a.get('age_seconds', 0)}s){' area=' + a['area'] if a.get('area') else ''}"
|
|
for a in alarms[:30]
|
|
] or ["- 활성 알람 없음"]
|
|
recent_lines = [
|
|
f"- [{ev['event_type']}] {ev['tag_name']} @ {_kst_str(ev['event_time'])} "
|
|
f"({ev.get('prev_value')}→{ev.get('curr_value')})"
|
|
f" (직전상태유지={ev.get('prev_state_duration_s', '?')}s)"
|
|
for ev in sample
|
|
]
|
|
focus_line = f"\n특히 다음 관점을 우선해 설명하세요: {focus}\n" if focus else ""
|
|
|
|
system = (
|
|
"당신은 IIoT/공장 운전 분석 전문가입니다. 디지털 포인트의 상태 변경 이벤트 로그를 보고 "
|
|
"한국어로 6~10줄 요약을 만듭니다. 다음 구조를 따릅니다:\n"
|
|
"1) 핵심 현황 (총 이벤트 수, 주요 area)\n"
|
|
"2) 알람/트립 (ALARM/TRIP) 핵심 케이스 — 태그·시각·전후값\n"
|
|
"3) 패턴/특이점 (반복 발생, 동시 발생, area 집중)\n"
|
|
"4) 다음 점검 권고 (있다면)\n"
|
|
"구체적인 태그명과 시각을 포함하되 추측은 자제합니다.\n\n"
|
|
"참고 - 모든 시각은 KST(UTC+9, Asia/Seoul)입니다. "
|
|
"`직전상태유지`는 이 이벤트 직전에 태그가 머물렀던 시간(초)입니다."
|
|
)
|
|
user_msg = (
|
|
f"대상 area: {area or '전체'}\n"
|
|
f"분석 윈도우: 최근 {hours}시간\n"
|
|
f"이벤트 통계 (type별): {by_type}\n"
|
|
f"활성 알람 {len(alarms)}건:\n" + "\n".join(alarm_lines) + "\n\n"
|
|
f"최근 이벤트 표본 (최대 40건):\n" + "\n".join(recent_lines)
|
|
)
|
|
|
|
def _call():
|
|
return _llm().chat.completions.create(
|
|
model=VLLM_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": user_msg},
|
|
],
|
|
max_tokens=2048,
|
|
temperature=0.2,
|
|
)
|
|
|
|
try:
|
|
resp = await asyncio.to_thread(_call)
|
|
report = _strip_think(resp.choices[0].message.content) or "(보고서 생성 실패)"
|
|
except Exception as e:
|
|
report = f"(LLM 보고서 실패: {e})"
|
|
|
|
from datetime import datetime as _dt, timezone as _tz
|
|
return json.dumps({
|
|
"success": True,
|
|
"report": report,
|
|
"sections": {
|
|
"active_alarms_count": len(alarms),
|
|
"recent_events_count": len(events),
|
|
"by_type": by_type,
|
|
"window_hours": hours,
|
|
"area": area,
|
|
},
|
|
"generated_at": _dt.now(_tz.utc).isoformat(),
|
|
}, ensure_ascii=False, indent=2)
|
|
|
|
|
|
# ── P&ID 추출 도구 ──────────────────────────────────────────────────────────────
|
|
|
|
@mcp.tool()
|
|
async def extract_pid_tags(text: str, source_type: str) -> str:
|
|
"""텍스트에서 P&ID 태그를 regex로 결정론적 추출. LLM 호출 없음.
|
|
|
|
Args:
|
|
text: DXF/PDF에서 추출한 plain text
|
|
source_type: 'dxf' 또는 'pdf' (로깅용)
|
|
|
|
Returns:
|
|
JSON: { success, count, tags: [{tagNo, kind, prefix, type, confidence, ...}] }
|
|
- kind: 'pipe' | 'equipment' | 'instrument' | 'unknown'
|
|
- pipe면 lineNumber/size 포함, fluid는 type 필드
|
|
"""
|
|
try:
|
|
tags = _extract_pid_tags_from_text(text or "")
|
|
logging.info(f"[extract_pid_tags] source={source_type} count={len(tags)}")
|
|
return json.dumps({"success": True, "count": len(tags), "tags": tags},
|
|
ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
logging.error(f"P&ID 태그 추출 실패: {e}")
|
|
return json.dumps({"success": False, "error": str(e), "count": 0, "tags": []},
|
|
ensure_ascii=False)
|
|
|
|
|
|
@mcp.tool()
|
|
async def match_pid_tags(pid_tags: list[str], experion_tags: list[str]) -> str:
|
|
"""P&ID 태그를 Experion 태그에 결정론적으로 매핑. LLM 호출 없음.
|
|
|
|
매칭 전략 (보수적 — false positive 방지):
|
|
1) 정규화(lowercase + Experion suffix 제거) 후 동일 → confidence 0.99
|
|
2) Experion이 PID의 확장 (예: "FT-101" ↔ "ft-101.pv") → confidence 0.95
|
|
단, PID prefix가 최소 4글자 이상이어야 (`P-1` 같은 짧은 식별자가 다수와 매칭되는 것 방지)
|
|
3) 그 외 → experionTag=null, confidence=0.0
|
|
(UI에서는 "매핑" 버튼 표시 → 운전원 수동 매핑)
|
|
|
|
이전 difflib fuzzy(cutoff 0.75)는 PSV-10101 ↔ p-10101.pv 같은 거짓 매칭을 만들어 제거함.
|
|
숫자 부분만 같으면 종류(PSV/P/XV/VP)가 달라도 묶이는 문제.
|
|
|
|
Args:
|
|
pid_tags: P&ID에서 추출한 태그 목록
|
|
experion_tags: Experion 시스템 태그 목록 (보통 ".pv" 등 suffix 포함)
|
|
|
|
Returns:
|
|
JSON: { success, count, mappings: [{pidTag, experionTag, confidence}] }
|
|
"""
|
|
_EX_SUFFIXES = ('.pv', '.sp', '.mv', '.op', '.cv', '.fieldvalue', '.qv', '.hzset')
|
|
_MIN_PREFIX_LEN = 4 # prefix 매칭 최소 길이
|
|
|
|
def _norm(tag: str) -> str:
|
|
t = (tag or "").strip().lower()
|
|
for s in _EX_SUFFIXES:
|
|
if t.endswith(s):
|
|
t = t[:-len(s)]
|
|
break
|
|
return t
|
|
|
|
try:
|
|
# 정규화된 experion → 원본 (충돌 시 더 짧은 원본 우선)
|
|
ex_index: dict[str, str] = {}
|
|
for ex in experion_tags or []:
|
|
n = _norm(ex)
|
|
if not n:
|
|
continue
|
|
if n not in ex_index or len(ex) < len(ex_index[n]):
|
|
ex_index[n] = ex
|
|
|
|
# prefix 매칭 후보 inverted index (n -> 자기 자신)
|
|
ex_norms = list(ex_index.keys())
|
|
|
|
mappings = []
|
|
for pid in pid_tags or []:
|
|
pid_norm = _norm(pid)
|
|
if not pid_norm:
|
|
mappings.append({"pidTag": pid, "experionTag": None, "confidence": 0.0})
|
|
continue
|
|
|
|
# 1) 정확 매칭
|
|
if pid_norm in ex_index:
|
|
mappings.append({"pidTag": pid, "experionTag": ex_index[pid_norm], "confidence": 0.99})
|
|
continue
|
|
|
|
# 2) prefix 매칭 — 단, pid가 충분히 구체적일 때만 (≥ 4 chars)
|
|
if len(pid_norm) >= _MIN_PREFIX_LEN:
|
|
hit = next(
|
|
(n for n in ex_norms
|
|
if n.startswith(pid_norm + ".") or n.startswith(pid_norm + "-")),
|
|
None,
|
|
)
|
|
if hit:
|
|
mappings.append({"pidTag": pid, "experionTag": ex_index[hit], "confidence": 0.95})
|
|
continue
|
|
|
|
# 3) 매칭 없음
|
|
mappings.append({"pidTag": pid, "experionTag": None, "confidence": 0.0})
|
|
|
|
return json.dumps({"success": True, "count": len(mappings), "mappings": mappings},
|
|
ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
logging.error(f"match_pid_tags failed: {e}")
|
|
return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False)
|
|
|
|
|
|
# ── P&ID 파싱 도구 (DXF/PDF/DWG) ───────────────────────────────────────────────
|
|
|
|
|
|
@mcp.tool()
|
|
async def parse_pid_dxf(filepath: str) -> str:
|
|
"""DXF에서 구조화된 P&ID 정보를 빠르게 추출. 좌표 계산 없음, LLM 호출 없음.
|
|
|
|
- LINENO 레이어 → 라인 마스터 (service/line_no/size/material_spec/flange/insul_*)
|
|
- 그 외 TEXT/MTEXT → 태그 후보 (prefix로 장비·계기 분류)
|
|
- fluid 사전은 Symbol & Legend FLUID NAME ABBREVIATION 기준 내장
|
|
- 그래프/위상 분석이 필요하면 별도 도구 `build_pid_graph_parallel` 사용
|
|
|
|
Args:
|
|
filepath: DXF 파일 경로
|
|
|
|
Returns:
|
|
JSON: {
|
|
success, fluid_dictionary, linenos, tags, stats
|
|
}
|
|
"""
|
|
try:
|
|
data = await _extract_pid_dxf_fast(filepath)
|
|
return json.dumps({"success": True, **data}, ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
logging.error(f"parse_pid_dxf failed: {e}")
|
|
return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False)
|
|
|
|
|
|
@mcp.tool()
|
|
async def parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
|
|
"""PyMuPDF 기반 PDF 파일 파싱. 텍스트 추출 후 LLM으로 태그 자동 추출.
|
|
|
|
Args:
|
|
filepath: PDF 파일 경로
|
|
use_ocr: OCR 사용 여부 (기본 True, 고정밀도)
|
|
|
|
Returns:
|
|
JSON: { success, text, count, tags: [{tagNo, equipmentName, ...}] }
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import re
|
|
|
|
try:
|
|
def _extract_text():
|
|
if use_ocr:
|
|
return _extract_text_from_pdf_ocr(filepath)
|
|
else:
|
|
return _extract_text_from_pdf(filepath)
|
|
text = await asyncio.to_thread(_extract_text)
|
|
|
|
if not text.strip():
|
|
return json.dumps({
|
|
"success": True,
|
|
"text": "",
|
|
"count": 0,
|
|
"tags": []
|
|
}, ensure_ascii=False, indent=2)
|
|
|
|
# LLM으로 태그 추출
|
|
system = (
|
|
"You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
|
|
"Extract instrument and equipment tags from the provided text.\n"
|
|
"Return ONLY a JSON array of objects with the following structure:\n"
|
|
'[{"tagNo":"FIT-10115","equipmentName":"Flow Transmitter","instrumentType":"FT" OR "FIT OR "TIA","lineNumber":"L-101","pidDrawingNo":"P&ID-001","confidence":0.95},...]\n'
|
|
"IMPORTANT rules:\n"
|
|
"- tagNo: Standard tag format with these patterns:\n"
|
|
" * Instrument: [Function][Loop]-[Number] (e.g., FT-101, PT-201, LI-301, FICQ-6113)\n"
|
|
" * Equipment: [Type]-[Number] (e.g., P-10101, T-10100, C-9111, E-10119)\n"
|
|
" * Complex: [Type]-[Number]-[Size]-[Class]-[Material]-[Option] (e.g., VG-6203-15A-F1A-n, CD-10513-40A-S1A-H50)\n"
|
|
" * Real examples from DXF: BT-6200, SARF-#6-PID-002, P-6101, DP-10101, CHS-6630-100A-F-C50\n"
|
|
"- instrumentType: First 2-4 letters of tagNo (FIT, PT, LI, FICQ, TCV, FCV, PCV, PG, TG, etc.)\n"
|
|
"- equipmentName: Descriptive name if available, otherwise null\n"
|
|
"- lineNumber: Line number if available, otherwise null\n"
|
|
"- pidDrawingNo: Drawing number if available, otherwise null\n"
|
|
"- confidence: 0.0 to 1.0 based on how clearly the tag was identified\n"
|
|
"- Do NOT include any explanation, only the JSON array.\n"
|
|
"- If no tags found, return an empty array: []\n"
|
|
"- temperature=0.1 for deterministic output.\n"
|
|
)
|
|
|
|
truncated_text = text[:12000] if len(text) > 12000 else text
|
|
|
|
def _call_llm():
|
|
return _llm().chat.completions.create(
|
|
model=VLLM_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": f"Source: pdf\n\nText:\n{truncated_text}"},
|
|
],
|
|
max_tokens=4096,
|
|
temperature=0.1,
|
|
)
|
|
|
|
resp = await asyncio.to_thread(_call_llm)
|
|
|
|
raw = _strip_think(resp.choices[0].message.content or "").strip()
|
|
|
|
# 마크다운 코드 블록 제거
|
|
if raw.startswith("```"):
|
|
lines = raw.splitlines()
|
|
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
|
|
|
|
# JSON 배열 추출
|
|
match = re.search(r'\[.*\]', raw, re.DOTALL)
|
|
if match:
|
|
raw = match.group(0)
|
|
|
|
# JSON 파싱 시도
|
|
try:
|
|
data = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
# JSON 배열 추출 시도 (더 엄격한 패턴)
|
|
match = re.search(r'\[\s*\{.*?\}\s*\]', raw, re.DOTALL)
|
|
if match:
|
|
raw_clean = match.group(0)
|
|
try:
|
|
data = json.loads(raw_clean)
|
|
except json.JSONDecodeError:
|
|
# 마지막으로, JSON 배열을 개별 객체로 분리하여 파싱 시도
|
|
objects = re.findall(r'\{[^{}]*\}', raw, re.DOTALL)
|
|
data = []
|
|
for obj in objects:
|
|
try:
|
|
data.append(json.loads(obj))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
if not isinstance(data, list):
|
|
data = []
|
|
|
|
return json.dumps({
|
|
"success": True,
|
|
"text": text[:10000],
|
|
"count": len(text),
|
|
"tags": data
|
|
}, ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"PDF 파싱 실패: {e}"}, ensure_ascii=False)
|
|
|
|
|
|
@mcp.tool()
|
|
async def build_pid_graph_parallel(filepath: str) -> str:
|
|
"""
|
|
분산 처리 기법을 적용한 P&ID 그래프 생성 툴.
|
|
전처리 -> 병렬 분산 추출 -> 위상 모델링 -> 저장 과정을 수행합니다.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
|
|
try:
|
|
# 1. 전처리 (Phase 1: Geometric Extraction)
|
|
def _extract_and_save():
|
|
extractor = PidGeometricExtractor(filepath)
|
|
geo_data_path = f"mcp-server/storage/{os.path.basename(filepath)}_geo.json"
|
|
geo_data_list = extractor.extract_and_save(geo_data_path)
|
|
return geo_data_path
|
|
geo_data_path = await asyncio.to_thread(_extract_and_save)
|
|
|
|
# geo_data_list는 경로를 반환하므로 다시 로드
|
|
def _load_geo_data():
|
|
with open(geo_data_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
geo_data = await asyncio.to_thread(_load_geo_data)
|
|
|
|
# 2. 병렬 분산 추출 (Phase 3: Intelligent Mapping)
|
|
# 시스템 태그 목록 가져오기 (DB에서 조회하는 로직 필요, 여기서는 예시로 빈 리스트 또는 기본값)
|
|
# 실제로는 get_tag_metadata 등을 통해 전체 태그 리스트를 확보해야 함
|
|
system_tags = []
|
|
try:
|
|
conn = await _get_db_connection()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT tagname FROM realtime_table")
|
|
system_tags = [r[0] for r in cur.fetchall()]
|
|
finally:
|
|
conn.close()
|
|
except Exception as e:
|
|
logging.warning(f"Failed to fetch system tags: {e}")
|
|
|
|
# 그래프 임시 생성 (Mapper가 위상 정보를 사용하므로 필요)
|
|
builder = PidTopologyBuilder(geo_data)
|
|
builder.build_graph()
|
|
|
|
# Mapper 설정
|
|
from openai import AsyncOpenAI
|
|
api_client = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
|
|
mapper = IntelligentMapper(builder.G, system_tags, api_client=api_client, model_name=VLLM_MODEL)
|
|
|
|
# 분류별 노드 분리
|
|
nodes = list(builder.G.nodes())
|
|
transmitter_nodes = [n for n, d in builder.G.nodes(data=True) if d.get('value', '').upper() in ['FIT', 'FT', 'LT', 'PT', 'TE']] # 단순화된 필터
|
|
valve_nodes = [n for n, d in builder.G.nodes(data=True) if d.get('value', '').upper() in ['FCV', 'LCV', 'TCV', 'PCV', 'XV']]
|
|
equipment_nodes = [n for n, d in builder.G.nodes(data=True) if d.get('type') not in ['TEXT', 'LINE', 'LWPOLYLINE']]
|
|
|
|
# 병렬 호출 (vLLM Batching 유도)
|
|
tasks = [
|
|
mapper.extract_transmitters(transmitter_nodes),
|
|
mapper.extract_valves(valve_nodes),
|
|
mapper.extract_equipment(equipment_nodes)
|
|
]
|
|
extracted_results = await asyncio.gather(*tasks)
|
|
|
|
# 결과 통합
|
|
all_mapped_tags = []
|
|
for res_dict in extracted_results:
|
|
for node_id, mapping in res_dict.items():
|
|
if mapping.resolved_tag != "UNKNOWN":
|
|
# TopologyBuilder가 기대하는 형식으로 변환
|
|
node_data = builder.G.nodes[node_id]
|
|
all_mapped_tags.append({
|
|
"entity_id": node_id,
|
|
"tagName": mapping.resolved_tag,
|
|
"bbox": node_data['bbox'].bounds if hasattr(node_data['bbox'], 'bounds') else node_data['bbox'],
|
|
"clean_value": mapping.resolved_tag
|
|
})
|
|
|
|
# 3. 최종 위상 모델링 (Phase 2)
|
|
final_builder = PidTopologyBuilder(geo_data, all_extracted_tags=all_mapped_tags)
|
|
final_builder.build_graph()
|
|
|
|
# 4. 저장
|
|
graph_id = os.path.basename(filepath).replace(".dxf", "_graph.json")
|
|
graph_path = f"mcp-server/storage/{graph_id}"
|
|
final_builder.save_graph(graph_path)
|
|
|
|
return json.dumps({
|
|
"success": True,
|
|
"data": {
|
|
"graph_id": graph_id,
|
|
"graph_path": graph_path,
|
|
"nodes": final_builder.G.number_of_nodes(),
|
|
"edges": final_builder.G.number_of_edges()
|
|
},
|
|
"message": "그래프 생성 완료"
|
|
}, ensure_ascii=False)
|
|
|
|
except Exception as e:
|
|
logging.error(f"build_pid_graph_parallel failed: {e}")
|
|
return json.dumps({"success": False, "data": None, "error": str(e), "message": "그래프 생성 실패"}, ensure_ascii=False)
|
|
|
|
@mcp.tool()
|
|
async def analyze_pid_impact(graph_id: str, start_node_id: str) -> str:
|
|
"""
|
|
구축된 그래프를 기반으로 특정 설비 장애 시 영향도 분석을 수행합니다.
|
|
"""
|
|
import asyncio
|
|
|
|
try:
|
|
graph_path = f"mcp-server/storage/{graph_id}"
|
|
mapping_path = graph_path.replace("_graph.json", "_mapping.json")
|
|
|
|
def _analyze():
|
|
analyzer = PidAnalysisEngine(graph_path, mapping_path)
|
|
return analyzer.analyze_impact(start_node_id)
|
|
|
|
result = await asyncio.to_thread(_analyze)
|
|
return json.dumps(result, ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"Impact analysis failed: {e}"}, ensure_ascii=False)
|
|
|
|
@mcp.tool()
|
|
async def parse_pid_drawing(filepath: str) -> str:
|
|
"""확장자 자동 감지하여 P&ID 도면 파싱.
|
|
|
|
Args:
|
|
filepath: DXF/DWG/PDF 파일 경로
|
|
|
|
Returns:
|
|
JSON: { success, text, count, tags, format }
|
|
"""
|
|
import os
|
|
|
|
ext = os.path.splitext(filepath)[1].lower()
|
|
|
|
if ext == ".dxf":
|
|
return await parse_pid_dxf(filepath)
|
|
elif ext == ".dwg":
|
|
# DWG 파일은 사전에 DXF로 변환하여 업로드해야 합니다.
|
|
# Linux에서 DWG를 DXF로 변환하는 도구는 제한되어 있습니다.
|
|
return json.dumps({
|
|
"success": False,
|
|
"error": "DWG 파일은 현재 직접 파싱할 수 없습니다.\n" +
|
|
"사전에 DXF로 변환하여 업로드해 주세요.\n" +
|
|
"\n변환 방법:\n" +
|
|
"1. Windows에서 AutoCAD 또는 ODA File Converter 사용\n" +
|
|
"2. 온라인 DWG → DXF 변환기 사용\n" +
|
|
"3. LibreOffice Draw (Windows/macOS 전용) 사용"
|
|
}, ensure_ascii=False)
|
|
elif ext == ".pdf":
|
|
return await parse_pid_pdf(filepath)
|
|
else:
|
|
return json.dumps({
|
|
"success": False,
|
|
"error": f"Unsupported format: {ext}. Supported: .dxf, .dwg, .pdf"
|
|
}, ensure_ascii=False)
|
|
|
|
|
|
|
|
# ── KB ingest 파서 ────────────────────────────────────────────────────────────
|
|
|
|
@mcp.tool()
|
|
async def parse_document(
|
|
doc_id: str,
|
|
title: str,
|
|
file_path: str,
|
|
mime_type: str = "",
|
|
collection_key: str = "",
|
|
chunking_policy: str = "",
|
|
) -> str:
|
|
"""KB ingest 파서. 파일 확장자에 따라 적절한 청킹을 수행한다.
|
|
|
|
Args:
|
|
doc_id: 문서 ID (UUID 문자열)
|
|
title: 제목 (오류 메시지에만 사용)
|
|
file_path: 절대 경로
|
|
mime_type: 정보용 (옵션)
|
|
collection_key: 정보용 (옵션)
|
|
chunking_policy: JSON 문자열, 향후 정책 분기에 사용
|
|
|
|
Returns:
|
|
JSON 문자열: {"success": true, "chunks": [{"text", "chunk_kind", "locator"}, ...]}
|
|
or {"success": false, "error": "..."}
|
|
"""
|
|
import os
|
|
if not os.path.isfile(file_path):
|
|
return json.dumps({"success": False, "error": f"file not found: {file_path}"}, ensure_ascii=False)
|
|
|
|
ext = os.path.splitext(file_path)[1].lower()
|
|
try:
|
|
if ext in (".xlsx", ".xlsm"):
|
|
from parsers import xlsx_parser
|
|
chunks = await asyncio.to_thread(xlsx_parser.parse, file_path)
|
|
elif ext == ".pdf":
|
|
from parsers import pdf_parser
|
|
chunks = await asyncio.to_thread(pdf_parser.parse, file_path)
|
|
elif ext == ".docx":
|
|
from parsers import docx_parser
|
|
chunks = await asyncio.to_thread(docx_parser.parse, file_path)
|
|
elif ext in (".md", ".txt", ".markdown"):
|
|
from parsers import text_parser
|
|
chunks = await asyncio.to_thread(text_parser.parse, file_path)
|
|
else:
|
|
return json.dumps(
|
|
{"success": False, "error": f"unsupported extension: {ext}"},
|
|
ensure_ascii=False
|
|
)
|
|
|
|
return json.dumps(
|
|
{"success": True, "doc_id": doc_id, "chunks": chunks, "count": len(chunks)},
|
|
ensure_ascii=False
|
|
)
|
|
except Exception as e:
|
|
return json.dumps({"success": False, "error": f"parse failed: {e}"}, ensure_ascii=False)
|
|
|
|
|
|
# ── Field Instrument Inference ──────────────────────────────────────────────
|
|
|
|
@mcp.tool()
|
|
def infer_field_instruments(
|
|
use_llm: bool = False,
|
|
seed_doc_id: str | None = None,
|
|
seed_excel_path: str | None = None,
|
|
) -> str:
|
|
"""
|
|
v_tag_summary의 모든 base_tag를 대상으로 현장 계기를 자동 유추하여 Excel 초안 생성.
|
|
|
|
Args:
|
|
use_llm: LLM으로 description 한국어 초안 보강 (기본 False, Phase C에서 활성화)
|
|
seed_doc_id: 기존 Excel doc_id 지정 시 수정사항 보존 + 신규만 추가
|
|
seed_excel_path: 기존 Excel 파일 경로 (C# 측에서 전달)
|
|
|
|
Returns:
|
|
JSON: { success, doc_path, instrument_count, unmatched_count, message }
|
|
"""
|
|
from instrument_inference.rules import load_rules
|
|
from instrument_inference.infer import infer_instruments_for_base_tag
|
|
from instrument_inference.excel import generate_excel
|
|
|
|
# [H-3 수정] v_tag_summary에서 base_tag별 dp 집합을 SQL로 한 번에 추출
|
|
def _fetch_tags():
|
|
import psycopg
|
|
conn = psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT
|
|
base_tag,
|
|
area,
|
|
CASE WHEN pv IS NOT NULL THEN '.pv' END,
|
|
CASE WHEN sp IS NOT NULL THEN '.sp' END,
|
|
CASE WHEN op IS NOT NULL THEN '.op' END,
|
|
CASE WHEN instate0 IS NOT NULL THEN '.instate0' END,
|
|
CASE WHEN instate1 IS NOT NULL THEN '.instate1' END,
|
|
CASE WHEN instate2 IS NOT NULL THEN '.instate2' END
|
|
FROM v_tag_summary
|
|
ORDER BY base_tag
|
|
""")
|
|
return cur.fetchall()
|
|
finally:
|
|
conn.close()
|
|
|
|
# [L-3 수정] .qv는 v_tag_summary에 없으므로 별도 조회
|
|
def _fetch_qv_tags():
|
|
import psycopg
|
|
conn = psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT DISTINCT SUBSTRING(tagname FROM '(.*)\\.qv') AS base_tag
|
|
FROM realtime_table
|
|
WHERE tagname LIKE '%.qv'
|
|
""")
|
|
return set(r[0] for r in cur.fetchall())
|
|
finally:
|
|
conn.close()
|
|
|
|
tags = _fetch_tags()
|
|
# [M-1 수정] asyncio.to_thread 데드 코드 삭제 — sync def이므로 blocking 호출
|
|
|
|
qv_tags = _fetch_qv_tags()
|
|
|
|
all_instruments = []
|
|
power_equipment = []
|
|
unmatched_tags = []
|
|
|
|
for row in tags:
|
|
base_tag = row[0]
|
|
area = row[1]
|
|
if not base_tag:
|
|
continue
|
|
|
|
# [H-3 수정] 실제 DB 컬럼 NULL 여부로 dp 집합 구성
|
|
dp = [d for d in row[2:8] if d is not None]
|
|
if base_tag in qv_tags:
|
|
dp.append(".qv")
|
|
|
|
try:
|
|
result = infer_instruments_for_base_tag(base_tag, dp, area or "")
|
|
# [H-4 수정] 빈 리스트 가드
|
|
if not result:
|
|
unmatched_tags.append(base_tag)
|
|
continue
|
|
if result[0].get("needs_review") and result[0].get("inference_basis") == "unmatched_prefix":
|
|
unmatched_tags.append(base_tag)
|
|
else:
|
|
for inst in result:
|
|
if inst.get("role") == "power_equipment":
|
|
power_equipment.append(inst)
|
|
else:
|
|
all_instruments.append(inst)
|
|
except Exception as e:
|
|
logging.warning(f"[infer] {base_tag} 처리 실패: {e}")
|
|
unmatched_tags.append(base_tag)
|
|
|
|
# [M-5 수정] instrument_id 기준 dedup
|
|
def _dedup(inst_list):
|
|
seen = {}
|
|
deduped = []
|
|
for inst in inst_list:
|
|
iid = inst["instrument_id"]
|
|
if iid in seen:
|
|
existing = seen[iid]
|
|
existing["inference_basis"] = f"{existing['inference_basis']}; also from {inst.get('parent_base_tag', '?')}"
|
|
else:
|
|
seen[iid] = inst
|
|
deduped.append(inst)
|
|
return deduped
|
|
|
|
all_instruments = _dedup(all_instruments)
|
|
power_equipment = _dedup(power_equipment)
|
|
|
|
# [M-3 수정] seed_excel_path 기반 머지 로직
|
|
if seed_excel_path and os.path.exists(seed_excel_path):
|
|
all_instruments = _merge_with_seed_excel(seed_excel_path, all_instruments)
|
|
|
|
# Excel 생성 (4시트: instruments, naming_convention, unmatched_tags, power_equipment)
|
|
xlsx_path = generate_excel(all_instruments, unmatched_tags, power_equipment)
|
|
|
|
# LLM 보강 (Phase C — 아직 stub)
|
|
if use_llm:
|
|
xlsx_path = _enrich_descriptions_llm(xlsx_path)
|
|
|
|
return json.dumps({
|
|
"success": True,
|
|
"doc_path": xlsx_path,
|
|
"instrument_count": len(all_instruments),
|
|
"power_equipment_count": len(power_equipment),
|
|
"unmatched_count": len(unmatched_tags),
|
|
"message": f"{len(all_instruments)}개 계기, {len(power_equipment)}개 동력기기 유추 완료 ({len(unmatched_tags)}개 미매칭)",
|
|
})
|
|
|
|
|
|
def _merge_with_seed_excel(seed_path: str, new_instruments: list[dict]) -> list[dict]:
|
|
"""[M-3 수정] 기존 Excel에서 instruments 시트 로드 후 머지."""
|
|
from openpyxl import load_workbook
|
|
wb = load_workbook(seed_path, read_only=True, data_only=True)
|
|
ws = wb["instruments"]
|
|
rows = list(ws.iter_rows(values_only=True))
|
|
if not rows:
|
|
return new_instruments
|
|
headers = [str(c) for c in rows[0]]
|
|
existing = []
|
|
for r in rows[1:]:
|
|
inst = dict(zip(headers, r))
|
|
if inst.get("delete") in (True, "TRUE", "True", "YES", "Yes", "Y", "y"):
|
|
continue
|
|
existing.append(inst)
|
|
|
|
existing_ids = {i["instrument_id"] for i in existing}
|
|
merged = list(existing)
|
|
for inst in new_instruments:
|
|
if inst["instrument_id"] not in existing_ids:
|
|
merged.append(inst)
|
|
return merged
|
|
|
|
|
|
def _enrich_descriptions_llm(xlsx_path: str) -> str:
|
|
"""LLM으로 description 한국어 초안 보강 (Phase C stub)."""
|
|
# [M-2 수정] Phase C 구현 전까지 no-op. UI는 disabled로 처리.
|
|
return xlsx_path
|
|
|
|
|
|
# ── 엔트리포인트 ──────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
"""HTTP 모드로 실행 — C# McpClient (localhost:5001) 용."""
|
|
mcp.run(transport="streamable-http")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# --http 플래그: HTTP 모드 (C# McpClient 용)
|
|
# 플래그 없음: stdio 모드 (Claude Code / Roo Code MCP 용)
|
|
if "--http" in sys.argv:
|
|
mcp.run(transport="streamable-http")
|
|
else:
|
|
mcp.run(transport="stdio")
|