diff --git a/mcp-server/server.py b/mcp-server/server.py index 7ec8c16..f1f8aa3 100644 --- a/mcp-server/server.py +++ b/mcp-server/server.py @@ -79,6 +79,12 @@ from pipeline.analyzer import PidAnalysisEngine import networkx as nx import asyncio +# ── Verifier (Phase B MVP — R1·R2·R4) ── +from verifier.validators import ( + validate_tag, validate_area, validate_direction, validate_max_depth, + log_rejection, VerifierError, +) + # ── 임베딩 (Ollama) ─────────────────────────────────────────────────────────── async def _embed(text: str) -> list[float]: @@ -248,20 +254,37 @@ _PID_TAG_RE = re.compile(r'^([A-Z]{1,4})-(\d{3,6})([A-Z])?$') +# DCS 함수블록 prefix frozenset — compound형 포함 (ISA _systemFuncLetters 기준) +# ⚠️ C# Boot DDL UPDATE 목록과 수동 동기화 필요 (향후 dcs_prefixes.py 분리 고려) +_DCS_PREFIXES: frozenset[str] = frozenset({ + "FIC", "FICA", "FICQ", "FICR", + "TIC", "TICA", "TICQ", + "PIC", "PICA", + "LIC", "LICA", + "FY", "TY", "PY", "LY", + "FV", "TV", "PV", "LV", +}) + + def _classify_pid_tag(tag_no: str) -> dict: """tagNo의 prefix로 장비/계기 종류 분류 (좌표·LLM 사용 안 함).""" m = _PID_TAG_RE.match(tag_no) if not m: - return {"kind": "unknown", "prefix": None, "type": None} + return {"kind": "unknown", "prefix": None, "type": None, "tag_dcs": False} prefix = m.group(1) if prefix in _PID_EQUIPMENT_PREFIX: - return {"kind": "equipment", "prefix": prefix, "type": _PID_EQUIPMENT_PREFIX[prefix]} + return {"kind": "equipment", "prefix": prefix, "type": _PID_EQUIPMENT_PREFIX[prefix], "tag_dcs": False} if 2 <= len(prefix) <= 4 and prefix[0] in _PID_INSTRUMENT_FIRST: measure = _PID_INSTRUMENT_FIRST[prefix[0]] mods = [_PID_INSTRUMENT_MODIFIER[c] for c in prefix[1:] if c in _PID_INSTRUMENT_MODIFIER] type_name = (measure + " " + " ".join(mods)).strip() - return {"kind": "instrument", "prefix": prefix, "type": type_name} - return {"kind": "unknown", "prefix": prefix, "type": None} + return { + "kind": "instrument", + "prefix": prefix, + "type": type_name, + "tag_dcs": prefix in _DCS_PREFIXES, + } + return {"kind": "unknown", "prefix": prefix, "type": None, "tag_dcs": False} def _parse_pid_lineno(token: str) -> dict | None: @@ -619,6 +642,21 @@ async def _get_db_connection(): return await asyncio.to_thread(_connect) +def _get_db_connection_sync(): + """PostgreSQL DB 연결 획득 (sync — Verifier 내부용).""" + import psycopg + return psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT) + + +def _check(tool: str, params: dict, *errs) -> str | None: + """첫 번째 비-None VerifierError를 로그 + JSON str 반환. 없으면 None.""" + for e in errs: + if e: + log_rejection(tool, params, e) + return json.dumps(e.to_dict(), ensure_ascii=False) + return None + + def _validate_sql(sql: str) -> tuple[bool, str]: """SQL 안전 검증 — SELECT/WITH만 허용, 위험 키워드 차단.""" if len(sql) > 4000: @@ -660,6 +698,10 @@ Tables: realtime_table(tagname TEXT, livevalue TEXT, timestamp TIMESTAMPTZ) tag_metadata(base_tag TEXT, attribute TEXT, value TEXT) event_history_table(tagname TEXT, prev_value TEXT, curr_value TEXT, event_type TEXT, event_time TIMESTAMPTZ, duration_seconds INT) + pid_equipment(tag_no TEXT, category TEXT, tag_dcs BOOL, tag_class TEXT, instrument_type TEXT, from_tag TEXT, to_tag TEXT) + -- tag_dcs=TRUE: DCS 함수블록(FIC/TIC/PIC류), FALSE: 현장 물리 계기(FT/FCV류) + -- tag_class: 'field'(현장) / 'system'(DCS) — tag_dcs 기반 + -- from_tag(상류) → tag_no → to_tag(하류) 연결 추적 Views: v_tag_summary(base_tag TEXT, pv TEXT, sp TEXT, op TEXT, description TEXT, area TEXT, sub_area TEXT) @@ -974,11 +1016,12 @@ async def upsert_pid_connection( role: str | None = None, category: str | None = None, tag_class: str | None = None, + tag_dcs: bool | None = None, ) -> str: """운전자가 작성한 from/to 연결 1건을 pid_equipment 에 멱등(idempotent) 반영. ⚠️ 오직 pid_equipment 테이블만, 아래 컬럼만 변경한다(나머지는 절대 안 건드림): - from_tag, to_tag, from_at, to_at, role, category, tag_class, connection_locked, updated_at + from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs, connection_locked, updated_at equipment_name/instrument_type/line_number/pid_drawing_no/pos_x/pos_y/confidence/is_active 는 보존. 매칭 규칙(문서규칙 §5): @@ -993,7 +1036,8 @@ async def upsert_pid_connection( Args: tag_no: 태그번호 (필수, 대소문자 무시 매칭). from_tag/to_tag: 상류/하류 태그. 병렬이면 호출 측에서 "A, B" 콤마 병합해 전달(이 도구는 분리하지 않음). - from_at/to_at: 위치/서술 텍스트. role/category/tag_class: 그대로 저장(None이면 기존 유지). + from_at/to_at: 위치/서술 텍스트. role/category/tag_class/tag_dcs: 그대로 저장(None이면 기존 유지). + tag_dcs: TRUE=DCS 함수블록(FIC/TIC류), FALSE=현장 물리 계기(FT/FCV류). None이면 기존 유지. Returns: JSON { success, action(update_existing|update_filled|insert), id, tag_no, before, after } @@ -1007,14 +1051,18 @@ async def upsert_pid_connection( tag_no = _n(tag_no) if not tag_no: return json.dumps({"success": False, "error": "tag_no 가 비었습니다."}, ensure_ascii=False) + err = _check("upsert_pid_connection", {"tag_no": tag_no}, validate_tag(tag_no, _get_db_connection_sync)) + if err: return err from_tag, to_tag = _n(from_tag), _n(to_tag) from_at, to_at = _n(from_at), _n(to_at) role, category, tag_class = _n(role), _n(category), _n(tag_class) + # bool은 _n() 미사용 — None이면 기존 유지, 값이면 bool로 강제 변환 + tag_dcs_val = bool(tag_dcs) if tag_dcs is not None else None locked = (from_tag is not None) or (to_tag is not None) - _SNAP = ["tag_no", "from_tag", "to_tag", "from_at", "to_at", "role", "category", "tag_class", "connection_locked"] + _SNAP = ["tag_no", "from_tag", "to_tag", "from_at", "to_at", "role", "category", "tag_class", "tag_dcs", "connection_locked"] def _snap(cur, _id): - cur.execute("""SELECT tag_no, from_tag, to_tag, from_at, to_at, role, category, tag_class, connection_locked + cur.execute("""SELECT tag_no, from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs, connection_locked FROM pid_equipment WHERE id=%s""", (_id,)) r = cur.fetchone() return dict(zip(_SNAP, r)) if r else None @@ -1056,26 +1104,28 @@ async def upsert_pid_connection( from_tag=%s, to_tag=%s, from_at=%s, to_at=%s, role=%s, category=COALESCE(%s, category), tag_class=COALESCE(%s, tag_class), + tag_dcs=COALESCE(%s, tag_dcs), connection_locked=%s, updated_at=now() WHERE id=%s""", - (from_tag, to_tag, from_at, to_at, role, category, tag_class, locked, target_id)) + (from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs_val, locked, target_id)) else: action = "insert" before = None # 같은 tag 기존 행에서 정적 메타 복사 (없으면 NULL) - cur.execute("""SELECT equipment_name, instrument_type, line_number, pid_drawing_no, category + cur.execute("""SELECT equipment_name, instrument_type, line_number, pid_drawing_no, category, tag_dcs FROM pid_equipment WHERE lower(tag_no)=lower(%s) ORDER BY id LIMIT 1""", (tag_no,)) - meta = cur.fetchone() or (None, None, None, None, None) - eq_name, inst_type, line_no, draw_no, ex_cat = meta + meta = cur.fetchone() or (None, None, None, None, None, None) + eq_name, inst_type, line_no, draw_no, ex_cat, ex_dcs = meta cur.execute( """INSERT INTO pid_equipment (tag_no, equipment_name, instrument_type, line_number, pid_drawing_no, - from_tag, to_tag, from_at, to_at, role, category, tag_class, + from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs, connection_locked, confidence, is_active, extracted_at, updated_at) - VALUES (%s,%s,%s,%s,%s, %s,%s,%s,%s,%s, COALESCE(%s,%s),%s, %s, 0, TRUE, now(), now()) + VALUES (%s,%s,%s,%s,%s, %s,%s,%s,%s,%s, COALESCE(%s,%s),%s, COALESCE(%s,%s,FALSE), %s, 0, TRUE, now(), now()) RETURNING id""", (tag_no, eq_name, inst_type, line_no, draw_no, - from_tag, to_tag, from_at, to_at, role, category, ex_cat, tag_class, locked)) + from_tag, to_tag, from_at, to_at, role, category, ex_cat, tag_class, + tag_dcs_val, ex_dcs, locked)) target_id = cur.fetchone()[0] after = _snap(cur, target_id) @@ -1108,6 +1158,13 @@ async def query_pv_history(tag_names: list[str], time_from: str, time_to: str, l Returns: JSON: { success, tag_names, time_range, limit, data } """ + _first_bad = None + for t in (tag_names or []): + _first_bad = validate_tag(t, _get_db_connection_sync) + if _first_bad: + break + err = _check("query_pv_history", {"tag_names": tag_names}, _first_bad) + if err: return err conn = None try: limit = min(limit, 5000) @@ -1359,35 +1416,44 @@ _VALID_EVENT_TYPES = ("ALARM", "TRIP", "NORMAL", "RUN", "CHANGE") @mcp.tool() -async def find_tags(query: str, area: str | None = None, sub_area: str | None = None, top_k: int = 20) -> str: +async def find_tags(query: str | None = None, area: str | None = None, sub_area: str | None = None, top_k: int = 20) -> str: """태그 검색 — base_tag/설명(desc)/area/sub_area 통합 검색 (v_tag_summary 뷰 기반). 사용 시점: 사용자가 "온도", "Tower 1 압력", "운전 중인 펌프" 같은 자연어로 태그를 지칭할 때 실제 base_tag(예: 'ti-6101', 'p-6102')를 역으로 찾기 위해. + sub_area만 지정하면 해당 area 전체 태그를 반환한다. get_tag_metadata와 차이: 단순 tagname LIKE만 보지 않고 description/area에도 매칭하며, 현재 PV/SP/OP/description/area/sub_area를 함께 반환. Args: - query: 검색어 (base_tag 또는 description 부분 일치, 대소문자 무시) + query: (선택) 검색어 (base_tag 또는 description 부분 일치, 대소문자 무시). + 생략하면 sub_area/area 조건만으로 전체 조회. area: (선택) area 필터 (예: 'P6'). "P6-1"처럼 '-'가 있으면 sub_area로 자동 처리. sub_area: (선택) sub_area 필터 (예: 'P6-1'). 공용 태그("P6-1,P6-2")도 매칭됨. + query 생략 시 해당 sub_area 전체 태그 반환. top_k: 반환 태그 수 (기본 20, 최대 100) Returns: JSON: { success, query, count, tags: [{base_tag, pv, sp, op, description, area, sub_area}] } """ + err = _check("find_tags", {"query": query, "area": area, "sub_area": sub_area}, + validate_area(area), validate_area(sub_area, "sub_area")) + if err: return err conn = None try: top_k = max(1, min(top_k, 100)) - q = f"%{query.strip()}%" # sub_area 명시 파라미터 우선, 없으면 area가 "P6-1" 형식이면 sub_area로 간주 effective_sub = sub_area or (area if (area and "-" in area) else None) effective_area = None if (area and "-" in area) else area - conds = ["(base_tag ILIKE %s OR description ILIKE %s)"] - params: list = [q, q] + conds: list[str] = [] + params: list = [] + if query: + q = f"%{query.strip()}%" + conds.append("(base_tag ILIKE %s OR description ILIKE %s)") + params.extend([q, q]) if effective_sub: conds.append("%s = ANY(string_to_array(sub_area, ','))") params.append(effective_sub) @@ -1395,6 +1461,8 @@ async def find_tags(query: str, area: str | None = None, sub_area: str | None = # v_tag_summary.area는 '{12 | P6 | }' 원본 형식이므로 코드만 추출해 비교 conds.append("trim(split_part(area, '|', 2)) = %s") params.append(effective_area) + if not conds: + conds.append("TRUE") sql = ( "SELECT base_tag, pv, sp, op, description, area, sub_area " @@ -1441,6 +1509,11 @@ async def trace_connections(start_tag: str, direction: str = "downstream", max_d - from_tag/to_tag에 쉼표가 여러 개면 병렬 펌프·라인 → 모두 경로에 포함됨(누락 없음). - live_state: 해당 태그의 실시간 상태(예: R-RUN/L-STOP). 병렬 펌프 중 R-RUN/L-RUN이 현재 공급원. """ + err = _check("trace_connections", {"start_tag": start_tag, "direction": direction, "max_depth": max_depth}, + validate_tag(start_tag, _get_db_connection_sync), + validate_direction(direction), + validate_max_depth(max_depth)) + if err: return err conn = None try: start_tag = start_tag.strip().upper() @@ -1596,6 +1669,10 @@ async def query_events( Returns: JSON: { success, count, time_range, events: [...] } """ + err = _check("query_events", {"tag_name": tag_name, "area": area}, + validate_tag(tag_name, _get_db_connection_sync) if tag_name else None, + validate_area(area)) + if err: return err if event_type and event_type.upper() not in _VALID_EVENT_TYPES: return json.dumps({ "success": False, @@ -1669,6 +1746,8 @@ async def active_alarms(area: str | None = None, limit: int = 100) -> str: Returns: JSON: { success, count, alarms: [{tag_name, event_type, since, prev_state_duration_s, area, ...}] } """ + err = _check("active_alarms", {"area": area}, validate_area(area)) + if err: return err conn = None try: limit = max(1, min(limit, 500)) @@ -1737,6 +1816,8 @@ async def summarize_events( Returns: JSON: { success, summary, stats: {by_type, by_area, count} } """ + err = _check("summarize_events", {"area": area}, validate_area(area)) + if err: return err max_events = max(10, min(max_events, 500)) raw = await query_events(event_type=event_type, area=area, since=since, limit=max_events) parsed = json.loads(raw) @@ -1822,6 +1903,8 @@ async def generate_status_report(area: str | None = None, hours: int = 24) -> st Returns: JSON: { success, report, sections: {active_alarms, recent_events, by_type}, generated_at } """ + err = _check("generate_status_report", {"area": area, "hours": hours}, validate_area(area)) + if err: return err hours = max(1, min(hours, 168)) since_iso = None # query_events가 24h 기본을 쓰지만, hours로 명시 전달 from datetime import datetime, timezone, timedelta diff --git a/mcp-server/worker/sql_prompt.py b/mcp-server/worker/sql_prompt.py new file mode 100644 index 0000000..0942744 --- /dev/null +++ b/mcp-server/worker/sql_prompt.py @@ -0,0 +1,141 @@ +"""NL2SQL 프롬프트 단일 소스 (production + eval 공유). + +`nl2sql_worker._generate_sql` 와 `eval/run_eval.py` 가 **동일 프롬프트**를 쓰도록 여기서 정의한다. +순수 문자열 상수만 — 무거운 의존성 없음(import 안전). 프롬프트를 고칠 땐 여기만 고치면 됨. +(참고: server.py 에도 별도 _DB_SCHEMA 사본이 있음 — 추후 통합 대상) +""" + +# DB 스키마 +DB_SCHEMA = """ +PostgreSQL 시계열 데이터베이스 스키마 + +테이블: pid_equipment (P&ID 추출 장비/계기) + tag_no TEXT - 태그번호 (예: FIC-6113, FT-6113) + category TEXT - 'instrument' / 'power_equipment' / 'storage_equipment' / ... + tag_dcs BOOL - TRUE=DCS 함수블록(FIC/TIC/PIC류), FALSE=현장 물리 계기(FT/FCV류) + tag_class TEXT - 'field'(현장) / 'system'(DCS) — tag_dcs 기반 + instrument_type TEXT - ISA prefix (FT/FIC/P 등) + from_tag TEXT - 연결 상류 태그 + to_tag TEXT - 연결 하류 태그 +※ DCS 태그: SELECT WHERE tag_dcs=TRUE, 현장 계기: WHERE tag_dcs=FALSE AND category='instrument' + +테이블: history_table (시계열 이력) + tagname TEXT - 태그명 (모두 소문자, 예: 'ficq-6113.pv') — 대소문자 구분 + node_id TEXT - OPC UA 노드 ID + value TEXT - 측정값, 수치 연산 시 ::double precision 캐스트 필요 + recorded_at TIMESTAMPTZ - 기록 시각(UTC), 스냅샷 주기 약 60초 + +테이블: realtime_table (실시간 최신값) + tagname TEXT - 태그명 (모두 소문자) + node_id TEXT - OPC UA 노드 ID + livevalue TEXT - 현재값 + timestamp TIMESTAMPTZ - 최종 갱신 시각 + +테이블: tag_metadata (태그 메타데이터 - 변경 드묾) + base_tag TEXT - 기본 태그명 (예: 'ficq-6101', 'xv-6124') + attribute TEXT - 속성명 ('desc', 'area') + value TEXT - 메타데이터 값 + node_id TEXT - OPC UA 노드 ID + loaded_at TIMESTAMPTZ - 마지막 로드 시각 + +뷰: v_tag_summary (실시간값 + 메타데이터 통합 뷰) + base_tag TEXT - 기본 태그명 + pv TEXT - 현재 프로세스 값 + sp TEXT - 설정값 + op TEXT - 출력값 + instate0 TEXT - 상태 비트 0 (true/false) + instate1 TEXT - 상태 비트 1 (true/false) + instate2 TEXT - 상태 비트 2 (true/false) + description TEXT - 장비 설명 (tag_metadata.desc) + area TEXT - 소속 플랜트 (tag_metadata.area) + +뷰: v_plant_running_state (area별 펌프 운전 판정 — "어떤 플랜트가 운전 중" 질문 1순위) + area_code TEXT - 정규화 area (예: P3, P4, P5, P6, P8) + status TEXT - 'RUNNING' / 'TRIPPED' / 'STOPPED' (펌프 1대라도 RUN이면 RUNNING) + running_pumps INT - R-RUN/L-RUN 펌프 수 + tripped_pumps INT - R-TRIP/L-TRIP 펌프 수 + stopped_pumps INT - R-STOP/L-STOP 펌프 수 + total_pumps INT - 펌프 enum 보유 태그 수 + running_pump_tags TEXT[] - 현재 RUN 상태 펌프 base_tag 배열 + ※ "운전 중인 플랜트/펌프", "트립 펌프" 류 질문은 이 뷰를 직접 SELECT (펌프 상태 SQL 직접 작성 금지) + ※ 결과에 없는 area = 펌프 미등록 → 운전 여부 단정 금지. 이 뷰는 area 레벨(sub_area 없음) + +뷰: v_plant_running_state_corroborated (펌프별 실질 운전 — 유량/진공 교차검증, sub_area 지원) + base_tag TEXT - 펌프 base_tag (예: 'p-6102', 'vp-6117') + area_code TEXT - 정규화 area + sub_area TEXT - 세부 area (예: 'P6-1'; 공용은 'P6-1,P6-2'). 필터는 LIKE '%P6-1%' + corroborated_status TEXT - CONFIRMED_RUNNING / SUSPICIOUS_RUNNING / STALE / INDETERMINATE_RUNNING / STOPPED / TRIPPED + flow_kg_hr DOUBLE PRECISION - 연결 유량(kg/hr) + vacuum_torr DOUBLE PRECISION - 연결 진공압(torr=mmHg) + ※ "6-1차/6-2차" 등 sub_area 필터가 필요한 질문은 **반드시 이 뷰** 사용 (아래 agg/기본뷰는 sub_area 없음) + +뷰: v_instrument_range (계기 단위/레인지 — tag_metadata에서 추출) + base_tag TEXT - 기본 태그명, 접미사 없음 (예: 'ficq-6113', 'pica-6111') + unit TEXT - 단위 (예: 'kg/hr', 'mmHg') + eu_lo DOUBLE PRECISION - 레인지 하한 + eu_hi DOUBLE PRECISION - 레인지 상한 + ※ 계기 레인지/상하한/단위 질문에 사용. base_tag는 '.pv' 등 접미사를 떼고 매칭 + +참고(직접 쓰지 말 것): v_plant_running_state_agg 도 있으나 area 레벨 집계라 sub_area가 없음. +sub_area 질문엔 위 v_plant_running_state_corroborated 를 사용. + +새로운 태그 타입: + - 아날로그: ficq-6101.pv/sp/op (Double) + - 디지털 XV: xv-6124.pv/op (Int32), xv-6124.instate0~7 (Boolean) + - Pump: p-6102.pv/op (Int32), p-6102.instate0~7 (Boolean) + - 메타데이터: desc (String), area (Enum) + +BCD 상태 조회 팁: + - instate0~7은 Boolean (true/false) + - pv 값이 EnumValueType 형식인 경우 `{코드 | DisplayName | }`에서 DisplayName으로 상태 확인 가능 + - v_tag_summary 뷰를 사용하면 실시간값+메타데이터 한 번에 조회 가능 + +N분 간격 집계 공식 (time_bucket 금지, date_trunc 사용): + 1분 버킷: date_trunc('minute', recorded_at) AS bucket + 2분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120) AS bucket + 5분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/300)*300) AS bucket + 10분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/600)*600) AS bucket + N분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)) AS bucket + +예시 (2분 간격, 여러 태그, KST 표시): + SELECT to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120) AT TIME ZONE 'Asia/Seoul' AS bucket, + tagname, AVG(value::double precision) AS avg_val + FROM history_table + WHERE tagname IN ('tag1', 'tag2') + AND recorded_at >= NOW() - INTERVAL '3 hours' + GROUP BY to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120), tagname + ORDER BY to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120), tagname + +규칙: + - SELECT만 허용 (INSERT/UPDATE/DELETE/DROP 등 불가) + - tagname은 모두 소문자로 정확히 입력 + - value 컬럼은 TEXT이므로 집계 시 ::double precision 캐스트 필수 + - time_bucket 함수 사용 금지 — 위의 to_timestamp/FLOOR/EPOCH 공식 사용 +""" + +# SQL 생성 system 프롬프트 (nl2sql_worker._generate_sql 와 동일) +SQL_SYSTEM_PROMPT = ( + "You are a PostgreSQL SQL expert.\n" + "Convert the user's question into a SELECT SQL using the schema below.\n" + "IMPORTANT rules:\n" + "- Use ONLY PostgreSQL syntax. No DATE_FORMAT, no INTERVAL N DAY.\n" + "- Time column is 'recorded_at' (TIMESTAMPTZ). Do NOT use 'timestamp'.\n" + "- NEVER use time_bucket(). For N-minute buckets use to_timestamp/FLOOR/EPOCH formula.\n" + "- INTERVAL rule:\n" + " * If the question specifies an interval (e.g. '2분 간격', '5-minute interval'):\n" + " use: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)) AS bucket\n" + " with GROUP BY bucket, tagname and AVG(value::double precision) AS avg_val\n" + " * If NO interval is specified: SELECT recorded_at, tagname, value — NO GROUP BY.\n" + "- Current year is 2026. '4월 27일' means 2026-04-27.\n" + "- All times in DB are UTC. Korean input is KST (UTC+9). Convert KST→UTC for WHERE: KST 12:00 = UTC 03:00.\n" + "- Display times in KST: always apply AT TIME ZONE 'Asia/Seoul' on time columns in SELECT.\n" + " * Non-aggregated: SELECT recorded_at AT TIME ZONE 'Asia/Seoul' AS recorded_at, ...\n" + " * Aggregated bucket: GROUP BY the raw UTC expression, then convert only in SELECT:\n" + " SELECT to_timestamp(...) AT TIME ZONE 'Asia/Seoul' AS bucket, AVG(...) AS avg_val\n" + " FROM ... GROUP BY to_timestamp(...), tagname ORDER BY to_timestamp(...), tagname\n" + "- value column is TEXT; cast with ::double precision only when aggregating.\n" + "- All tagnames are lowercase (e.g. 'ficq-6113.pv'). Match exactly.\n" + "- PostgreSQL LIKE: dot has no special meaning, no escaping needed.\n" + "- Return ONLY the SQL statement. No explanation, no markdown.\n\n" + f"{DB_SCHEMA}" +) diff --git a/prompts/plant_context.md b/prompts/plant_context.md index 7ef997d..3176768 100644 --- a/prompts/plant_context.md +++ b/prompts/plant_context.md @@ -327,3 +327,28 @@ AND from_tag IN ('FT-6118','P-6118',...); - 설명할 때 "이 장비는 2개 경로가 있음"이라고 명시 - OR 병합된 값("A, B" 같은)은 없음 — 각 행은 단일 경로 - `from_at`, `to_at`은 연결 지점 상세 (예: "C-6111 중상부 제품 노즐") + +--- + +## pid_equipment.tag_dcs — 현장 계기 vs DCS 함수블록 구별 + +- **tag_dcs = TRUE**: DCS 내부 함수블록 (FIC, TIC, PIC, LIC, FY, TY, PY, LY 등 compound형 포함) + - 물리 기기 없음. Experion DB 포인트로만 존재 + - 예: FIC-6113(유량제어기), TIC-6201(온도제어기) +- **tag_dcs = FALSE**: 현장 물리 계기 (FT, PT, LT, FCV, PSV, XV 등) + - P&ID 도면에 기기 심벌로 표시되는 실물. Experion 연결 여부 무관하게 field + - 예: FT-6113(유량전송기), FCV-6113(제어밸브) + +### 쿼리 예시 + +```sql +-- DCS 태그 수 +SELECT COUNT(*) FROM pid_equipment WHERE tag_dcs = TRUE; + +-- 현장 계기 목록 +SELECT tag_no, instrument_type FROM pid_equipment +WHERE tag_dcs = FALSE AND category = 'instrument'; + +-- 특정 태그 DCS 여부 확인 +SELECT tag_no, tag_dcs, tag_class FROM pid_equipment WHERE tag_no = 'FIC-6113'; +``` diff --git a/src/Core/Application/DTOs/PidPrefixRuleDto.cs b/src/Core/Application/DTOs/PidPrefixRuleDto.cs index 5e44695..53a705e 100644 --- a/src/Core/Application/DTOs/PidPrefixRuleDto.cs +++ b/src/Core/Application/DTOs/PidPrefixRuleDto.cs @@ -4,6 +4,7 @@ public record PidPrefixRuleDto( int Id, string Prefix, string Category, + bool TagDcs, string? Description, int SortOrder, DateTime CreatedAt); @@ -11,11 +12,13 @@ public record PidPrefixRuleDto( public record CreatePidPrefixRuleRequest( string Prefix, string Category, - string? Description, + bool TagDcs = false, + string? Description = null, int SortOrder = 0); public record UpdatePidPrefixRuleRequest( string Prefix, string Category, - string? Description, + bool TagDcs = false, + string? Description = null, int SortOrder = 0); diff --git a/src/Core/Application/Services/PidExtractorService.cs b/src/Core/Application/Services/PidExtractorService.cs index f4abc95..54b3cc3 100644 --- a/src/Core/Application/Services/PidExtractorService.cs +++ b/src/Core/Application/Services/PidExtractorService.cs @@ -88,7 +88,8 @@ public class PidExtractorService : IPidExtractorService : await FindFallbackTagAsync(item.TagNo); var category = await MatchCategoryAsync(item.TagNo); - var tagClass = ClassifyTagClass(item.TagNo, category, experionTag != null); + var tagDcs = await ResolveTagDcsAsync(item.TagNo); + var tagClass = ClassifyTagClass(item.TagNo, category, tagDcs, experionTag != null); var newItem = new PidEquipment { @@ -101,6 +102,7 @@ public class PidExtractorService : IPidExtractorService Confidence = item.Confidence, ExperionTagId = experionTag?.Id, Category = category, + TagDcs = tagDcs, TagClass = tagClass, ExtractedAt = DateTime.UtcNow, UpdatedAt = DateTime.UtcNow @@ -532,9 +534,10 @@ public class PidExtractorService : IPidExtractorService worksheet.Cells[1, 14].Value = "From_at"; worksheet.Cells[1, 15].Value = "To_at"; worksheet.Cells[1, 16].Value = "태그분류"; - worksheet.Cells[1, 17].Value = "id"; + worksheet.Cells[1, 17].Value = "id"; // 안정 키(라운드트립 매칭용) — col17 고정 + worksheet.Cells[1, 18].Value = "DCS태그"; // tag_dcs: DCS 함수블록 여부 - using var headerRange = worksheet.Cells[1, 1, 1, 17]; + using var headerRange = worksheet.Cells[1, 1, 1, 18]; headerRange.Style.Font.Bold = true; headerRange.Style.Fill.PatternType = OfficeOpenXml.Style.ExcelFillStyle.Solid; headerRange.Style.Fill.BackgroundColor.SetColor(System.Drawing.Color.LightGray); @@ -566,6 +569,7 @@ public class PidExtractorService : IPidExtractorService _ => "" }; worksheet.Cells[row, 17].Value = item.Id; // 안정 키(라운드트립 매칭용) + worksheet.Cells[row, 18].Value = item.TagDcs ? "DCS" : "현장"; row++; } @@ -620,8 +624,11 @@ public class PidExtractorService : IPidExtractorService StringComparison.Ordinal)) continue; // col17 헤더가 "id" 면 안정 키 매칭, 아니면 옛 포맷(태그번호 매칭)으로 폴백 - bool hasIdCol = string.Equals(ws.Cells[1, 17].Text?.Trim(), "id", - StringComparison.Ordinal); + bool hasIdCol = string.Equals(ws.Cells[1, 17].Text?.Trim(), "id", + StringComparison.Ordinal); + // col18 헤더가 "DCS태그" 면 tag_dcs 읽기 + bool hasDcsCol = string.Equals(ws.Cells[1, 18].Text?.Trim(), "DCS태그", + StringComparison.Ordinal); sheets++; for (int r = 2; r <= ws.Dimension.End.Row; r++) @@ -673,6 +680,12 @@ public class PidExtractorService : IPidExtractorService e.FromAt = fromAt; e.ToAt = toAt; e.TagClass = tagClass; + // col18: DCS태그 (DCS=true, 현장=false). 헤더 없는 옛 파일은 기존 값 유지. + if (hasDcsCol) + { + var dcsVal = Norm(ws, r, 18); + e.TagDcs = dcsVal == "DCS"; + } // From/To 를 채운 행만 잠금(사람이 연결을 교정한 행). // 둘 다 비우면 잠금 해제 → 연결분석이 다시 도출 가능. e.ConnectionLocked = fromTag != null || toTag != null; @@ -794,39 +807,39 @@ public class PidExtractorService : IPidExtractorService tagNo.StartsWith(r.Prefix, StringComparison.OrdinalIgnoreCase))?.Category; } - // 태그 선두 알파벳 (첫 비알파벳 이전) — ISA 기능코드 후보 - private static readonly Regex _tagPrefixRe = new(@"^([A-Za-z]+)", RegexOptions.Compiled); - - // 제어시스템 함수 후속문자: I(지시) C(제어) A(알람) Q(적산) Y(연산) R(기록) - // → 이 함수들은 현장 기기가 아니라 DCS/SCADA/PLC 내부에서 구현됨 - private static readonly HashSet _systemFuncLetters = - ['I', 'C', 'A', 'Q', 'Y', 'R']; + /// + /// prefix rule에서 tag_dcs 값을 조회. StartsWith 매칭으로 compound형(FICQ/FICA 등) 자동 커버. + /// 가장 긴 prefix 우선(FIC보다 FICQ가 더 구체적이면 FICQ rule 우선). + /// + private async Task ResolveTagDcsAsync(string tagNo) + { + var rules = await GetRulesCachedAsync(); + var upper = tagNo.ToUpperInvariant(); + var rule = rules + .Where(r => upper.StartsWith(r.Prefix.ToUpperInvariant())) + .OrderByDescending(r => r.Prefix.Length) // 가장 긴 prefix 우선 + .FirstOrDefault(); + return rule?.TagDcs ?? false; + } /// - /// 계기(instrument) 하위 분류. experion(realtime) 연결을 1순위 확정 신호로 사용. - /// experion 연결됨 → system (DCS/SCADA DB 포인트, ground truth) - /// ISA 후속문자에 I/C/A/Q/Y/R → system (제어시스템 함수 블록) - /// 그 외(1차 측정요소·전송·게이지·기계식) → field (현장 실물 기기) + /// 계기(instrument) 하위 분류. + /// tag_dcs(prefix rule 기반)를 우선 신호로 사용: + /// tag_dcs=TRUE → system (DCS 함수블록 확정. FT 전송기가 Experion에 연결돼도 field) + /// tag_dcs=FALSE → field (현장 물리 계기) + /// hasExperionLink는 더 이상 TagClass 결정에 사용하지 않음 + /// (ExperionTagId FK로 연결 정보는 보존됨). /// instrument 가 아닌 카테고리는 null. /// - private static string? ClassifyTagClass(string tagNo, string? category, bool hasExperionLink) + private static string? ClassifyTagClass(string tagNo, string? category, bool tagDcs, bool hasExperionLink) { if (category != PidEquipment.CategoryInstrument) return null; - if (hasExperionLink) - return PidEquipment.TagClassSystem; - - var m = _tagPrefixRe.Match(tagNo); - if (m.Success && m.Groups[1].Value.Length >= 2) - { - var prefix = m.Groups[1].Value.ToUpperInvariant(); - // 첫 글자(측정변수) 이후 후속문자에 제어시스템 함수가 있으면 system - for (int i = 1; i < prefix.Length; i++) - if (_systemFuncLetters.Contains(prefix[i])) - return PidEquipment.TagClassSystem; - } + // tag_dcs=TRUE: prefix rule이 ground truth → system 확정 + if (tagDcs) return PidEquipment.TagClassSystem; + // tag_dcs=FALSE: 현장 계기 → field (hasExperionLink 무관) return PidEquipment.TagClassField; } @@ -846,6 +859,7 @@ public class PidExtractorService : IPidExtractorService { Prefix = request.Prefix.Trim(), Category = request.Category, + TagDcs = request.TagDcs, Description = request.Description?.Trim(), SortOrder = request.SortOrder, CreatedAt = DateTime.UtcNow, @@ -863,6 +877,7 @@ public class PidExtractorService : IPidExtractorService if (rule == null) return null; rule.Prefix = request.Prefix.Trim(); rule.Category = request.Category; + rule.TagDcs = request.TagDcs; rule.Description = request.Description?.Trim(); rule.SortOrder = request.SortOrder; rule.UpdatedAt = DateTime.UtcNow; @@ -898,8 +913,10 @@ public class PidExtractorService : IPidExtractorService var category = await MatchCategoryAsync(item.TagNo); if (category != null) { + var tagDcs = await ResolveTagDcsAsync(item.TagNo); item.Category = category; - item.TagClass = ClassifyTagClass(item.TagNo, category, item.ExperionTagId != null); + item.TagDcs = tagDcs; + item.TagClass = ClassifyTagClass(item.TagNo, category, tagDcs, item.ExperionTagId != null); item.UpdatedAt = DateTime.UtcNow; total++; } @@ -919,7 +936,9 @@ public class PidExtractorService : IPidExtractorService foreach (var item in batch) { - item.TagClass = ClassifyTagClass(item.TagNo, item.Category, item.ExperionTagId != null); + var tagDcs = await ResolveTagDcsAsync(item.TagNo); + item.TagDcs = tagDcs; + item.TagClass = ClassifyTagClass(item.TagNo, item.Category, tagDcs, item.ExperionTagId != null); item.UpdatedAt = DateTime.UtcNow; total++; } diff --git a/src/Core/Domain/Entities/PidEquipment.cs b/src/Core/Domain/Entities/PidEquipment.cs index f920f65..7d97654 100644 --- a/src/Core/Domain/Entities/PidEquipment.cs +++ b/src/Core/Domain/Entities/PidEquipment.cs @@ -77,6 +77,10 @@ public class PidEquipment [Column("tag_class")] public string? TagClass { get; set; } + /// prefix rule 기반 DCS 함수블록 여부: TRUE=FIC/TIC류 DCS 블록, FALSE=FT/FCV류 현장 계기 + [Column("tag_dcs")] + public bool TagDcs { get; set; } = false; + [MaxLength(100)] [Column("role")] public string? Role { get; set; } diff --git a/src/Core/Domain/Entities/PidPrefixRule.cs b/src/Core/Domain/Entities/PidPrefixRule.cs index 41199ee..095f7be 100644 --- a/src/Core/Domain/Entities/PidPrefixRule.cs +++ b/src/Core/Domain/Entities/PidPrefixRule.cs @@ -27,6 +27,10 @@ public class PidPrefixRule [Column("sort_order")] public int SortOrder { get; set; } + /// DCS 함수블록 여부: TRUE=FIC/TIC/PIC/FY 등 DCS 내부 블록, FALSE=FT/FCV 등 현장 계기 + [Column("tag_dcs")] + public bool TagDcs { get; set; } = false; + [Column("created_at")] public DateTime CreatedAt { get; set; } = DateTime.UtcNow; diff --git a/src/Infrastructure/Database/ExperionDbContext.cs b/src/Infrastructure/Database/ExperionDbContext.cs index b868faa..47302a1 100644 --- a/src/Infrastructure/Database/ExperionDbContext.cs +++ b/src/Infrastructure/Database/ExperionDbContext.cs @@ -591,6 +591,36 @@ public class ExperionDbService : IExperionDbService await _ctx.Database.ExecuteSqlRawAsync( "ALTER TABLE pid_equipment ADD COLUMN IF NOT EXISTS tag_class TEXT;"); + // ── tag_dcs 마이그레이션 (DCS 함수블록 vs 현장 계기 구별) ───────────────── + // Step 1: pid_prefix_rules 컬럼 추가 + await _ctx.Database.ExecuteSqlRawAsync(""" + ALTER TABLE pid_prefix_rules + ADD COLUMN IF NOT EXISTS tag_dcs BOOLEAN NOT NULL DEFAULT FALSE; + """); + + // Step 2: DCS prefix 마킹 (기본형 — compound형은 Step 4 StartsWith로 커버) + await _ctx.Database.ExecuteSqlRawAsync(""" + UPDATE pid_prefix_rules + SET tag_dcs = TRUE + WHERE prefix IN ('FIC','TIC','PIC','LIC','FY','TY','PY','LY','FV','TV','PV','LV'); + """); + + // Step 3: pid_equipment 컬럼 추가 + await _ctx.Database.ExecuteSqlRawAsync(""" + ALTER TABLE pid_equipment + ADD COLUMN IF NOT EXISTS tag_dcs BOOLEAN NOT NULL DEFAULT FALSE; + """); + + // Step 4: 기존 행 backfill — StartsWith 매칭 (FICQ/FICA 등 compound형 자동 포함) + await _ctx.Database.ExecuteSqlRawAsync(""" + UPDATE pid_equipment pe + SET tag_dcs = TRUE + FROM pid_prefix_rules pr + WHERE pe.instrument_type LIKE (pr.prefix || '%') + AND pr.tag_dcs = TRUE; + """); + // ───────────────────────────────────────────────────────────────────────── + // pid_equipment 좌표/파일명 컬럼 await _ctx.Database.ExecuteSqlRawAsync(""" DO $$ diff --git a/src/Web/Controllers/PidController.cs b/src/Web/Controllers/PidController.cs index 3cc8fbe..b9e12f1 100644 --- a/src/Web/Controllers/PidController.cs +++ b/src/Web/Controllers/PidController.cs @@ -274,6 +274,7 @@ public class PidController : ControllerBase id = r.Id, prefix = r.Prefix, category = r.Category, + tagDcs = r.TagDcs, description = r.Description, sortOrder = r.SortOrder, createdAt = r.CreatedAt diff --git a/src/Web/wwwroot/js/pid.js b/src/Web/wwwroot/js/pid.js index f272e59..0277e0d 100644 --- a/src/Web/wwwroot/js/pid.js +++ b/src/Web/wwwroot/js/pid.js @@ -404,6 +404,10 @@ async function pidRefreshPrefixRules() { + @@ -414,6 +418,10 @@ async function pidRefreshPrefixRules() { + @@ -435,6 +443,7 @@ async function pidAddPrefixRule(category) { const prefix = row.querySelector('.pid-cat-prefix-input').value.trim(); const desc = row.querySelector('.pid-cat-desc-input').value.trim(); const order = parseInt(row.querySelector('.pid-cat-order-input').value) || 10; + const tagDcs = row.querySelector('.pid-cat-dcs-input')?.checked ?? false; if (!prefix) { alert('Prefix를 입력하세요.'); return; } @@ -442,7 +451,7 @@ async function pidAddPrefixRule(category) { const res = await fetch('/api/pid/prefix-rules', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ prefix, category, description: desc, sortOrder: order }) + body: JSON.stringify({ prefix, category, tagDcs, description: desc, sortOrder: order }) }); if (!res.ok) { const err = await res.json().catch(() => ({ error: res.statusText })); @@ -464,6 +473,7 @@ async function pidUpdatePrefixRule(id, btn) { const prefix = row.querySelector('.pid-cat-prefix-input').value.trim(); const desc = row.querySelector('.pid-cat-desc-input').value.trim(); const order = parseInt(row.querySelector('.pid-cat-order-input').value) || 10; + const tagDcs = row.querySelector('.pid-cat-dcs-input')?.checked ?? false; if (!prefix) { alert('Prefix를 입력하세요.'); return; } @@ -471,7 +481,7 @@ async function pidUpdatePrefixRule(id, btn) { const res = await fetch(`/api/pid/prefix-rules/${id}`, { method: 'PUT', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ prefix, category: cat, description: desc, sortOrder: order }) + body: JSON.stringify({ prefix, category: cat, tagDcs, description: desc, sortOrder: order }) }); if (!res.ok) { const err = await res.json().catch(() => ({ error: res.statusText }));