- P&ID: 연결 분석 API, Prefix 규칙 관리, 카테고리 분류, DXF 그래프 빌드 - LLM: 대화 요약, tool card 영구 보존, 시계열 차트(uPlot), 에이전트 모드 - KB: 청크 미리보기, Field Instrument Inference, 인증/Qdrant 클라이언트 - MCP: 서버 기능 확장, 파이프라인 수정, timeout 개선 - Frontend: P&ID UI, LLM UI, KB UI, OPC UA Write 탭 추가 - 설정: AGENTS.md, plant_context, README, opencode.json 업데이트 - 정리: 진단 체크리스트 문서 삭제
297 lines
13 KiB
Python
297 lines
13 KiB
Python
import networkx as nx
|
||
from shapely.geometry import box, Point, LineString
|
||
import json
|
||
from typing import List, Dict, Any, Optional, Tuple
|
||
from collections import defaultdict
|
||
|
||
class SpatialGrid:
|
||
"""
|
||
그리드 기반 공간 인덱스.
|
||
셀 크기 = threshold. 각 엔티티를 중심 좌표로 그리드 셀에 할당.
|
||
쿼리 시 인접 셀(3×3)만 검색.
|
||
|
||
기존 O(n) 순회 → O(1) 그리드 셀 조회.
|
||
"""
|
||
def __init__(self, cell_size: float):
|
||
self.cell_size = cell_size
|
||
self.grid = defaultdict(list) # (gx, gy) -> [(node_id, bbox_dict), ...]
|
||
|
||
def add(self, node_id, bbox):
|
||
"""bbox를 그리드에 등록. bbox는 {min_x, min_y, max_x, max_y} 또는 shapely geometry."""
|
||
if hasattr(bbox, 'bounds'):
|
||
min_x, min_y, max_x, max_y = bbox.bounds
|
||
else:
|
||
min_x = bbox['min_x']
|
||
min_y = bbox['min_y']
|
||
max_x = bbox['max_x']
|
||
max_y = bbox['max_y']
|
||
|
||
center_x = (min_x + max_x) / 2
|
||
center_y = (min_y + max_y) / 2
|
||
gx = int(center_x / self.cell_size)
|
||
gy = int(center_y / self.cell_size)
|
||
self.grid[(gx, gy)].append((node_id, min_x, min_y, max_x, max_y))
|
||
|
||
def query(self, query_min_x, query_min_y, query_max_x, query_max_y, threshold) -> list:
|
||
"""
|
||
query BBox와 threshold 이내인 모든 노드를 반환.
|
||
인접 셀(3×3)만 확인.
|
||
"""
|
||
query_center_x = (query_min_x + query_max_x) / 2
|
||
query_center_y = (query_min_y + query_max_y) / 2
|
||
q_gx = int(query_center_x / self.cell_size)
|
||
q_gy = int(query_center_y / self.cell_size)
|
||
|
||
# threshold / cell_size 만큼 확장하여 검색 범위 계산
|
||
radius = max(1, int(threshold / self.cell_size) + 1)
|
||
|
||
candidates = []
|
||
for dx in range(-radius, radius + 1):
|
||
for dy in range(-radius, radius + 1):
|
||
cell = self.grid.get((q_gx + dx, q_gy + dy))
|
||
if cell:
|
||
candidates.extend(cell)
|
||
|
||
# BBox 빠른 필터로 최종 확인
|
||
result = []
|
||
for node_id, min_x, min_y, max_x, max_y in candidates:
|
||
dx = max(0, min_x - query_max_x, query_min_x - max_x)
|
||
dy = max(0, min_y - query_max_y, query_min_y - max_y)
|
||
dist = (dx * dx + dy * dy) ** 0.5
|
||
if dist <= threshold:
|
||
result.append(node_id)
|
||
return result
|
||
|
||
|
||
class PidTopologyBuilder:
|
||
def __init__(self, geometric_data: List[Dict[str, Any]], all_extracted_tags: Optional[List[Dict[str, Any]]] = None, config: Optional[Dict[str, float]] = None):
|
||
"""
|
||
- geometric_data: Phase 1에서 추출된 기하학적 데이터 (List of dicts)
|
||
- all_extracted_tags: 통합된 태그 리스트
|
||
- config: {'dist_threshold': 50.0, 'tag_threshold': 100.0} 등 설정값
|
||
"""
|
||
self.data = geometric_data
|
||
self.all_tags = all_extracted_tags if all_extracted_tags else []
|
||
|
||
# 기본 설정값
|
||
default_config = {'dist_threshold': 50.0, 'tag_threshold': 100.0}
|
||
self.config = config if config else default_config
|
||
|
||
# 동적 스케일링 적용
|
||
self._apply_dynamic_scaling()
|
||
|
||
self.G = nx.DiGraph() # 방향성 그래프 생성
|
||
|
||
def _apply_dynamic_scaling(self):
|
||
"""도면의 전체 크기를 기반으로 임계값을 동적으로 조정"""
|
||
if not self.data:
|
||
return
|
||
|
||
# 모든 BBox를 포함하는 전체 영역 계산
|
||
all_min_x = min((item['bbox']['min_x'] for item in self.data if 'bbox' in item), default=0)
|
||
all_max_x = max((item['bbox']['max_x'] for item in self.data if 'bbox' in item), default=1000)
|
||
all_min_y = min((item['bbox']['min_y'] for item in self.data if 'bbox' in item), default=0)
|
||
all_max_y = max((item['bbox']['max_y'] for item in self.data if 'bbox' in item), default=1000)
|
||
|
||
drawing_width = all_max_x - all_min_x
|
||
drawing_height = all_max_y - all_min_y
|
||
diag = (drawing_width**2 + drawing_height**2)**0.5
|
||
|
||
if diag == 0: return
|
||
|
||
# 기준 대각선 길이 (예: 5000 units) 대비 현재 도면 크기 비율 계산
|
||
# 기준 도면 크기가 5000일 때 dist_threshold=50 (1%)
|
||
scale_factor = diag / 5000.0
|
||
|
||
# 임계값 업데이트 (최소/최대 범위 제한)
|
||
self.config['dist_threshold'] = max(5.0, min(200.0, 50.0 * scale_factor))
|
||
self.config['tag_threshold'] = max(20.0, min(500.0, 100.0 * scale_factor))
|
||
|
||
def _build_spatial_grid(self, node_ids) -> SpatialGrid:
|
||
"""설비 노드들로 SpatialGrid 인덱스 생성."""
|
||
grid = SpatialGrid(cell_size=self.config['dist_threshold'])
|
||
for nid in node_ids:
|
||
grid.add(nid, self.G.nodes[nid]['bbox'])
|
||
return grid
|
||
|
||
# 시그널 레이어 이름 집합 (ELECTRIC SIGNAL, INSTRUMENT signal선 등)
|
||
_SIGNAL_LAYERS = frozenset({'ELECTRIC SIGNAL', 'SIGNAL', 'ELEC', 'CABLE', 'WIRE'})
|
||
|
||
def _relation_for_layer(self, layer: str) -> str:
|
||
return 'signal' if (layer or '').upper() in {s.upper() for s in self._SIGNAL_LAYERS} else 'pipe'
|
||
|
||
def build_graph(self):
|
||
# 1. 모든 객체를 노드로 추가
|
||
for item in self.data:
|
||
bbox_vals = item['bbox']
|
||
# BoundingBox 모델의 필드명에 맞춰 추출 (min_x, min_y, max_x, max_y)
|
||
bbox_geom = box(bbox_vals['min_x'], bbox_vals['min_y'], bbox_vals['max_x'], bbox_vals['max_y'])
|
||
|
||
self.G.add_node(item['entity_id'],
|
||
type=item['entity_type'],
|
||
bbox=bbox_geom,
|
||
value=item.get('clean_value'),
|
||
layer=item.get('layer'))
|
||
|
||
# 2. 분산 추출된 태그 통합 및 노드 추가
|
||
for tag in self.all_tags:
|
||
bbox_vals = tag['bbox']
|
||
bbox_geom = box(bbox_vals['min_x'], bbox_vals['min_y'], bbox_vals['max_x'], bbox_vals['max_y'])
|
||
self.G.add_node(tag['entity_id'],
|
||
type='TEXT',
|
||
bbox=bbox_geom,
|
||
value=tag.get('clean_value') or tag.get('tagName'))
|
||
|
||
# 설비 노드 목록
|
||
equipments = [n for n, d in self.G.nodes(data=True) if d['type'] not in ['TEXT', 'LINE', 'LWPOLYLINE']]
|
||
|
||
# SpatialGrid 인덱스 생성 (O(n) 일회성)
|
||
eq_grid = self._build_spatial_grid(equipments)
|
||
|
||
# 3. 태그-설비 논리적 연결 (Association) — SpatialGrid 사용
|
||
tags = [n for n, d in self.G.nodes(data=True) if d['type'] == 'TEXT']
|
||
|
||
for tag in tags:
|
||
best_match = self._find_nearest_equipment(tag, eq_grid)
|
||
if best_match:
|
||
self.G.add_edge(tag, best_match, relation='associated_with')
|
||
|
||
# 4. 배관/시그널 기반 연결 — SpatialGrid 사용
|
||
lines = [n for n, d in self.G.nodes(data=True) if d['type'] in ['LINE', 'LWPOLYLINE']]
|
||
|
||
for line_id in lines:
|
||
original_item = next((item for item in self.data if item['entity_id'] == line_id), None)
|
||
if not original_item or not original_item.get('coordinates'):
|
||
continue
|
||
|
||
coords = original_item['coordinates']
|
||
line_geom = LineString(coords)
|
||
line_bbox = line_geom.bounds
|
||
layer = original_item.get('layer', '')
|
||
relation = self._relation_for_layer(layer)
|
||
|
||
# SpatialGrid로 후보 집합 조회 (O(1) 그리드 셀 기반)
|
||
nearby_equipment_ids = eq_grid.query(
|
||
line_bbox[0], line_bbox[1], line_bbox[2], line_bbox[3],
|
||
self.config['dist_threshold']
|
||
)
|
||
|
||
# 개선: 끝점뿐만 아니라 라인 전체가 설비 BBox와 교차하거나 매우 가까운지 확인
|
||
connected_nodes = []
|
||
for eq_id in nearby_equipment_ids:
|
||
eq_bbox = self.G.nodes[eq_id]['bbox']
|
||
# 1. 라인이 BBox와 교차하는지 확인 (관통 포함)
|
||
if line_geom.intersects(eq_bbox):
|
||
connected_nodes.append(eq_id)
|
||
# 2. 교차하지 않더라도 임계값 이내에 있는지 확인 (근접 연결)
|
||
elif line_geom.distance(eq_bbox) < self.config['dist_threshold']:
|
||
connected_nodes.append(eq_id)
|
||
|
||
# 중복 제거
|
||
connected_nodes = list(set(connected_nodes))
|
||
|
||
if len(connected_nodes) >= 2:
|
||
node0_bbox = self.G.nodes[connected_nodes[0]]['bbox']
|
||
node1_bbox = self.G.nodes[connected_nodes[1]]['bbox']
|
||
|
||
center0 = ((node0_bbox.bounds[0] + node0_bbox.bounds[2])/2, (node0_bbox.bounds[1] + node0_bbox.bounds[3])/2)
|
||
center1 = ((node1_bbox.bounds[0] + node1_bbox.bounds[2])/2, (node1_bbox.bounds[1] + node1_bbox.bounds[3])/2)
|
||
|
||
if abs(center1[0] - center0[0]) > abs(center1[1] - center0[1]):
|
||
if center0[0] < center1[0]:
|
||
self.G.add_edge(connected_nodes[0], connected_nodes[1], relation=relation, flow_direction='forward')
|
||
else:
|
||
self.G.add_edge(connected_nodes[1], connected_nodes[0], relation=relation, flow_direction='forward')
|
||
else:
|
||
if center0[1] > center1[1]:
|
||
self.G.add_edge(connected_nodes[0], connected_nodes[1], relation=relation, flow_direction='forward')
|
||
else:
|
||
self.G.add_edge(connected_nodes[1], connected_nodes[0], relation=relation, flow_direction='forward')
|
||
elif len(connected_nodes) == 1:
|
||
# 단일 연결의 경우, 일단 엣지를 생성하되 방향은 미정(undirected-like)으로 처리하거나
|
||
# 추후 전파 로직에서 결정하도록 함
|
||
pass
|
||
|
||
def _find_nearest_equipment(self, tag_id, eq_grid):
|
||
"""
|
||
단순 거리 기반 매핑에서 위상 기반 가중치 매핑으로 개선.
|
||
가중치 = 거리 점수 + 연결성 점수
|
||
SpatialGrid로 후보 집합 조회 후 shapely 거리 계산.
|
||
|
||
Args:
|
||
tag_id: 태그 노드 ID
|
||
eq_grid: 설비 노드의 SpatialGrid 인덱스
|
||
"""
|
||
tag_bbox = self.G.nodes[tag_id]['bbox']
|
||
best_score = float('inf')
|
||
nearest = None
|
||
|
||
# 태그 노드와 연결된 배관(LINE/LWPOLYLINE) 확인
|
||
connected_pipes = [n for n in self.G.neighbors(tag_id) if self.G.nodes[n]['type'] in ['LINE', 'LWPOLYLINE']]
|
||
|
||
# SpatialGrid로 후보 집합 조회 (O(1) 그리드 셀 기반)
|
||
tb = tag_bbox.bounds
|
||
nearby_equipment_ids = eq_grid.query(
|
||
tb[0], tb[1], tb[2], tb[3],
|
||
self.config['tag_threshold']
|
||
)
|
||
|
||
for eq_id in nearby_equipment_ids:
|
||
eq_bbox = self.G.nodes[eq_id]['bbox']
|
||
dist = tag_bbox.distance(eq_bbox)
|
||
|
||
if dist > self.config['tag_threshold']:
|
||
continue
|
||
|
||
# 1. 거리 점수 (낮을수록 좋음)
|
||
score = dist
|
||
|
||
# 2. 연결성 가중치 (태그와 설비가 동일한 배관에 연결되어 있다면 점수 대폭 감점 = 우선순위 상승)
|
||
# 태그가 직접 배관에 연결되어 있지는 않지만, 태그 근처의 배관이 설비에 연결되어 있는지 확인
|
||
for pipe_id in connected_pipes:
|
||
if self.G.has_edge(pipe_id, eq_id) or self.G.has_edge(eq_id, pipe_id):
|
||
score -= self.config['tag_threshold'] * 0.5 # 연결성 보너스
|
||
|
||
if score < best_score:
|
||
best_score = score
|
||
nearest = eq_id
|
||
|
||
return nearest
|
||
|
||
def validate_topology(self):
|
||
"""위상 무결성 검증"""
|
||
isolated = list(nx.isolates(self.G))
|
||
return {
|
||
"isolated_nodes": isolated,
|
||
"node_count": self.G.number_of_nodes(),
|
||
"edge_count": self.G.number_of_edges()
|
||
}
|
||
|
||
def save_graph(self, output_path: str):
|
||
"""그래프 구조를 JSON 형태로 저장"""
|
||
from networkx.readwrite import json_graph
|
||
data = json_graph.node_link_data(self.G)
|
||
|
||
# shapely geometry 객체는 JSON 직렬화가 안 되므로 변환
|
||
for node in data['nodes']:
|
||
if 'bbox' in node:
|
||
bbox = node['bbox']
|
||
node['bbox'] = {
|
||
'min_x': bbox.bounds[0],
|
||
'min_y': bbox.bounds[1],
|
||
'max_x': bbox.bounds[2],
|
||
'max_y': bbox.bounds[3]
|
||
}
|
||
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||
return output_path
|
||
|
||
def analyze_impact(graph, start_node):
|
||
"""특정 설비 장애 시 하류(Downstream)에 영향을 받는 모든 노드 추출"""
|
||
if start_node not in graph:
|
||
return []
|
||
# BFS를 통해 도달 가능한 모든 노드 탐색
|
||
impacted_nodes = nx.descendants(graph, start_node)
|
||
return list(impacted_nodes)
|