feat: pid_equipment에 tag_dcs BOOLEAN 추가 — DCS 함수블록 vs 현장 계기 구별

## 변경 내용

### DB 스키마 (Boot DDL)
- pid_prefix_rules.tag_dcs BOOLEAN NOT NULL DEFAULT FALSE 추가
- DCS prefix 시드 마킹: FIC/TIC/PIC/LIC/FY/TY/PY/LY/FV/TV/PV/LV → tag_dcs=TRUE
- pid_equipment.tag_dcs BOOLEAN NOT NULL DEFAULT FALSE 추가
- 기존 행 backfill: instrument_type LIKE prefix% StartsWith 매칭 (FICQ/FICA 자동 포함)

### C# 도메인/서비스
- PidPrefixRule: TagDcs bool 프로퍼티 추가
- PidEquipment: TagDcs bool 프로퍼티 추가
- PidPrefixRuleDto (3개 record): TagDcs 추가
- PidExtractorService:
  - ResolveTagDcsAsync() 신규 — StartsWith 매칭, 가장 긴 prefix 우선
  - ClassifyTagClass() 재설계 — tagDcs 우선 (hasExperionLink 제거)
  - 추출 저장 시 TagDcs 채우기
  - ExportToExcelAsync() col18=DCS태그 추가 (col17=id 보호)
  - ImportFromExcelAsync() col18 읽기 (hasDcsCol 감지)
  - ApplyCategoriesToExistingAsync() 두 루프에 tag_dcs backfill 추가
  - CreatePrefixRuleAsync/UpdatePrefixRuleAsync TagDcs 저장

### Web Controller
- PidController.GetPrefixRules: tagDcs: r.TagDcs 추가

### Web UI (pid.js)
- PREFIX 그룹 각 행에 DCS/현장 배지 + 체크박스
- Add/Update body에 tagDcs 전송

### MCP/LLM
- server.py: _DCS_PREFIXES frozenset 추가
- _classify_pid_tag(): tag_dcs 반환 필드 추가
- _DB_SCHEMA: pid_equipment 테이블 설명 추가
- upsert_pid_connection: tag_dcs 파라미터 + UPDATE/INSERT SQL 수정
- sql_prompt.py: pid_equipment 테이블 추가
- prompts/plant_context.md: tag_dcs 설명 + 쿼리 예시 추가

## 설계 결정
- FT 전송기는 Experion 연결 여부와 무관하게 현장 계기 (tag_dcs=FALSE)
- tag_dcs=TRUE: prefix rule이 ground truth → system 확정
- hasExperionLink는 TagClass 결정에서 제거 (연결 정보는 ExperionTagId FK로 보존)
- compound prefix (FICQ/FICA): LIKE StartsWith 매칭으로 자동 커버

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
windpacer
2026-05-27 13:12:45 +09:00
parent 95ec160e98
commit c1d228d1f6
10 changed files with 375 additions and 55 deletions

View File

