- P&ID: 연결 분석 API, Prefix 규칙 관리, 카테고리 분류, DXF 그래프 빌드 - LLM: 대화 요약, tool card 영구 보존, 시계열 차트(uPlot), 에이전트 모드 - KB: 청크 미리보기, Field Instrument Inference, 인증/Qdrant 클라이언트 - MCP: 서버 기능 확장, 파이프라인 수정, timeout 개선 - Frontend: P&ID UI, LLM UI, KB UI, OPC UA Write 탭 추가 - 설정: AGENTS.md, plant_context, README, opencode.json 업데이트 - 정리: 진단 체크리스트 문서 삭제
330 lines
10 KiB
Python
330 lines
10 KiB
Python
"""DCS base_tag → 현장 계기 자동 유추 알고리즘."""
|
|
from __future__ import annotations
|
|
import re
|
|
|
|
from .rules import get_measurement, get_modifier, get_special_prefix
|
|
|
|
|
|
def split_tag(base_tag: str) -> tuple[str, str]:
|
|
"""'ficq-6101' → ('ficq', '6101') / 'xv-6124' → ('xv', '6124')."""
|
|
m = re.match(r"^([a-zA-Z]+)-?(.*)", base_tag)
|
|
if not m:
|
|
return base_tag, ""
|
|
return m.group(1).lower(), m.group(2)
|
|
|
|
|
|
def infer_instruments_for_base_tag(
|
|
base_tag: str,
|
|
data_points: list[str],
|
|
area: str,
|
|
) -> list[dict]:
|
|
"""
|
|
단일 base_tag에 대해 현장 계기 목록을 유추.
|
|
|
|
Returns:
|
|
instrument dict 리스트 (§3.1 스키마와 일치).
|
|
매칭 실패 시 1행 포함 (confidence=low, needs_review=True).
|
|
"""
|
|
head, loop = split_tag(base_tag)
|
|
if not loop:
|
|
loop = "unknown"
|
|
|
|
dp_set = set(data_points) if data_points else set()
|
|
|
|
# 규칙에서 어긋난 태그 → unmatched
|
|
# loop에 '_' 포함 (fica-3102_op 등) 또는 'esd' 포함 (lt-9113-lo-esd 등 시스템 포인트)
|
|
if "_" in loop or "esd" in loop.lower():
|
|
return _build_unmatched(base_tag, area, head, loop)
|
|
|
|
# 1. 특수 prefix 우선 (정확한 전체 일치)
|
|
sp = get_special_prefix(head)
|
|
if sp:
|
|
return _build_special(head, loop, area, sp, dp_set)
|
|
|
|
# 2. 첫 글자 = 측정량
|
|
first = head[0]
|
|
meas = get_measurement(first)
|
|
if not meas:
|
|
return _build_unmatched(base_tag, area, head, loop)
|
|
|
|
instruments = []
|
|
has_transmitter = False
|
|
has_controller = False
|
|
|
|
# DCS 내부 기능: controller, totalizer, alarm, switch는 현장 계기가 아님
|
|
_dcs_internal_roles = {"controller", "totalizer", "alarm", "switch"}
|
|
|
|
# 3. 수식어 글자별로 계기 생성
|
|
for letter in head[1:]:
|
|
mod = get_modifier(letter)
|
|
if not mod:
|
|
continue
|
|
if mod.get("virtual"):
|
|
continue # I, R은 가상
|
|
|
|
role = mod["role"]
|
|
|
|
# DCS 내부 기능은 instruments에서 제외
|
|
if role in _dcs_internal_roles:
|
|
if role == "controller":
|
|
has_controller = True
|
|
continue
|
|
|
|
inst = _build_instrument(first, meas, role, loop, area, mod, dp_set, base_tag)
|
|
instruments.append(inst)
|
|
|
|
if role == "transmitter":
|
|
has_transmitter = True
|
|
|
|
# 4. T 글자가 명시 안 됐어도 컨트롤러가 있으면 송신기 암시
|
|
if not has_transmitter and has_controller:
|
|
inst = _build_implicit_transmitter(first, meas, loop, area, dp_set, base_tag)
|
|
instruments.insert(0, inst)
|
|
has_transmitter = True
|
|
|
|
# 5. 컨트롤러 → 제어밸브 자동 생성 (auto_pair)
|
|
if has_controller:
|
|
inst = _build_paired_valve(first, meas, loop, area, dp_set, base_tag)
|
|
instruments.append(inst)
|
|
|
|
# [H-4 수정] 빈 리스트일 때 implicit transmitter 강제 추가
|
|
# pi-XXXX/ti-XXXX 등 "측정량 + I(virtual)"만 있는 경우 instruments가 비어 있음
|
|
if not instruments:
|
|
inst = _build_implicit_transmitter(first, meas, loop, area, dp_set, base_tag)
|
|
instruments.append(inst)
|
|
|
|
# 6. from/to 채우기
|
|
_link_signal_flow(instruments, base_tag, area)
|
|
|
|
# 7. confidence 계산
|
|
for inst in instruments:
|
|
inst["confidence"] = _score_confidence(inst, dp_set)
|
|
inst["needs_review"] = inst["confidence"] == "low"
|
|
|
|
return instruments
|
|
|
|
|
|
def _build_instrument(
|
|
first_letter: str, meas: str, role: str, loop: str, area: str, mod: dict, dp_set: set, parent_tag: str
|
|
) -> dict:
|
|
role_id = _role_to_id(first_letter, role, loop)
|
|
display = f"{first_letter.upper()}{_role_suffix(role).upper()}-{loop.upper()}"
|
|
dps = mod.get("data_points", [])
|
|
matched_dps = [d for d in dps if d in dp_set]
|
|
|
|
return {
|
|
"instrument_id": role_id,
|
|
"display_name": display,
|
|
"parent_base_tag": parent_tag,
|
|
"role": role,
|
|
"loop": loop,
|
|
"area": area or "(none)",
|
|
"measures": meas,
|
|
"data_points": ",".join(matched_dps) if matched_dps else "(none)",
|
|
"from": "(none)",
|
|
"to": "(none)",
|
|
"description": "",
|
|
"confidence": "medium",
|
|
"needs_review": False,
|
|
"inference_basis": f"{first_letter.upper()}+{mod.get('role','?')}",
|
|
"operator_notes": "",
|
|
"delete": False,
|
|
}
|
|
|
|
|
|
def _build_special(head: str, loop: str, area: str, sp: dict, dp_set: set) -> list[dict]:
|
|
role = sp["role"]
|
|
meas = sp.get("measures")
|
|
equip_type = sp.get("equipment_type")
|
|
role_id = f"{head}-{loop}"
|
|
display = f"{head.upper()}-{loop.upper()}"
|
|
|
|
dp_map = {
|
|
"shutdown-valve": [".instate0", ".instate1"],
|
|
"interlock-relay": [".instate0", ".instate1"],
|
|
"positioner": [".op"],
|
|
"power_equipment": [".run", ".fault"],
|
|
}
|
|
expected_dps = dp_map.get(role, [])
|
|
matched_dps = [d for d in expected_dps if d in dp_set]
|
|
|
|
inst = {
|
|
"instrument_id": role_id,
|
|
"display_name": display,
|
|
"parent_base_tag": f"{head}-{loop}",
|
|
"role": role,
|
|
"loop": loop,
|
|
"area": area or "(none)",
|
|
"measures": meas or "(none)",
|
|
"data_points": ",".join(matched_dps) if matched_dps else "(none)",
|
|
"from": "(none)",
|
|
"to": "(none)",
|
|
"description": "",
|
|
"confidence": "medium",
|
|
"needs_review": False,
|
|
"inference_basis": f"special_prefix:{head}",
|
|
"operator_notes": "",
|
|
"delete": False,
|
|
}
|
|
|
|
if role == "power_equipment" and equip_type:
|
|
inst["equipment_type"] = equip_type
|
|
return [inst]
|
|
|
|
|
|
def _build_unmatched(base_tag: str, area: str, head: str, loop: str) -> list[dict]:
|
|
return [{
|
|
"instrument_id": f"{head}-{loop}",
|
|
"display_name": f"{head.upper()}-{loop.upper()}",
|
|
"parent_base_tag": base_tag,
|
|
"role": "equipment",
|
|
"loop": loop,
|
|
"area": area or "(none)",
|
|
"measures": "(none)",
|
|
"data_points": "(none)",
|
|
"from": "(none)",
|
|
"to": "(none)",
|
|
"description": "",
|
|
"confidence": "low",
|
|
"needs_review": True,
|
|
"inference_basis": "unmatched_prefix",
|
|
"operator_notes": "",
|
|
"delete": False,
|
|
}]
|
|
|
|
|
|
def _build_implicit_transmitter(first: str, meas: str, loop: str, area: str, dp_set: set, parent: str) -> dict:
|
|
role_id = f"{first}t-{loop}"
|
|
return {
|
|
"instrument_id": role_id,
|
|
"display_name": f"{first.upper()}T-{loop.upper()}",
|
|
"parent_base_tag": parent,
|
|
"role": "transmitter",
|
|
"loop": loop,
|
|
"area": area or "(none)",
|
|
"measures": meas,
|
|
"data_points": ".pv" if ".pv" in dp_set else "(none)",
|
|
"from": f"process/{area or 'unknown'}-{loop}-inlet",
|
|
"to": f"tag/{parent}",
|
|
"description": "",
|
|
"confidence": "medium",
|
|
"needs_review": False,
|
|
"inference_basis": f"{first.upper()}+(implied T)",
|
|
"operator_notes": "",
|
|
"delete": False,
|
|
}
|
|
|
|
|
|
def _build_paired_valve(first: str, meas: str, loop: str, area: str, dp_set: set, parent: str) -> dict:
|
|
role_id = f"{first}cv-{loop}"
|
|
return {
|
|
"instrument_id": role_id,
|
|
"display_name": f"{first.upper()}CV-{loop.upper()}",
|
|
"parent_base_tag": parent,
|
|
"role": "control-valve",
|
|
"loop": loop,
|
|
"area": area or "(none)",
|
|
"measures": "(none)",
|
|
"data_points": ".op" if ".op" in dp_set else "(none)",
|
|
"from": f"tag/{first}ic-{loop}",
|
|
"to": f"process/{area or 'unknown'}-{loop}-downstream",
|
|
"description": "",
|
|
"confidence": "high",
|
|
"needs_review": False,
|
|
"inference_basis": "C -> CV auto_pair",
|
|
"operator_notes": "",
|
|
"delete": False,
|
|
}
|
|
|
|
|
|
def _role_suffix(role: str) -> str:
|
|
mapping = {
|
|
"transmitter": "t",
|
|
"controller": "ic",
|
|
"totalizer": "q",
|
|
"switch": "s",
|
|
"alarm": "a",
|
|
"interlock-relay": "y",
|
|
"positioner": "z",
|
|
}
|
|
return mapping.get(role, role[:2])
|
|
|
|
|
|
def _role_to_id(first_letter: str, role: str, loop: str) -> str:
|
|
return f"{first_letter}{_role_suffix(role)}-{loop}"
|
|
|
|
|
|
def _link_signal_flow(instruments: list[dict], parent_tag: str, area: str) -> None:
|
|
"""role별 from/to 기본값 채음 (§5.1)."""
|
|
transmitters = [i for i in instruments if i["role"] == "transmitter"]
|
|
valves = [i for i in instruments if i["role"] == "control-valve"]
|
|
|
|
for t in transmitters:
|
|
if t["from"] == "(none)":
|
|
t["from"] = f"process/{area or 'unknown'}-{t['loop']}-inlet"
|
|
if t["to"] == "(none)":
|
|
t["to"] = f"tag/{parent_tag}"
|
|
|
|
for v in valves:
|
|
if v["from"] == "(none)":
|
|
v["from"] = f"tag/{parent_tag}"
|
|
if v["to"] == "(none)":
|
|
v["to"] = f"process/{area or 'unknown'}-{v['loop']}-downstream"
|
|
|
|
|
|
def _score_confidence(inst: dict, dp_set: set) -> str:
|
|
"""data_point 일치도 + prefix 매칭으로 신뢰도 계산."""
|
|
role = inst["role"]
|
|
basis = inst.get("inference_basis", "")
|
|
|
|
if basis == "unmatched_prefix":
|
|
return "low"
|
|
|
|
expected = {
|
|
"transmitter": [".pv"],
|
|
"controller": [".sp", ".op"],
|
|
"totalizer": [".qv"],
|
|
"switch": [".instate0"],
|
|
"shutdown-valve": [".instate0"],
|
|
}
|
|
|
|
checks = expected.get(role, [])
|
|
if not checks:
|
|
return "medium" if basis.startswith("special") else "high"
|
|
|
|
matched = sum(1 for c in checks if c in dp_set)
|
|
if matched == len(checks):
|
|
return "high"
|
|
if matched > 0:
|
|
return "medium"
|
|
return "low"
|
|
|
|
|
|
def _role_to_korean_description(role: str, meas: str) -> str:
|
|
"""role + 측정량 → 한국어 설명 초안."""
|
|
meas_ko = {
|
|
"flow": "유량", "pressure": "압력", "temperature": "온도",
|
|
"level": "위차", "analysis": "분석", "speed": "회전수",
|
|
"weight": "중량", "density": "비중", "power": "전력", "moisture": "함량",
|
|
}
|
|
m = meas_ko.get(meas, meas)
|
|
|
|
role_desc = {
|
|
"transmitter": f"{m} 송신기",
|
|
"controller": f"{m} 제어기",
|
|
"totalizer": f"{m} 적산기",
|
|
"switch": f"{m} 스위치",
|
|
"alarm": f"{m} 알람",
|
|
"control-valve": f"{m} 제어밸브",
|
|
"shutdown-valve": "차단밸브",
|
|
"interlock-relay": "인터록 릴레이",
|
|
"positioner": "포지셔너",
|
|
"motor": "모터",
|
|
"pump": "펌프",
|
|
"compressor": "압축기",
|
|
"agitator": "교반기",
|
|
"blower": "송풍기",
|
|
"fan": "송풍기",
|
|
}
|
|
return role_desc.get(role, role)
|