Files

83 lines
3.4 KiB
Python

import networkx as nx
from typing import Dict, List, Optional
import json
import os
class PidAnalysisEngine:
def __init__(self, topology_file: str, mapping_file: str):
self.topology_file = topology_file
self.mapping_file = mapping_file
self.graph = nx.DiGraph()
self.tag_mapping = {}
self.load_data()
def load_data(self):
"""그래프 및 매핑 데이터 로드"""
try:
if os.path.exists(self.topology_file):
with open(self.topology_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# NetworkX 그래프 생성 (node_link_data 형식 가정)
for node in data.get('nodes', []):
# 'id' 키를 제거하여 중복 키워드 인자 방지
node_attrs = {k: v for k, v in node.items() if k != 'id'}
self.graph.add_node(node['id'], **node_attrs)
for edge in data.get('links', []): # node_link_data는 'links' 사용
# 'source', 'target' 키를 제거하여 중복 키워드 인자 방지
edge_attrs = {k: v for k, v in edge.items() if k not in ('source', 'target')}
self.graph.add_edge(edge['source'], edge['target'], **edge_attrs)
if os.path.exists(self.mapping_file):
with open(self.mapping_file, 'r', encoding='utf-8') as f:
self.tag_mapping = json.load(f)
except Exception as e:
print(f"Error loading analysis data: {e}")
def get_propagation_path_with_flow(self, start_node: str):
"""
엣지의 방향성(flow_direction)과 상태(valve_status)를 고려한 실제 영향 전파 경로 추출
"""
if start_node not in self.graph:
return {}
# 1. 유효한 엣지만 필터링 (방향이 forward이고 밸브가 open인 경로)
valid_edges = [
(u, v) for u, v, d in self.graph.edges(data=True)
if d.get('flow_direction', 'forward') == 'forward'
and d.get('valve_status', 'open') == 'open'
]
filtered_graph = nx.DiGraph()
filtered_graph.add_edges_from(valid_edges)
# 2. 전파 단계별 노드 추출 (BFS)
try:
propagation_levels = nx.single_source_shortest_path_length(filtered_graph, start_node)
return propagation_levels
except Exception:
return {}
def analyze_impact(self, node_id: str):
"""특정 노드 장애 시 영향도 분석 결과 반환"""
if node_id not in self.graph:
return {"success": False, "error": f"Node {node_id} not found in topology"}
impact_map = self.get_propagation_path_with_flow(node_id)
# 경로 추출 (시각화를 위해 모든 영향 노드로의 최단 경로 포함)
paths = []
for target in impact_map.keys():
if target != node_id:
try:
path = nx.shortest_path(self.graph, source=node_id, target=target)
paths.append(path)
except nx.NetworkXNoPath:
continue
return {
"success": True,
"startNode": node_id,
"impactedNodes": impact_map,
"paths": paths
}