Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL. 기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체. - industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버) - src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer) - mcp-server: Python FastMCP (RAG/NL2SQL/P&ID) - 다중 컨트롤러(N-Controller) 지원 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
83 lines
3.4 KiB
Python
83 lines
3.4 KiB
Python
import networkx as nx
|
|
from typing import Dict, List, Optional
|
|
import json
|
|
import os
|
|
|
|
class PidAnalysisEngine:
|
|
def __init__(self, topology_file: str, mapping_file: str):
|
|
self.topology_file = topology_file
|
|
self.mapping_file = mapping_file
|
|
self.graph = nx.DiGraph()
|
|
self.tag_mapping = {}
|
|
self.load_data()
|
|
|
|
def load_data(self):
|
|
"""그래프 및 매핑 데이터 로드"""
|
|
try:
|
|
if os.path.exists(self.topology_file):
|
|
with open(self.topology_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
# NetworkX 그래프 생성 (node_link_data 형식 가정)
|
|
for node in data.get('nodes', []):
|
|
# 'id' 키를 제거하여 중복 키워드 인자 방지
|
|
node_attrs = {k: v for k, v in node.items() if k != 'id'}
|
|
self.graph.add_node(node['id'], **node_attrs)
|
|
for edge in data.get('links', []): # node_link_data는 'links' 사용
|
|
# 'source', 'target' 키를 제거하여 중복 키워드 인자 방지
|
|
edge_attrs = {k: v for k, v in edge.items() if k not in ('source', 'target')}
|
|
self.graph.add_edge(edge['source'], edge['target'], **edge_attrs)
|
|
|
|
if os.path.exists(self.mapping_file):
|
|
with open(self.mapping_file, 'r', encoding='utf-8') as f:
|
|
self.tag_mapping = json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading analysis data: {e}")
|
|
|
|
def get_propagation_path_with_flow(self, start_node: str):
|
|
"""
|
|
엣지의 방향성(flow_direction)과 상태(valve_status)를 고려한 실제 영향 전파 경로 추출
|
|
"""
|
|
if start_node not in self.graph:
|
|
return {}
|
|
|
|
# 1. 유효한 엣지만 필터링 (방향이 forward이고 밸브가 open인 경로)
|
|
valid_edges = [
|
|
(u, v) for u, v, d in self.graph.edges(data=True)
|
|
if d.get('flow_direction', 'forward') == 'forward'
|
|
and d.get('valve_status', 'open') == 'open'
|
|
]
|
|
|
|
filtered_graph = nx.DiGraph()
|
|
filtered_graph.add_edges_from(valid_edges)
|
|
|
|
# 2. 전파 단계별 노드 추출 (BFS)
|
|
try:
|
|
propagation_levels = nx.single_source_shortest_path_length(filtered_graph, start_node)
|
|
return propagation_levels
|
|
except Exception:
|
|
return {}
|
|
|
|
def analyze_impact(self, node_id: str):
|
|
"""특정 노드 장애 시 영향도 분석 결과 반환"""
|
|
if node_id not in self.graph:
|
|
return {"success": False, "error": f"Node {node_id} not found in topology"}
|
|
|
|
impact_map = self.get_propagation_path_with_flow(node_id)
|
|
|
|
# 경로 추출 (시각화를 위해 모든 영향 노드로의 최단 경로 포함)
|
|
paths = []
|
|
for target in impact_map.keys():
|
|
if target != node_id:
|
|
try:
|
|
path = nx.shortest_path(self.graph, source=node_id, target=target)
|
|
paths.append(path)
|
|
except nx.NetworkXNoPath:
|
|
continue
|
|
|
|
return {
|
|
"success": True,
|
|
"startNode": node_id,
|
|
"impactedNodes": impact_map,
|
|
"paths": paths
|
|
}
|