Files
ExperionCrawler/.rooBackup/2026-05-02_0448/Graph_Pipeline_Phase2.md

6.7 KiB

🕸️ Graph Pipeline Phase 2: 위상 모델링 (Topology Modeling)

이 문서는 P&ID Graph Pipeline의 두 번째 단계인 위상 모델링의 상세 구현 계획을 다룹니다. 1단계에서 추출한 기하학적 객체(좌표, BBox)를 기반으로, 설비 간의 **연결성(Connectivity)**과 **흐름(Flow)**을 정의하는 지식 그래프(Knowledge Graph)를 구축하는 것이 목표입니다.


📦 1. 필수 패키지 및 환경 설정

1.1 Python 패키지

패키지 용도 비고
networkx 그래프 데이터 구조 생성 및 알고리즘 분석 핵심 라이브러리
shapely 객체 간 거리 계산 및 포함 관계 분석 1단계와 연계
scikit-learn (선택) KD-Tree를 이용한 고속 근접 이웃 검색 대규모 도면 최적화
matplotlib 생성된 그래프의 위상 구조 시각화 검증 디버깅용

1.2 설치 명령어

pip install networkx shapely scikit-learn matplotlib

📐 2. 상세 설계 구조

2.1 그래프 정의 (Graph Definition)

  • 노드 (Nodes):
    • Equipment: 펌프, 탱크, 열교환기 등 (속성: ID, 타입, BBox)
    • Instrument: 전송기, 밸브, 게이지 등 (속성: ID, 타입, BBox)
    • Tag: 텍스트 기반 태그 (속성: TagName, Value)
  • 엣지 (Edges):
    • Pipe: 설비-설비, 설비-계기 간의 물리적 연결 (속성: LineNumber, 방향성)
    • Association: 태그-설비 간의 논리적 연결 (속성: 관계 타입 - 예: 'belongs_to')

2.2 위상 추론 로직 (Topology Inference)

  1. 태그-설비 결합 (Tag-to-Entity Binding):
    • 태그 텍스트의 BBox와 가장 가까운 심볼(Equipment/Instrument)을 찾아 Association 엣지를 생성합니다.
  2. 배관 연결성 분석 (Line Connectivity):
    • LINE 또는 POLYLINE의 끝점이 특정 설비의 BBox 내부에 있거나 임계 거리(\epsilon) 이내에 있으면 두 노드를 Pipe 엣지로 연결합니다.
  3. 흐름 방향성 부여 (Flow Direction):
    • 화살표 심볼의 방향 또는 공정 흐름 규칙을 분석하여 엣지에 source \rightarrow target 방향을 설정합니다.

💻 3. 실제 구현 코딩 가이드 (Example)

3.1 그래프 구축 핵심 코드

import networkx as nx
from shapely.geometry import box, Point

class PidTopologyBuilder:
    def __init__(self, geometric_data, all_extracted_tags=None):
        """
        Phase 5 병렬 아키텍처 반영:
        - geometric_data: Phase 1에서 추출된 기하학적 데이터
        - all_extracted_tags: 여러 Worker(Phase 3)가 분산 추출한 태그 리스트의 통합본 (flatten_results 결과)
        """
        self.data = geometric_data
        self.all_tags = all_extracted_tags if all_extracted_tags else []
        self.G = nx.DiGraph()      # 방향성 그래프 생성

    def build_graph(self):
        # 1. 모든 객체를 노드로 추가
        for item in self.data:
            self.G.add_node(item['id'],
                           type=item['type'],
                           bbox=box(*item['bbox'].values()),
                           value=item.get('value'))

        # 2. 분산 추출된 태그 통합 및 노드 추가 (Phase 5 반영)
        for tag in self.all_tags:
            # tag: { "id": "...", "tagName": "...", "bbox": {...}, "type": "TEXT" }
            self.G.add_node(tag['id'],
                           type='TEXT',
                           bbox=box(*tag['bbox'].values()),
                           value=tag.get('tagName'))

        # 3. 태그-설비 논리적 연결 (Association)
        tags = [n for n, d in self.G.nodes(data=True) if d['type'] == 'TEXT']
        equipments = [n for n, d in self.G.nodes(data=True) if d['type'] != 'TEXT']

        for tag in tags:
            best_match = self._find_nearest_equipment(tag, equipments)
            if best_match:
                self.G.add_edge(tag, best_match, relation='associated_with')

        # 3. 배관 기반 물리적 연결 (Pipe)
        lines = [n for n, d in self.G.nodes(data=True) if d['type'] in ['LINE', 'POLYLINE']]
        for line in lines:
            connected_nodes = self._find_connected_nodes(line, equipments)
            if len(connected_nodes) >= 2:
                # 라인을 통해 연결된 두 설비 간 엣지 생성
                self.G.add_edge(connected_nodes[0], connected_nodes[1], relation='pipe')

    def _find_nearest_equipment(self, tag_id, equipment_ids):
        tag_bbox = self.G.nodes[tag_id]['bbox']
        min_dist = float('inf')
        nearest = None
        for eq_id in equipment_ids:
            eq_bbox = self.G.nodes[eq_id]['bbox']
            dist = tag_bbox.distance(eq_bbox)
            if dist < min_dist:
                min_dist = dist
                nearest = eq_id
        return nearest if min_dist < 50.0 else None # 임계값 50.0

    def _find_connected_nodes(self, line_id, equipment_ids):
        # 라인의 시작/끝점이 어떤 설비 BBox에 포함되는지 확인
        # (실제 구현 시 line의 coordinates 활용)
        return [eq for eq in equipment_ids if self.G.nodes[eq]['bbox'].intersects(self.G.nodes[line_id]['bbox'])]

# 실행 (Phase 5 Orchestrator 관점)
# 1. Phase 1 결과 로드
# 2. Phase 3 Worker들의 결과를 flatten_results()로 통합
all_tags = flatten_results([worker1_res, worker2_res, worker3_res, worker4_res, worker5_res])

builder = PidTopologyBuilder(geometric_data, all_extracted_tags=all_tags)
builder.build_graph()
graph = builder.G

3.2 위상 분석 유틸리티: 영향도 분석 (Impact Analysis)

def analyze_impact(graph, start_node):
    """특정 설비 장애 시 하류(Downstream)에 영향을 받는 모든 노드 추출"""
    # BFS를 통해 도달 가능한 모든 노드 탐색
    impacted_nodes = nx.descendants(graph, start_node)
    return list(impacted_nodes)

# 예: P-101 펌프 고장 시 영향 분석
affected = analyze_impact(graph, "node_P101")
print(f"Impacted Equipment: {affected}")

🚀 4. Phase 2 완료 기준 (Definition of Done)

  • 모든 설비와 계기가 그래프의 **노드(Node)**로 변환되었는가?
  • 분산 추출된 태그 리스트가 flatten_results를 통해 통합되어 그래프에 반영되었는가? (Phase 5 반영)
  • 태그와 설비 간의 **논리적 연결(Association)**이 정확하게 매핑되었는가?
  • 배관(Line)을 통해 설비 간의 **물리적 연결(Pipe Edge)**이 생성되었는가?
  • nx.descendants 등을 통해 특정 노드로부터의 **흐름 추적(Flow Tracing)**이 가능한가?
  • 생성된 그래프 구조가 JSON(GraphML 등) 형태로 저장되어 Phase 3로 전달 가능한가?