Files
HC900-Crawler/mcp-server/instrument_inference/infer.py
windpacer 16fc7a2598 Initial commit: HC900 Crawler
Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL.
기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체.

- industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버)
- src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer)
- mcp-server: Python FastMCP (RAG/NL2SQL/P&ID)
- 다중 컨트롤러(N-Controller) 지원

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:28:14 +09:00

330 lines
10 KiB
Python

"""DCS base_tag → 현장 계기 자동 유추 알고리즘."""
from __future__ import annotations
import re
from .rules import get_measurement, get_modifier, get_special_prefix
def split_tag(base_tag: str) -> tuple[str, str]:
"""'ficq-6101' → ('ficq', '6101') / 'xv-6124' → ('xv', '6124')."""
m = re.match(r"^([a-zA-Z]+)-?(.*)", base_tag)
if not m:
return base_tag, ""
return m.group(1).lower(), m.group(2)
def infer_instruments_for_base_tag(
base_tag: str,
data_points: list[str],
area: str,
) -> list[dict]:
"""
단일 base_tag에 대해 현장 계기 목록을 유추.
Returns:
instrument dict 리스트 (§3.1 스키마와 일치).
매칭 실패 시 1행 포함 (confidence=low, needs_review=True).
"""
head, loop = split_tag(base_tag)
if not loop:
loop = "unknown"
dp_set = set(data_points) if data_points else set()
# 규칙에서 어긋난 태그 → unmatched
# loop에 '_' 포함 (fica-3102_op 등) 또는 'esd' 포함 (lt-9113-lo-esd 등 시스템 포인트)
if "_" in loop or "esd" in loop.lower():
return _build_unmatched(base_tag, area, head, loop)
# 1. 특수 prefix 우선 (정확한 전체 일치)
sp = get_special_prefix(head)
if sp:
return _build_special(head, loop, area, sp, dp_set)
# 2. 첫 글자 = 측정량
first = head[0]
meas = get_measurement(first)
if not meas:
return _build_unmatched(base_tag, area, head, loop)
instruments = []
has_transmitter = False
has_controller = False
# DCS 내부 기능: controller, totalizer, alarm, switch는 현장 계기가 아님
_dcs_internal_roles = {"controller", "totalizer", "alarm", "switch"}
# 3. 수식어 글자별로 계기 생성
for letter in head[1:]:
mod = get_modifier(letter)
if not mod:
continue
if mod.get("virtual"):
continue # I, R은 가상
role = mod["role"]
# DCS 내부 기능은 instruments에서 제외
if role in _dcs_internal_roles:
if role == "controller":
has_controller = True
continue
inst = _build_instrument(first, meas, role, loop, area, mod, dp_set, base_tag)
instruments.append(inst)
if role == "transmitter":
has_transmitter = True
# 4. T 글자가 명시 안 됐어도 컨트롤러가 있으면 송신기 암시
if not has_transmitter and has_controller:
inst = _build_implicit_transmitter(first, meas, loop, area, dp_set, base_tag)
instruments.insert(0, inst)
has_transmitter = True
# 5. 컨트롤러 → 제어밸브 자동 생성 (auto_pair)
if has_controller:
inst = _build_paired_valve(first, meas, loop, area, dp_set, base_tag)
instruments.append(inst)
# [H-4 수정] 빈 리스트일 때 implicit transmitter 강제 추가
# pi-XXXX/ti-XXXX 등 "측정량 + I(virtual)"만 있는 경우 instruments가 비어 있음
if not instruments:
inst = _build_implicit_transmitter(first, meas, loop, area, dp_set, base_tag)
instruments.append(inst)
# 6. from/to 채우기
_link_signal_flow(instruments, base_tag, area)
# 7. confidence 계산
for inst in instruments:
inst["confidence"] = _score_confidence(inst, dp_set)
inst["needs_review"] = inst["confidence"] == "low"
return instruments
def _build_instrument(
first_letter: str, meas: str, role: str, loop: str, area: str, mod: dict, dp_set: set, parent_tag: str
) -> dict:
role_id = _role_to_id(first_letter, role, loop)
display = f"{first_letter.upper()}{_role_suffix(role).upper()}-{loop.upper()}"
dps = mod.get("data_points", [])
matched_dps = [d for d in dps if d in dp_set]
return {
"instrument_id": role_id,
"display_name": display,
"parent_base_tag": parent_tag,
"role": role,
"loop": loop,
"area": area or "(none)",
"measures": meas,
"data_points": ",".join(matched_dps) if matched_dps else "(none)",
"from": "(none)",
"to": "(none)",
"description": "",
"confidence": "medium",
"needs_review": False,
"inference_basis": f"{first_letter.upper()}+{mod.get('role','?')}",
"operator_notes": "",
"delete": False,
}
def _build_special(head: str, loop: str, area: str, sp: dict, dp_set: set) -> list[dict]:
role = sp["role"]
meas = sp.get("measures")
equip_type = sp.get("equipment_type")
role_id = f"{head}-{loop}"
display = f"{head.upper()}-{loop.upper()}"
dp_map = {
"shutdown-valve": [".instate0", ".instate1"],
"interlock-relay": [".instate0", ".instate1"],
"positioner": [".op"],
"power_equipment": [".run", ".fault"],
}
expected_dps = dp_map.get(role, [])
matched_dps = [d for d in expected_dps if d in dp_set]
inst = {
"instrument_id": role_id,
"display_name": display,
"parent_base_tag": f"{head}-{loop}",
"role": role,
"loop": loop,
"area": area or "(none)",
"measures": meas or "(none)",
"data_points": ",".join(matched_dps) if matched_dps else "(none)",
"from": "(none)",
"to": "(none)",
"description": "",
"confidence": "medium",
"needs_review": False,
"inference_basis": f"special_prefix:{head}",
"operator_notes": "",
"delete": False,
}
if role == "power_equipment" and equip_type:
inst["equipment_type"] = equip_type
return [inst]
def _build_unmatched(base_tag: str, area: str, head: str, loop: str) -> list[dict]:
return [{
"instrument_id": f"{head}-{loop}",
"display_name": f"{head.upper()}-{loop.upper()}",
"parent_base_tag": base_tag,
"role": "equipment",
"loop": loop,
"area": area or "(none)",
"measures": "(none)",
"data_points": "(none)",
"from": "(none)",
"to": "(none)",
"description": "",
"confidence": "low",
"needs_review": True,
"inference_basis": "unmatched_prefix",
"operator_notes": "",
"delete": False,
}]
def _build_implicit_transmitter(first: str, meas: str, loop: str, area: str, dp_set: set, parent: str) -> dict:
role_id = f"{first}t-{loop}"
return {
"instrument_id": role_id,
"display_name": f"{first.upper()}T-{loop.upper()}",
"parent_base_tag": parent,
"role": "transmitter",
"loop": loop,
"area": area or "(none)",
"measures": meas,
"data_points": ".pv" if ".pv" in dp_set else "(none)",
"from": f"process/{area or 'unknown'}-{loop}-inlet",
"to": f"tag/{parent}",
"description": "",
"confidence": "medium",
"needs_review": False,
"inference_basis": f"{first.upper()}+(implied T)",
"operator_notes": "",
"delete": False,
}
def _build_paired_valve(first: str, meas: str, loop: str, area: str, dp_set: set, parent: str) -> dict:
role_id = f"{first}cv-{loop}"
return {
"instrument_id": role_id,
"display_name": f"{first.upper()}CV-{loop.upper()}",
"parent_base_tag": parent,
"role": "control-valve",
"loop": loop,
"area": area or "(none)",
"measures": "(none)",
"data_points": ".op" if ".op" in dp_set else "(none)",
"from": f"tag/{first}ic-{loop}",
"to": f"process/{area or 'unknown'}-{loop}-downstream",
"description": "",
"confidence": "high",
"needs_review": False,
"inference_basis": "C -> CV auto_pair",
"operator_notes": "",
"delete": False,
}
def _role_suffix(role: str) -> str:
mapping = {
"transmitter": "t",
"controller": "ic",
"totalizer": "q",
"switch": "s",
"alarm": "a",
"interlock-relay": "y",
"positioner": "z",
}
return mapping.get(role, role[:2])
def _role_to_id(first_letter: str, role: str, loop: str) -> str:
return f"{first_letter}{_role_suffix(role)}-{loop}"
def _link_signal_flow(instruments: list[dict], parent_tag: str, area: str) -> None:
"""role별 from/to 기본값 채음 (§5.1)."""
transmitters = [i for i in instruments if i["role"] == "transmitter"]
valves = [i for i in instruments if i["role"] == "control-valve"]
for t in transmitters:
if t["from"] == "(none)":
t["from"] = f"process/{area or 'unknown'}-{t['loop']}-inlet"
if t["to"] == "(none)":
t["to"] = f"tag/{parent_tag}"
for v in valves:
if v["from"] == "(none)":
v["from"] = f"tag/{parent_tag}"
if v["to"] == "(none)":
v["to"] = f"process/{area or 'unknown'}-{v['loop']}-downstream"
def _score_confidence(inst: dict, dp_set: set) -> str:
"""data_point 일치도 + prefix 매칭으로 신뢰도 계산."""
role = inst["role"]
basis = inst.get("inference_basis", "")
if basis == "unmatched_prefix":
return "low"
expected = {
"transmitter": [".pv"],
"controller": [".sp", ".op"],
"totalizer": [".qv"],
"switch": [".instate0"],
"shutdown-valve": [".instate0"],
}
checks = expected.get(role, [])
if not checks:
return "medium" if basis.startswith("special") else "high"
matched = sum(1 for c in checks if c in dp_set)
if matched == len(checks):
return "high"
if matched > 0:
return "medium"
return "low"
def _role_to_korean_description(role: str, meas: str) -> str:
"""role + 측정량 → 한국어 설명 초안."""
meas_ko = {
"flow": "유량", "pressure": "압력", "temperature": "온도",
"level": "위차", "analysis": "분석", "speed": "회전수",
"weight": "중량", "density": "비중", "power": "전력", "moisture": "함량",
}
m = meas_ko.get(meas, meas)
role_desc = {
"transmitter": f"{m} 송신기",
"controller": f"{m} 제어기",
"totalizer": f"{m} 적산기",
"switch": f"{m} 스위치",
"alarm": f"{m} 알람",
"control-valve": f"{m} 제어밸브",
"shutdown-valve": "차단밸브",
"interlock-relay": "인터록 릴레이",
"positioner": "포지셔너",
"motor": "모터",
"pump": "펌프",
"compressor": "압축기",
"agitator": "교반기",
"blower": "송풍기",
"fan": "송풍기",
}
return role_desc.get(role, role)