Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL. 기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체. - industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버) - src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer) - mcp-server: Python FastMCP (RAG/NL2SQL/P&ID) - 다중 컨트롤러(N-Controller) 지원 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
155 lines
7.0 KiB
Python
155 lines
7.0 KiB
Python
import networkx as nx
|
|
import asyncio
|
|
import json
|
|
from typing import List, Optional, Dict, Any, Tuple
|
|
from pydantic import BaseModel, Field
|
|
from rapidfuzz import process, fuzz
|
|
from openai import AsyncOpenAI
|
|
|
|
# --- 응답 구조화를 위한 Pydantic 모델 ---
|
|
class MappingResult(BaseModel):
|
|
resolved_tag: str = Field(..., description="The final mapped system tag")
|
|
reason: str = Field(..., description="Reason for this mapping based on context")
|
|
confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score from 0 to 1")
|
|
|
|
class IntelligentMapper:
|
|
def __init__(self, graph: nx.Graph, system_tags: List[str], api_client: Optional[AsyncOpenAI] = None, model_name: str = "Qwen3.6-35B-A3B-FP8"):
|
|
self.graph = graph
|
|
self.system_tags = system_tags
|
|
self.client = api_client
|
|
self.model_name = model_name
|
|
|
|
def get_node_context(self, node_id: str) -> str:
|
|
"""노드의 주변 위상 정보를 텍스트로 변환 (확장된 컨텍스트 제공)"""
|
|
if not self.graph.has_node(node_id):
|
|
return "Node not found in graph"
|
|
|
|
node_attr = self.graph.nodes[node_id]
|
|
node_type = node_attr.get('type', 'Unknown')
|
|
node_val = node_attr.get('value', 'Unknown')
|
|
|
|
# 1. 직접 연결된 이웃 노드 정보
|
|
neighbors = list(self.graph.neighbors(node_id))
|
|
neighbor_info = []
|
|
for n in neighbors:
|
|
attr = self.graph.nodes[n]
|
|
val = attr.get('value', n)
|
|
typ = attr.get('type', 'Unknown')
|
|
# 엣지 속성(관계) 추가
|
|
rel = self.graph.get_edge_data(node_id, n).get('relation', 'connected')
|
|
neighbor_info.append(f"[{rel}] {val} (Type: {typ})")
|
|
|
|
# 2. 2-hop 연결 정보 (더 넓은 맥락 파악)
|
|
extended_context = []
|
|
for n in neighbors:
|
|
second_neighbors = list(self.graph.neighbors(n))
|
|
for sn in second_neighbors:
|
|
if sn == node_id: continue
|
|
s_attr = self.graph.nodes[sn]
|
|
extended_context.append(f"Indirectly connected via {self.graph.nodes[n].get('value', n)} to {s_attr.get('value', sn)} (Type: {s_attr.get('type', 'Unknown')})")
|
|
|
|
context_str = (
|
|
f"Target Node: {node_val} (Type: {node_type})\n"
|
|
f"Direct Neighbors: {', '.join(neighbor_info) if neighbor_info else 'None'}\n"
|
|
f"Extended Context: {', '.join(extended_context[:10]) if extended_context else 'None'}"
|
|
)
|
|
|
|
return context_str
|
|
|
|
async def _resolve_generic(self, node_id: str, category_prompt: str) -> MappingResult:
|
|
"""공통 매핑 로직 (비동기 + 구조화 응답)"""
|
|
if not self.client:
|
|
return MappingResult(resolved_tag="UNKNOWN", reason="API Client not provided", confidence=0.0)
|
|
|
|
# Phase 2에서 'value'에 clean_value가 저장됨
|
|
node_data = self.graph.nodes.get(node_id, {})
|
|
tag_text = node_data.get('value', '')
|
|
|
|
# 1차 후보 추출 (RapidFuzz)
|
|
candidates = process.extract(tag_text, self.system_tags, scorer=fuzz.WRatio, limit=5)
|
|
context = self.get_node_context(node_id)
|
|
|
|
prompt = f"""
|
|
{category_prompt}
|
|
P&ID 도면의 태그 '{tag_text}'를 실제 시스템 태그와 매핑해야 합니다.
|
|
위상 맥락: {context}
|
|
후보 리스트: {candidates}
|
|
|
|
반드시 다음 JSON 형식으로만 응답하세요:
|
|
{{
|
|
"resolved_tag": "태그명 또는 UNKNOWN",
|
|
"reason": "매핑 이유",
|
|
"confidence": 0.0~1.0
|
|
}}
|
|
"""
|
|
|
|
try:
|
|
response = await self.client.chat.completions.create(
|
|
model=model_name,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
response_format={ "type": "json_object" }
|
|
)
|
|
raw_content = response.choices[0].message.content
|
|
return MappingResult.model_validate_json(raw_content)
|
|
except Exception as e:
|
|
print(f"Error resolving node {node_id}: {e}")
|
|
return MappingResult(resolved_tag="UNKNOWN", reason=f"Error: {str(e)}", confidence=0.0)
|
|
|
|
# --- 전문화된 Worker 함수들 ---
|
|
# vLLM 컨커런시 제한으로 인해 배치 처리 (한 번에 15개 요청)
|
|
_BATCH_SIZE = 15
|
|
|
|
async def _batch_gather(self, node_ids: List[str], prompt: str) -> Dict[str, MappingResult]:
|
|
"""노드를 배치로 나누어 순차적으로 LLM 요청.
|
|
모든 노드를 동시에 요청하면 vLLM 큐가 가득 차 타임아웃 발생.
|
|
"""
|
|
all_results = {}
|
|
for i in range(0, len(node_ids), self._BATCH_SIZE):
|
|
batch = node_ids[i : i + self._BATCH_SIZE]
|
|
tasks = [self._resolve_generic(nid, prompt) for nid in batch]
|
|
batch_results = await asyncio.gather(*tasks)
|
|
all_results.update(dict(zip(batch, batch_results)))
|
|
return all_results
|
|
|
|
async def extract_transmitters(self, node_ids: List[str]) -> Dict[str, MappingResult]:
|
|
prompt = "당신은 계측기 전문 엔지니어입니다. 특히 Pressure/Flow/Level Transmitter 매핑에 특화되어 있습니다."
|
|
return await self._batch_gather(node_ids, prompt)
|
|
|
|
async def extract_valves(self, node_ids: List[str]) -> Dict[str, MappingResult]:
|
|
prompt = "당신은 밸브 및 액추에이터 전문 엔지니어입니다. 밸브의 개폐 상태 및 제어 태그 매핑에 특화되어 있습니다."
|
|
return await self._batch_gather(node_ids, prompt)
|
|
|
|
async def extract_equipment(self, node_ids: List[str]) -> Dict[str, MappingResult]:
|
|
prompt = "당신은 공정 설비 전문 엔지니어입니다. 펌프, 탱크, 열교환기 등의 메인 설비 태그 매핑에 특화되어 있습니다."
|
|
return await self._batch_gather(node_ids, prompt)
|
|
|
|
def validate_mapping(resolved_tag: str, symbol_type: str, tag_metadata: Dict[str, Any]) -> Tuple[bool, str]:
|
|
"""심볼 타입과 실제 태그 메타데이터의 엄격한 일치 여부 검증"""
|
|
if resolved_tag == "UNKNOWN":
|
|
return False, "Tag not resolved"
|
|
|
|
unit_map = {
|
|
"Pressure Transmitter": ["bar", "psi", "kPa", "Pa", "kg/cm2"],
|
|
"Flow Meter": ["m3/h", "lpm", "kg/h"],
|
|
"Temperature Sensor": ["°C", "C", "K", "°F"]
|
|
}
|
|
|
|
actual_unit = tag_metadata.get('unit', '').strip()
|
|
allowed_units = unit_map.get(symbol_type, [])
|
|
|
|
if actual_unit and actual_unit in allowed_units:
|
|
return True, "Unit Match"
|
|
|
|
actual_desc = tag_metadata.get('description', '').lower()
|
|
expected_keywords = {
|
|
"Pressure Transmitter": ["pressure", "press"],
|
|
"Flow Meter": ["flow", "flowrate"],
|
|
"Temperature Sensor": ["temp", "temperature"]
|
|
}
|
|
|
|
keywords = expected_keywords.get(symbol_type, [])
|
|
if any(kw in actual_desc for kw in keywords):
|
|
return True, "Description Match (Unit Missing)"
|
|
|
|
return False, "Mismatch: Symbol type and Tag metadata do not align"
|