diff --git a/.claude/settings.json b/.claude/settings.json index 10e68e0..fb2f327 100644 --- a/.claude/settings.json +++ b/.claude/settings.json @@ -6,7 +6,7 @@ "/home/windpacer/projects/ExperionCrawler/mcp-server/server.py" ], "env": {}, - "description": "ExperionCrawler RAG — Qdrant(코드베이스+OPC UA 문서) + GLM-4.7-Flash" + "description": "ExperionCrawler RAG — Qdrant(코드베이스+OPC UA 문서), 현재 LLM은 mcp-server/llm-model.json 참조" } } } diff --git a/mcp-server/server.py b/mcp-server/server.py index 5cd3cdb..90b477b 100644 --- a/mcp-server/server.py +++ b/mcp-server/server.py @@ -20,11 +20,29 @@ from mcp.server.fastmcp import FastMCP logging.basicConfig(level=logging.WARNING, stream=sys.stderr) +# ── 시간 변환 ─────────────────────────────────────────────────────────────────── +KST = __import__("zoneinfo").ZoneInfo("Asia/Seoul") + +def _kst_str(dt_iso: str | None) -> str: + """UTC ISO 문자열 → KST ISO 문자열 (초 단위, +09:00). None이면 그대로 None.""" + if not dt_iso: + return str(dt_iso) if dt_iso is None else "" + from datetime import datetime + if isinstance(dt_iso, datetime): + return dt_iso.astimezone(KST).strftime("%Y-%m-%dT%H:%M:%S+09:00") + try: + dt = datetime.fromisoformat(dt_iso) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=__import__("datetime").timezone.utc) + return dt.astimezone(KST).strftime("%Y-%m-%dT%H:%M:%S+09:00") + except Exception: + return dt_iso # fallback + # ── 설정 ────────────────────────────────────────────────────────────────────── QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333") OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434") EMBED_MODEL = os.environ.get("EMBED_MODEL", "nomic-embed-text") -VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8000/v1") +VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8001/v1") from config import get_vllm_model VLLM_MODEL = get_vllm_model() @@ -134,10 +152,268 @@ async def _extract_text_from_dxf(filepath: str) -> str: except Exception: pass return "\n".join(texts) - + return await asyncio.to_thread(_extract) +# ── P&ID 빠른 구조 추출 (좌표 계산 없음, LLM 호출 없음) ───────────────────────── +# +# 설계 원칙: +# 1) DXF의 layer 정보를 사용 — LINENO 레이어 텍스트는 라인 마스터로 직접 파싱 +# 2) tag/LineNo 정규식 — 100% 결정론적 +# 3) FLUID NAME ABBREVIATION은 Symbol & Legend에서 1회 추출한 사전을 내장 +# (다른 플랜트 도면을 추가할 때만 사전 갱신) +# 4) 좌표 계산·KD-tree·NetworkX 그래프 없음 → I/O 시간만 소요 +# +# LineNo 형식: SERVICE - LINENUM - SIZE - MaterialSpec+FlangeRating+InsulationCode - InsulationThickness +# 예) P-10138-600A-F2A-H100, CD-10513-40A-S1A-H50, VG-6203-15A-F1A-n + +_PID_FLUID_DICT: dict[str, str] = { + "P": "PROCESS FLUID", + "CHE": "PROCESS FLUID", + "CWS": "COOLING WATER SUPPLY", + "CWR": "COOLING WATER RETURN", + "CHS": "CHILLED WATER SUPPLY", + "CHR": "CHILLED WATER RETURN", + "WW": "WASTE WATER", + "ST": "STEAM", + "CD": "STEAM CONDENSATE", + "IA": "INSTRUMENT AIR", + "AIR": "INSTRUMENT AIR", + "N2": "NITROGEN", + "VG": "VENT GAS", + "SCR": "VENT GAS", + "SC": "VENT GAS", + "DIW": "DEIONIZED WATER", + "SW": "SOFT WATER", + "SAM": "SAMPLE LINE", + "NBD": "NITROGEN BLOW DOWN", +} + +_PID_EQUIPMENT_PREFIX: dict[str, str] = { + "P": "Pump", + "T": "Tank", + "F": "Filter", + "C": "Column", + "E": "Heat Exchanger", + "D": "Drum", + "V": "Vessel", + "BT": "Buffer Tank", + "DP": "Drainage Point", + "CH": "Chiller", + "CT": "Cooling Tower", + "VP": "Vacuum Pump", + "R": "Reactor", + "S": "Separator", +} + +_PID_INSTRUMENT_FIRST: dict[str, str] = { + "P": "Pressure", "T": "Temperature", "F": "Flow", "L": "Level", + "A": "Analysis", "S": "Speed", "H": "Hand", "Q": "Quantity", "W": "Weight", +} + +_PID_INSTRUMENT_MODIFIER: dict[str, str] = { + "I": "Indicator", "C": "Control", "T": "Transmitter", "R": "Recorder", + "A": "Alarm", "S": "Switch", "V": "Valve", "Q": "Totalizer", + "Y": "Computing", "E": "Element", "G": "Gauge", +} + +# 5~7필드 배관번호 지원: SERVICE-LINENUM-SIZE-PIPESPEC-INSUL +# 10차: P-10101-25A-F1A-n (pipe_spec=F1A) +# 9차: P-9107-25A-F-n (pipe_spec=F, insul=n) +# 냉각: CHR-9641-50A-F-C50 (pipe_spec=F, insul=C50) +_PID_LINENO_FULL_RE = re.compile( + r'^([A-Z][A-Z0-9]{0,3})-(\d{3,6})-(\d{1,4}[A-Z]?)-([A-Za-z][A-Za-z0-9]*)-([A-Za-z0-9]+)$' +) +_PID_LINENO_SHORT_RE = re.compile(r'^([A-Z][A-Z0-9]{0,3})-(\d{3,6})$') +_PID_TAG_RE = re.compile(r'^([A-Z]{1,4})-(\d{3,6})([A-Z])?$') + + + +def _classify_pid_tag(tag_no: str) -> dict: + """tagNo의 prefix로 장비/계기 종류 분류 (좌표·LLM 사용 안 함).""" + m = _PID_TAG_RE.match(tag_no) + if not m: + return {"kind": "unknown", "prefix": None, "type": None} + prefix = m.group(1) + if prefix in _PID_EQUIPMENT_PREFIX: + return {"kind": "equipment", "prefix": prefix, "type": _PID_EQUIPMENT_PREFIX[prefix]} + if 2 <= len(prefix) <= 4 and prefix[0] in _PID_INSTRUMENT_FIRST: + measure = _PID_INSTRUMENT_FIRST[prefix[0]] + mods = [_PID_INSTRUMENT_MODIFIER[c] for c in prefix[1:] if c in _PID_INSTRUMENT_MODIFIER] + type_name = (measure + " " + " ".join(mods)).strip() + return {"kind": "instrument", "prefix": prefix, "type": type_name} + return {"kind": "unknown", "prefix": prefix, "type": None} + + +def _parse_pid_lineno(token: str) -> dict | None: + """LineNo 토큰을 필드로 분해. 5~7필드 모두 지원. 매칭 실패 시 None.""" + m = _PID_LINENO_FULL_RE.match(token) + if m: + service, line_no, size, pipe_spec, insul = m.groups() + return { + "raw": token, + "service": service, + "fluid": _PID_FLUID_DICT.get(service, service), + "line_no": line_no, + "size": size, + "pipe_spec": pipe_spec, + "insul": insul, + } + sm = _PID_LINENO_SHORT_RE.match(token) + if sm: + service, line_no = sm.groups() + return { + "raw": token, + "service": service, + "fluid": _PID_FLUID_DICT.get(service, service), + "line_no": line_no, + "size": None, + "pipe_spec": None, + "insul": None, + } + return None + + +async def _extract_pid_dxf_fast(filepath: str) -> dict: + """DXF에서 layer + regex만으로 구조 추출. 좌표 계산·LLM 호출 없음.""" + import ezdxf + from ezdxf.tools.text import plain_mtext + from collections import Counter + + def _work(): + doc = ezdxf.readfile(filepath) + msp = doc.modelspace() + + linenos: list[dict] = [] + tags: list[dict] = [] + seen_tags: set[str] = set() + + for e in msp.query('TEXT MTEXT'): + if e.dxftype() == 'TEXT': + txt = e.dxf.text or "" + else: + try: + txt = plain_mtext(e.dxf.text or "") + except Exception: + txt = e.dxf.text or "" + txt = txt.strip() + if not txt: + continue + + layer = e.dxf.layer + + # 1) 완전한 배관번호 형식 (FULL_RE) → 레이어 이름 무관하게 배관번호로 처리 + if _PID_LINENO_FULL_RE.match(txt): + parsed = _parse_pid_lineno(txt) + if parsed is not None: + linenos.append(parsed) + continue + + # 2) 짧은 형식(P-10101)은 레이어 이름에 'LINENO'가 포함된 경우에만 배관번호 + # (펌프 태그와 텍스트가 동일하므로 레이어 힌트 불가피) + if 'LINENO' in layer.upper(): + parsed = _parse_pid_lineno(txt) + if parsed is not None: + linenos.append(parsed) + continue + + # 3) 일반 장비/계기 태그 + if _PID_TAG_RE.match(txt): + if txt in seen_tags: + continue + seen_tags.add(txt) + cls = _classify_pid_tag(txt) + tags.append({"tagNo": txt, **cls, "layer": layer}) + + fluid_dist = Counter(l['service'] for l in linenos) + equipment_dist = Counter(t['prefix'] for t in tags if t['kind'] == 'equipment') + instrument_dist = Counter(t['prefix'] for t in tags if t['kind'] == 'instrument') + unique_pipes = len({(l['service'], l['line_no']) for l in linenos}) + + return { + "fluid_dictionary": _PID_FLUID_DICT, + "linenos": linenos, + "tags": tags, + "stats": { + "lineno_label_count": len(linenos), + "unique_pipes": unique_pipes, + "tag_count": len(tags), + "fluid_distribution": dict(fluid_dist.most_common()), + "equipment_distribution": dict(equipment_dist.most_common()), + "instrument_distribution": dict(instrument_dist.most_common()), + }, + } + + return await asyncio.to_thread(_work) + + +# 장비 prefix와 안 겹치는 fluid-only 코드 (짧은 LineNo `XXX-NNNN`을 안전하게 pipe로 인식) +_PID_FLUID_ONLY_CODES: set[str] = set(_PID_FLUID_DICT.keys()) - set(_PID_EQUIPMENT_PREFIX.keys()) + + +def _extract_pid_tags_from_text(text: str) -> list[dict]: + """plain text(PDF/OCR/필터된 DXF text)에서 tag/LineNo를 regex로 추출. + + 분기 우선순위: + 1) 완전한 LineNo (`FLUID-NUM-SIZE-SPEC-INSUL` 5필드, 모호하지 않음) → pipe + 2) 짧은 LineNo (`FLUID-NUM`)이지만 prefix가 fluid 전용 코드일 때 → pipe + 3) 일반 tag (`XXX-NNNN[A]`) → 장비/계기 분류 + instrumentType 설정 + """ + seen: set[str] = set() + out: list[dict] = [] + for token in re.split(r'[\s,;|()\[\]<>{}"\']+', text or ''): + token = token.strip() + if not token or token in seen: + continue + + # 1) 완전한 배관번호 (5~7필드) — FULL_RE 매칭 + m_full = _PID_LINENO_FULL_RE.match(token) + if m_full: + seen.add(token) + parsed = _parse_pid_lineno(token) + out.append({ + "tagNo": token, + "kind": "pipe", + "prefix": parsed["service"], + "type": parsed["fluid"], + "lineNumber": parsed["line_no"], + "size": parsed["size"], + "pipeSpec": parsed["pipe_spec"], + "insul": parsed["insul"], + "confidence": 0.99, + }) + continue + + # 2) 짧은 LineNo — fluid 전용 코드일 때만 + m_short = _PID_LINENO_SHORT_RE.match(token) + if m_short and m_short.group(1) in _PID_FLUID_ONLY_CODES: + seen.add(token) + parsed = _parse_pid_lineno(token) + out.append({ + "tagNo": token, + "kind": "pipe", + "prefix": parsed["service"], + "type": parsed["fluid"], + "lineNumber": parsed["line_no"], + "size": None, + "confidence": 0.95, + }) + continue + + # 3) 일반 tag + if _PID_TAG_RE.match(token): + seen.add(token) + cls = _classify_pid_tag(token) + out.append({ + "tagNo": token, + **cls, + "instrumentType": cls["prefix"], + "confidence": 0.95 if cls["kind"] != "unknown" else 0.5, + }) + return out + + async def _extract_text_from_pdf(filepath: str) -> str: """PyMuPDF로 PDF 파일에서 텍스트 추출.""" import asyncio @@ -820,6 +1096,51 @@ async def list_drawings(unit_no: str | None = None) -> str: conn.close() +# ── Phase 7.1: NL2SQL 의도 라우터 ────────────────────────────────── +# 키워드/정규식 기반 (ML 불필요). query_with_nl 진입 시 분류 → 적절한 도구로 위임. + +_CLASSIFY_RULES = [ + # (정규식, 라우팅 대상 도구명) + (r'활성.*알람|현재.*알람|지금.*알람|active.*alarm', 'active_alarms'), + (r'트립|trip', 'active_alarms'), + (r'상태\s*보고서|교대.*보고|status.*report|운전.*보고', 'generate_status_report'), + # "이상 상황 보고" 패턴 — 사용자가 특정 일자/area 범위로 비정상 상황 요약을 요청하는 경우. + # summarize_events가 since/until/area 파라미터를 받으므로 generate_status_report보다 적합. + (r'이상.*상황|상황.*보고|이상.*보고|비정상.*상황|abnormal', 'summarize_events'), + (r'요약|보고서|리포트|summary|summarize|report', 'summarize_events'), + (r'태그.*찾|tag.*찾|찾아\s*줘|find.*tag|어떤.*태그', 'find_tags'), + (r'이벤트.*조회|이벤트.*목록|event.*list|event.*query|로그.*조회', 'query_events'), +] + + +def _classify_intent(question: str) -> str: + """질문을 키워드 정규식으로 분류. 매칭 실패 시 'query_with_nl' (SQL 경로) 반환.""" + if not question: + return 'query_with_nl' + q = question.lower() + for pattern, target in _CLASSIFY_RULES: + if re.search(pattern, q, re.IGNORECASE): + return target + return 'query_with_nl' + + +@mcp.tool() +async def classify_intent(question: str) -> str: + """질문 의도를 분류하여 적절한 도구명을 반환합니다 (실행하지는 않음). + + 반환값(도구명): active_alarms, generate_status_report, summarize_events, + find_tags, query_events, query_with_nl(기본) + + Args: + question: 자연어 질문 + + Returns: + JSON: { success, question, route } + """ + route = _classify_intent(question) + return json.dumps({"success": True, "question": question, "route": route}, ensure_ascii=False) + + @mcp.tool() async def query_with_nl(question: str) -> str: """자연어 질문을 LLM이 SQL로 변환하고 시계열 DB를 조회합니다. @@ -832,6 +1153,34 @@ async def query_with_nl(question: str) -> str: """ import asyncio import json as json_module + + # Phase 7.1: 의도 라우팅 — 명백한 비-SQL 질문은 전용 도구로 위임 + route = _classify_intent(question) + if route != 'query_with_nl': + try: + if route == 'active_alarms': + payload = await active_alarms() + elif route == 'find_tags': + payload = await find_tags(query=question) + elif route == 'query_events': + payload = await query_events() + elif route == 'summarize_events': + payload = await summarize_events() + elif route == 'generate_status_report': + payload = await generate_status_report() + else: + payload = None + if payload is not None: + try: + obj = json.loads(payload) + except Exception: + obj = {"raw": payload} + obj["_routed_from"] = "query_with_nl" + obj["_route"] = route + return json.dumps(obj, ensure_ascii=False, default=str) + except Exception as e: + # 라우팅 실패 시 원래 SQL 경로로 fallback + pass system = ( "You are a PostgreSQL SQL expert.\n" @@ -1032,7 +1381,7 @@ async def query_events( events = [ {"id": r[0], "tag_name": r[1], "prev_value": r[2], "curr_value": r[3], "event_type": r[4], "event_time": r[5].isoformat() if r[5] else None, - "area": r[6], "section": r[7], "duration_seconds": r[8], "metadata": r[9]} + "area": r[6], "section": r[7], "prev_state_duration_s": r[8], "metadata": r[9]} for r in rows ] return json.dumps({ @@ -1061,7 +1410,7 @@ async def active_alarms(area: str | None = None, limit: int = 100) -> str: limit: 최대 반환 수 (기본 100) Returns: - JSON: { success, count, alarms: [{tag_name, event_type, since, duration_seconds, area, ...}] } + JSON: { success, count, alarms: [{tag_name, event_type, since, prev_state_duration_s, area, ...}] } """ conn = None try: @@ -1092,7 +1441,7 @@ async def active_alarms(area: str | None = None, limit: int = 100) -> str: alarms = [ {"id": r[0], "tag_name": r[1], "curr_value": r[2], "event_type": r[3], "since": r[4].isoformat() if r[4] else None, - "area": r[5], "section": r[6], "duration_seconds": r[7], "metadata": r[8], + "area": r[5], "section": r[6], "prev_state_duration_s": r[7], "metadata": r[8], "age_seconds": r[9]} for r in rows ] @@ -1153,8 +1502,9 @@ async def summarize_events( # LLM 요약 — 가벼운 토큰 수로 제한 sample = events[:max_events] bullet_lines = [ - f"- [{ev['event_type']}] {ev['tag_name']} @ {ev['event_time']}" + f"- [{ev['event_type']}] {ev['tag_name']} @ {_kst_str(ev['event_time'])}" f" ({ev.get('prev_value')}→{ev.get('curr_value')})" + f" (직전상태유지={ev.get('prev_state_duration_s', '?')}s)" f"{(' area=' + ev['area']) if ev.get('area') else ''}" for ev in sample ] @@ -1167,7 +1517,9 @@ async def summarize_events( "2) 알람/트립 (ALARM/TRIP) 핵심 케이스 — 태그·시각·전후값\n" "3) 패턴/특이점 (반복 발생, 동시 발생, area 집중)\n" "4) 다음 점검 권고 (있다면)\n" - "구체적인 태그명과 시각을 포함하되 추측은 자제합니다." + "구체적인 태그명과 시각을 포함하되 추측은 자제합니다.\n\n" + "참고 - 모든 시각은 KST(UTC+9, Asia/Seoul)입니다. " + "`직전상태유지`는 이 이벤트 직전에 태그가 머물렀던 시간(초)입니다." ) user_msg = ( f"분석 대상 이벤트 {len(sample)}건 (전체 {parsed.get('count')}건). " @@ -1230,25 +1582,28 @@ async def generate_status_report(area: str | None = None, hours: int = 24) -> st # 3) LLM 보고서 alarm_lines = [ - f"- [{a['event_type']}] {a['tag_name']} since {a['since']} " + f"- [{a['event_type']}] {a['tag_name']} since {_kst_str(a['since'])} " f"({a.get('age_seconds', 0)}s){' area=' + a['area'] if a.get('area') else ''}" for a in alarms[:30] ] or ["- 활성 알람 없음"] recent_lines = [ - f"- [{ev['event_type']}] {ev['tag_name']} @ {ev['event_time']} " + f"- [{ev['event_type']}] {ev['tag_name']} @ {_kst_str(ev['event_time'])} " f"({ev.get('prev_value')}→{ev.get('curr_value')})" - for ev in events[:40] - ] or ["- 최근 이벤트 없음"] + f" (직전상태유지={ev.get('prev_state_duration_s', '?')}s)" + for ev in sample + ] + focus_line = f"\n특히 다음 관점을 우선해 설명하세요: {focus}\n" if focus else "" system = ( - "당신은 공장 운전 교대 보고서를 작성하는 운전원 보조 AI입니다. " - "다음 형식으로 한국어 보고서를 작성하세요. 마크다운 사용 가능.\n\n" - "# 운전 상태 종합 보고서\n\n" - "## 1. 요약\n(2~3줄로 핵심 상황)\n\n" - "## 2. 활성 알람\n(있으면 표 또는 bullet, 없으면 \"없음\")\n\n" - "## 3. 최근 N시간 이벤트 분석\n(주요 패턴, 빈번한 태그, 동시 발생)\n\n" - "## 4. 권고 조치\n(점검할 태그/area, 우선순위)\n\n" - "수치는 통계 데이터를 그대로 인용하고, 추측은 명시적으로 표시하세요." + "당신은 IIoT/공장 운전 분석 전문가입니다. 디지털 포인트의 상태 변경 이벤트 로그를 보고 " + "한국어로 6~10줄 요약을 만듭니다. 다음 구조를 따릅니다:\n" + "1) 핵심 현황 (총 이벤트 수, 주요 area)\n" + "2) 알람/트립 (ALARM/TRIP) 핵심 케이스 — 태그·시각·전후값\n" + "3) 패턴/특이점 (반복 발생, 동시 발생, area 집중)\n" + "4) 다음 점검 권고 (있다면)\n" + "구체적인 태그명과 시각을 포함하되 추측은 자제합니다.\n\n" + "참고 - 모든 시각은 KST(UTC+9, Asia/Seoul)입니다. " + "`직전상태유지`는 이 이벤트 직전에 태그가 머물렀던 시간(초)입니다." ) user_msg = ( f"대상 area: {area or '전체'}\n" @@ -1294,181 +1649,104 @@ async def generate_status_report(area: str | None = None, hours: int = 24) -> st @mcp.tool() async def extract_pid_tags(text: str, source_type: str) -> str: - """P&ID 도면(DXF/PDF)에서 태그 정보를 추출합니다. + """텍스트에서 P&ID 태그를 regex로 결정론적 추출. LLM 호출 없음. Args: - text: DXF/PDF에서 추출한 텍스트 - source_type: 'dxf' 또는 'pdf' + text: DXF/PDF에서 추출한 plain text + source_type: 'dxf' 또는 'pdf' (로깅용) Returns: - JSON: { success, count, tags: [{tagNo, equipmentName, instrumentType, lineNumber, pidDrawingNo, confidence}] } + JSON: { success, count, tags: [{tagNo, kind, prefix, type, confidence, ...}] } + - kind: 'pipe' | 'equipment' | 'instrument' | 'unknown' + - pipe면 lineNumber/size 포함, fluid는 type 필드 """ - import asyncio - import logging - import re - import json as json_module - - system = ( - "You are a P&ID (Piping and Instrumentation Diagram) expert.\n" - "Extract all instrument and equipment tags from the provided text.\n" - "Return ONLY a valid JSON array. Each element must have exactly these fields:\n" - '{"tagNo":"FCV-101","equipmentName":null,"instrumentType":"FCV","lineNumber":null,"pidDrawingNo":null,"confidence":0.95}\n' - "Rules:\n" - "- tagNo: any token matching [LETTERS]-[DIGITS] or [LETTERS]-[DIGITS]-[SUFFIX]\n" - " Examples: FCV-101, P-10101, T-10100, VG-6203-15A-F1A-n, BT-6200, DP-10101\n" - "- instrumentType: leading letters of tagNo (e.g. FCV, P, T, VG, BT, DP, PSV)\n" - "- equipmentName: descriptive name if present in text near the tag, else null\n" - "- lineNumber: null unless a line number is explicitly associated\n" - "- pidDrawingNo: null unless a drawing number is explicitly associated\n" - "- confidence: 0.95 for clear tags, lower for ambiguous ones\n" - "- Output ONLY the JSON array, no markdown, no explanation.\n" - "- If no tags found, return: []\n" - ) - try: - truncated_text = text[:100000] if len(text) > 100000 else text - - def _call_llm(): - return _llm().chat.completions.create( - model=VLLM_MODEL, - messages=[ - {"role": "system", "content": system}, - {"role": "user", "content": f"Source: {source_type}\n\nText:\n{truncated_text}"}, - ], - max_tokens=32768, - temperature=0.1, - extra_body={"chat_template_kwargs": {"enable_thinking": False}}, - ) - - resp = await asyncio.to_thread(_call_llm) - - raw = (resp.choices[0].message.content or "").strip() - finish_reason = resp.choices[0].finish_reason - - # 마크다운 코드 블록 제거 - if raw.startswith("```"): - lines = raw.splitlines() - raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip() - - # finish_reason=length 로 잘린 경우: 마지막 완전한 객체까지 살린 뒤 배열 닫기 - if finish_reason == "length": - last_close = raw.rfind("}") - if last_close != -1: - raw = raw[:last_close + 1] + "]" - - # 유효한 JSON 배열 추출 (가장 긴 균형 잡힌 [...] 선택) - def _extract_array(s: str) -> str: - depth = 0; start = -1; best = "" - for i, c in enumerate(s): - if c == '[': - if depth == 0: start = i - depth += 1 - elif c == ']': - depth -= 1 - if depth == 0 and start >= 0: - cand = s[start:i+1] - if len(cand) > len(best): best = cand - return best if best else "[]" - - raw = _extract_array(raw) - - # JSON 파싱 — 실패 시 개별 객체 추출로 폴백 - try: - data = json_module.loads(raw) - except json_module.JSONDecodeError: - objects = re.findall(r'\{[^{}]*\}', raw, re.DOTALL) - data = [] - for obj in objects: - try: - data.append(json_module.loads(obj)) - except json_module.JSONDecodeError: - pass - if not data: - return json_module.dumps({"success": False, "count": 0, "tags": []}, ensure_ascii=False) - - logging.info(f"[extract_pid_tags] source={source_type} count={len(data) if isinstance(data, list) else 0}") - - return json_module.dumps({ - "success": True, - "count": len(data), - "tags": data - }, ensure_ascii=False, indent=2) - + tags = _extract_pid_tags_from_text(text or "") + logging.info(f"[extract_pid_tags] source={source_type} count={len(tags)}") + return json.dumps({"success": True, "count": len(tags), "tags": tags}, + ensure_ascii=False, indent=2) except Exception as e: logging.error(f"P&ID 태그 추출 실패: {e}") - logging.error(f"Raw response: {raw[:1000]}") - return json.dumps({"success": False, "error": f"P&ID 태그 추출 실패: {e}"}, ensure_ascii=False) + return json.dumps({"success": False, "error": str(e), "count": 0, "tags": []}, + ensure_ascii=False) @mcp.tool() async def match_pid_tags(pid_tags: list[str], experion_tags: list[str]) -> str: - """P&ID 태그를 Experion 태그에 매핑합니다. + """P&ID 태그를 Experion 태그에 결정론적으로 매핑. LLM 호출 없음. + + 매칭 전략 (보수적 — false positive 방지): + 1) 정규화(lowercase + Experion suffix 제거) 후 동일 → confidence 0.99 + 2) Experion이 PID의 확장 (예: "FT-101" ↔ "ft-101.pv") → confidence 0.95 + 단, PID prefix가 최소 4글자 이상이어야 (`P-1` 같은 짧은 식별자가 다수와 매칭되는 것 방지) + 3) 그 외 → experionTag=null, confidence=0.0 + (UI에서는 "매핑" 버튼 표시 → 운전원 수동 매핑) + + 이전 difflib fuzzy(cutoff 0.75)는 PSV-10101 ↔ p-10101.pv 같은 거짓 매칭을 만들어 제거함. + 숫자 부분만 같으면 종류(PSV/P/XV/VP)가 달라도 묶이는 문제. Args: - pid_tags: P&ID에서 추출한 태그 목록 (예: ["FT-101", "PT-201"]) - experion_tags: Experion 시스템 태그 목록 (예: ["ficq-6113.pv", "pt-201.pv"]) + pid_tags: P&ID에서 추출한 태그 목록 + experion_tags: Experion 시스템 태그 목록 (보통 ".pv" 등 suffix 포함) Returns: JSON: { success, count, mappings: [{pidTag, experionTag, confidence}] } """ - import asyncio - import re - import json as json_module + _EX_SUFFIXES = ('.pv', '.sp', '.mv', '.op', '.cv', '.fieldvalue', '.qv', '.hzset') + _MIN_PREFIX_LEN = 4 # prefix 매칭 최소 길이 - system = ( - "You are a P&ID to Experion tag matching expert.\n" - "Match P&ID tags to Experion tags based on similarity.\n" - "Return ONLY a JSON array of objects with the following structure:\n" - '[{"pidTag":"FT-101","experionTag":"ft-101.pv","confidence":0.92},...]\n' - "IMPORTANT rules:\n" - "- pidTag: The original P&ID tag from input\n" - "- experionTag: The matched Experion tag (lowercase, with .pv/.sp/.mv suffix)\n" - "- confidence: 0.0 to 1.0 based on match quality\n" - "- If no good match found, set confidence < 0.5 and leave experionTag null\n" - "- Do NOT include any explanation, only the JSON array.\n" - "- If no matches found, return an empty array: []\n" - "- temperature=0.1 for deterministic output.\n" - ) + def _norm(tag: str) -> str: + t = (tag or "").strip().lower() + for s in _EX_SUFFIXES: + if t.endswith(s): + t = t[:-len(s)] + break + return t try: - pid_str = "\n".join(pid_tags) - experion_str = "\n".join(experion_tags) + # 정규화된 experion → 원본 (충돌 시 더 짧은 원본 우선) + ex_index: dict[str, str] = {} + for ex in experion_tags or []: + n = _norm(ex) + if not n: + continue + if n not in ex_index or len(ex) < len(ex_index[n]): + ex_index[n] = ex - def _call_llm(): - return _llm().chat.completions.create( - model=VLLM_MODEL, - messages=[ - {"role": "system", "content": system}, - {"role": "user", "content": f"P&ID Tags:\n{pid_str}\n\nExperion Tags:\n{experion_str}"}, - ], - max_tokens=16384, - temperature=0.1, - extra_body={"chat_template_kwargs": {"enable_thinking": False}}, - ) + # prefix 매칭 후보 inverted index (n -> 자기 자신) + ex_norms = list(ex_index.keys()) - resp = await asyncio.to_thread(_call_llm) + mappings = [] + for pid in pid_tags or []: + pid_norm = _norm(pid) + if not pid_norm: + mappings.append({"pidTag": pid, "experionTag": None, "confidence": 0.0}) + continue - raw = (resp.choices[0].message.content or "").strip() - finish_reason = resp.choices[0].finish_reason + # 1) 정확 매칭 + if pid_norm in ex_index: + mappings.append({"pidTag": pid, "experionTag": ex_index[pid_norm], "confidence": 0.99}) + continue - if raw.startswith("```"): - lines = raw.splitlines() - raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip() + # 2) prefix 매칭 — 단, pid가 충분히 구체적일 때만 (≥ 4 chars) + if len(pid_norm) >= _MIN_PREFIX_LEN: + hit = next( + (n for n in ex_norms + if n.startswith(pid_norm + ".") or n.startswith(pid_norm + "-")), + None, + ) + if hit: + mappings.append({"pidTag": pid, "experionTag": ex_index[hit], "confidence": 0.95}) + continue - if finish_reason == "length": - last_close = raw.rfind("}") - if last_close != -1: - raw = raw[:last_close + 1] + "]" - - match = re.search(r'\[.*\]', raw, re.DOTALL) - raw = match.group(0) if match else "[]" - - data = json_module.loads(raw) - return json_module.dumps({"success": True, "count": len(data), "mappings": data}, - ensure_ascii=False, indent=2) + # 3) 매칭 없음 + mappings.append({"pidTag": pid, "experionTag": None, "confidence": 0.0}) + return json.dumps({"success": True, "count": len(mappings), "mappings": mappings}, + ensure_ascii=False, indent=2) except Exception as e: - return json.dumps({"success": False, "error": f"P&ID 태그 매핑 실패: {e}"}, ensure_ascii=False) + logging.error(f"match_pid_tags failed: {e}") + return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False) # ── P&ID 파싱 도구 (DXF/PDF/DWG) ─────────────────────────────────────────────── @@ -1476,111 +1754,27 @@ async def match_pid_tags(pid_tags: list[str], experion_tags: list[str]) -> str: @mcp.tool() async def parse_pid_dxf(filepath: str) -> str: - """ezdxf 기반 DXF 파일 파싱. 텍스트 추출 후 LLM으로 태그 자동 추출. + """DXF에서 구조화된 P&ID 정보를 빠르게 추출. 좌표 계산 없음, LLM 호출 없음. + + - LINENO 레이어 → 라인 마스터 (service/line_no/size/material_spec/flange/insul_*) + - 그 외 TEXT/MTEXT → 태그 후보 (prefix로 장비·계기 분류) + - fluid 사전은 Symbol & Legend FLUID NAME ABBREVIATION 기준 내장 + - 그래프/위상 분석이 필요하면 별도 도구 `build_pid_graph_parallel` 사용 Args: filepath: DXF 파일 경로 Returns: - JSON: { success, text, count, tags: [{tagNo, equipmentName, ...}] } + JSON: { + success, fluid_dictionary, linenos, tags, stats + } """ - import asyncio - import json - import re - try: - def _extract_text(): - return _extract_text_from_dxf(filepath) - text = await asyncio.to_thread(_extract_text) - - if not text.strip(): - return json.dumps({ - "success": True, - "text": "", - "count": 0, - "tags": [] - }, ensure_ascii=False, indent=2) - - # LLM으로 태그 추출 - system = ( - "You are a P&ID (Piping and Instrumentation Diagram) expert.\n" - "Extract instrument and equipment tags from the provided text.\n" - "Return ONLY a JSON array of objects with the following structure:\n" - '[{"tagNo":"FIT-10115","equipmentName":"Flow Transmitter","instrumentType":"FT" OR "FIT OR "TIA","lineNumber":"L-101","pidDrawingNo":"P&ID-001","confidence":0.95},...]\n' - "IMPORTANT rules:\n" - "- tagNo: Standard tag format with these patterns:\n" - " * Instrument: [Function][Loop]-[Number] (e.g., FT-101, PT-201, LI-301, FICQ-6113)\n" - " * Equipment: [Type]-[Number] (e.g., P-10101, T-10100, C-9111, E-10119)\n" - " * Complex: [Type]-[Number]-[Size]-[Class]-[Material]-[Option] (e.g., VG-6203-15A-F1A-n, CD-10513-40A-S1A-H50)\n" - " * Real examples from DXF: BT-6200, SARF-#6-PID-002, P-6101, DP-10101, CHS-6630-100A-F-C50\n" - "- instrumentType: First 2-4 letters of tagNo (FIT, PT, LI, FICQ, TCV, FCV, PCV, PG, TG, etc.)\n" - "- equipmentName: Descriptive name if available, otherwise null\n" - "- lineNumber: Line number if available, otherwise null\n" - "- pidDrawingNo: Drawing number if available, otherwise null\n" - "- confidence: 0.0 to 1.0 based on how clearly the tag was identified\n" - "- Do NOT include any explanation, only the JSON array.\n" - "- If no tags found, return an empty array: []\n" - "- temperature=0.1 for deterministic output.\n" - ) - - truncated_text = text[:12000] if len(text) > 12000 else text - - def _call_llm(): - return _llm().chat.completions.create( - model=VLLM_MODEL, - messages=[ - {"role": "system", "content": system}, - {"role": "user", "content": f"Source: dxf\n\nText:\n{truncated_text}"}, - ], - max_tokens=4096, - temperature=0.1, - ) - - resp = await asyncio.to_thread(_call_llm) - - raw = (resp.choices[0].message.content or "").strip() - - # 마크다운 코드 블록 제거 - if raw.startswith("```"): - lines = raw.splitlines() - raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip() - - # JSON 배열 추출 - match = re.search(r'\[.*\]', raw, re.DOTALL) - if match: - raw = match.group(0) - - # JSON 파싱 시도 - try: - data = json.loads(raw) - except json.JSONDecodeError: - # JSON 배열 추출 시도 (더 엄격한 패턴) - match = re.search(r'\[\s*\{.*?\}\s*\]', raw, re.DOTALL) - if match: - raw_clean = match.group(0) - try: - data = json.loads(raw_clean) - except json.JSONDecodeError: - # 마지막으로, JSON 배열을 개별 객체로 분리하여 파싱 시도 - objects = re.findall(r'\{[^{}]*\}', raw, re.DOTALL) - data = [] - for obj in objects: - try: - data.append(json.loads(obj)) - except json.JSONDecodeError: - pass - - if not isinstance(data, list): - data = [] - - return json.dumps({ - "success": True, - "text": text[:10000], # 제한 - "count": len(text), - "tags": data - }, ensure_ascii=False, indent=2) + data = await _extract_pid_dxf_fast(filepath) + return json.dumps({"success": True, **data}, ensure_ascii=False, indent=2) except Exception as e: - return json.dumps({"success": False, "error": f"DXF 파싱 실패: {e}"}, ensure_ascii=False) + logging.error(f"parse_pid_dxf failed: {e}") + return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False) @mcp.tool() @@ -1911,6 +2105,173 @@ async def parse_document( return json.dumps({"success": False, "error": f"parse failed: {e}"}, ensure_ascii=False) +# ── Field Instrument Inference ────────────────────────────────────────────── + +@mcp.tool() +def infer_field_instruments( + use_llm: bool = False, + seed_doc_id: str | None = None, + seed_excel_path: str | None = None, +) -> str: + """ + v_tag_summary의 모든 base_tag를 대상으로 현장 계기를 자동 유추하여 Excel 초안 생성. + + Args: + use_llm: LLM으로 description 한국어 초안 보강 (기본 False, Phase C에서 활성화) + seed_doc_id: 기존 Excel doc_id 지정 시 수정사항 보존 + 신규만 추가 + seed_excel_path: 기존 Excel 파일 경로 (C# 측에서 전달) + + Returns: + JSON: { success, doc_path, instrument_count, unmatched_count, message } + """ + from instrument_inference.rules import load_rules + from instrument_inference.infer import infer_instruments_for_base_tag + from instrument_inference.excel import generate_excel + + # [H-3 수정] v_tag_summary에서 base_tag별 dp 집합을 SQL로 한 번에 추출 + def _fetch_tags(): + import psycopg + conn = psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT) + try: + with conn.cursor() as cur: + cur.execute(""" + SELECT + base_tag, + area, + CASE WHEN pv IS NOT NULL THEN '.pv' END, + CASE WHEN sp IS NOT NULL THEN '.sp' END, + CASE WHEN op IS NOT NULL THEN '.op' END, + CASE WHEN instate0 IS NOT NULL THEN '.instate0' END, + CASE WHEN instate1 IS NOT NULL THEN '.instate1' END, + CASE WHEN instate2 IS NOT NULL THEN '.instate2' END + FROM v_tag_summary + ORDER BY base_tag + """) + return cur.fetchall() + finally: + conn.close() + + # [L-3 수정] .qv는 v_tag_summary에 없으므로 별도 조회 + def _fetch_qv_tags(): + import psycopg + conn = psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT) + try: + with conn.cursor() as cur: + cur.execute(""" + SELECT DISTINCT SUBSTRING(tagname FROM '(.*)\\.qv') AS base_tag + FROM realtime_table + WHERE tagname LIKE '%.qv' + """) + return set(r[0] for r in cur.fetchall()) + finally: + conn.close() + + tags = _fetch_tags() + # [M-1 수정] asyncio.to_thread 데드 코드 삭제 — sync def이므로 blocking 호출 + + qv_tags = _fetch_qv_tags() + + all_instruments = [] + power_equipment = [] + unmatched_tags = [] + + for row in tags: + base_tag = row[0] + area = row[1] + if not base_tag: + continue + + # [H-3 수정] 실제 DB 컬럼 NULL 여부로 dp 집합 구성 + dp = [d for d in row[2:8] if d is not None] + if base_tag in qv_tags: + dp.append(".qv") + + try: + result = infer_instruments_for_base_tag(base_tag, dp, area or "") + # [H-4 수정] 빈 리스트 가드 + if not result: + unmatched_tags.append(base_tag) + continue + if result[0].get("needs_review") and result[0].get("inference_basis") == "unmatched_prefix": + unmatched_tags.append(base_tag) + else: + for inst in result: + if inst.get("role") == "power_equipment": + power_equipment.append(inst) + else: + all_instruments.append(inst) + except Exception as e: + logging.warning(f"[infer] {base_tag} 처리 실패: {e}") + unmatched_tags.append(base_tag) + + # [M-5 수정] instrument_id 기준 dedup + def _dedup(inst_list): + seen = {} + deduped = [] + for inst in inst_list: + iid = inst["instrument_id"] + if iid in seen: + existing = seen[iid] + existing["inference_basis"] = f"{existing['inference_basis']}; also from {inst.get('parent_base_tag', '?')}" + else: + seen[iid] = inst + deduped.append(inst) + return deduped + + all_instruments = _dedup(all_instruments) + power_equipment = _dedup(power_equipment) + + # [M-3 수정] seed_excel_path 기반 머지 로직 + if seed_excel_path and os.path.exists(seed_excel_path): + all_instruments = _merge_with_seed_excel(seed_excel_path, all_instruments) + + # Excel 생성 (4시트: instruments, naming_convention, unmatched_tags, power_equipment) + xlsx_path = generate_excel(all_instruments, unmatched_tags, power_equipment) + + # LLM 보강 (Phase C — 아직 stub) + if use_llm: + xlsx_path = _enrich_descriptions_llm(xlsx_path) + + return json.dumps({ + "success": True, + "doc_path": xlsx_path, + "instrument_count": len(all_instruments), + "power_equipment_count": len(power_equipment), + "unmatched_count": len(unmatched_tags), + "message": f"{len(all_instruments)}개 계기, {len(power_equipment)}개 동력기기 유추 완료 ({len(unmatched_tags)}개 미매칭)", + }) + + +def _merge_with_seed_excel(seed_path: str, new_instruments: list[dict]) -> list[dict]: + """[M-3 수정] 기존 Excel에서 instruments 시트 로드 후 머지.""" + from openpyxl import load_workbook + wb = load_workbook(seed_path, read_only=True, data_only=True) + ws = wb["instruments"] + rows = list(ws.iter_rows(values_only=True)) + if not rows: + return new_instruments + headers = [str(c) for c in rows[0]] + existing = [] + for r in rows[1:]: + inst = dict(zip(headers, r)) + if inst.get("delete") in (True, "TRUE", "True", "YES", "Yes", "Y", "y"): + continue + existing.append(inst) + + existing_ids = {i["instrument_id"] for i in existing} + merged = list(existing) + for inst in new_instruments: + if inst["instrument_id"] not in existing_ids: + merged.append(inst) + return merged + + +def _enrich_descriptions_llm(xlsx_path: str) -> str: + """LLM으로 description 한국어 초안 보강 (Phase C stub).""" + # [M-2 수정] Phase C 구현 전까지 no-op. UI는 disabled로 처리. + return xlsx_path + + # ── 엔트리포인트 ────────────────────────────────────────────────────────────── def main(): diff --git a/mcp-server/worker/pid_extract_prompts.py b/mcp-server/worker/pid_extract_prompts.py index 9c2549d..bb2af7a 100644 --- a/mcp-server/worker/pid_extract_prompts.py +++ b/mcp-server/worker/pid_extract_prompts.py @@ -59,13 +59,17 @@ Examples: PG-101, TG-201, LG-301, PG-10101, TG-10201 # 펌프: P-10101, VP-10117, DP-10101 등 _PUMP_PROMPT = _PROMPT_HEADER + """ -Extract ONLY pumps and compressors. +Extract ONLY pumps and compressors (simple equipment tags, NO pipe size suffix). -Target equipment types: P (pump), VP (vertical pump), DP (dual pump), -C (compressor), CP (centrifugal pump), BP (booster pump), +Target equipment types: P (pump), VP (vertical pump), DP (dual pump), +C (compressor), CP (centrifugal pump), BP (booster pump), SP (sump pump), and their variants. -Examples: P-10101, VP-10117, DP-10101, C-10201, CP-10301, BP-10401 +Examples (4~5 digit loop numbers): P-10101, VP-10117, DP-10101, C-10201, P-9101, P-9116, VP-9201 + +IMPORTANT: Do NOT extract pipeline/line numbers that have a pipe size suffix (e.g. 25A, 50A, 100A). + SKIP (pipeline, not a pump): P-10101-25A-F1A-n, P-9107-25A-F-n, CHR-9641-50A-F-C50 + INCLUDE (pump tag): P-10101, VP-10117, P-9101 """ # 프롬프트 매핑 diff --git a/src/Core/Application/Services/PidExtractorService.cs b/src/Core/Application/Services/PidExtractorService.cs index 248aea2..dac4af9 100644 --- a/src/Core/Application/Services/PidExtractorService.cs +++ b/src/Core/Application/Services/PidExtractorService.cs @@ -17,6 +17,8 @@ public class PidExtractorService : IPidExtractorService private readonly McpClient _mcp; private readonly ExperionDbContext _dbContext; private readonly ILogger _logger; + private readonly SemaphoreSlim _cacheLock = new(1, 1); + private List? _cachedRules; public PidExtractorService(McpClient mcp, ExperionDbContext dbContext, ILogger logger) { @@ -62,15 +64,27 @@ public class PidExtractorService : IPidExtractorService var mappingJson = await _mcp.MatchPidTagsAsync(pidTagNos, experionTagNames); var mappings = ParseMappingJson(mappingJson); + // 중복 체크: 기존 DB에 있는 TagNo는 제외 (대소문자 구분 없음) + var existingTagNos = new HashSet( + await _dbContext.PidEquipment.Select(e => e.TagNo).ToListAsync(), + StringComparer.OrdinalIgnoreCase); + var newItems = extractedItems.Where(i => !existingTagNos.Contains(i.TagNo)).ToList(); + int skippedCount = extractedItems.Count - newItems.Count; + + if (skippedCount > 0) + _logger.LogInformation("P&ID 중복 제외: {Skipped}건 스킵 (이미 존재)", skippedCount); + // DB 저장 var dbItems = new List(); - foreach (var item in extractedItems) + foreach (var item in newItems) { mappings.TryGetValue(item.TagNo, out var matched); var experionTag = matched != null ? await _dbContext.RealtimePoints.FirstOrDefaultAsync(r => r.TagName == matched) : await FindFallbackTagAsync(item.TagNo); + var category = await MatchCategoryAsync(item.TagNo); + dbItems.Add(new PidEquipment { TagNo = item.TagNo, @@ -80,20 +94,27 @@ public class PidExtractorService : IPidExtractorService PidDrawingNo = item.PidDrawingNo, Confidence = item.Confidence, ExperionTagId = experionTag?.Id, + Category = category, ExtractedAt = DateTime.UtcNow, UpdatedAt = DateTime.UtcNow }); } - await _dbContext.PidEquipment.AddRangeAsync(dbItems); - await _dbContext.SaveChangesAsync(); + if (dbItems.Count > 0) + { + await _dbContext.PidEquipment.AddRangeAsync(dbItems); + await _dbContext.SaveChangesAsync(); + } - _logger.LogInformation("P&ID 추출 완료: {Total}건 저장 (파일: {FileName})", dbItems.Count, fileName); + _logger.LogInformation( + "P&ID 추출 완료: {Total}건 저장, {Skipped}건 중복 스킵 (파일: {FileName})", + dbItems.Count, skippedCount, fileName); return new PidExtractionResult( TotalCount: dbItems.Count, ConfidenceItems: dbItems.Count(i => i.Confidence >= 0.7), - LowConfidenceItems: dbItems.Count(i => i.Confidence < 0.5)); + LowConfidenceItems: dbItems.Count(i => i.Confidence < 0.5), + SkippedDuplicates: skippedCount); } private string ExtractDxfText(Stream stream) @@ -143,7 +164,7 @@ public class PidExtractorService : IPidExtractorService // - 단일 글자 장비 태그 포함: P-10101, T-10100, E-10119, C-10111 // - 다중 글자 계측 태그: FCV-101, FICQ-6113, PSV-6203 // - 복합 태그: VG-6203-15A-F1A-n, CD-10513-40A - if (Regex.IsMatch(trimmed, @"[A-Z]{1,6}-\d{2,6}(-[A-Z0-9]+)*")) + if (Regex.IsMatch(trimmed, @"[A-Z]{1,6}-\d{2,6}(-[A-Z0-9]+)*", RegexOptions.IgnoreCase)) { filteredLines.Add(trimmed); } @@ -265,6 +286,15 @@ public class PidExtractorService : IPidExtractorService await _dbContext.SaveChangesAsync(); } + public async Task DeleteAsync(long id) + { + var e = await _dbContext.PidEquipment.FindAsync(id); + if (e == null) return false; + _dbContext.PidEquipment.Remove(e); + await _dbContext.SaveChangesAsync(); + return true; + } + public Task GetTotalCountAsync() => _dbContext.PidEquipment.CountAsync(); public Task GetConfidenceItemsCountAsync() => _dbContext.PidEquipment.CountAsync(e => e.Confidence >= 0.7); public Task GetLowConfidenceItemsCountAsync() => _dbContext.PidEquipment.CountAsync(e => e.Confidence < 0.5); @@ -281,13 +311,16 @@ public class PidExtractorService : IPidExtractorService }; } - public Task ExportToCsvAsync(IEnumerable items) + public async Task ExportToCsvAsync(IEnumerable items) { - var sb = new StringBuilder(); - sb.AppendLine("TagNo,EquipmentName,InstrumentType,LineNumber,PidDrawingNo,Confidence,IsActive,ExtractedAt,ExperionTagId"); - foreach (var i in items) - sb.AppendLine($"{Csv(i.TagNo)},{Csv(i.EquipmentName)},{Csv(i.InstrumentType)},{Csv(i.LineNumber)},{Csv(i.PidDrawingNo)},{i.Confidence},{i.IsActive},{i.ExtractedAt:O},{i.ExperionTagId}"); - return Task.FromResult(sb.ToString()); + return await Task.Run(() => + { + var sb = new StringBuilder(); + sb.AppendLine("TagNo,EquipmentName,InstrumentType,LineNumber,PidDrawingNo,Confidence,IsActive,ExtractedAt,ExperionTagId,Category,Role,From,To"); + foreach (var i in items) + sb.AppendLine($"{Csv(i.TagNo)},{Csv(i.EquipmentName)},{Csv(i.InstrumentType)},{Csv(i.LineNumber)},{Csv(i.PidDrawingNo)},{i.Confidence},{i.IsActive},{i.ExtractedAt:O},{i.ExperionTagId},{Csv(i.Category)},{Csv(i.Role)},{Csv(i.FromTag)},{Csv(i.ToTag)}"); + return sb.ToString(); + }); } private static string Csv(string? v) @@ -297,38 +330,216 @@ public class PidExtractorService : IPidExtractorService ? $"\"{v.Replace("\"", "\"\"")}\"" : v; } - public Task ExportToExcelAsync(IEnumerable items) + public async Task ExportToExcelAsync(IEnumerable items) { - using var package = new OfficeOpenXml.ExcelPackage(); - var worksheet = package.Workbook.Worksheets.Add("P&ID Equipment"); - - // 헤더 - worksheet.Cells[1, 1].Value = "태그번호"; - worksheet.Cells[1, 2].Value = "장비명"; - worksheet.Cells[1, 3].Value = "계기유형"; - worksheet.Cells[1, 4].Value = "라인번호"; - worksheet.Cells[1, 5].Value = "도면번호"; - worksheet.Cells[1, 6].Value = "신뢰도"; - worksheet.Cells[1, 7].Value = "상태"; - worksheet.Cells[1, 8].Value = "추출일시"; - worksheet.Cells[1, 9].Value = "Experion 태그"; - - int row = 2; - foreach (var item in items) + return await Task.Run(async () => { - worksheet.Cells[row, 1].Value = item.TagNo; - worksheet.Cells[row, 2].Value = item.EquipmentName ?? ""; - worksheet.Cells[row, 3].Value = item.InstrumentType ?? ""; - worksheet.Cells[row, 4].Value = item.LineNumber ?? ""; - worksheet.Cells[row, 5].Value = item.PidDrawingNo ?? ""; - worksheet.Cells[row, 6].Value = item.Confidence; - worksheet.Cells[row, 7].Value = item.IsActive ? "활성" : "비활성"; - worksheet.Cells[row, 8].Value = item.ExtractedAt; - worksheet.Cells[row, 9].Value = item.ExperionTag?.TagName ?? ""; - row++; - } + using var package = new OfficeOpenXml.ExcelPackage(); - return Task.FromResult(package.GetAsByteArray()); + var rules = await GetRulesCachedAsync(); + var prefixToDesc = rules + .ToDictionary(r => r.Prefix.ToLowerInvariant(), r => r.Description ?? r.Prefix); + + var grouped = items + .GroupBy(i => string.IsNullOrEmpty(i.Category) ? "__unmatched__" : i.Category!) + .ToDictionary(g => g.Key, g => g.ToList()); + + var sheetOrder = new[] + { + PidEquipment.CategoryInstrument, + PidEquipment.CategoryPowerEquipment, + PidEquipment.CategoryStorageEquipment, + PidEquipment.CategoryProcessEquipment, + PidEquipment.CategoryUtilityEquipment, + PidEquipment.CategoryPipings, + "__unmatched__" + }; + + var sheetNames = new Dictionary + { + [PidEquipment.CategoryInstrument] = "Instrument", + [PidEquipment.CategoryPowerEquipment] = "Power Equipment", + [PidEquipment.CategoryStorageEquipment] = "Storage Equipment", + [PidEquipment.CategoryProcessEquipment] = "Process Equipment", + [PidEquipment.CategoryUtilityEquipment] = "Utility Equipment", + [PidEquipment.CategoryPipings] = "Pipings", + ["__unmatched__"] = "Unmatched" + }; + + foreach (var cat in sheetOrder) + { + if (!grouped.TryGetValue(cat, out var groupItems) || groupItems.Count == 0) + continue; + + var sheetName = sheetNames[cat]; + var worksheet = package.Workbook.Worksheets.Add(sheetName); + + worksheet.Cells[1, 1].Value = "태그번호"; + worksheet.Cells[1, 2].Value = "장비명"; + worksheet.Cells[1, 3].Value = "장비타입"; + worksheet.Cells[1, 4].Value = "라인번호"; + worksheet.Cells[1, 5].Value = "도면번호"; + worksheet.Cells[1, 6].Value = "신뢰도"; + worksheet.Cells[1, 7].Value = "상태"; + worksheet.Cells[1, 8].Value = "추출일시"; + worksheet.Cells[1, 9].Value = "Experion 태그"; + worksheet.Cells[1, 10].Value = "카테고리"; + worksheet.Cells[1, 11].Value = "Role"; + worksheet.Cells[1, 12].Value = "From"; + worksheet.Cells[1, 13].Value = "To"; + + using var headerRange = worksheet.Cells[1, 1, 1, 13]; + headerRange.Style.Font.Bold = true; + headerRange.Style.Fill.PatternType = OfficeOpenXml.Style.ExcelFillStyle.Solid; + headerRange.Style.Fill.BackgroundColor.SetColor(System.Drawing.Color.LightGray); + + int row = 2; + foreach (var item in groupItems) + { + worksheet.Cells[row, 1].Value = item.TagNo; + worksheet.Cells[row, 2].Value = item.EquipmentName ?? ""; + worksheet.Cells[row, 3].Value = item.InstrumentType ?? ""; + worksheet.Cells[row, 4].Value = item.LineNumber ?? ""; + worksheet.Cells[row, 5].Value = item.PidDrawingNo ?? ""; + worksheet.Cells[row, 6].Value = item.Confidence; + worksheet.Cells[row, 7].Value = item.IsActive ? "활성" : "비활성"; + worksheet.Cells[row, 8].Value = item.ExtractedAt; + worksheet.Cells[row, 9].Value = item.ExperionTag?.TagName ?? ""; + worksheet.Cells[row, 10].Value = item.Category ?? ""; + worksheet.Cells[row, 11].Value = item.Role ?? ""; + worksheet.Cells[row, 12].Value = item.FromTag ?? ""; + worksheet.Cells[row, 13].Value = item.ToTag ?? ""; + row++; + } + + worksheet.Cells.AutoFitColumns(); + } + + return package.GetAsByteArray(); + }); + } + + // ── Prefix Rule Cache ────────────────────────────────────────────────────── + + private async Task> GetRulesCachedAsync() + { + var rules = _cachedRules; + if (rules != null) return rules; + + await _cacheLock.WaitAsync(); + try + { + rules = _cachedRules; + if (rules != null) return rules; + rules = await _dbContext.PidPrefixRules + .OrderByDescending(r => r.Prefix.Length) + .ThenBy(r => r.SortOrder) + .ToListAsync(); + _cachedRules = rules; + return rules; + } + finally + { + _cacheLock.Release(); + } + } + + private void InvalidateRulesCache() + { + Interlocked.Exchange(ref _cachedRules, null); + } + + // 배관번호 패턴: SERVICE-LINENUM-SIZE(숫자+알파벳)-... 3번째 필드에 파이프 사이즈 존재 + private static readonly Regex _pipeLineNoRe = new( + @"^[A-Z][A-Z0-9]{0,3}-\d{3,6}-\d{1,4}[A-Za-z]-", + RegexOptions.Compiled); + + private async Task MatchCategoryAsync(string tagNo) + { + if (_pipeLineNoRe.IsMatch(tagNo)) + return PidEquipment.CategoryPipings; + + var rules = await GetRulesCachedAsync(); + return rules.FirstOrDefault(r => + tagNo.StartsWith(r.Prefix, StringComparison.OrdinalIgnoreCase))?.Category; + } + + // ── Prefix Rule CRUD ─────────────────────────────────────────────────────── + + public async Task> GetPrefixRulesAsync() + { + return await _dbContext.PidPrefixRules + .OrderBy(r => r.SortOrder) + .ThenBy(r => r.Prefix) + .ToListAsync(); + } + + public async Task CreatePrefixRuleAsync(CreatePidPrefixRuleRequest request) + { + var rule = new PidPrefixRule + { + Prefix = request.Prefix.Trim(), + Category = request.Category, + Description = request.Description?.Trim(), + SortOrder = request.SortOrder, + CreatedAt = DateTime.UtcNow, + UpdatedAt = DateTime.UtcNow + }; + _dbContext.PidPrefixRules.Add(rule); + await _dbContext.SaveChangesAsync(); + InvalidateRulesCache(); + return rule; + } + + public async Task UpdatePrefixRuleAsync(int id, UpdatePidPrefixRuleRequest request) + { + var rule = await _dbContext.PidPrefixRules.FindAsync(id); + if (rule == null) return null; + rule.Prefix = request.Prefix.Trim(); + rule.Category = request.Category; + rule.Description = request.Description?.Trim(); + rule.SortOrder = request.SortOrder; + rule.UpdatedAt = DateTime.UtcNow; + await _dbContext.SaveChangesAsync(); + InvalidateRulesCache(); + return rule; + } + + public async Task DeletePrefixRuleAsync(int id) + { + var rule = await _dbContext.PidPrefixRules.FindAsync(id); + if (rule == null) return false; + _dbContext.PidPrefixRules.Remove(rule); + await _dbContext.SaveChangesAsync(); + InvalidateRulesCache(); + return true; + } + + public async Task ApplyCategoriesToExistingAsync() + { + const int batchSize = 1000; + int total = 0; + while (true) + { + var batch = await _dbContext.PidEquipment + .Where(e => e.Category == null) + .Take(batchSize) + .ToListAsync(); + if (!batch.Any()) break; + + foreach (var item in batch) + { + var category = await MatchCategoryAsync(item.TagNo); + if (category != null) + { + item.Category = category; + item.UpdatedAt = DateTime.UtcNow; + total++; + } + } + await _dbContext.SaveChangesAsync(); + } + return total; } }