import networkx as nx from shapely.geometry import box, Point, LineString import json from typing import List, Dict, Any, Optional, Tuple class PidTopologyBuilder: def __init__(self, geometric_data: List[Dict[str, Any]], all_extracted_tags: Optional[List[Dict[str, Any]]] = None, config: Optional[Dict[str, float]] = None): """ - geometric_data: Phase 1에서 추출된 기하학적 데이터 (List of dicts) - all_extracted_tags: 통합된 태그 리스트 - config: {'dist_threshold': 50.0, 'tag_threshold': 100.0} 등 설정값 """ self.data = geometric_data self.all_tags = all_extracted_tags if all_extracted_tags else [] # 기본 설정값 default_config = {'dist_threshold': 50.0, 'tag_threshold': 100.0} self.config = config if config else default_config # 동적 스케일링 적용 self._apply_dynamic_scaling() self.G = nx.DiGraph() # 방향성 그래프 생성 def _apply_dynamic_scaling(self): """도면의 전체 크기를 기반으로 임계값을 동적으로 조정""" if not self.data: return # 모든 BBox를 포함하는 전체 영역 계산 all_min_x = min((item['bbox']['min_x'] for item in self.data if 'bbox' in item), default=0) all_max_x = max((item['bbox']['max_x'] for item in self.data if 'bbox' in item), default=1000) all_min_y = min((item['bbox']['min_y'] for item in self.data if 'bbox' in item), default=0) all_max_y = max((item['bbox']['max_y'] for item in self.data if 'bbox' in item), default=1000) drawing_width = all_max_x - all_min_x drawing_height = all_max_y - all_min_y diag = (drawing_width**2 + drawing_height**2)**0.5 if diag == 0: return # 기준 대각선 길이 (예: 5000 units) 대비 현재 도면 크기 비율 계산 # 기준 도면 크기가 5000일 때 dist_threshold=50 (1%) scale_factor = diag / 5000.0 # 임계값 업데이트 (최소/최대 범위 제한) self.config['dist_threshold'] = max(5.0, min(200.0, 50.0 * scale_factor)) self.config['tag_threshold'] = max(20.0, min(500.0, 100.0 * scale_factor)) def build_graph(self): # 1. 모든 객체를 노드로 추가 for item in self.data: bbox_vals = item['bbox'] # BoundingBox 모델의 필드명에 맞춰 추출 (min_x, min_y, max_x, max_y) bbox_geom = box(bbox_vals['min_x'], bbox_vals['min_y'], bbox_vals['max_x'], bbox_vals['max_y']) self.G.add_node(item['entity_id'], type=item['entity_type'], bbox=bbox_geom, value=item.get('clean_value'), layer=item.get('layer')) # 2. 분산 추출된 태그 통합 및 노드 추가 for tag in self.all_tags: bbox_vals = tag['bbox'] bbox_geom = box(bbox_vals['min_x'], bbox_vals['min_y'], bbox_vals['max_x'], bbox_vals['max_y']) self.G.add_node(tag['entity_id'], type='TEXT', bbox=bbox_geom, value=tag.get('clean_value') or tag.get('tagName')) # 3. 태그-설비 논리적 연결 (Association) tags = [n for n, d in self.G.nodes(data=True) if d['type'] == 'TEXT'] equipments = [n for n, d in self.G.nodes(data=True) if d['type'] not in ['TEXT', 'LINE', 'LWPOLYLINE']] for tag in tags: best_match = self._find_nearest_equipment(tag, equipments) if best_match: self.G.add_edge(tag, best_match, relation='associated_with') # 4. 배관 기반 물리적 연결 (Pipe) [개선됨: End-point 기반] lines = [n for n, d in self.G.nodes(data=True) if d['type'] in ['LINE', 'LWPOLYLINE']] for line_id in lines: original_item = next((item for item in self.data if item['entity_id'] == line_id), None) if not original_item or not original_item.get('coordinates'): continue coords = original_item['coordinates'] line_geom = LineString(coords) # 개선: 끝점뿐만 아니라 라인 전체가 설비 BBox와 교차하거나 매우 가까운지 확인 connected_nodes = [] for eq_id in equipments: eq_bbox = self.G.nodes[eq_id]['bbox'] # 1. 라인이 BBox와 교차하는지 확인 (관통 포함) if line_geom.intersects(eq_bbox): connected_nodes.append(eq_id) # 2. 교차하지 않더라도 임계값 이내에 있는지 확인 (근접 연결) elif line_geom.distance(eq_bbox) < self.config['dist_threshold']: connected_nodes.append(eq_id) # 중복 제거 connected_nodes = list(set(connected_nodes)) if len(connected_nodes) >= 2: # 개선: 단순 순서가 아닌, 기하학적 좌표 기반의 흐름 방향 추론 (왼쪽 -> 오른쪽, 위 -> 아래 우선) # 실제 공정 도면의 일반적인 흐름 방향을 반영 node0_bbox = self.G.nodes[connected_nodes[0]]['bbox'] node1_bbox = self.G.nodes[connected_nodes[1]]['bbox'] center0 = ((node0_bbox.bounds[0] + node0_bbox.bounds[2])/2, (node0_bbox.bounds[1] + node0_bbox.bounds[3])/2) center1 = ((node1_bbox.bounds[0] + node1_bbox.bounds[2])/2, (node1_bbox.bounds[1] + node1_bbox.bounds[3])/2) # X축 차이가 Y축 차이보다 크면 X축 기준, 아니면 Y축 기준으로 방향 결정 if abs(center1[0] - center0[0]) > abs(center1[1] - center0[1]): # X축 기준: 왼쪽 -> 오른쪽 if center0[0] < center1[0]: self.G.add_edge(connected_nodes[0], connected_nodes[1], relation='pipe', flow_direction='forward') else: self.G.add_edge(connected_nodes[1], connected_nodes[0], relation='pipe', flow_direction='forward') else: # Y축 기준: 위 -> 아래 (도면 좌표계에 따라 다를 수 있으나 일반적인 관례 적용) if center0[1] > center1[1]: self.G.add_edge(connected_nodes[0], connected_nodes[1], relation='pipe', flow_direction='forward') else: self.G.add_edge(connected_nodes[1], connected_nodes[0], relation='pipe', flow_direction='forward') elif len(connected_nodes) == 1: # 단일 연결의 경우, 일단 엣지를 생성하되 방향은 미정(undirected-like)으로 처리하거나 # 추후 전파 로직에서 결정하도록 함 pass def _find_nearest_equipment(self, tag_id, equipment_ids): """ 단순 거리 기반 매핑에서 위상 기반 가중치 매핑으로 개선. 가중치 = 거리 점수 + 연결성 점수 """ tag_bbox = self.G.nodes[tag_id]['bbox'] best_score = float('inf') nearest = None # 태그 노드와 연결된 배관(LINE/LWPOLYLINE) 확인 connected_pipes = [n for n in self.G.neighbors(tag_id) if self.G.nodes[n]['type'] in ['LINE', 'LWPOLYLINE']] for eq_id in equipment_ids: eq_bbox = self.G.nodes[eq_id]['bbox'] dist = tag_bbox.distance(eq_bbox) if dist > self.config['tag_threshold']: continue # 1. 거리 점수 (낮을수록 좋음) score = dist # 2. 연결성 가중치 (태그와 설비가 동일한 배관에 연결되어 있다면 점수 대폭 감점 = 우선순위 상승) # 태그가 직접 배관에 연결되어 있지는 않지만, 태그 근처의 배관이 설비에 연결되어 있는지 확인 for pipe_id in connected_pipes: if self.G.has_edge(pipe_id, eq_id) or self.G.has_edge(eq_id, pipe_id): score -= self.config['tag_threshold'] * 0.5 # 연결성 보너스 if score < best_score: best_score = score nearest = eq_id return nearest def validate_topology(self): """위상 무결성 검증""" isolated = list(nx.isolates(self.G)) return { "isolated_nodes": isolated, "node_count": self.G.number_of_nodes(), "edge_count": self.G.number_of_edges() } def save_graph(self, output_path: str): """그래프 구조를 JSON 형태로 저장""" from networkx.readwrite import json_graph data = json_graph.node_link_data(self.G) # shapely geometry 객체는 JSON 직렬화가 안 되므로 변환 for node in data['nodes']: if 'bbox' in node: bbox = node['bbox'] node['bbox'] = { 'min_x': bbox.bounds[0], 'min_y': bbox.bounds[1], 'max_x': bbox.bounds[2], 'max_y': bbox.bounds[3] } with open(output_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) return output_path def analyze_impact(graph, start_node): """특정 설비 장애 시 하류(Downstream)에 영향을 받는 모든 노드 추출""" if start_node not in graph: return [] # BFS를 통해 도달 가능한 모든 노드 탐색 impacted_nodes = nx.descendants(graph, start_node) return list(impacted_nodes)