Files
windpacer 16fc7a2598 Initial commit: HC900 Crawler
Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL.
기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체.

- industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버)
- src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer)
- mcp-server: Python FastMCP (RAG/NL2SQL/P&ID)
- 다중 컨트롤러(N-Controller) 지원

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:28:14 +09:00

297 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import networkx as nx
from shapely.geometry import box, Point, LineString
import json
from typing import List, Dict, Any, Optional, Tuple
from collections import defaultdict
class SpatialGrid:
"""
그리드 기반 공간 인덱스.
셀 크기 = threshold. 각 엔티티를 중심 좌표로 그리드 셀에 할당.
쿼리 시 인접 셀(3×3)만 검색.
기존 O(n) 순회 → O(1) 그리드 셀 조회.
"""
def __init__(self, cell_size: float):
self.cell_size = cell_size
self.grid = defaultdict(list) # (gx, gy) -> [(node_id, bbox_dict), ...]
def add(self, node_id, bbox):
"""bbox를 그리드에 등록. bbox는 {min_x, min_y, max_x, max_y} 또는 shapely geometry."""
if hasattr(bbox, 'bounds'):
min_x, min_y, max_x, max_y = bbox.bounds
else:
min_x = bbox['min_x']
min_y = bbox['min_y']
max_x = bbox['max_x']
max_y = bbox['max_y']
center_x = (min_x + max_x) / 2
center_y = (min_y + max_y) / 2
gx = int(center_x / self.cell_size)
gy = int(center_y / self.cell_size)
self.grid[(gx, gy)].append((node_id, min_x, min_y, max_x, max_y))
def query(self, query_min_x, query_min_y, query_max_x, query_max_y, threshold) -> list:
"""
query BBox와 threshold 이내인 모든 노드를 반환.
인접 셀(3×3)만 확인.
"""
query_center_x = (query_min_x + query_max_x) / 2
query_center_y = (query_min_y + query_max_y) / 2
q_gx = int(query_center_x / self.cell_size)
q_gy = int(query_center_y / self.cell_size)
# threshold / cell_size 만큼 확장하여 검색 범위 계산
radius = max(1, int(threshold / self.cell_size) + 1)
candidates = []
for dx in range(-radius, radius + 1):
for dy in range(-radius, radius + 1):
cell = self.grid.get((q_gx + dx, q_gy + dy))
if cell:
candidates.extend(cell)
# BBox 빠른 필터로 최종 확인
result = []
for node_id, min_x, min_y, max_x, max_y in candidates:
dx = max(0, min_x - query_max_x, query_min_x - max_x)
dy = max(0, min_y - query_max_y, query_min_y - max_y)
dist = (dx * dx + dy * dy) ** 0.5
if dist <= threshold:
result.append(node_id)
return result
class PidTopologyBuilder:
def __init__(self, geometric_data: List[Dict[str, Any]], all_extracted_tags: Optional[List[Dict[str, Any]]] = None, config: Optional[Dict[str, float]] = None):
"""
- geometric_data: Phase 1에서 추출된 기하학적 데이터 (List of dicts)
- all_extracted_tags: 통합된 태그 리스트
- config: {'dist_threshold': 50.0, 'tag_threshold': 100.0} 등 설정값
"""
self.data = geometric_data
self.all_tags = all_extracted_tags if all_extracted_tags else []
# 기본 설정값
default_config = {'dist_threshold': 50.0, 'tag_threshold': 100.0}
self.config = config if config else default_config
# 동적 스케일링 적용
self._apply_dynamic_scaling()
self.G = nx.DiGraph() # 방향성 그래프 생성
def _apply_dynamic_scaling(self):
"""도면의 전체 크기를 기반으로 임계값을 동적으로 조정"""
if not self.data:
return
# 모든 BBox를 포함하는 전체 영역 계산
all_min_x = min((item['bbox']['min_x'] for item in self.data if 'bbox' in item), default=0)
all_max_x = max((item['bbox']['max_x'] for item in self.data if 'bbox' in item), default=1000)
all_min_y = min((item['bbox']['min_y'] for item in self.data if 'bbox' in item), default=0)
all_max_y = max((item['bbox']['max_y'] for item in self.data if 'bbox' in item), default=1000)
drawing_width = all_max_x - all_min_x
drawing_height = all_max_y - all_min_y
diag = (drawing_width**2 + drawing_height**2)**0.5
if diag == 0: return
# 기준 대각선 길이 (예: 5000 units) 대비 현재 도면 크기 비율 계산
# 기준 도면 크기가 5000일 때 dist_threshold=50 (1%)
scale_factor = diag / 5000.0
# 임계값 업데이트 (최소/최대 범위 제한)
self.config['dist_threshold'] = max(5.0, min(200.0, 50.0 * scale_factor))
self.config['tag_threshold'] = max(20.0, min(500.0, 100.0 * scale_factor))
def _build_spatial_grid(self, node_ids) -> SpatialGrid:
"""설비 노드들로 SpatialGrid 인덱스 생성."""
grid = SpatialGrid(cell_size=self.config['dist_threshold'])
for nid in node_ids:
grid.add(nid, self.G.nodes[nid]['bbox'])
return grid
# 시그널 레이어 이름 집합 (ELECTRIC SIGNAL, INSTRUMENT signal선 등)
_SIGNAL_LAYERS = frozenset({'ELECTRIC SIGNAL', 'SIGNAL', 'ELEC', 'CABLE', 'WIRE'})
def _relation_for_layer(self, layer: str) -> str:
return 'signal' if (layer or '').upper() in {s.upper() for s in self._SIGNAL_LAYERS} else 'pipe'
def build_graph(self):
# 1. 모든 객체를 노드로 추가
for item in self.data:
bbox_vals = item['bbox']
# BoundingBox 모델의 필드명에 맞춰 추출 (min_x, min_y, max_x, max_y)
bbox_geom = box(bbox_vals['min_x'], bbox_vals['min_y'], bbox_vals['max_x'], bbox_vals['max_y'])
self.G.add_node(item['entity_id'],
type=item['entity_type'],
bbox=bbox_geom,
value=item.get('clean_value'),
layer=item.get('layer'))
# 2. 분산 추출된 태그 통합 및 노드 추가
for tag in self.all_tags:
bbox_vals = tag['bbox']
bbox_geom = box(bbox_vals['min_x'], bbox_vals['min_y'], bbox_vals['max_x'], bbox_vals['max_y'])
self.G.add_node(tag['entity_id'],
type='TEXT',
bbox=bbox_geom,
value=tag.get('clean_value') or tag.get('tagName'))
# 설비 노드 목록
equipments = [n for n, d in self.G.nodes(data=True) if d['type'] not in ['TEXT', 'LINE', 'LWPOLYLINE']]
# SpatialGrid 인덱스 생성 (O(n) 일회성)
eq_grid = self._build_spatial_grid(equipments)
# 3. 태그-설비 논리적 연결 (Association) — SpatialGrid 사용
tags = [n for n, d in self.G.nodes(data=True) if d['type'] == 'TEXT']
for tag in tags:
best_match = self._find_nearest_equipment(tag, eq_grid)
if best_match:
self.G.add_edge(tag, best_match, relation='associated_with')
# 4. 배관/시그널 기반 연결 — SpatialGrid 사용
lines = [n for n, d in self.G.nodes(data=True) if d['type'] in ['LINE', 'LWPOLYLINE']]
for line_id in lines:
original_item = next((item for item in self.data if item['entity_id'] == line_id), None)
if not original_item or not original_item.get('coordinates'):
continue
coords = original_item['coordinates']
line_geom = LineString(coords)
line_bbox = line_geom.bounds
layer = original_item.get('layer', '')
relation = self._relation_for_layer(layer)
# SpatialGrid로 후보 집합 조회 (O(1) 그리드 셀 기반)
nearby_equipment_ids = eq_grid.query(
line_bbox[0], line_bbox[1], line_bbox[2], line_bbox[3],
self.config['dist_threshold']
)
# 개선: 끝점뿐만 아니라 라인 전체가 설비 BBox와 교차하거나 매우 가까운지 확인
connected_nodes = []
for eq_id in nearby_equipment_ids:
eq_bbox = self.G.nodes[eq_id]['bbox']
# 1. 라인이 BBox와 교차하는지 확인 (관통 포함)
if line_geom.intersects(eq_bbox):
connected_nodes.append(eq_id)
# 2. 교차하지 않더라도 임계값 이내에 있는지 확인 (근접 연결)
elif line_geom.distance(eq_bbox) < self.config['dist_threshold']:
connected_nodes.append(eq_id)
# 중복 제거
connected_nodes = list(set(connected_nodes))
if len(connected_nodes) >= 2:
node0_bbox = self.G.nodes[connected_nodes[0]]['bbox']
node1_bbox = self.G.nodes[connected_nodes[1]]['bbox']
center0 = ((node0_bbox.bounds[0] + node0_bbox.bounds[2])/2, (node0_bbox.bounds[1] + node0_bbox.bounds[3])/2)
center1 = ((node1_bbox.bounds[0] + node1_bbox.bounds[2])/2, (node1_bbox.bounds[1] + node1_bbox.bounds[3])/2)
if abs(center1[0] - center0[0]) > abs(center1[1] - center0[1]):
if center0[0] < center1[0]:
self.G.add_edge(connected_nodes[0], connected_nodes[1], relation=relation, flow_direction='forward')
else:
self.G.add_edge(connected_nodes[1], connected_nodes[0], relation=relation, flow_direction='forward')
else:
if center0[1] > center1[1]:
self.G.add_edge(connected_nodes[0], connected_nodes[1], relation=relation, flow_direction='forward')
else:
self.G.add_edge(connected_nodes[1], connected_nodes[0], relation=relation, flow_direction='forward')
elif len(connected_nodes) == 1:
# 단일 연결의 경우, 일단 엣지를 생성하되 방향은 미정(undirected-like)으로 처리하거나
# 추후 전파 로직에서 결정하도록 함
pass
def _find_nearest_equipment(self, tag_id, eq_grid):
"""
단순 거리 기반 매핑에서 위상 기반 가중치 매핑으로 개선.
가중치 = 거리 점수 + 연결성 점수
SpatialGrid로 후보 집합 조회 후 shapely 거리 계산.
Args:
tag_id: 태그 노드 ID
eq_grid: 설비 노드의 SpatialGrid 인덱스
"""
tag_bbox = self.G.nodes[tag_id]['bbox']
best_score = float('inf')
nearest = None
# 태그 노드와 연결된 배관(LINE/LWPOLYLINE) 확인
connected_pipes = [n for n in self.G.neighbors(tag_id) if self.G.nodes[n]['type'] in ['LINE', 'LWPOLYLINE']]
# SpatialGrid로 후보 집합 조회 (O(1) 그리드 셀 기반)
tb = tag_bbox.bounds
nearby_equipment_ids = eq_grid.query(
tb[0], tb[1], tb[2], tb[3],
self.config['tag_threshold']
)
for eq_id in nearby_equipment_ids:
eq_bbox = self.G.nodes[eq_id]['bbox']
dist = tag_bbox.distance(eq_bbox)
if dist > self.config['tag_threshold']:
continue
# 1. 거리 점수 (낮을수록 좋음)
score = dist
# 2. 연결성 가중치 (태그와 설비가 동일한 배관에 연결되어 있다면 점수 대폭 감점 = 우선순위 상승)
# 태그가 직접 배관에 연결되어 있지는 않지만, 태그 근처의 배관이 설비에 연결되어 있는지 확인
for pipe_id in connected_pipes:
if self.G.has_edge(pipe_id, eq_id) or self.G.has_edge(eq_id, pipe_id):
score -= self.config['tag_threshold'] * 0.5 # 연결성 보너스
if score < best_score:
best_score = score
nearest = eq_id
return nearest
def validate_topology(self):
"""위상 무결성 검증"""
isolated = list(nx.isolates(self.G))
return {
"isolated_nodes": isolated,
"node_count": self.G.number_of_nodes(),
"edge_count": self.G.number_of_edges()
}
def save_graph(self, output_path: str):
"""그래프 구조를 JSON 형태로 저장"""
from networkx.readwrite import json_graph
data = json_graph.node_link_data(self.G)
# shapely geometry 객체는 JSON 직렬화가 안 되므로 변환
for node in data['nodes']:
if 'bbox' in node:
bbox = node['bbox']
node['bbox'] = {
'min_x': bbox.bounds[0],
'min_y': bbox.bounds[1],
'max_x': bbox.bounds[2],
'max_y': bbox.bounds[3]
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return output_path
def analyze_impact(graph, start_node):
"""특정 설비 장애 시 하류(Downstream)에 영향을 받는 모든 노드 추출"""
if start_node not in graph:
return []
# BFS를 통해 도달 가능한 모든 노드 탐색
impacted_nodes = nx.descendants(graph, start_node)
return list(impacted_nodes)