@@ -79,6 +79,12 @@ from pipeline.analyzer import PidAnalysisEngine
import networkx as nx
import asyncio
# ── Verifier (Phase B MVP — R1·R2·R4) ──
from verifier.validators import (
validate_tag, validate_area, validate_direction, validate_max_depth,
log_rejection, VerifierError,
)
# ── 임베딩 (Ollama) ───────────────────────────────────────────────────────────
async def _embed(text: str) -> list[float]:
@@ -248,20 +254,37 @@ _PID_TAG_RE = re.compile(r'^([A-Z]{1,4})-(\d{3,6})([A-Z])?$')
# DCS 함수블록 prefix frozenset — compound형 포함 (ISA _systemFuncLetters 기준)
# ⚠️ C# Boot DDL UPDATE 목록과 수동 동기화 필요 (향후 dcs_prefixes.py 분리 고려)
_DCS_PREFIXES: frozenset[str] = frozenset({
"FIC", "FICA", "FICQ", "FICR",
"TIC", "TICA", "TICQ",
"PIC", "PICA",
"LIC", "LICA",
"FY", "TY", "PY", "LY",
"FV", "TV", "PV", "LV",
})
def _classify_pid_tag(tag_no: str) -> dict:
"""tagNo의 prefix로 장비/계기 종류 분류 (좌표·LLM 사용 안 함)."""
m = _PID_TAG_RE.match(tag_no)
if not m:
return {"kind": "unknown", "prefix": None, "type": None}
return {"kind": "unknown", "prefix": None, "type": None, "tag_dcs": False}
prefix = m.group(1)
if prefix in _PID_EQUIPMENT_PREFIX:
return {"kind": "equipment", "prefix": prefix, "type": _PID_EQUIPMENT_PREFIX[prefix]}
return {"kind": "equipment", "prefix": prefix, "type": _PID_EQUIPMENT_PREFIX[prefix], "tag_dcs": False}
if 2 <= len(prefix) <= 4 and prefix[0] in _PID_INSTRUMENT_FIRST:
measure = _PID_INSTRUMENT_FIRST[prefix[0]]
mods = [_PID_INSTRUMENT_MODIFIER[c] for c in prefix[1:] if c in _PID_INSTRUMENT_MODIFIER]
type_name = (measure + " " + " ".join(mods)).strip()
return {"kind": "instrument", "prefix": prefix, "type": type_name}
return {"kind": "unknown", "prefix": prefix, "type": None}
return {
"kind": "instrument",
"prefix": prefix,
"type": type_name,
"tag_dcs": prefix in _DCS_PREFIXES,
}
return {"kind": "unknown", "prefix": prefix, "type": None, "tag_dcs": False}
def _parse_pid_lineno(token: str) -> dict | None:
@@ -619,6 +642,21 @@ async def _get_db_connection():
return await asyncio.to_thread(_connect)
def _get_db_connection_sync():
"""PostgreSQL DB 연결 획득 (sync — Verifier 내부용)."""
import psycopg
return psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
def _check(tool: str, params: dict, *errs) -> str | None:
"""첫 번째 비-None VerifierError를 로그 + JSON str 반환. 없으면 None."""
for e in errs:
if e:
log_rejection(tool, params, e)
return json.dumps(e.to_dict(), ensure_ascii=False)
return None
def _validate_sql(sql: str) -> tuple[bool, str]:
"""SQL 안전 검증 — SELECT/WITH만 허용, 위험 키워드 차단."""
if len(sql) > 4000:
@@ -660,6 +698,10 @@ Tables:
realtime_table(tagname TEXT, livevalue TEXT, timestamp TIMESTAMPTZ)
tag_metadata(base_tag TEXT, attribute TEXT, value TEXT)
event_history_table(tagname TEXT, prev_value TEXT, curr_value TEXT, event_type TEXT, event_time TIMESTAMPTZ, duration_seconds INT)
pid_equipment(tag_no TEXT, category TEXT, tag_dcs BOOL, tag_class TEXT, instrument_type TEXT, from_tag TEXT, to_tag TEXT)
-- tag_dcs=TRUE: DCS 함수블록(FIC/TIC/PIC류), FALSE: 현장 물리 계기(FT/FCV류)
-- tag_class: 'field'(현장) / 'system'(DCS) — tag_dcs 기반
-- from_tag(상류) → tag_no → to_tag(하류) 연결 추적
Views:
v_tag_summary(base_tag TEXT, pv TEXT, sp TEXT, op TEXT, description TEXT, area TEXT, sub_area TEXT)
@@ -974,11 +1016,12 @@ async def upsert_pid_connection(
role: str | None = None,
category: str | None = None,
tag_class: str | None = None,
tag_dcs: bool | None = None,
) -> str:
"""운전자가 작성한 from/to 연결 1건을 pid_equipment 에 멱등(idempotent) 반영.
⚠️ 오직 pid_equipment 테이블만, 아래 컬럼만 변경한다(나머지는 절대 안 건드림):
from_tag, to_tag, from_at, to_at, role, category, tag_class, connection_locked, updated_at
from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs, connection_locked, updated_at
equipment_name/instrument_type/line_number/pid_drawing_no/pos_x/pos_y/confidence/is_active 는 보존.
매칭 규칙(문서규칙 §5):
@@ -993,7 +1036,8 @@ async def upsert_pid_connection(
Args:
tag_no: 태그번호 (필수, 대소문자 무시 매칭).
from_tag/to_tag: 상류/하류 태그. 병렬이면 호출 측에서 "A, B" 콤마 병합해 전달(이 도구는 분리하지 않음).
from_at/to_at: 위치/서술 텍스트. role/category/tag_class: 그대로 저장(None이면 기존 유지).
from_at/to_at: 위치/서술 텍스트. role/category/tag_class/tag_dcs: 그대로 저장(None이면 기존 유지).
tag_dcs: TRUE=DCS 함수블록(FIC/TIC류), FALSE=현장 물리 계기(FT/FCV류). None이면 기존 유지.
Returns:
JSON { success, action(update_existing|update_filled|insert), id, tag_no, before, after }
@@ -1007,14 +1051,18 @@ async def upsert_pid_connection(
tag_no = _n(tag_no)
if not tag_no:
return json.dumps({"success": False, "error": "tag_no 가 비었습니다."}, ensure_ascii=False)
err = _check("upsert_pid_connection", {"tag_no": tag_no}, validate_tag(tag_no, _get_db_connection_sync))
if err: return err
from_tag, to_tag = _n(from_tag), _n(to_tag)
from_at, to_at = _n(from_at), _n(to_at)
role, category, tag_class = _n(role), _n(category), _n(tag_class)
# bool은 _n() 미사용 — None이면 기존 유지, 값이면 bool로 강제 변환
tag_dcs_val = bool(tag_dcs) if tag_dcs is not None else None
locked = (from_tag is not None) or (to_tag is not None)
_SNAP = ["tag_no", "from_tag", "to_tag", "from_at", "to_at", "role", "category", "tag_class", "connection_locked"]
_SNAP = ["tag_no", "from_tag", "to_tag", "from_at", "to_at", "role", "category", "tag_class", "tag_dcs", "connection_locked"]
def _snap(cur, _id):
cur.execute("""SELECT tag_no, from_tag, to_tag, from_at, to_at, role, category, tag_class, connection_locked
cur.execute("""SELECT tag_no, from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs, connection_locked
FROM pid_equipment WHERE id=%s""", (_id,))
r = cur.fetchone()
return dict(zip(_SNAP, r)) if r else None
@@ -1056,26 +1104,28 @@ async def upsert_pid_connection(
from_tag=%s, to_tag=%s, from_at=%s, to_at=%s, role=%s,
category=COALESCE(%s, category),
tag_class=COALESCE(%s, tag_class),
tag_dcs=COALESCE(%s, tag_dcs),
connection_locked=%s, updated_at=now()
WHERE id=%s""",
(from_tag, to_tag, from_at, to_at, role, category, tag_class, locked, target_id))
(from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs_val, locked, target_id))
else:
action = "insert"
before = None
# 같은 tag 기존 행에서 정적 메타 복사 (없으면 NULL)
cur.execute("""SELECT equipment_name, instrument_type, line_number, pid_drawing_no, category
cur.execute("""SELECT equipment_name, instrument_type, line_number, pid_drawing_no, category, tag_dcs
FROM pid_equipment WHERE lower(tag_no)=lower(%s) ORDER BY id LIMIT 1""", (tag_no,))
meta = cur.fetchone() or (None, None, None, None, None)
eq_name, inst_type, line_no, draw_no, ex_cat = meta
meta = cur.fetchone() or (None, None, None, None, None, None)
eq_name, inst_type, line_no, draw_no, ex_cat, ex_dcs = meta
cur.execute(
"""INSERT INTO pid_equipment
(tag_no, equipment_name, instrument_type, line_number, pid_drawing_no,
from_tag, to_tag, from_at, to_at, role, category, tag_class,
from_tag, to_tag, from_at, to_at, role, category, tag_class, tag_dcs,
connection_locked, confidence, is_active, extracted_at, updated_at)
VALUES (%s,%s,%s,%s,%s, %s,%s,%s,%s,%s, COALESCE(%s,%s),%s, %s, 0, TRUE, now(), now())
VALUES (%s,%s,%s,%s,%s, %s,%s,%s,%s,%s, COALESCE(%s,%s),%s, COALESCE(%s,%s,FALSE), %s, 0, TRUE, now(), now())
RETURNING id""",
(tag_no, eq_name, inst_type, line_no, draw_no,
from_tag, to_tag, from_at, to_at, role, category, ex_cat, tag_class, locked))
from_tag, to_tag, from_at, to_at, role, category, ex_cat, tag_class,
tag_dcs_val, ex_dcs, locked))
target_id = cur.fetchone()[0]
after = _snap(cur, target_id)
@@ -1108,6 +1158,13 @@ async def query_pv_history(tag_names: list[str], time_from: str, time_to: str, l
Returns:
JSON: { success, tag_names, time_range, limit, data }
"""
_first_bad = None
for t in (tag_names or []):
_first_bad = validate_tag(t, _get_db_connection_sync)
if _first_bad:
break
err = _check("query_pv_history", {"tag_names": tag_names}, _first_bad)
if err: return err
conn = None
try:
limit = min(limit, 5000)
@@ -1359,35 +1416,44 @@ _VALID_EVENT_TYPES = ("ALARM", "TRIP", "NORMAL", "RUN", "CHANGE")
@mcp.tool()
async def find_tags(query: str, area: str | None = None, sub_area: str | None = None, top_k: int = 20) -> str:
async def find_tags(query: str | None = None, area: str | None = None, sub_area: str | None = None, top_k: int = 20) -> str:
"""태그 검색 — base_tag/설명(desc)/area/sub_area 통합 검색 (v_tag_summary 뷰 기반).
사용 시점: 사용자가 "온도", "Tower 1 압력", "운전 중인 펌프" 같은 자연어로
태그를 지칭할 때 실제 base_tag(예: 'ti-6101', 'p-6102')를 역으로 찾기 위해.
sub_area만 지정하면 해당 area 전체 태그를 반환한다.
get_tag_metadata와 차이: 단순 tagname LIKE만 보지 않고 description/area에도
매칭하며, 현재 PV/SP/OP/description/area/sub_area를 함께 반환.
Args:
query: 검색어 (base_tag 또는 description 부분 일치, 대소문자 무시)
query: (선택) 검색어 (base_tag 또는 description 부분 일치, 대소문자 무시).
생략하면 sub_area/area 조건만으로 전체 조회.
area: (선택) area 필터 (예: 'P6'). "P6-1"처럼 '-'가 있으면 sub_area로 자동 처리.
sub_area: (선택) sub_area 필터 (예: 'P6-1'). 공용 태그("P6-1,P6-2")도 매칭됨.
query 생략 시 해당 sub_area 전체 태그 반환.
top_k: 반환 태그 수 (기본 20, 최대 100)
Returns:
JSON: { success, query, count, tags: [{base_tag, pv, sp, op, description, area, sub_area}] }
"""
err = _check("find_tags", {"query": query, "area": area, "sub_area": sub_area},
validate_area(area), validate_area(sub_area, "sub_area"))
if err: return err
conn = None
try:
top_k = max(1, min(top_k, 100))
q = f"%{query.strip()}%"
# sub_area 명시 파라미터 우선, 없으면 area가 "P6-1" 형식이면 sub_area로 간주
effective_sub = sub_area or (area if (area and "-" in area) else None)
effective_area = None if (area and "-" in area) else area
conds = ["(base_tag ILIKE %s OR description ILIKE %s)"]
params: list = [q, q]
conds: list[str] = []
params: list = []
if query:
q = f"%{query.strip()}%"
conds.append("(base_tag ILIKE %s OR description ILIKE %s)")
params.extend([q, q])
if effective_sub:
conds.append("%s = ANY(string_to_array(sub_area, ','))")
params.append(effective_sub)
@@ -1395,6 +1461,8 @@ async def find_tags(query: str, area: str | None = None, sub_area: str | None =
# v_tag_summary.area는 '{12 | P6 | }' 원본 형식이므로 코드만 추출해 비교
conds.append("trim(split_part(area, '|', 2)) = %s")
params.append(effective_area)
if not conds:
conds.append("TRUE")
sql = (
"SELECT base_tag, pv, sp, op, description, area, sub_area "
@@ -1441,6 +1509,11 @@ async def trace_connections(start_tag: str, direction: str = "downstream", max_d
- from_tag/to_tag에 쉼표가 여러 개면 병렬 펌프·라인 → 모두 경로에 포함됨(누락 없음).
- live_state: 해당 태그의 실시간 상태(예: R-RUN/L-STOP). 병렬 펌프 중 R-RUN/L-RUN이 현재 공급원.
"""
err = _check("trace_connections", {"start_tag": start_tag, "direction": direction, "max_depth": max_depth},
validate_tag(start_tag, _get_db_connection_sync),
validate_direction(direction),
validate_max_depth(max_depth))
if err: return err
conn = None
try:
start_tag = start_tag.strip().upper()
@@ -1596,6 +1669,10 @@ async def query_events(
Returns:
JSON: { success, count, time_range, events: [...] }
"""
err = _check("query_events", {"tag_name": tag_name, "area": area},
validate_tag(tag_name, _get_db_connection_sync) if tag_name else None,
validate_area(area))
if err: return err
if event_type and event_type.upper() not in _VALID_EVENT_TYPES:
return json.dumps({
"success": False,
@@ -1669,6 +1746,8 @@ async def active_alarms(area: str | None = None, limit: int = 100) -> str:
Returns:
JSON: { success, count, alarms: [{tag_name, event_type, since, prev_state_duration_s, area, ...}] }
"""
err = _check("active_alarms", {"area": area}, validate_area(area))
if err: return err
conn = None
try:
limit = max(1, min(limit, 500))
@@ -1737,6 +1816,8 @@ async def summarize_events(
Returns:
JSON: { success, summary, stats: {by_type, by_area, count} }
"""
err = _check("summarize_events", {"area": area}, validate_area(area))
if err: return err
max_events = max(10, min(max_events, 500))
raw = await query_events(event_type=event_type, area=area, since=since, limit=max_events)
parsed = json.loads(raw)
@@ -1822,6 +1903,8 @@ async def generate_status_report(area: str | None = None, hours: int = 24) -> st
Returns:
JSON: { success, report, sections: {active_alarms, recent_events, by_type}, generated_at }
"""
err = _check("generate_status_report", {"area": area, "hours": hours}, validate_area(area))
if err: return err
hours = max(1, min(hours, 168))
since_iso = None # query_events가 24h 기본을 쓰지만, hours로 명시 전달
from datetime import datetime, timezone, timedelta