Files
windpacer 16fc7a2598 Initial commit: HC900 Crawler
Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL.
기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체.

- industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버)
- src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer)
- mcp-server: Python FastMCP (RAG/NL2SQL/P&ID)
- 다중 컨트롤러(N-Controller) 지원

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:28:14 +09:00

464 lines
17 KiB
Python

import ezdxf
import re
import json
import logging
from typing import List, Optional, Tuple, Union
from pydantic import BaseModel, Field
from shapely.geometry import box, Point
# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Data Models ---
class BoundingBox(BaseModel):
min_x: float
min_y: float
max_x: float
max_y: float
center: Tuple[float, float]
class GeometricEntity(BaseModel):
entity_id: str
entity_type: str # TEXT, MTEXT, LINE, LWPOLYLINE, CIRCLE, ARC
layer: str
bbox: BoundingBox
raw_value: Optional[str] = None
clean_value: Optional[str] = None
coordinates: List[Union[Tuple[float, float], List[float]]] = Field(default_factory=list)
properties: dict = Field(default_factory=dict)
class DrawingRegion(BaseModel):
"""도면 분할 영역"""
drawing_no: int
x_min: float
x_max: float
y_min: float
y_max: float
entity_count: int = 0
# --- Extractor Implementation ---
class PidGeometricExtractor:
def __init__(self, file_path: str):
try:
self.doc = ezdxf.readfile(file_path)
self.msp = self.doc.modelspace()
except Exception as e:
raise IOError(f"Failed to load DXF file: {e}")
def clean_text(self, text: str) -> str:
"""
DXF 특수 제어 문자 및 MTEXT 포맷팅을 제거하여 정제된 텍스트 반환.
"""
if not text:
return ""
# 1. MTEXT 포맷팅 및 제어 문자 제거 (\P, \W, \L, \A, \C, \H, \S, \T 등)
text = re.sub(r'\\([P|W|L|A|C|H|S|T])\d*;?', ' ', text)
# 2. 중괄호 { } 제거
text = re.sub(r'[\{\}]', ' ', text)
# 3. DXF 특수 제어 문자 제거 (%%U: Underline, %%O: Overline, %%S: Strikethrough, %%R: Registered)
text = re.sub(r'%%[U|O|S|R]', ' ', text)
# 4. 불필요한 특수 기호 및 반복되는 공백 정제
text = re.sub(r'\s+', ' ', text).strip()
return text
def get_bbox(self, entity) -> Optional[BoundingBox]:
"""
엔티티 타입별로 동적인 Bounding Box를 계산하여 반환.
"""
try:
if entity.dxftype() == 'TEXT':
p = entity.dxf.insert
h = entity.dxf.height
# 텍스트 길이에 따른 대략적인 너비 계산 (글자수 * 높이 * 0.6)
width = len(entity.dxf.text) * h * 0.6
return self._create_bbox(p.x, p.y, p.x + width, p.y + h)
elif entity.dxftype() == 'MTEXT':
p = entity.dxf.insert
h = entity.dxf.char_height if hasattr(entity.dxf, 'char_height') else 2.5
w = entity.dxf.width if entity.dxf.width > 0 else len(entity.text) * h * 0.6
return self._create_bbox(p.x, p.y, p.x + w, p.y + h)
elif entity.dxftype() == 'LINE':
start = entity.dxf.start
end = entity.dxf.end
return self._create_bbox(
min(start.x, end.x), min(start.y, end.y),
max(start.x, end.x), max(start.y, end.y)
)
elif entity.dxftype() == 'LWPOLYLINE':
points = entity.get_points()
if not points: return None
xs = [p[0] for p in points]
ys = [p[1] for p in points]
return self._create_bbox(min(xs), min(ys), max(xs), max(ys))
elif entity.dxftype() in ('CIRCLE', 'ARC'):
center = entity.dxf.center
radius = entity.dxf.radius
return self._create_bbox(
center.x - radius, center.y - radius,
center.x + radius, center.y + radius
)
except Exception as e:
logger.error(f"Error calculating bbox for {entity.dxftype()} ({entity.dxf.handle}): {e}", exc_info=True)
return None
def _create_bbox(self, min_x, min_y, max_x, max_y) -> BoundingBox:
return BoundingBox(
min_x=min_x,
min_y=min_y,
max_x=max_x,
max_y=max_y,
center=((min_x + max_x) / 2, (min_y + max_y) / 2)
)
def extract_and_save(self, output_path: str):
"""
기하학적 데이터를 추출하여 JSON 파일로 저장.
"""
results = []
logger.info(f"Starting DXF extraction from {self.doc.filename if hasattr(self.doc, 'filename') else 'unknown file'}")
for entity in self.msp:
try:
bbox_obj = self.get_bbox(entity)
if not bbox_obj:
continue
raw_text = ""
if entity.dxftype() == 'TEXT':
raw_text = entity.dxf.text
elif entity.dxftype() == 'MTEXT':
raw_text = entity.text
# 좌표 추출 (3D 좌표를 2D로 변환)
coords = []
if hasattr(entity, 'get_points'):
# ezdxf의 get_points()는 (x, y, z) 튜플 리스트를 반환함
coords = [(p[0], p[1]) for p in entity.get_points()]
elif entity.dxftype() == 'LINE':
coords = [(entity.dxf.start.x, entity.dxf.start.y), (entity.dxf.end.x, entity.dxf.end.y)]
elif entity.dxftype() in ('CIRCLE', 'ARC'):
coords = [(entity.dxf.center.x, entity.dxf.center.y)]
entity_data = GeometricEntity(
entity_id=entity.dxf.handle,
entity_type=entity.dxftype(),
layer=entity.dxf.layer,
bbox=bbox_obj,
raw_value=raw_text if raw_text else None,
clean_value=self.clean_text(raw_text) if raw_text else None,
coordinates=coords,
properties={
"color": entity.dxf.color,
"lineweight": entity.dxf.lineweight if hasattr(entity.dxf, 'lineweight') else None,
}
)
results.append(entity_data.model_dump())
except Exception as e:
logger.error(f"Unexpected error processing entity {entity.dxftype()} ({entity.dxf.handle}): {e}")
continue
try:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=4)
logger.info(f"Successfully saved {len(results)} entities to {output_path}")
except Exception as e:
logger.error(f"Failed to save extraction results to {output_path}: {e}")
raise
return output_path
def split_drawings(
self,
bucket_size: float = 200.0,
threshold_ratio: float = 0.15,
min_sparse_width: float = None
) -> List[DrawingRegion]:
"""
X/Y 밀도 기반 sparse region 감지로 도면 영역 분할.
Returns:
DrawingRegion 목록 (엔티티가 있는 영역만)
"""
# 1. 중심 좌표 수집
centers = []
for entity in self.msp:
try:
if hasattr(entity.dxf, 'insert'):
centers.append((entity.dxf.insert.x, entity.dxf.insert.y))
elif hasattr(entity.dxf, 'start'):
cx = (entity.dxf.start.x + entity.dxf.end.x) / 2
cy = (entity.dxf.start.y + entity.dxf.end.y) / 2
centers.append((cx, cy))
elif hasattr(entity.dxf, 'center'):
centers.append((entity.dxf.center.x, entity.dxf.center.y))
except Exception:
pass
if not centers:
logger.warning("중심 좌표를 수집할 수 없습니다. 전체를 단일 영역으로 반환.")
return [DrawingRegion(drawing_no=1, x_min=0, x_max=1, y_min=0, y_max=1)]
xs = [c[0] for c in centers]
ys = [c[1] for c in centers]
x_range = (min(xs), max(xs))
y_range = (min(ys), max(ys))
# 2. 밀도 히스토그램 계산
x_buckets = self._compute_density_histogram(centers, 'x', bucket_size)
y_buckets = self._compute_density_histogram(centers, 'y', bucket_size)
# 3. sparse region 감지 (밀도 기반 + gap 기반)
if min_sparse_width is None:
min_sparse_width = bucket_size * 1.5
x_sparse = sorted(set(
self._find_sparse_regions(x_buckets, bucket_size, threshold_ratio, min_sparse_width)
+ self._find_gaps_in_buckets(x_buckets, bucket_size)
))
y_sparse = sorted(set(
self._find_sparse_regions(y_buckets, bucket_size, threshold_ratio, min_sparse_width)
+ self._find_gaps_in_buckets(y_buckets, bucket_size)
))
# 4. 도면 영역 계산
regions = self._compute_drawing_regions(
centers, x_sparse, y_sparse, x_range, y_range
)
logger.info(f"도면 분할 완료: {len(regions)}개 영역 감지")
for r in regions:
logger.info(f" 도면 #{r.drawing_no}: X={r.x_min:.0f}~{r.x_max:.0f}, Y={r.y_min:.0f}~{r.y_max:.0f}, 엔티티={r.entity_count}")
return regions
def extract_region(self, region: DrawingRegion) -> List[dict]:
"""
특정 도면 영역 내 엔티티만 추출.
Args:
region: 추출할 도면 영역
Returns:
GeometricEntity 딕셔너리 목록
"""
results = []
region_box = box(region.x_min, region.y_min, region.x_max, region.y_max)
for entity in self.msp:
try:
bbox_obj = self.get_bbox(entity)
if not bbox_obj:
continue
entity_box = box(bbox_obj.min_x, bbox_obj.min_y, bbox_obj.max_x, bbox_obj.max_y)
# 중심점이 region 내에 있는지 확인
if not region_box.contains(Point(bbox_obj.center)):
continue
raw_text = ""
if entity.dxftype() == 'TEXT':
raw_text = entity.dxf.text
elif entity.dxftype() == 'MTEXT':
raw_text = entity.text
coords = []
if hasattr(entity, 'get_points'):
coords = [(p[0], p[1]) for p in entity.get_points()]
elif entity.dxftype() == 'LINE':
coords = [(entity.dxf.start.x, entity.dxf.start.y), (entity.dxf.end.x, entity.dxf.end.y)]
elif entity.dxftype() in ('CIRCLE', 'ARC'):
coords = [(entity.dxf.center.x, entity.dxf.center.y)]
entity_data = GeometricEntity(
entity_id=entity.dxf.handle,
entity_type=entity.dxftype(),
layer=entity.dxf.layer,
bbox=bbox_obj,
raw_value=raw_text if raw_text else None,
clean_value=self.clean_text(raw_text) if raw_text else None,
coordinates=coords,
properties={
"color": entity.dxf.color,
"lineweight": entity.dxf.lineweight if hasattr(entity.dxf, 'lineweight') else None,
}
)
results.append(entity_data.model_dump())
except Exception as e:
logger.error(f"Region extraction error for {entity.dxftype()} ({entity.dxf.handle}): {e}")
continue
logger.info(f"도면 #{region.drawing_no} 추출 완료: {len(results)}개 엔티티")
return results
# --- split_drawings / extract_region용 내부 헬퍼 ---
def _compute_density_histogram(
self,
centers: List[Tuple[float, float]],
axis: str,
bucket_size: float
) -> dict:
if axis == 'x':
coords = [c[0] for c in centers]
else:
coords = [c[1] for c in centers]
if not coords:
return {}
buckets = {}
for coord in coords:
bucket = int(coord / bucket_size) * bucket_size
buckets[bucket] = buckets.get(bucket, 0) + 1
return dict(sorted(buckets.items()))
def _find_sparse_regions(
self,
buckets: dict,
bucket_size: float,
threshold_ratio: float = 0.15,
min_sparse_width: float = None
) -> List[Tuple[float, float]]:
if not buckets:
return []
if min_sparse_width is None:
min_sparse_width = bucket_size * 1.5
counts = list(buckets.values())
avg_count = sum(counts) / len(counts)
threshold = avg_count * threshold_ratio
sorted_keys = sorted(buckets.keys())
sparse_regions = []
in_sparse = False
sparse_start = 0
for key in sorted_keys:
is_sparse = buckets[key] < threshold
if is_sparse and not in_sparse:
sparse_start = key
in_sparse = True
elif not is_sparse and in_sparse:
sparse_end = key
if (sparse_end - sparse_start) >= min_sparse_width:
sparse_regions.append((sparse_start, sparse_end))
in_sparse = False
if in_sparse and len(sorted_keys) > 0:
sparse_end = sorted_keys[-1] + bucket_size
if (sparse_end - sparse_start) >= min_sparse_width:
sparse_regions.append((sparse_start, sparse_end))
return sparse_regions
def _find_gaps_in_buckets(
self,
buckets: dict,
bucket_size: float,
min_gap_buckets: int = 1
) -> List[Tuple[float, float]]:
if not buckets:
return []
sorted_keys = sorted(buckets.keys())
gaps = []
for i in range(len(sorted_keys) - 1):
current = sorted_keys[i]
next_key = sorted_keys[i + 1]
gap_size = next_key - current
if gap_size > bucket_size * (min_gap_buckets + 1):
gaps.append((current, next_key))
return gaps
def _compute_drawing_regions(
self,
centers: List[Tuple[float, float]],
x_sparse: List[Tuple[float, float]],
y_sparse: List[Tuple[float, float]],
x_range: Tuple[float, float],
y_range: Tuple[float, float]
) -> List[DrawingRegion]:
# X 축 분할점 생성
x_boundaries = [x_range[0]]
for start, end in x_sparse:
mid = (start + end) / 2
if mid not in x_boundaries:
x_boundaries.append(mid)
x_boundaries.append(x_range[1])
x_boundaries = sorted(set(x_boundaries))
# Y 축 분할점 생성
y_boundaries = [y_range[0]]
for start, end in y_sparse:
mid = (start + end) / 2
if mid not in y_boundaries:
y_boundaries.append(mid)
y_boundaries.append(y_range[1])
y_boundaries = sorted(set(y_boundaries))
# 2D 영역 생성
regions = []
region_no = 1
for i in range(len(x_boundaries) - 1):
for j in range(len(y_boundaries) - 1):
x_min = x_boundaries[i]
x_max = x_boundaries[i + 1]
y_min = y_boundaries[j]
y_max = y_boundaries[j + 1]
count = sum(
1 for cx, cy in centers
if x_min <= cx < x_max and y_min <= cy < y_max
)
if count > 0:
regions.append(DrawingRegion(
drawing_no=region_no,
x_min=x_min,
x_max=x_max,
y_min=y_min,
y_max=y_max,
entity_count=count
))
region_no += 1
return regions
# --- Proximity Utilities ---
def is_near(bbox_a: BoundingBox, bbox_b: BoundingBox, threshold=5.0) -> bool:
"""
두 Bounding Box 간의 최단 거리가 임계값 이내인지 확인.
shapely 없이 BBox 좌표만으로 O(1) 계산.
"""
dx = max(0, bbox_b.min_x - bbox_a.max_x, bbox_a.min_x - bbox_b.max_x)
dy = max(0, bbox_b.min_y - bbox_a.max_y, bbox_a.min_y - bbox_b.max_y)
dist = (dx * dx + dy * dy) ** 0.5
return dist <= threshold
def is_inside(point: Tuple[float, float], bbox: BoundingBox) -> bool:
"""
특정 점이 Bounding Box 내부에 있는지 확인.
"""
return (bbox.min_x <= point[0] <= bbox.max_x) and (bbox.min_y <= point[1] <= bbox.max_y)