464 lines
17 KiB
Python
464 lines
17 KiB
Python
import ezdxf
|
|
import re
|
|
import json
|
|
import logging
|
|
from typing import List, Optional, Tuple, Union
|
|
from pydantic import BaseModel, Field
|
|
from shapely.geometry import box, Point
|
|
|
|
# 로깅 설정
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Data Models ---
|
|
|
|
class BoundingBox(BaseModel):
|
|
min_x: float
|
|
min_y: float
|
|
max_x: float
|
|
max_y: float
|
|
center: Tuple[float, float]
|
|
|
|
class GeometricEntity(BaseModel):
|
|
entity_id: str
|
|
entity_type: str # TEXT, MTEXT, LINE, LWPOLYLINE, CIRCLE, ARC
|
|
layer: str
|
|
bbox: BoundingBox
|
|
raw_value: Optional[str] = None
|
|
clean_value: Optional[str] = None
|
|
coordinates: List[Union[Tuple[float, float], List[float]]] = Field(default_factory=list)
|
|
properties: dict = Field(default_factory=dict)
|
|
|
|
class DrawingRegion(BaseModel):
|
|
"""도면 분할 영역"""
|
|
drawing_no: int
|
|
x_min: float
|
|
x_max: float
|
|
y_min: float
|
|
y_max: float
|
|
entity_count: int = 0
|
|
|
|
# --- Extractor Implementation ---
|
|
|
|
class PidGeometricExtractor:
|
|
def __init__(self, file_path: str):
|
|
try:
|
|
self.doc = ezdxf.readfile(file_path)
|
|
self.msp = self.doc.modelspace()
|
|
except Exception as e:
|
|
raise IOError(f"Failed to load DXF file: {e}")
|
|
|
|
def clean_text(self, text: str) -> str:
|
|
"""
|
|
DXF 특수 제어 문자 및 MTEXT 포맷팅을 제거하여 정제된 텍스트 반환.
|
|
"""
|
|
if not text:
|
|
return ""
|
|
|
|
# 1. MTEXT 포맷팅 및 제어 문자 제거 (\P, \W, \L, \A, \C, \H, \S, \T 등)
|
|
text = re.sub(r'\\([P|W|L|A|C|H|S|T])\d*;?', ' ', text)
|
|
|
|
# 2. 중괄호 { } 제거
|
|
text = re.sub(r'[\{\}]', ' ', text)
|
|
|
|
# 3. DXF 특수 제어 문자 제거 (%%U: Underline, %%O: Overline, %%S: Strikethrough, %%R: Registered)
|
|
text = re.sub(r'%%[U|O|S|R]', ' ', text)
|
|
|
|
# 4. 불필요한 특수 기호 및 반복되는 공백 정제
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|
|
|
return text
|
|
|
|
def get_bbox(self, entity) -> Optional[BoundingBox]:
|
|
"""
|
|
엔티티 타입별로 동적인 Bounding Box를 계산하여 반환.
|
|
"""
|
|
try:
|
|
if entity.dxftype() == 'TEXT':
|
|
p = entity.dxf.insert
|
|
h = entity.dxf.height
|
|
# 텍스트 길이에 따른 대략적인 너비 계산 (글자수 * 높이 * 0.6)
|
|
width = len(entity.dxf.text) * h * 0.6
|
|
return self._create_bbox(p.x, p.y, p.x + width, p.y + h)
|
|
|
|
elif entity.dxftype() == 'MTEXT':
|
|
p = entity.dxf.insert
|
|
h = entity.dxf.char_height if hasattr(entity.dxf, 'char_height') else 2.5
|
|
w = entity.dxf.width if entity.dxf.width > 0 else len(entity.text) * h * 0.6
|
|
return self._create_bbox(p.x, p.y, p.x + w, p.y + h)
|
|
|
|
elif entity.dxftype() == 'LINE':
|
|
start = entity.dxf.start
|
|
end = entity.dxf.end
|
|
return self._create_bbox(
|
|
min(start.x, end.x), min(start.y, end.y),
|
|
max(start.x, end.x), max(start.y, end.y)
|
|
)
|
|
|
|
elif entity.dxftype() == 'LWPOLYLINE':
|
|
points = entity.get_points()
|
|
if not points: return None
|
|
xs = [p[0] for p in points]
|
|
ys = [p[1] for p in points]
|
|
return self._create_bbox(min(xs), min(ys), max(xs), max(ys))
|
|
|
|
elif entity.dxftype() in ('CIRCLE', 'ARC'):
|
|
center = entity.dxf.center
|
|
radius = entity.dxf.radius
|
|
return self._create_bbox(
|
|
center.x - radius, center.y - radius,
|
|
center.x + radius, center.y + radius
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating bbox for {entity.dxftype()} ({entity.dxf.handle}): {e}", exc_info=True)
|
|
return None
|
|
|
|
def _create_bbox(self, min_x, min_y, max_x, max_y) -> BoundingBox:
|
|
return BoundingBox(
|
|
min_x=min_x,
|
|
min_y=min_y,
|
|
max_x=max_x,
|
|
max_y=max_y,
|
|
center=((min_x + max_x) / 2, (min_y + max_y) / 2)
|
|
)
|
|
|
|
def extract_and_save(self, output_path: str):
|
|
"""
|
|
기하학적 데이터를 추출하여 JSON 파일로 저장.
|
|
"""
|
|
results = []
|
|
logger.info(f"Starting DXF extraction from {self.doc.filename if hasattr(self.doc, 'filename') else 'unknown file'}")
|
|
|
|
for entity in self.msp:
|
|
try:
|
|
bbox_obj = self.get_bbox(entity)
|
|
if not bbox_obj:
|
|
continue
|
|
|
|
raw_text = ""
|
|
if entity.dxftype() == 'TEXT':
|
|
raw_text = entity.dxf.text
|
|
elif entity.dxftype() == 'MTEXT':
|
|
raw_text = entity.text
|
|
|
|
# 좌표 추출 (3D 좌표를 2D로 변환)
|
|
coords = []
|
|
if hasattr(entity, 'get_points'):
|
|
# ezdxf의 get_points()는 (x, y, z) 튜플 리스트를 반환함
|
|
coords = [(p[0], p[1]) for p in entity.get_points()]
|
|
elif entity.dxftype() == 'LINE':
|
|
coords = [(entity.dxf.start.x, entity.dxf.start.y), (entity.dxf.end.x, entity.dxf.end.y)]
|
|
elif entity.dxftype() in ('CIRCLE', 'ARC'):
|
|
coords = [(entity.dxf.center.x, entity.dxf.center.y)]
|
|
|
|
entity_data = GeometricEntity(
|
|
entity_id=entity.dxf.handle,
|
|
entity_type=entity.dxftype(),
|
|
layer=entity.dxf.layer,
|
|
bbox=bbox_obj,
|
|
raw_value=raw_text if raw_text else None,
|
|
clean_value=self.clean_text(raw_text) if raw_text else None,
|
|
coordinates=coords,
|
|
properties={
|
|
"color": entity.dxf.color,
|
|
"lineweight": entity.dxf.lineweight if hasattr(entity.dxf, 'lineweight') else None,
|
|
}
|
|
)
|
|
results.append(entity_data.model_dump())
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error processing entity {entity.dxftype()} ({entity.dxf.handle}): {e}")
|
|
continue
|
|
|
|
try:
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(results, f, ensure_ascii=False, indent=4)
|
|
logger.info(f"Successfully saved {len(results)} entities to {output_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save extraction results to {output_path}: {e}")
|
|
raise
|
|
|
|
return output_path
|
|
|
|
def split_drawings(
|
|
self,
|
|
bucket_size: float = 200.0,
|
|
threshold_ratio: float = 0.15,
|
|
min_sparse_width: float = None
|
|
) -> List[DrawingRegion]:
|
|
"""
|
|
X/Y 밀도 기반 sparse region 감지로 도면 영역 분할.
|
|
|
|
Returns:
|
|
DrawingRegion 목록 (엔티티가 있는 영역만)
|
|
"""
|
|
# 1. 중심 좌표 수집
|
|
centers = []
|
|
for entity in self.msp:
|
|
try:
|
|
if hasattr(entity.dxf, 'insert'):
|
|
centers.append((entity.dxf.insert.x, entity.dxf.insert.y))
|
|
elif hasattr(entity.dxf, 'start'):
|
|
cx = (entity.dxf.start.x + entity.dxf.end.x) / 2
|
|
cy = (entity.dxf.start.y + entity.dxf.end.y) / 2
|
|
centers.append((cx, cy))
|
|
elif hasattr(entity.dxf, 'center'):
|
|
centers.append((entity.dxf.center.x, entity.dxf.center.y))
|
|
except Exception:
|
|
pass
|
|
|
|
if not centers:
|
|
logger.warning("중심 좌표를 수집할 수 없습니다. 전체를 단일 영역으로 반환.")
|
|
return [DrawingRegion(drawing_no=1, x_min=0, x_max=1, y_min=0, y_max=1)]
|
|
|
|
xs = [c[0] for c in centers]
|
|
ys = [c[1] for c in centers]
|
|
x_range = (min(xs), max(xs))
|
|
y_range = (min(ys), max(ys))
|
|
|
|
# 2. 밀도 히스토그램 계산
|
|
x_buckets = self._compute_density_histogram(centers, 'x', bucket_size)
|
|
y_buckets = self._compute_density_histogram(centers, 'y', bucket_size)
|
|
|
|
# 3. sparse region 감지 (밀도 기반 + gap 기반)
|
|
if min_sparse_width is None:
|
|
min_sparse_width = bucket_size * 1.5
|
|
|
|
x_sparse = sorted(set(
|
|
self._find_sparse_regions(x_buckets, bucket_size, threshold_ratio, min_sparse_width)
|
|
+ self._find_gaps_in_buckets(x_buckets, bucket_size)
|
|
))
|
|
y_sparse = sorted(set(
|
|
self._find_sparse_regions(y_buckets, bucket_size, threshold_ratio, min_sparse_width)
|
|
+ self._find_gaps_in_buckets(y_buckets, bucket_size)
|
|
))
|
|
|
|
# 4. 도면 영역 계산
|
|
regions = self._compute_drawing_regions(
|
|
centers, x_sparse, y_sparse, x_range, y_range
|
|
)
|
|
|
|
logger.info(f"도면 분할 완료: {len(regions)}개 영역 감지")
|
|
for r in regions:
|
|
logger.info(f" 도면 #{r.drawing_no}: X={r.x_min:.0f}~{r.x_max:.0f}, Y={r.y_min:.0f}~{r.y_max:.0f}, 엔티티={r.entity_count}")
|
|
|
|
return regions
|
|
|
|
def extract_region(self, region: DrawingRegion) -> List[dict]:
|
|
"""
|
|
특정 도면 영역 내 엔티티만 추출.
|
|
|
|
Args:
|
|
region: 추출할 도면 영역
|
|
|
|
Returns:
|
|
GeometricEntity 딕셔너리 목록
|
|
"""
|
|
results = []
|
|
region_box = box(region.x_min, region.y_min, region.x_max, region.y_max)
|
|
|
|
for entity in self.msp:
|
|
try:
|
|
bbox_obj = self.get_bbox(entity)
|
|
if not bbox_obj:
|
|
continue
|
|
|
|
entity_box = box(bbox_obj.min_x, bbox_obj.min_y, bbox_obj.max_x, bbox_obj.max_y)
|
|
|
|
# 중심점이 region 내에 있는지 확인
|
|
if not region_box.contains(Point(bbox_obj.center)):
|
|
continue
|
|
|
|
raw_text = ""
|
|
if entity.dxftype() == 'TEXT':
|
|
raw_text = entity.dxf.text
|
|
elif entity.dxftype() == 'MTEXT':
|
|
raw_text = entity.text
|
|
|
|
coords = []
|
|
if hasattr(entity, 'get_points'):
|
|
coords = [(p[0], p[1]) for p in entity.get_points()]
|
|
elif entity.dxftype() == 'LINE':
|
|
coords = [(entity.dxf.start.x, entity.dxf.start.y), (entity.dxf.end.x, entity.dxf.end.y)]
|
|
elif entity.dxftype() in ('CIRCLE', 'ARC'):
|
|
coords = [(entity.dxf.center.x, entity.dxf.center.y)]
|
|
|
|
entity_data = GeometricEntity(
|
|
entity_id=entity.dxf.handle,
|
|
entity_type=entity.dxftype(),
|
|
layer=entity.dxf.layer,
|
|
bbox=bbox_obj,
|
|
raw_value=raw_text if raw_text else None,
|
|
clean_value=self.clean_text(raw_text) if raw_text else None,
|
|
coordinates=coords,
|
|
properties={
|
|
"color": entity.dxf.color,
|
|
"lineweight": entity.dxf.lineweight if hasattr(entity.dxf, 'lineweight') else None,
|
|
}
|
|
)
|
|
results.append(entity_data.model_dump())
|
|
except Exception as e:
|
|
logger.error(f"Region extraction error for {entity.dxftype()} ({entity.dxf.handle}): {e}")
|
|
continue
|
|
|
|
logger.info(f"도면 #{region.drawing_no} 추출 완료: {len(results)}개 엔티티")
|
|
return results
|
|
|
|
# --- split_drawings / extract_region용 내부 헬퍼 ---
|
|
|
|
def _compute_density_histogram(
|
|
self,
|
|
centers: List[Tuple[float, float]],
|
|
axis: str,
|
|
bucket_size: float
|
|
) -> dict:
|
|
if axis == 'x':
|
|
coords = [c[0] for c in centers]
|
|
else:
|
|
coords = [c[1] for c in centers]
|
|
|
|
if not coords:
|
|
return {}
|
|
|
|
buckets = {}
|
|
for coord in coords:
|
|
bucket = int(coord / bucket_size) * bucket_size
|
|
buckets[bucket] = buckets.get(bucket, 0) + 1
|
|
|
|
return dict(sorted(buckets.items()))
|
|
|
|
def _find_sparse_regions(
|
|
self,
|
|
buckets: dict,
|
|
bucket_size: float,
|
|
threshold_ratio: float = 0.15,
|
|
min_sparse_width: float = None
|
|
) -> List[Tuple[float, float]]:
|
|
if not buckets:
|
|
return []
|
|
|
|
if min_sparse_width is None:
|
|
min_sparse_width = bucket_size * 1.5
|
|
|
|
counts = list(buckets.values())
|
|
avg_count = sum(counts) / len(counts)
|
|
threshold = avg_count * threshold_ratio
|
|
|
|
sorted_keys = sorted(buckets.keys())
|
|
sparse_regions = []
|
|
in_sparse = False
|
|
sparse_start = 0
|
|
|
|
for key in sorted_keys:
|
|
is_sparse = buckets[key] < threshold
|
|
|
|
if is_sparse and not in_sparse:
|
|
sparse_start = key
|
|
in_sparse = True
|
|
elif not is_sparse and in_sparse:
|
|
sparse_end = key
|
|
if (sparse_end - sparse_start) >= min_sparse_width:
|
|
sparse_regions.append((sparse_start, sparse_end))
|
|
in_sparse = False
|
|
|
|
if in_sparse and len(sorted_keys) > 0:
|
|
sparse_end = sorted_keys[-1] + bucket_size
|
|
if (sparse_end - sparse_start) >= min_sparse_width:
|
|
sparse_regions.append((sparse_start, sparse_end))
|
|
|
|
return sparse_regions
|
|
|
|
def _find_gaps_in_buckets(
|
|
self,
|
|
buckets: dict,
|
|
bucket_size: float,
|
|
min_gap_buckets: int = 1
|
|
) -> List[Tuple[float, float]]:
|
|
if not buckets:
|
|
return []
|
|
|
|
sorted_keys = sorted(buckets.keys())
|
|
gaps = []
|
|
|
|
for i in range(len(sorted_keys) - 1):
|
|
current = sorted_keys[i]
|
|
next_key = sorted_keys[i + 1]
|
|
gap_size = next_key - current
|
|
|
|
if gap_size > bucket_size * (min_gap_buckets + 1):
|
|
gaps.append((current, next_key))
|
|
|
|
return gaps
|
|
|
|
def _compute_drawing_regions(
|
|
self,
|
|
centers: List[Tuple[float, float]],
|
|
x_sparse: List[Tuple[float, float]],
|
|
y_sparse: List[Tuple[float, float]],
|
|
x_range: Tuple[float, float],
|
|
y_range: Tuple[float, float]
|
|
) -> List[DrawingRegion]:
|
|
# X 축 분할점 생성
|
|
x_boundaries = [x_range[0]]
|
|
for start, end in x_sparse:
|
|
mid = (start + end) / 2
|
|
if mid not in x_boundaries:
|
|
x_boundaries.append(mid)
|
|
x_boundaries.append(x_range[1])
|
|
x_boundaries = sorted(set(x_boundaries))
|
|
|
|
# Y 축 분할점 생성
|
|
y_boundaries = [y_range[0]]
|
|
for start, end in y_sparse:
|
|
mid = (start + end) / 2
|
|
if mid not in y_boundaries:
|
|
y_boundaries.append(mid)
|
|
y_boundaries.append(y_range[1])
|
|
y_boundaries = sorted(set(y_boundaries))
|
|
|
|
# 2D 영역 생성
|
|
regions = []
|
|
region_no = 1
|
|
for i in range(len(x_boundaries) - 1):
|
|
for j in range(len(y_boundaries) - 1):
|
|
x_min = x_boundaries[i]
|
|
x_max = x_boundaries[i + 1]
|
|
y_min = y_boundaries[j]
|
|
y_max = y_boundaries[j + 1]
|
|
|
|
count = sum(
|
|
1 for cx, cy in centers
|
|
if x_min <= cx < x_max and y_min <= cy < y_max
|
|
)
|
|
|
|
if count > 0:
|
|
regions.append(DrawingRegion(
|
|
drawing_no=region_no,
|
|
x_min=x_min,
|
|
x_max=x_max,
|
|
y_min=y_min,
|
|
y_max=y_max,
|
|
entity_count=count
|
|
))
|
|
region_no += 1
|
|
|
|
return regions
|
|
|
|
# --- Proximity Utilities ---
|
|
|
|
def is_near(bbox_a: BoundingBox, bbox_b: BoundingBox, threshold=5.0) -> bool:
|
|
"""
|
|
두 Bounding Box 간의 최단 거리가 임계값 이내인지 확인.
|
|
shapely 없이 BBox 좌표만으로 O(1) 계산.
|
|
"""
|
|
dx = max(0, bbox_b.min_x - bbox_a.max_x, bbox_a.min_x - bbox_b.max_x)
|
|
dy = max(0, bbox_b.min_y - bbox_a.max_y, bbox_a.min_y - bbox_b.max_y)
|
|
dist = (dx * dx + dy * dy) ** 0.5
|
|
return dist <= threshold
|
|
|
|
def is_inside(point: Tuple[float, float], bbox: BoundingBox) -> bool:
|
|
"""
|
|
특정 점이 Bounding Box 내부에 있는지 확인.
|
|
"""
|
|
return (bbox.min_x <= point[0] <= bbox.max_x) and (bbox.min_y <= point[1] <= bbox.max_y)
|