opencode 로 바꾸고 작업전 커밋

This commit is contained in:
windpacer
2026-05-08 17:22:10 +09:00
parent 15c17522c8
commit e923aab43b
202 changed files with 1336027 additions and 115 deletions

View File

@@ -29,6 +29,15 @@ class GeometricEntity(BaseModel):
coordinates: List[Union[Tuple[float, float], List[float]]] = Field(default_factory=list)
properties: dict = Field(default_factory=dict)
class DrawingRegion(BaseModel):
"""도면 분할 영역"""
drawing_no: int
x_min: float
x_max: float
y_min: float
y_max: float
entity_count: int = 0
# --- Extractor Implementation ---
class PidGeometricExtractor:
@@ -171,16 +180,281 @@ class PidGeometricExtractor:
return output_path
def split_drawings(
self,
bucket_size: float = 200.0,
threshold_ratio: float = 0.15,
min_sparse_width: float = None
) -> List[DrawingRegion]:
"""
X/Y 밀도 기반 sparse region 감지로 도면 영역 분할.
Returns:
DrawingRegion 목록 (엔티티가 있는 영역만)
"""
# 1. 중심 좌표 수집
centers = []
for entity in self.msp:
try:
if hasattr(entity.dxf, 'insert'):
centers.append((entity.dxf.insert.x, entity.dxf.insert.y))
elif hasattr(entity.dxf, 'start'):
cx = (entity.dxf.start.x + entity.dxf.end.x) / 2
cy = (entity.dxf.start.y + entity.dxf.end.y) / 2
centers.append((cx, cy))
elif hasattr(entity.dxf, 'center'):
centers.append((entity.dxf.center.x, entity.dxf.center.y))
except Exception:
pass
if not centers:
logger.warning("중심 좌표를 수집할 수 없습니다. 전체를 단일 영역으로 반환.")
return [DrawingRegion(drawing_no=1, x_min=0, x_max=1, y_min=0, y_max=1)]
xs = [c[0] for c in centers]
ys = [c[1] for c in centers]
x_range = (min(xs), max(xs))
y_range = (min(ys), max(ys))
# 2. 밀도 히스토그램 계산
x_buckets = self._compute_density_histogram(centers, 'x', bucket_size)
y_buckets = self._compute_density_histogram(centers, 'y', bucket_size)
# 3. sparse region 감지 (밀도 기반 + gap 기반)
if min_sparse_width is None:
min_sparse_width = bucket_size * 1.5
x_sparse = sorted(set(
self._find_sparse_regions(x_buckets, bucket_size, threshold_ratio, min_sparse_width)
+ self._find_gaps_in_buckets(x_buckets, bucket_size)
))
y_sparse = sorted(set(
self._find_sparse_regions(y_buckets, bucket_size, threshold_ratio, min_sparse_width)
+ self._find_gaps_in_buckets(y_buckets, bucket_size)
))
# 4. 도면 영역 계산
regions = self._compute_drawing_regions(
centers, x_sparse, y_sparse, x_range, y_range
)
logger.info(f"도면 분할 완료: {len(regions)}개 영역 감지")
for r in regions:
logger.info(f" 도면 #{r.drawing_no}: X={r.x_min:.0f}~{r.x_max:.0f}, Y={r.y_min:.0f}~{r.y_max:.0f}, 엔티티={r.entity_count}")
return regions
def extract_region(self, region: DrawingRegion) -> List[dict]:
"""
특정 도면 영역 내 엔티티만 추출.
Args:
region: 추출할 도면 영역
Returns:
GeometricEntity 딕셔너리 목록
"""
results = []
region_box = box(region.x_min, region.y_min, region.x_max, region.y_max)
for entity in self.msp:
try:
bbox_obj = self.get_bbox(entity)
if not bbox_obj:
continue
entity_box = box(bbox_obj.min_x, bbox_obj.min_y, bbox_obj.max_x, bbox_obj.max_y)
# 중심점이 region 내에 있는지 확인
if not region_box.contains(Point(bbox_obj.center)):
continue
raw_text = ""
if entity.dxftype() == 'TEXT':
raw_text = entity.dxf.text
elif entity.dxftype() == 'MTEXT':
raw_text = entity.text
coords = []
if hasattr(entity, 'get_points'):
coords = [(p[0], p[1]) for p in entity.get_points()]
elif entity.dxftype() == 'LINE':
coords = [(entity.dxf.start.x, entity.dxf.start.y), (entity.dxf.end.x, entity.dxf.end.y)]
elif entity.dxftype() in ('CIRCLE', 'ARC'):
coords = [(entity.dxf.center.x, entity.dxf.center.y)]
entity_data = GeometricEntity(
entity_id=entity.dxf.handle,
entity_type=entity.dxftype(),
layer=entity.dxf.layer,
bbox=bbox_obj,
raw_value=raw_text if raw_text else None,
clean_value=self.clean_text(raw_text) if raw_text else None,
coordinates=coords,
properties={
"color": entity.dxf.color,
"lineweight": entity.dxf.lineweight if hasattr(entity.dxf, 'lineweight') else None,
}
)
results.append(entity_data.model_dump())
except Exception as e:
logger.error(f"Region extraction error for {entity.dxftype()} ({entity.dxf.handle}): {e}")
continue
logger.info(f"도면 #{region.drawing_no} 추출 완료: {len(results)}개 엔티티")
return results
# --- split_drawings / extract_region용 내부 헬퍼 ---
def _compute_density_histogram(
self,
centers: List[Tuple[float, float]],
axis: str,
bucket_size: float
) -> dict:
if axis == 'x':
coords = [c[0] for c in centers]
else:
coords = [c[1] for c in centers]
if not coords:
return {}
buckets = {}
for coord in coords:
bucket = int(coord / bucket_size) * bucket_size
buckets[bucket] = buckets.get(bucket, 0) + 1
return dict(sorted(buckets.items()))
def _find_sparse_regions(
self,
buckets: dict,
bucket_size: float,
threshold_ratio: float = 0.15,
min_sparse_width: float = None
) -> List[Tuple[float, float]]:
if not buckets:
return []
if min_sparse_width is None:
min_sparse_width = bucket_size * 1.5
counts = list(buckets.values())
avg_count = sum(counts) / len(counts)
threshold = avg_count * threshold_ratio
sorted_keys = sorted(buckets.keys())
sparse_regions = []
in_sparse = False
sparse_start = 0
for key in sorted_keys:
is_sparse = buckets[key] < threshold
if is_sparse and not in_sparse:
sparse_start = key
in_sparse = True
elif not is_sparse and in_sparse:
sparse_end = key
if (sparse_end - sparse_start) >= min_sparse_width:
sparse_regions.append((sparse_start, sparse_end))
in_sparse = False
if in_sparse and len(sorted_keys) > 0:
sparse_end = sorted_keys[-1] + bucket_size
if (sparse_end - sparse_start) >= min_sparse_width:
sparse_regions.append((sparse_start, sparse_end))
return sparse_regions
def _find_gaps_in_buckets(
self,
buckets: dict,
bucket_size: float,
min_gap_buckets: int = 1
) -> List[Tuple[float, float]]:
if not buckets:
return []
sorted_keys = sorted(buckets.keys())
gaps = []
for i in range(len(sorted_keys) - 1):
current = sorted_keys[i]
next_key = sorted_keys[i + 1]
gap_size = next_key - current
if gap_size > bucket_size * (min_gap_buckets + 1):
gaps.append((current, next_key))
return gaps
def _compute_drawing_regions(
self,
centers: List[Tuple[float, float]],
x_sparse: List[Tuple[float, float]],
y_sparse: List[Tuple[float, float]],
x_range: Tuple[float, float],
y_range: Tuple[float, float]
) -> List[DrawingRegion]:
# X 축 분할점 생성
x_boundaries = [x_range[0]]
for start, end in x_sparse:
mid = (start + end) / 2
if mid not in x_boundaries:
x_boundaries.append(mid)
x_boundaries.append(x_range[1])
x_boundaries = sorted(set(x_boundaries))
# Y 축 분할점 생성
y_boundaries = [y_range[0]]
for start, end in y_sparse:
mid = (start + end) / 2
if mid not in y_boundaries:
y_boundaries.append(mid)
y_boundaries.append(y_range[1])
y_boundaries = sorted(set(y_boundaries))
# 2D 영역 생성
regions = []
region_no = 1
for i in range(len(x_boundaries) - 1):
for j in range(len(y_boundaries) - 1):
x_min = x_boundaries[i]
x_max = x_boundaries[i + 1]
y_min = y_boundaries[j]
y_max = y_boundaries[j + 1]
count = sum(
1 for cx, cy in centers
if x_min <= cx < x_max and y_min <= cy < y_max
)
if count > 0:
regions.append(DrawingRegion(
drawing_no=region_no,
x_min=x_min,
x_max=x_max,
y_min=y_min,
y_max=y_max,
entity_count=count
))
region_no += 1
return regions
# --- Proximity Utilities ---
def is_near(bbox_a: BoundingBox, bbox_b: BoundingBox, threshold=5.0) -> bool:
"""
두 Bounding Box 간의 최단 거리가 임계값 이내인지 확인.
shapely box를 사용하여 거리 계산.
shapely 없이 BBox 좌표만으로 O(1) 계산.
"""
box_a = box(bbox_a.min_x, bbox_a.min_y, bbox_a.max_x, bbox_a.max_y)
box_b = box(bbox_b.min_x, bbox_b.min_y, bbox_b.max_x, bbox_b.max_y)
return box_a.distance(box_b) <= threshold
dx = max(0, bbox_b.min_x - bbox_a.max_x, bbox_a.min_x - bbox_b.max_x)
dy = max(0, bbox_b.min_y - bbox_a.max_y, bbox_a.min_y - bbox_b.max_y)
dist = (dx * dx + dy * dy) ** 0.5
return dist <= threshold
def is_inside(point: Tuple[float, float], bbox: BoundingBox) -> bool:
"""

View File

@@ -0,0 +1,247 @@
#!/usr/bin/env python3
"""
P&ID 도면의 레전드 페이지를 파싱하여 계측기 식별 패턴을 추출합니다.
레전드 페이지는 일반적으로 DXF 파일의 첫 번째 도면에 위치하며,
계측기 유형별 패턴(예: FICQ = Flow Indicating Control with Differential)을 정의합니다.
이 모듈은:
1. 레전드 페이지에서 계측기 식별 테이블 추출
2. 계측기 유형별 패턴 정의
3. 실제 도면에서 태그 필터링
"""
import re
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
@dataclass
class InstrumentPattern:
"""계측기 패턴 정의"""
first_letter: str # 첫 글자 (기능)
first_meaning: str # 첫 글자 의미
succeeding_letters: Dict[str, str] # 후속 글자: {글자: 의미}
def matches(self, tag: str) -> bool:
"""태그가 이 패턴과 일치하는지 확인"""
if not tag:
return False
# 첫 글자 확인
if tag[0].upper() != self.first_letter.upper():
return False
return True
class LegendParser:
"""
P&ID 도면의 레전드 페이지를 파싱하여 계측기 식별 패턴을 추출합니다.
"""
# 계측기 식별 테이블 (ISA-5.1 표준 기반)
INSTRUMENT_IDENTIFICATION = {
# 첫 글자 (기능)
'A': 'ANALYSIS, AUTO',
'B': 'BURNER COMBUSTION',
'C': 'CONDUCTIVITY',
'D': 'DENSITY',
'E': 'VOLTAGE(EMF), HEAT ENERGY',
'F': 'FLOW RATE',
'G': 'GAUGE, GAS',
'H': 'HAND, HAND-CONTROLLED SHUT-OFF VALVE',
'I': 'CURRENT(ELECTRICAL)',
'J': 'POWER(MW,MVAR)',
'K': 'TIME',
'L': 'LEVEL',
'M': 'MAN, MOTOR',
'N': 'NUMBER OF OBJECTS',
'P': 'PRESSURE',
'Q': 'WEIGHT',
'R': 'RADIATION, RADIOACTIVITY',
'S': 'SPEED, FREQUENCY',
'T': 'TEMPERATURE',
'U': 'MULTIVARIABLE',
'V': 'VALVE, DAMPER',
'W': 'ACTUATOR',
'X': 'UNSPECIFIED',
'Y': 'RELAY, EVENT',
'Z': 'POSITION, LINEAR',
# 후속 글자 (수식어)
'AA': 'ALARM, AUDIBLE',
'AB': 'ABSORBENCY',
'AC': 'ACCELERATION',
'AD': 'ADHESION',
'AE': 'AREA',
'AF': 'AVG FLOW RATE',
'AG': 'GRAVIMETRIC',
'AH': 'HUMIDITY',
'AI': 'IONIZATION',
'AJ': 'JOULE',
'AK': 'KINETIC',
'AL': 'LEVEL',
'AM': 'MAGNETIC',
'AN': 'NORMAL',
'AO': 'OPTICAL',
'AP': 'PHASE',
'AQ': 'QUANTITY',
'AR': 'RATIO',
'AS': 'SPEED',
'AT': 'TEMPERATURE',
'AU': 'UNSPECIFIED',
'AV': 'VOLTAGE',
'AW': 'WAVELENGTH',
'AX': 'X-RAY',
'AY': 'YIELD',
'AZ': 'Z-POTENTIAL',
}
# 계측기 유형별 그룹
INSTRUMENT_GROUPS = {
'Sensor': ['FT', 'FIT', 'LT', 'PT', 'TE', 'PG', 'LG', 'TG', 'FI', 'PI', 'TI', 'LI'],
'Valve': ['FCV', 'TCV', 'LCV', 'PCV', 'XV', 'FV', 'LV', 'PV', 'TV'],
'Controller': ['FIC', 'TIC', 'PIC', 'LIC', 'FICQ', 'TICA', 'PICA', 'LICA'],
'Indicator': ['FI', 'PI', 'TI', 'LI', 'FIT', 'LIT', 'PIT', 'TIT'],
'Transmitter': ['FT', 'LT', 'PT', 'TT', 'FIT', 'LIT', 'PIT', 'TIT'],
'Gauge': ['PG', 'TG', 'LG'],
'Safety': ['PSV', 'SRV', 'SDV'],
'Pump': ['P-', 'P-1', 'P-2'],
'Compressor': ['C-', 'K-'],
'Heat Exchanger': ['E-'],
'Tank': ['T-', 'D-'],
'Column': ['C-'],
'Filter': ['F-'],
'Separator': ['SP-'],
}
# 배관 번호 패턴
PIPELINE_PATTERN = re.compile(r'[A-Z]{1,6}-\d{3,6}(-[A-Z0-9]+)*')
# 계측기 태그 패턴
INSTRUMENT_TAG_PATTERN = re.compile(r'[A-Z]{1,6}-\d{3,6}(-[A-Z0-9]+)*')
def __init__(self):
self.patterns: List[InstrumentPattern] = []
self._build_patterns()
def _build_patterns(self):
"""계측기 식별 패턴을 빌드합니다."""
for letter, meaning in self.INSTRUMENT_IDENTIFICATION.items():
if len(letter) == 1: # 첫 글자만
self.patterns.append(InstrumentPattern(
first_letter=letter,
first_meaning=meaning,
succeeding_letters={}
))
def extract_instrument_tags(self, text: str) -> List[str]:
"""
텍스트에서 계측기 태그를 추출합니다.
Args:
text: DXF에서 추출한 텍스트
Returns:
계측기 태그 목록
"""
tags = []
for match in self.INSTRUMENT_TAG_PATTERN.finditer(text):
tag = match.group(0)
# 계측기 태그인지 확인
if self._is_instrument_tag(tag):
tags.append(tag)
return tags
def _is_instrument_tag(self, tag: str) -> bool:
"""태그가 계측기 태그인지 확인합니다."""
if not tag or len(tag) < 3:
return False
# 첫 글자가 계측기 식별 테이블에 있는지 확인
first_letter = tag[0].upper()
if first_letter not in self.INSTRUMENT_IDENTIFICATION:
return False
# 숫자 부분이 있는지 확인
if not re.search(r'\d', tag):
return False
return True
def group_by_type(self, tags: List[str]) -> Dict[str, List[str]]:
"""
태그를 유형별로 그룹핑합니다.
Args:
tags: 계측기 태그 목록
Returns:
유형별 태그 그룹 {유형: [태그1, 태그2, ...]}
"""
groups = {group: [] for group in self.INSTRUMENT_GROUPS}
for tag in tags:
for group, patterns in self.INSTRUMENT_GROUPS.items():
for pattern in patterns:
if tag.startswith(pattern):
groups[group].append(tag)
break
# 빈 그룹 제거
return {k: v for k, v in groups.items() if v}
def extract_area_from_tag(self, tag: str) -> Optional[str]:
"""
태그에서 AREA 번호를 추출합니다.
예: FICQ-6113 → "6" (6호 플랜트)
FICQ-10113 → "10" (10호 플랜트)
Args:
tag: 계측기 태그
Returns:
AREA 번호 또는 None
"""
match = re.search(r'-([1-9]\d*)\d+', tag)
if match:
return match.group(1)
return None
def parse_legend_page(self, dxf_filepath: str) -> Dict:
"""
레전드 페이지를 파싱하여 계측기 식별 패턴을 추출합니다.
Args:
dxf_filepath: DXF 파일 경로
Returns:
레전드 페이지 정보
"""
import ezdxf
from ezdxf.tools.text import plain_mtext
doc = ezdxf.readfile(dxf_filepath)
msp = doc.modelspace()
# 레전드 페이지 영역 (X: -176 ~ 2000)
legend_texts = []
for entity in msp:
if entity.dxftype() in ('TEXT', 'MTEXT'):
x = entity.dxf.insert.x
if -176 <= x <= 2000:
if entity.dxftype() == 'TEXT':
text = entity.dxf.text
else:
text = plain_mtext(entity.dxf.text) if hasattr(entity.dxf, 'text') else entity.text
if text.strip():
legend_texts.append((x, entity.dxf.insert.y, text))
return {
'legend_texts': legend_texts,
'instrument_identification': self.INSTRUMENT_IDENTIFICATION,
'instrument_groups': self.INSTRUMENT_GROUPS,
}
# 단일 인스턴스 (싱글톤)
legend_parser = LegendParser()

View File

@@ -95,23 +95,32 @@ class IntelligentMapper:
return MappingResult(resolved_tag="UNKNOWN", reason=f"Error: {str(e)}", confidence=0.0)
# --- 전문화된 Worker 함수들 ---
# vLLM 컨커런시 제한으로 인해 배치 처리 (한 번에 15개 요청)
_BATCH_SIZE = 15
async def _batch_gather(self, node_ids: List[str], prompt: str) -> Dict[str, MappingResult]:
"""노드를 배치로 나누어 순차적으로 LLM 요청.
모든 노드를 동시에 요청하면 vLLM 큐가 가득 차 타임아웃 발생.
"""
all_results = {}
for i in range(0, len(node_ids), self._BATCH_SIZE):
batch = node_ids[i : i + self._BATCH_SIZE]
tasks = [self._resolve_generic(nid, prompt) for nid in batch]
batch_results = await asyncio.gather(*tasks)
all_results.update(dict(zip(batch, batch_results)))
return all_results
async def extract_transmitters(self, node_ids: List[str]) -> Dict[str, MappingResult]:
prompt = "당신은 계측기 전문 엔지니어입니다. 특히 Pressure/Flow/Level Transmitter 매핑에 특화되어 있습니다."
tasks = [self._resolve_generic(nid, prompt) for nid in node_ids]
results = await asyncio.gather(*tasks)
return dict(zip(node_ids, results))
return await self._batch_gather(node_ids, prompt)
async def extract_valves(self, node_ids: List[str]) -> Dict[str, MappingResult]:
prompt = "당신은 밸브 및 액추에이터 전문 엔지니어입니다. 밸브의 개폐 상태 및 제어 태그 매핑에 특화되어 있습니다."
tasks = [self._resolve_generic(nid, prompt) for nid in node_ids]
results = await asyncio.gather(*tasks)
return dict(zip(node_ids, results))
return await self._batch_gather(node_ids, prompt)
async def extract_equipment(self, node_ids: List[str]) -> Dict[str, MappingResult]:
prompt = "당신은 공정 설비 전문 엔지니어입니다. 펌프, 탱크, 열교환기 등의 메인 설비 태그 매핑에 특화되어 있습니다."
tasks = [self._resolve_generic(nid, prompt) for nid in node_ids]
results = await asyncio.gather(*tasks)
return dict(zip(node_ids, results))
return await self._batch_gather(node_ids, prompt)
def validate_mapping(resolved_tag: str, symbol_type: str, tag_metadata: Dict[str, Any]) -> Tuple[bool, str]:
"""심볼 타입과 실제 태그 메타데이터의 엄격한 일치 여부 검증"""

View File

@@ -2,6 +2,66 @@ import networkx as nx
from shapely.geometry import box, Point, LineString
import json
from typing import List, Dict, Any, Optional, Tuple
from collections import defaultdict
class SpatialGrid:
"""
그리드 기반 공간 인덱스.
셀 크기 = threshold. 각 엔티티를 중심 좌표로 그리드 셀에 할당.
쿼리 시 인접 셀(3×3)만 검색.
기존 O(n) 순회 → O(1) 그리드 셀 조회.
"""
def __init__(self, cell_size: float):
self.cell_size = cell_size
self.grid = defaultdict(list) # (gx, gy) -> [(node_id, bbox_dict), ...]
def add(self, node_id, bbox):
"""bbox를 그리드에 등록. bbox는 {min_x, min_y, max_x, max_y} 또는 shapely geometry."""
if hasattr(bbox, 'bounds'):
min_x, min_y, max_x, max_y = bbox.bounds
else:
min_x = bbox['min_x']
min_y = bbox['min_y']
max_x = bbox['max_x']
max_y = bbox['max_y']
center_x = (min_x + max_x) / 2
center_y = (min_y + max_y) / 2
gx = int(center_x / self.cell_size)
gy = int(center_y / self.cell_size)
self.grid[(gx, gy)].append((node_id, min_x, min_y, max_x, max_y))
def query(self, query_min_x, query_min_y, query_max_x, query_max_y, threshold) -> list:
"""
query BBox와 threshold 이내인 모든 노드를 반환.
인접 셀(3×3)만 확인.
"""
query_center_x = (query_min_x + query_max_x) / 2
query_center_y = (query_min_y + query_max_y) / 2
q_gx = int(query_center_x / self.cell_size)
q_gy = int(query_center_y / self.cell_size)
# threshold / cell_size 만큼 확장하여 검색 범위 계산
radius = max(1, int(threshold / self.cell_size) + 1)
candidates = []
for dx in range(-radius, radius + 1):
for dy in range(-radius, radius + 1):
cell = self.grid.get((q_gx + dx, q_gy + dy))
if cell:
candidates.extend(cell)
# BBox 빠른 필터로 최종 확인
result = []
for node_id, min_x, min_y, max_x, max_y in candidates:
dx = max(0, min_x - query_max_x, query_min_x - max_x)
dy = max(0, min_y - query_max_y, query_min_y - max_y)
dist = (dx * dx + dy * dy) ** 0.5
if dist <= threshold:
result.append(node_id)
return result
class PidTopologyBuilder:
def __init__(self, geometric_data: List[Dict[str, Any]], all_extracted_tags: Optional[List[Dict[str, Any]]] = None, config: Optional[Dict[str, float]] = None):
@@ -47,6 +107,13 @@ class PidTopologyBuilder:
self.config['dist_threshold'] = max(5.0, min(200.0, 50.0 * scale_factor))
self.config['tag_threshold'] = max(20.0, min(500.0, 100.0 * scale_factor))
def _build_spatial_grid(self, node_ids) -> SpatialGrid:
"""설비 노드들로 SpatialGrid 인덱스 생성."""
grid = SpatialGrid(cell_size=self.config['dist_threshold'])
for nid in node_ids:
grid.add(nid, self.G.nodes[nid]['bbox'])
return grid
def build_graph(self):
# 1. 모든 객체를 노드로 추가
for item in self.data:
@@ -69,17 +136,23 @@ class PidTopologyBuilder:
bbox=bbox_geom,
value=tag.get('clean_value') or tag.get('tagName'))
# 3. 태그-설비 논리적 연결 (Association)
tags = [n for n, d in self.G.nodes(data=True) if d['type'] == 'TEXT']
# 설비 노드 목록
equipments = [n for n, d in self.G.nodes(data=True) if d['type'] not in ['TEXT', 'LINE', 'LWPOLYLINE']]
# SpatialGrid 인덱스 생성 (O(n) 일회성)
eq_grid = self._build_spatial_grid(equipments)
# 3. 태그-설비 논리적 연결 (Association) — SpatialGrid 사용
tags = [n for n, d in self.G.nodes(data=True) if d['type'] == 'TEXT']
for tag in tags:
best_match = self._find_nearest_equipment(tag, equipments)
best_match = self._find_nearest_equipment(tag, eq_grid)
if best_match:
self.G.add_edge(tag, best_match, relation='associated_with')
# 4. 배관 기반 물리적 연결 (Pipe) [개선됨: End-point 기반]
# 4. 배관 기반 물리적 연결 (Pipe) — SpatialGrid 사용
lines = [n for n, d in self.G.nodes(data=True) if d['type'] in ['LINE', 'LWPOLYLINE']]
for line_id in lines:
original_item = next((item for item in self.data if item['entity_id'] == line_id), None)
if not original_item or not original_item.get('coordinates'):
@@ -87,9 +160,17 @@ class PidTopologyBuilder:
coords = original_item['coordinates']
line_geom = LineString(coords)
line_bbox = line_geom.bounds
# SpatialGrid로 후보 집합 조회 (O(1) 그리드 셀 기반)
nearby_equipment_ids = eq_grid.query(
line_bbox[0], line_bbox[1], line_bbox[2], line_bbox[3],
self.config['dist_threshold']
)
# 개선: 끝점뿐만 아니라 라인 전체가 설비 BBox와 교차하거나 매우 가까운지 확인
connected_nodes = []
for eq_id in equipments:
for eq_id in nearby_equipment_ids:
eq_bbox = self.G.nodes[eq_id]['bbox']
# 1. 라인이 BBox와 교차하는지 확인 (관통 포함)
if line_geom.intersects(eq_bbox):
@@ -128,10 +209,15 @@ class PidTopologyBuilder:
# 추후 전파 로직에서 결정하도록 함
pass
def _find_nearest_equipment(self, tag_id, equipment_ids):
def _find_nearest_equipment(self, tag_id, eq_grid):
"""
단순 거리 기반 매핑에서 위상 기반 가중치 매핑으로 개선.
가중치 = 거리 점수 + 연결성 점수
SpatialGrid로 후보 집합 조회 후 shapely 거리 계산.
Args:
tag_id: 태그 노드 ID
eq_grid: 설비 노드의 SpatialGrid 인덱스
"""
tag_bbox = self.G.nodes[tag_id]['bbox']
best_score = float('inf')
@@ -140,7 +226,14 @@ class PidTopologyBuilder:
# 태그 노드와 연결된 배관(LINE/LWPOLYLINE) 확인
connected_pipes = [n for n in self.G.neighbors(tag_id) if self.G.nodes[n]['type'] in ['LINE', 'LWPOLYLINE']]
for eq_id in equipment_ids:
# SpatialGrid로 후보 집합 조회 (O(1) 그리드 셀 기반)
tb = tag_bbox.bounds
nearby_equipment_ids = eq_grid.query(
tb[0], tb[1], tb[2], tb[3],
self.config['tag_threshold']
)
for eq_id in nearby_equipment_ids:
eq_bbox = self.G.nodes[eq_id]['bbox']
dist = tag_bbox.distance(eq_bbox)

View File

@@ -442,6 +442,39 @@ PostgreSQL 시계열 데이터베이스 스키마
livevalue TEXT - 현재값
timestamp TIMESTAMPTZ - 최종 갱신 시각
테이블: tag_metadata (태그 메타데이터 - 변경 드묾)
base_tag TEXT - 기본 태그명 (예: 'ficq-6101', 'xv-6124')
attribute TEXT - 속성명 ('desc', 'area', 'state0descriptor', ...)
value TEXT - 메타데이터 값
node_id TEXT - OPC UA 노드 ID
loaded_at TIMESTAMPTZ - 마지막 로드 시각
뷰: v_tag_summary (실시간값 + 메타데이터 통합 뷰)
base_tag TEXT - 기본 태그명
pv TEXT - 현재 프로세스 값
sp TEXT - 설정값
op TEXT - 출력값
instate0 TEXT - 상태 비트 0 (true/false)
instate1 TEXT - 상태 비트 1 (true/false)
instate2 TEXT - 상태 비트 2 (true/false)
description TEXT - 장비 설명 (tag_metadata.desc)
area TEXT - 소속 플랜트 (tag_metadata.area)
state0_descriptor TEXT - 비트 0 의미 (예: "Run/Stop")
state1_descriptor TEXT - 비트 1 의미 (예: "Remote/Local")
state2_descriptor TEXT - 비트 2 의미 (예: "Trip/Normal")
새로운 태그 타입:
- 아날로그: ficq-6101.pv/sp/op (Double)
- 디지털 XV: xv-6124.pv/op (Int32), xv-6124.instate0~7 (Boolean)
- Pump: p-6102.pv/op (Int32), p-6102.instate0~7 (Boolean)
- 메타데이터: desc (String), area (Enum), state0descriptor~7 (String)
BCD 상태 조회 팁:
- instate0~7은 Boolean (true/false)
- state0descriptor~7은 해당 비트의 의미 설명
- instate0=true이고 state0descriptor="Run/Stop"이면 → "Run" 상태
- v_tag_summary 뷰를 사용하면 실시간값+메타데이터 한 번에 조회 가능
N분 간격 집계 공식 (time_bucket 금지, date_trunc 사용):
1분 버킷: date_trunc('minute', recorded_at) AS bucket
2분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120) AS bucket
@@ -514,7 +547,7 @@ def ask_iiot_llm(question: str, context: str = "") -> str:
)
user_msg = f"컨텍스트:\n{context}\n\n질문: {question}" if context else question
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_msg},
@@ -733,7 +766,7 @@ async def query_with_nl(question: str) -> str:
try:
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": question},
@@ -820,7 +853,7 @@ async def extract_pid_tags(text: str, source_type: str) -> str:
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: {source_type}\n\nText:\n{truncated_text}"},
@@ -926,7 +959,7 @@ async def match_pid_tags(pid_tags: list[str], experion_tags: list[str]) -> str:
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"P&ID Tags:\n{pid_str}\n\nExperion Tags:\n{experion_str}"},
@@ -1017,7 +1050,7 @@ async def parse_pid_dxf(filepath: str) -> str:
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: dxf\n\nText:\n{truncated_text}"},
@@ -1130,7 +1163,7 @@ async def parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
def _call_llm():
return _llm().chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: pdf\n\nText:\n{truncated_text}"},
@@ -1356,7 +1389,7 @@ async def _forward_request(port: int, tool_name: str, params: dict, one_shot: bo
params: 요청 파라미터
one_shot: True일 경우 요청 완료 후 워커 종료
"""
async with httpx.AsyncClient(timeout=60) as client: # 1분 타임아웃
async with httpx.AsyncClient(timeout=600) as client: # 5분 타임아웃 (대용량 DXF 처리용)
endpoint = "/execute/one_shot" if one_shot else "/execute"
response = await client.post(
f"http://localhost:{port}{endpoint}",

View File

@@ -73,6 +73,39 @@ PostgreSQL 시계열 데이터베이스 스키마
livevalue TEXT - 현재값
timestamp TIMESTAMPTZ - 최종 갱신 시각
테이블: tag_metadata (태그 메타데이터 - 변경 드묾)
base_tag TEXT - 기본 태그명 (예: 'ficq-6101', 'xv-6124')
attribute TEXT - 속성명 ('desc', 'area', 'state0descriptor', ...)
value TEXT - 메타데이터 값
node_id TEXT - OPC UA 노드 ID
loaded_at TIMESTAMPTZ - 마지막 로드 시각
뷰: v_tag_summary (실시간값 + 메타데이터 통합 뷰)
base_tag TEXT - 기본 태그명
pv TEXT - 현재 프로세스 값
sp TEXT - 설정값
op TEXT - 출력값
instate0 TEXT - 상태 비트 0 (true/false)
instate1 TEXT - 상태 비트 1 (true/false)
instate2 TEXT - 상태 비트 2 (true/false)
description TEXT - 장비 설명 (tag_metadata.desc)
area TEXT - 소속 플랜트 (tag_metadata.area)
state0_descriptor TEXT - 비트 0 의미 (예: "Run/Stop")
state1_descriptor TEXT - 비트 1 의미 (예: "Remote/Local")
state2_descriptor TEXT - 비트 2 의미 (예: "Trip/Normal")
새로운 태그 타입:
- 아날로그: ficq-6101.pv/sp/op (Double)
- 디지털 XV: xv-6124.pv/op (Int32), xv-6124.instate0~7 (Boolean)
- Pump: p-6102.pv/op (Int32), p-6102.instate0~7 (Boolean)
- 메타데이터: desc (String), area (Enum), state0descriptor~7 (String)
BCD 상태 조회 팁:
- instate0~7은 Boolean (true/false)
- state0descriptor~7은 해당 비트의 의미 설명
- instate0=true이고 state0descriptor="Run/Stop"이면 → "Run" 상태
- v_tag_summary 뷰를 사용하면 실시간값+메타데이터 한 번에 조회 가능
N분 간격 집계 공식 (time_bucket 금지, date_trunc 사용):
1분 버킷: date_trunc('minute', recorded_at) AS bucket
2분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120) AS bucket
@@ -121,7 +154,7 @@ async def _generate_sql(natural_language: str) -> str:
)
response = await client.chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": natural_language},

View File

@@ -0,0 +1,62 @@
#!/usr/bin/env python3
"""P&ID 게이지 추출기
PG, TG, LG 등 게이지 전용 추출.
사용법:
python pid_extract_gauge.py --input full_text.txt --output gauge.json
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from pid_extract_template import call_llm
from pid_extract_prompts import _GAUGE_PROMPT
import argparse
import json
import logging
import time
logger = logging.getLogger("pid_extractor.gauge")
def extract(input_text: str, max_tokens: int = 65536) -> list:
"""게이지 태그 추출."""
return call_llm(_GAUGE_PROMPT, input_text, max_tokens=max_tokens)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="P&ID 게이지 추출기")
parser.add_argument("--input", required=True, help="입력 텍스트 파일 경로")
parser.add_argument("--output", required=True, help="출력 JSON 파일 경로")
parser.add_argument("--max-tokens", type=int, default=65536, help="최대 토큰 수")
args = parser.parse_args()
with open(args.input, "r", encoding="utf-8") as f:
input_text = f.read()
logger.info(f"입력 파일 읽기 완료: {len(input_text)}")
t0 = time.time()
tags = extract(input_text, max_tokens=args.max_tokens)
elapsed = time.time() - t0
logger.info(f"추출 완료: {len(tags)}개 태그, 소요 시간: {elapsed:.1f}")
output_dir = os.path.dirname(args.output)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
result = {
"success": True,
"count": len(tags),
"tags": tags,
"processing_time_sec": round(elapsed, 1),
}
with open(args.output, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
logger.info(f"결과 저장 완료: {args.output}")
print(json.dumps({"success": True, "count": len(tags), "time": round(elapsed, 1)}, ensure_ascii=False))

View File

@@ -0,0 +1,78 @@
"""P&ID 추출기용 계측기 유형별 프롬프트 정의"""
# 공통 프롬프트 헤더
_PROMPT_HEADER = """You are a P&ID (Piping and Instrumentation Diagram) expert.
Extract ONLY the specified instrument types from the provided DXF text.
Return ONLY a valid JSON array. Each element must have exactly these fields:
{"tagNo":"FCV-101","equipmentName":null,"instrumentType":"FCV","lineNumber":null,"pidDrawingNo":null,"confidence":0.95}
Rules:
- tagNo: any token matching [LETTERS]-[DIGITS] or [LETTERS]-[DIGITS]-[SUFFIX]
- instrumentType: leading letters of tagNo
- equipmentName: descriptive name if present near tag, else null
- lineNumber/pidDrawingNo: null unless explicitly associated
- confidence: 0.95 for clear tags, lower for ambiguous
- Output ONLY the JSON array, no markdown, no explanation.
- If no tags found, return: []
"""
# 센서/계측기: FT, FIT, LT, PT, TE, PG, LG, TG
_SENSOR_PROMPT = _PROMPT_HEADER + """
Extract ONLY flow transmitters (FT/FIT), level transmitters (LT),
pressure transmitters (PT), temperature elements (TE),
pressure gauges (PG), level gauges (LG), temperature gauges (TG).
Target instrument types: FT, FIT, FIC, LIC, PIC, TIC, LT, PT, TE, PG, LG, TG,
and their variants (e.g., FIT-XXXX, FT-XXXX).
Examples: FT-101, FIT-10115, PT-201, LT-301, TE-401, PG-501, LG-601, TG-701
"""
# 밸브: FCV, TCV, LCV, PCV, XV, FV, LV, PV, TV
_VALVE_PROMPT = _PROMPT_HEADER + """
Extract ONLY control valves and on/off valves.
Target instrument types: FCV, TCV, LCV, PCV, XV, FV, LV, PV, TV,
BCV, GV, and their variants (e.g., FCV-XXXX, PCV-XXXX, XV-XXXX).
Examples: FCV-101, TCV-201, LCV-301, PCV-401, XV-501, FV-601, LV-701, PV-801
"""
# 시스템/제어기: LI, PI, TI, FIQ, FICQ, TICA, PICA, LICA
_SYSTEM_PROMPT = _PROMPT_HEADER + """
Extract ONLY indicating instruments, recorders, and controllers.
Target instrument types: LI, PI, TI, SI, HI, FIQ, FICQ, TICA, PICA, LICA,
FIC, LIC, PIC, TIC, and their variants.
Examples: LI-101, PI-201, TI-301, FIQ-401, FICQ-501, TICA-601, PICA-701, LICA-801
"""
# 게이지: PG, TG, LG
_GAUGE_PROMPT = _PROMPT_HEADER + """
Extract ONLY gauges (pressure, temperature, level).
Target instrument types: PG, TG, LG, SG, HG, and their variants.
Examples: PG-101, TG-201, LG-301, PG-10101, TG-10201
"""
# 펌프: P-10101, VP-10117, DP-10101 등
_PUMP_PROMPT = _PROMPT_HEADER + """
Extract ONLY pumps and compressors.
Target equipment types: P (pump), VP (vertical pump), DP (dual pump),
C (compressor), CP (centrifugal pump), BP (booster pump),
and their variants.
Examples: P-10101, VP-10117, DP-10101, C-10201, CP-10301, BP-10401
"""
# 프롬프트 매핑
PROMPTS = {
"sensor": _SENSOR_PROMPT,
"valve": _VALVE_PROMPT,
"system": _SYSTEM_PROMPT,
"gauge": _GAUGE_PROMPT,
"pump": _PUMP_PROMPT,
}

View File

@@ -0,0 +1,62 @@
#!/usr/bin/env python3
"""P&ID 펌프 추출기
P-10101, VP-10117, DP-10101 등 펌프/압축기 전용 추출.
사용법:
python pid_extract_pump.py --input full_text.txt --output pump.json
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from pid_extract_template import call_llm
from pid_extract_prompts import _PUMP_PROMPT
import argparse
import json
import logging
import time
logger = logging.getLogger("pid_extractor.pump")
def extract(input_text: str, max_tokens: int = 65536) -> list:
"""펌프/압축기 태그 추출."""
return call_llm(_PUMP_PROMPT, input_text, max_tokens=max_tokens)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="P&ID 펌프 추출기")
parser.add_argument("--input", required=True, help="입력 텍스트 파일 경로")
parser.add_argument("--output", required=True, help="출력 JSON 파일 경로")
parser.add_argument("--max-tokens", type=int, default=65536, help="최대 토큰 수")
args = parser.parse_args()
with open(args.input, "r", encoding="utf-8") as f:
input_text = f.read()
logger.info(f"입력 파일 읽기 완료: {len(input_text)}")
t0 = time.time()
tags = extract(input_text, max_tokens=args.max_tokens)
elapsed = time.time() - t0
logger.info(f"추출 완료: {len(tags)}개 태그, 소요 시간: {elapsed:.1f}")
output_dir = os.path.dirname(args.output)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
result = {
"success": True,
"count": len(tags),
"tags": tags,
"processing_time_sec": round(elapsed, 1),
}
with open(args.output, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
logger.info(f"결과 저장 완료: {args.output}")
print(json.dumps({"success": True, "count": len(tags), "time": round(elapsed, 1)}, ensure_ascii=False))

View File

@@ -0,0 +1,67 @@
#!/usr/bin/env python3
"""P&ID 센서/계측기 추출기
FT, FIT, LT, PT, TE, PG, LG, TG 등 센서/계측기 전용 추출.
사용법:
python pid_extract_sensor.py --input full_text.txt --output sensor.json
"""
import sys
import os
# mcp-server/worker 디렉토리를 경로에 추가
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from pid_extract_template import parse_json_array, call_llm, main as template_main
from pid_extract_prompts import _SENSOR_PROMPT
import argparse
import json
import logging
import time
logger = logging.getLogger("pid_extractor.sensor")
def extract(input_text: str, max_tokens: int = 65536) -> list:
"""센서/계측기 태그 추출."""
return call_llm(_SENSOR_PROMPT, input_text, max_tokens=max_tokens)
if __name__ == "__main__":
# --prompt를 자동으로 _SENSOR_PROMPT로 설정
parser = argparse.ArgumentParser(description="P&ID 센서/계측기 추출기")
parser.add_argument("--input", required=True, help="입력 텍스트 파일 경로")
parser.add_argument("--output", required=True, help="출력 JSON 파일 경로")
parser.add_argument("--max-tokens", type=int, default=65536, help="최대 토큰 수")
args = parser.parse_args()
# 입력 읽기
with open(args.input, "r", encoding="utf-8") as f:
input_text = f.read()
logger.info(f"입력 파일 읽기 완료: {len(input_text)}")
# LLM 호출
t0 = time.time()
tags = extract(input_text, max_tokens=args.max_tokens)
elapsed = time.time() - t0
logger.info(f"추출 완료: {len(tags)}개 태그, 소요 시간: {elapsed:.1f}")
# 결과 저장
output_dir = os.path.dirname(args.output)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
result = {
"success": True,
"count": len(tags),
"tags": tags,
"processing_time_sec": round(elapsed, 1),
}
with open(args.output, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
logger.info(f"결과 저장 완료: {args.output}")
print(json.dumps({"success": True, "count": len(tags), "time": round(elapsed, 1)}, ensure_ascii=False))

View File

@@ -0,0 +1,62 @@
#!/usr/bin/env python3
"""P&ID 시스템/제어기 추출기
LI, PI, TI, FIQ, FICQ, TICA, PICA, LICA 등 시스템/제어기 전용 추출.
사용법:
python pid_extract_system.py --input full_text.txt --output system.json
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from pid_extract_template import call_llm
from pid_extract_prompts import _SYSTEM_PROMPT
import argparse
import json
import logging
import time
logger = logging.getLogger("pid_extractor.system")
def extract(input_text: str, max_tokens: int = 65536) -> list:
"""시스템/제어기 태그 추출."""
return call_llm(_SYSTEM_PROMPT, input_text, max_tokens=max_tokens)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="P&ID 시스템/제어기 추출기")
parser.add_argument("--input", required=True, help="입력 텍스트 파일 경로")
parser.add_argument("--output", required=True, help="출력 JSON 파일 경로")
parser.add_argument("--max-tokens", type=int, default=65536, help="최대 토큰 수")
args = parser.parse_args()
with open(args.input, "r", encoding="utf-8") as f:
input_text = f.read()
logger.info(f"입력 파일 읽기 완료: {len(input_text)}")
t0 = time.time()
tags = extract(input_text, max_tokens=args.max_tokens)
elapsed = time.time() - t0
logger.info(f"추출 완료: {len(tags)}개 태그, 소요 시간: {elapsed:.1f}")
output_dir = os.path.dirname(args.output)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
result = {
"success": True,
"count": len(tags),
"tags": tags,
"processing_time_sec": round(elapsed, 1),
}
with open(args.output, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
logger.info(f"결과 저장 완료: {args.output}")
print(json.dumps({"success": True, "count": len(tags), "time": round(elapsed, 1)}, ensure_ascii=False))

View File

@@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""P&ID 태그 추출기 공통 템플릿
독립 프로세스로서 CLI에서 실행되며,
입력 텍스트 파일에서 P&ID 태그를 추출하여 JSON 파일로 출력합니다.
사용법:
python pid_extract_template.py --input full_text.txt --output result.json --prompt "system prompt text"
python pid_extract_template.py --input full_text.txt --output result.json --prompt-file prompt.txt
환경 변수:
VLLM_BASE_URL: vLLM 엔드포인트 (기본: http://localhost:8000/v1)
VLLM_MODEL: 모델명 (기본: Qwen3.6-27B-FP8)
"""
import argparse
import json
import logging
import os
import re
import sys
import time
from typing import List
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
logger = logging.getLogger("pid_extractor")
def parse_json_array(raw: str, finish_reason: str = "") -> list:
"""LLM 출력에서 JSON 배열 추출. finish_reason=length 잘림 복구 포함."""
if raw.startswith("```"):
lines = raw.splitlines()
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
if finish_reason == "length":
last_close = raw.rfind("}")
if last_close != -1:
raw = raw[:last_close + 1] + "]"
# 가장 긴 균형 잡힌 [...] 추출
depth = 0
start = -1
best = ""
for i, c in enumerate(raw):
if c == "[":
if depth == 0:
start = i
depth += 1
elif c == "]":
depth -= 1
if depth == 0 and start >= 0:
cand = raw[start:i + 1]
if len(cand) > len(best):
best = cand
raw = best if best else "[]"
try:
return json.loads(raw)
except json.JSONDecodeError:
data = []
for obj in re.findall(r"\{[^{}]*\}", raw, re.DOTALL):
try:
data.append(json.loads(obj))
except json.JSONDecodeError:
pass
return data
def call_llm(system_prompt: str, user_text: str, max_tokens: int = 65536) -> List[dict]:
"""
vLLM에 LLM 호출하여 태그 목록 추출.
Args:
system_prompt: 시스템 프롬프트
user_text: 입력 텍스트
max_tokens: 최대 토큰 수
Returns:
추출된 태그 목록 (JSON 배열)
"""
from openai import OpenAI
base_url = os.environ.get("VLLM_BASE_URL", "http://localhost:8000/v1")
model = os.environ.get("VLLM_MODEL", "Qwen3.6-27B-FP8")
client = OpenAI(base_url=base_url, api_key="dummy")
logger.info(f"vLLM 호출: {base_url}, 모델: {model}, max_tokens: {max_tokens}")
logger.info(f"입력 텍스트 길이: {len(user_text)}")
resp = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_text},
],
max_tokens=max_tokens,
temperature=0.1,
extra_body={"chat_template_kwargs": {"enable_thinking": False}},
)
raw = (resp.choices[0].message.content or "").strip()
finish_reason = resp.choices[0].finish_reason
logger.info(f"LLM 응답: finish_reason={finish_reason}, 응답 길이={len(raw)}")
data = parse_json_array(raw, finish_reason)
if finish_reason == "length":
logger.warning(f"finish_reason=length: 응답이 잘렸습니다. 복구 시도됨. 추출된 태그 수: {len(data)}")
return data
def main():
parser = argparse.ArgumentParser(description="P&ID 태그 추출기")
parser.add_argument("--input", required=True, help="입력 텍스트 파일 경로")
parser.add_argument("--output", required=True, help="출력 JSON 파일 경로")
parser.add_argument("--prompt", type=str, default=None, help="시스템 프롬프트 (인라인)")
parser.add_argument("--prompt-file", type=str, default=None, help="시스템 프롬프트 파일 경로")
parser.add_argument("--max-tokens", type=int, default=65536, help="최대 토큰 수 (기본: 65536)")
args = parser.parse_args()
# 1. 입력 텍스트 읽기
if not os.path.exists(args.input):
logger.error(f"입력 파일을 찾을 수 없습니다: {args.input}")
sys.exit(1)
with open(args.input, "r", encoding="utf-8") as f:
input_text = f.read()
logger.info(f"입력 파일 읽기 완료: {len(input_text)}")
# 2. 시스템 프롬프트 읽기
system_prompt = None
if args.prompt:
system_prompt = args.prompt
elif args.prompt_file:
if not os.path.exists(args.prompt_file):
logger.error(f"프롬프트 파일을 찾을 수 없습니다: {args.prompt_file}")
sys.exit(1)
with open(args.prompt_file, "r", encoding="utf-8") as f:
system_prompt = f.read()
else:
logger.error("--prompt 또는 --prompt-file 중 하나를 지정해야 합니다.")
sys.exit(1)
logger.info(f"시스템 프롬프트: {len(system_prompt)}")
# 3. LLM 호출
t0 = time.time()
tags = call_llm(system_prompt, input_text, max_tokens=args.max_tokens)
elapsed = time.time() - t0
logger.info(f"추출 완료: {len(tags)}개 태그, 소요 시간: {elapsed:.1f}")
# 4. 결과 JSON 쓰기
output_dir = os.path.dirname(args.output)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
result = {
"success": True,
"count": len(tags),
"tags": tags,
"processing_time_sec": round(elapsed, 1),
}
with open(args.output, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
logger.info(f"결과 저장 완료: {args.output}")
# 5. 요약 출력
print(json.dumps({
"success": True,
"count": len(tags),
"time": round(elapsed, 1)
}, ensure_ascii=False))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,62 @@
#!/usr/bin/env python3
"""P&ID 밸브 추출기
FCV, TCV, LCV, PCV, XV, FV, LV, PV, TV 등 밸브 전용 추출.
사용법:
python pid_extract_valve.py --input full_text.txt --output valve.json
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from pid_extract_template import call_llm
from pid_extract_prompts import _VALVE_PROMPT
import argparse
import json
import logging
import time
logger = logging.getLogger("pid_extractor.valve")
def extract(input_text: str, max_tokens: int = 65536) -> list:
"""밸브 태그 추출."""
return call_llm(_VALVE_PROMPT, input_text, max_tokens=max_tokens)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="P&ID 밸브 추출기")
parser.add_argument("--input", required=True, help="입력 텍스트 파일 경로")
parser.add_argument("--output", required=True, help="출력 JSON 파일 경로")
parser.add_argument("--max-tokens", type=int, default=65536, help="최대 토큰 수")
args = parser.parse_args()
with open(args.input, "r", encoding="utf-8") as f:
input_text = f.read()
logger.info(f"입력 파일 읽기 완료: {len(input_text)}")
t0 = time.time()
tags = extract(input_text, max_tokens=args.max_tokens)
elapsed = time.time() - t0
logger.info(f"추출 완료: {len(tags)}개 태그, 소요 시간: {elapsed:.1f}")
output_dir = os.path.dirname(args.output)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
result = {
"success": True,
"count": len(tags),
"tags": tags,
"processing_time_sec": round(elapsed, 1),
}
with open(args.output, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
logger.info(f"결과 저장 완료: {args.output}")
print(json.dumps({"success": True, "count": len(tags), "time": round(elapsed, 1)}, ensure_ascii=False))

View File

@@ -173,7 +173,7 @@ def _extract_pid_tags(text: str, source_type: str) -> str:
)
truncated = text[:100000]
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: {source_type}\n\nText:\n{truncated}"},
@@ -202,7 +202,7 @@ def _match_pid_tags(pid_tags: list, experion_tags: list) -> str:
"- Output ONLY the JSON array.\n"
)
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": (
@@ -247,7 +247,7 @@ def _parse_pid_dxf(filepath: str) -> str:
ensure_ascii=False, indent=2)
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": _TAG_EXTRACT_SYSTEM},
{"role": "user", "content": f"Source: dxf\n\nText:\n{text[:8000]}"},
@@ -273,7 +273,7 @@ def _parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
ensure_ascii=False, indent=2)
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": _TAG_EXTRACT_SYSTEM},
{"role": "user", "content": f"Source: pdf\n\nText:\n{text[:12000]}"},
@@ -313,92 +313,211 @@ def _parse_pid_drawing(filepath: str) -> str:
# ── 그래프 도구 ───────────────────────────────────────────────────────────────
import time
async def _build_pid_graph_parallel(filepath: str) -> str:
"""
P&ID 그래프 빌드 — 독립 프로세스 병렬 아키텍처.
Phase 1: 도면 분할 + 기하 추출
Phase 2: 전체 텍스트 1회 추출
Phase 3: 5개 독립 추출 프로세스 병렬 실행 (subprocess.Popen)
Phase 4: 결과 통합 + tagNo 기준 중복 제거
Phase 5: 위상 그래프 빌드 + 저장
"""
from pipeline.extractor import PidGeometricExtractor
from pipeline.topology import PidTopologyBuilder
from pipeline.mapper import IntelligentMapper
from openai import AsyncOpenAI
import subprocess
import tempfile
import shutil
os.makedirs(STORAGE_DIR, exist_ok=True)
t0 = time.time()
basename = os.path.basename(filepath)
worker_dir = os.path.dirname(os.path.abspath(__file__))
logging.info(f"[{basename}] === 독립 프로세스 병렬 아키텍처 시작 ===")
# Phase 1: 기하 추출
# ── Phase 1: 도면 분할 + 기하 추출 ──────────────────────────────
logging.info(f"[{basename}] Phase 1: 도면 분할 + 기하 추출 시작")
extractor = PidGeometricExtractor(filepath)
geo_data_path = os.path.join(STORAGE_DIR, os.path.basename(filepath) + "_geo.json")
regions = extractor.split_drawings()
logging.info(f"[{basename}] 도면 분할: {len(regions)}개 영역")
geo_data_path = os.path.join(STORAGE_DIR, basename + "_geo.json")
extractor.extract_and_save(geo_data_path)
with open(geo_data_path, "r", encoding="utf-8") as f:
geo_data = json.load(f)
logging.info(f"[{basename}] Phase 1 완료 ({time.time()-t0:.1f}s) - {len(geo_data)}개 엔티티")
# 시스템 태그 조회
system_tags: list[str] = []
try:
conn = _get_db_connection()
with conn.cursor() as cur:
cur.execute("SELECT tagname FROM realtime_table")
system_tags = [r[0] for r in cur.fetchall()]
except Exception as e:
logging.warning(f"시스템 태그 조회 실패: {e}")
# ── Phase 2: 전체 텍스트 1회 추출 ───────────────────────────────
t2 = time.time()
logging.info(f"[{basename}] Phase 2: 전체 텍스트 1회 추출")
full_text = _extract_text_from_dxf(filepath)
# 임시 디렉토리 생성 (프로세스 간 통신용)
temp_dir = tempfile.mkdtemp(prefix=f"pid_{basename.replace('.dxf', '')}_")
text_path = os.path.join(temp_dir, "full_text.txt")
with open(text_path, "w", encoding="utf-8") as f:
f.write(full_text)
logging.info(f"[{basename}] Phase 2 완료 ({time.time()-t2:.1f}s) - {len(full_text)}")
# ── Phase 3: 5개 독립 추출 프로세스 병렬 실행 ───────────────────
t3 = time.time()
logging.info(f"[{basename}] Phase 3: 5개 독립 추출 프로세스 병렬 실행")
extractors = [
("sensor", "pid_extract_sensor.py"),
("valve", "pid_extract_valve.py"),
("system", "pid_extract_system.py"),
("gauge", "pid_extract_gauge.py"),
("pump", "pid_extract_pump.py"),
]
results_dir = os.path.join(temp_dir, "results")
os.makedirs(results_dir, exist_ok=True)
processes = []
for name, script in extractors:
output_path = os.path.join(results_dir, f"{name}.json")
script_path = os.path.join(worker_dir, script)
cmd = [
sys.executable, script_path,
"--input", text_path,
"--output", output_path,
]
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
processes.append((name, proc, output_path))
logging.info(f"[{basename}] 시작: {name} (pid={proc.pid})")
except Exception as e:
logging.error(f"[{basename}] 실행 실패 {name}: {e}")
# 모든 프로세스 대기 (timeout=300초)
for name, proc, output_path in processes:
try:
stdout, stderr = proc.communicate(timeout=300)
if proc.returncode != 0:
logging.error(f"[{basename}] {name} 실패 (rc={proc.returncode}): {stderr[:200]}")
else:
logging.info(f"[{basename}] {name} 완료: {stdout.strip()[:100]}")
except subprocess.TimeoutExpired:
proc.kill()
logging.error(f"[{basename}] {name} 타임아웃 (300초)")
logging.info(f"[{basename}] Phase 3 완료 ({time.time()-t3:.1f}s)")
# ── Phase 4: 결과 JSON 통합 + tagNo 기준 중복 제거 ──────────────
t4 = time.time()
logging.info(f"[{basename}] Phase 4: 결과 통합 + 중복 제거")
all_tags = []
seen_tagnos = set()
for name, _, output_path in processes:
if not os.path.exists(output_path):
logging.warning(f"[{basename}] 결과 없음: {name}")
continue
try:
with open(output_path, "r", encoding="utf-8") as f:
result = json.load(f)
tags = result.get("tags", [])
count_new = 0
for tag in tags:
tag_no = tag.get("tagNo", "")
if tag_no and tag_no not in seen_tagnos:
seen_tagnos.add(tag_no)
all_tags.append(tag)
count_new += 1
logging.info(f"[{basename}] {name}: {len(tags)}개 중 {count_new}개 신규")
except Exception as e:
logging.error(f"[{basename}] 결과 로드 실패 {name}: {e}")
logging.info(f"[{basename}] Phase 4 완료 ({time.time()-t4:.1f}s) - 총 {len(all_tags)}개 태그")
# ── Phase 5: 위상 그래프 빌드 + 저장 ────────────────────────────
t5 = time.time()
logging.info(f"[{basename}] Phase 5: 위상 그래프 빌드")
# Phase 2: 1차 위상 빌더 (Mapper용 그래프)
builder = PidTopologyBuilder(geo_data)
builder.build_graph()
# Phase 3: 병렬 LLM 매핑
api_client = AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
mapper = IntelligentMapper(builder.G, system_tags, api_client=api_client)
# 추출된 태그를 그래프에 추가
from shapely.geometry import box as shapely_box
for tag in all_tags:
tag_no = tag.get("tagNo", "UNKNOWN")
eq_name = tag.get("equipmentName")
inst_type = tag.get("instrumentType")
confidence = tag.get("confidence", 0.5)
transmitter_nodes = [
n for n, d in builder.G.nodes(data=True)
if (d.get("value") or "").upper() in {"FIT", "FT", "LT", "PT", "TE"}
]
valve_nodes = [
n for n, d in builder.G.nodes(data=True)
if (d.get("value") or "").upper() in {"FCV", "LCV", "TCV", "PCV", "XV"}
]
equipment_nodes = [
n for n, d in builder.G.nodes(data=True)
if d.get("type") not in {"TEXT", "LINE", "LWPOLYLINE"}
]
# 해당 태그의 bbox 찾기 (geo_data에서 clean_value 매칭)
matched_bbox = None
for entity in geo_data:
if entity.get("clean_value", "").upper() == tag_no.upper():
bbox = entity.get("bbox", {})
matched_bbox = (
bbox.get("min_x"), bbox.get("min_y"),
bbox.get("max_x"), bbox.get("max_y")
)
break
extracted_results = await asyncio.gather(
mapper.extract_transmitters(transmitter_nodes),
mapper.extract_valves(valve_nodes),
mapper.extract_equipment(equipment_nodes),
)
node_id = f"tag_{tag_no}"
if matched_bbox:
bbox_geom = shapely_box(matched_bbox[0], matched_bbox[1],
matched_bbox[2], matched_bbox[3])
builder.G.add_node(node_id,
type="TEXT",
bbox=bbox_geom,
value=tag_no,
equipment_name=eq_name,
instrument_type=inst_type,
confidence=confidence)
else:
builder.G.add_node(node_id,
type="TEXT",
bbox=shapely_box(0, 0, 1, 1),
value=tag_no,
equipment_name=eq_name,
instrument_type=inst_type,
confidence=confidence)
# 매핑 결과 통합
all_mapped_tags = []
for res_dict in extracted_results:
for node_id, mapping in res_dict.items():
if mapping.resolved_tag != "UNKNOWN":
node_data = builder.G.nodes[node_id]
all_mapped_tags.append({
"entity_id": node_id,
"tagName": mapping.resolved_tag,
"bbox": (
node_data["bbox"].bounds
if hasattr(node_data["bbox"], "bounds")
else node_data["bbox"]
),
"clean_value": mapping.resolved_tag,
})
# 태그-설비 연결
equipments = [n for n, d in builder.G.nodes(data=True)
if d.get("type") not in ("TEXT", "LINE", "LWPOLYLINE")]
if equipments:
eq_grid = builder._build_spatial_grid(equipments)
tag_ids = [f"tag_{t.get('tagNo', '')}" for t in all_tags]
for tag_id in tag_ids:
if tag_id in builder.G:
best_match = builder._find_nearest_equipment(tag_id, eq_grid)
if best_match:
builder.G.add_edge(tag_id, best_match, relation="associated_with")
# Phase 4: 최종 위상 모델링 + 저장
final_builder = PidTopologyBuilder(geo_data, all_extracted_tags=all_mapped_tags)
final_builder.build_graph()
graph_id = os.path.basename(filepath).replace(".dxf", "_graph.json")
graph_id = basename.replace(".dxf", "_graph.json")
graph_path = os.path.join(STORAGE_DIR, graph_id)
final_builder.save_graph(graph_path)
builder.save_graph(graph_path)
logging.info(f"build_pid_graph_parallel graph_id={graph_id} "
f"nodes={final_builder.G.number_of_nodes()} "
f"edges={final_builder.G.number_of_edges()}")
# 임시 디렉토리 정리
shutil.rmtree(temp_dir, ignore_errors=True)
total_time = time.time() - t0
logging.info(f"[{basename}] 전체 완료 ({total_time:.1f}s) - graph_id={graph_id} "
f"nodes={builder.G.number_of_nodes()} "
f"edges={builder.G.number_of_edges()} "
f"tags={len(all_tags)}")
return json.dumps({
"success": True,
"graph_id": graph_id,
"graph_path": graph_path,
"nodes": final_builder.G.number_of_nodes(),
"edges": final_builder.G.number_of_edges(),
"nodes": builder.G.number_of_nodes(),
"edges": builder.G.number_of_edges(),
"tags_extracted": len(all_tags),
"processing_time_sec": round(total_time, 1)
}, ensure_ascii=False)

View File

@@ -0,0 +1,609 @@
#!/usr/bin/env python3
"""P&ID 파싱 전용 워커 프로세스 (테스트용 - 절대 경로 지원)
Usage: python pid_worker_test.py <port>
담당 도구:
extract_pid_tags, match_pid_tags,
parse_pid_dxf, parse_pid_pdf, parse_pid_drawing,
build_pid_graph_parallel, analyze_pid_impact
"""
from __future__ import annotations
import sys
import os
# 프로젝트 루트를 Python 경로에 추가
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, PROJECT_ROOT)
import io
import json
import asyncio
import signal
import logging
import re
from functools import lru_cache
from fastapi import FastAPI, Request, BackgroundTasks
import uvicorn
# ── 설정 ─────────────────────────────────────────────────────────────────────
VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8000/v1")
VLLM_MODEL = os.environ.get("VLLM_MODEL", "Qwen3.6-27B-FP8")
DB_CONNECTION_STRING = os.environ.get("DB_CONNECTION_STRING", "postgresql://postgres:postgres@localhost:5432/iiot_platform")
DB_TIMEOUT = int(os.environ.get("DB_TIMEOUT", "10"))
_SERVER_DIR = PROJECT_ROOT
STORAGE_DIR = os.path.join(_SERVER_DIR, "storage")
logging.basicConfig(
level=logging.INFO,
stream=sys.stderr,
format="%(asctime)s [pid_worker] %(levelname)s %(message)s",
)
app = FastAPI()
# ── 싱글톤 ───────────────────────────────────────────────────────────────────
@lru_cache(maxsize=1)
def _llm():
from openai import OpenAI
return OpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
@lru_cache(maxsize=1)
def _ocr():
from paddleocr import PaddleOCR
use_gpu = os.environ.get("PADDLE_USE_GPU", "true").lower() == "true"
try:
return PaddleOCR(use_angle_cls=True, lang="korean", use_gpu=use_gpu, show_log=False)
except Exception:
if use_gpu:
os.environ["PADDLE_USE_GPU"] = "false"
return _ocr()
raise
# ── DB ───────────────────────────────────────────────────────────────────────
def _get_db_connection():
import psycopg
return psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
# ── 텍스트 추출 ──────────────────────────────────────────────────────────────
def _extract_text_from_dxf(filepath: str) -> str:
import ezdxf
from ezdxf.tools.text import plain_mtext
doc = ezdxf.readfile(filepath)
msp = doc.modelspace()
texts = []
for entity in msp:
if entity.dxftype() == "TEXT":
texts.append(entity.dxf.text)
elif entity.dxftype() == "MTEXT":
try:
plain = plain_mtext(entity.dxf.text)
if plain.strip():
texts.append(plain)
except Exception:
pass
return "\n".join(texts)
def _extract_text_from_pdf(filepath: str) -> str:
import fitz
doc = fitz.open(filepath)
return "\n".join(page.get_text() for page in doc)
def _extract_text_from_pdf_ocr(filepath: str) -> str:
import fitz
from PIL import Image
import numpy as np
doc = fitz.open(filepath)
all_texts = []
for page in doc:
mat = fitz.Matrix(300 / 72)
pix = page.get_pixmap(matrix=mat)
img = Image.open(io.BytesIO(pix.tobytes("png")))
result = _ocr().ocr(np.array(img), cls=True)
if result and result[0]:
all_texts.extend(line[1][0] for line in result[0])
return "\n".join(all_texts)
# ── JSON 배열 파싱 유틸 ───────────────────────────────────────────────────────
def _parse_json_array(raw: str, finish_reason: str = "") -> list:
"""LLM 출력에서 JSON 배열 추출. finish_reason=length 잘림 복구 포함."""
if raw.startswith("```"):
lines = raw.splitlines()
raw = "\n".join(lines[1:-1] if lines and lines[-1].strip() == "```" else lines[1:]).strip()
if finish_reason == "length":
last_close = raw.rfind("}")
if last_close != -1:
raw = raw[:last_close + 1] + "]"
# 가장 긴 균형 잡힌 [...] 추출
depth = 0; start = -1; best = ""
for i, c in enumerate(raw):
if c == "[":
if depth == 0:
start = i
depth += 1
elif c == "]":
depth -= 1
if depth == 0 and start >= 0:
cand = raw[start:i + 1]
if len(cand) > len(best):
best = cand
raw = best if best else "[]"
try:
return json.loads(raw)
except json.JSONDecodeError:
data = []
for obj in re.findall(r"\{[^{}]*\}", raw, re.DOTALL):
try:
data.append(json.loads(obj))
except json.JSONDecodeError:
pass
return data
# ── 태그 추출/매핑 도구 ───────────────────────────────────────────────────────
def _extract_pid_tags(text: str, source_type: str) -> str:
system = (
"You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
"Extract all instrument and equipment tags from the provided text.\n"
"Return ONLY a valid JSON array. Each element must have exactly these fields:\n"
'{"tagNo":"FCV-101","equipmentName":null,"instrumentType":"FCV",'
'"lineNumber":null,"pidDrawingNo":null,"confidence":0.95}\n'
"Rules:\n"
"- tagNo: any token matching [LETTERS]-[DIGITS] or [LETTERS]-[DIGITS]-[SUFFIX]\n"
" Examples: FCV-101, P-10101, T-10100, VG-6203-15A-F1A-n, BT-6200, DP-10101\n"
"- instrumentType: leading letters of tagNo\n"
"- equipmentName: descriptive name if present near tag, else null\n"
"- lineNumber/pidDrawingNo: null unless explicitly associated\n"
"- confidence: 0.95 for clear tags, lower for ambiguous\n"
"- Output ONLY the JSON array, no markdown, no explanation.\n"
"- If no tags found, return: []\n"
)
truncated = text[:100000]
resp = _llm().chat.completions.create(
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": f"Source: {source_type}\n\nText:\n{truncated}"},
],
max_tokens=32768,
temperature=0.1,
extra_body={"chat_template_kwargs": {"enable_thinking": False}},
)
raw = (resp.choices[0].message.content or "").strip()
data = _parse_json_array(raw, resp.choices[0].finish_reason)
logging.info(f"extract_pid_tags source={source_type} count={len(data)}")
return json.dumps({
"success": True,
"data": {"count": len(data), "tags": data},
"message": "태그 추출 완료"
}, ensure_ascii=False, indent=2)
def _match_pid_tags(pid_tags: list, experion_tags: list) -> str:
system = (
"You are a P&ID to Experion tag matching expert.\n"
"Match P&ID tags to Experion tags based on similarity.\n"
"Return ONLY a JSON array:\n"
'[{"pidTag":"FT-101","experionTag":"ft-101.pv","confidence":0.92},...]\n'
"- If no good match: confidence < 0.5, experionTag null\n"
"- Output ONLY the JSON array.\n"
)
resp = _llm().chat.completions.create(
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": (
f"P&ID Tags:\n{chr(10).join(pid_tags)}\n\n"
f"Experion Tags:\n{chr(10).join(experion_tags)}"
)},
],
max_tokens=16384,
temperature=0.1,
extra_body={"chat_template_kwargs": {"enable_thinking": False}},
)
raw = (resp.choices[0].message.content or "").strip()
data = _parse_json_array(raw, resp.choices[0].finish_reason)
return json.dumps({
"success": True,
"data": {"count": len(data), "mappings": data},
"message": "태그 매핑 완료"
}, ensure_ascii=False, indent=2)
# ── 도면 파싱 도구 ────────────────────────────────────────────────────────────
_TAG_EXTRACT_SYSTEM = (
"You are a P&ID (Piping and Instrumentation Diagram) expert.\n"
"Extract instrument and equipment tags from the provided text.\n"
"Return ONLY a JSON array:\n"
'[{"tagNo":"FIT-10115","equipmentName":"Flow Transmitter","instrumentType":"FIT",'
'"lineNumber":"L-101","pidDrawingNo":"P&ID-001","confidence":0.95},...]\n'
"Rules:\n"
"- tagNo: Instrument [Function]-[Number], Equipment [Type]-[Number]\n"
"- instrumentType: first 2-4 letters of tagNo\n"
"- equipmentName/lineNumber/pidDrawingNo: null if not present\n"
"- confidence: 0.0 to 1.0\n"
"- Output ONLY the JSON array, no markdown.\n"
"- If no tags found, return: []\n"
)
def _parse_pid_dxf(filepath: str) -> str:
text = _extract_text_from_dxf(filepath)
if not text.strip():
return json.dumps({"success": True, "text": "", "count": 0, "tags": []},
ensure_ascii=False, indent=2)
resp = _llm().chat.completions.create(
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": _TAG_EXTRACT_SYSTEM},
{"role": "user", "content": f"Source: dxf\n\nText:\n{text[:8000]}"},
],
max_tokens=8192,
temperature=0.1,
)
raw = (resp.choices[0].message.content or "").strip()
data = _parse_json_array(raw, resp.choices[0].finish_reason)
if not isinstance(data, list):
data = []
return json.dumps({
"success": True,
"data": {"text": text[:10000], "count": len(data), "tags": data},
"message": "DXF 파싱 완료"
}, ensure_ascii=False, indent=2)
def _parse_pid_pdf(filepath: str, use_ocr: bool = True) -> str:
text = _extract_text_from_pdf_ocr(filepath) if use_ocr else _extract_text_from_pdf(filepath)
if not text.strip():
return json.dumps({"success": True, "text": "", "count": 0, "tags": []},
ensure_ascii=False, indent=2)
resp = _llm().chat.completions.create(
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": _TAG_EXTRACT_SYSTEM},
{"role": "user", "content": f"Source: pdf\n\nText:\n{text[:12000]}"},
],
max_tokens=4096,
temperature=0.1,
)
raw = (resp.choices[0].message.content or "").strip()
data = _parse_json_array(raw, resp.choices[0].finish_reason)
if not isinstance(data, list):
data = []
return json.dumps({
"success": True,
"data": {"text": text[:10000], "count": len(data), "tags": data},
"message": "PDF 파싱 완료"
}, ensure_ascii=False, indent=2)
def _parse_pid_drawing(filepath: str) -> str:
ext = os.path.splitext(filepath)[1].lower()
if ext == ".dxf":
return _parse_pid_dxf(filepath)
elif ext == ".pdf":
return _parse_pid_pdf(filepath)
elif ext == ".dwg":
return json.dumps({
"success": False,
"data": None,
"error": "DWG 파일은 직접 파싱할 수 없습니다. DXF로 변환 후 사용하세요.",
"message": "지원하지 않는 파일 형식"
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"error": f"지원하지 않는 형식: {ext}. 지원: .dxf, .pdf",
}, ensure_ascii=False)
# ── 그래프 도구 ───────────────────────────────────────────────────────────────
import time
async def _build_pid_graph_parallel(filepath: str) -> str:
"""
P&ID 그래프 빌드 — 독립 프로세스 병렬 아키텍처.
Phase 1: 도면 분할 + 기하 추출
Phase 2: 전체 텍스트 1회 추출
Phase 3: 5개 독립 추출 프로세스 병렬 실행 (subprocess.Popen)
Phase 4: 결과 통합 + tagNo 기준 중복 제거
Phase 5: 위상 그래프 빌드 + 저장
"""
from pipeline.extractor import PidGeometricExtractor
from pipeline.topology import PidTopologyBuilder
import subprocess
import tempfile
import shutil
os.makedirs(STORAGE_DIR, exist_ok=True)
t0 = time.time()
# 절대 경로로 변환하여 파일 경로 문제 해결
if not os.path.isabs(filepath):
filepath = os.path.join(PROJECT_ROOT, filepath)
filepath = os.path.abspath(filepath)
basename = os.path.basename(filepath)
worker_dir = os.path.join(PROJECT_ROOT, "mcp-server", "worker")
logging.info(f"[{basename}] === 독립 프로세스 병렬 아키텍처 시작 ===")
# ── Phase 1: 도면 분할 + 기하 추출 ──────────────────────────────
logging.info(f"[{basename}] Phase 1: 도면 분할 + 기하 추출 시작")
extractor = PidGeometricExtractor(filepath)
regions = extractor.split_drawings()
logging.info(f"[{basename}] 도면 분할: {len(regions)}개 영역")
geo_data_path = os.path.join(STORAGE_DIR, basename + "_geo.json")
extractor.extract_and_save(geo_data_path)
with open(geo_data_path, "r", encoding="utf-8") as f:
geo_data = json.load(f)
logging.info(f"[{basename}] Phase 1 완료 ({time.time()-t0:.1f}s) - {len(geo_data)}개 엔티티")
# ── Phase 2: 전체 텍스트 1회 추출 ───────────────────────────────
t2 = time.time()
logging.info(f"[{basename}] Phase 2: 전체 텍스트 1회 추출")
full_text = _extract_text_from_dxf(filepath)
# 임시 디렉토리 생성 (프로세스 간 통신용)
temp_dir = tempfile.mkdtemp(prefix=f"pid_{basename.replace('.dxf', '')}_")
text_path = os.path.join(temp_dir, "full_text.txt")
with open(text_path, "w", encoding="utf-8") as f:
f.write(full_text)
logging.info(f"[{basename}] Phase 2 완료 ({time.time()-t2:.1f}s) - {len(full_text)}")
# ── Phase 3: 5개 독립 추출 프로세스 병렬 실행 ───────────────────
t3 = time.time()
logging.info(f"[{basename}] Phase 3: 5개 독립 추출 프로세스 병렬 실행")
extractors = [
("sensor", "pid_extract_sensor.py"),
("valve", "pid_extract_valve.py"),
("system", "pid_extract_system.py"),
("gauge", "pid_extract_gauge.py"),
("pump", "pid_extract_pump.py"),
]
results_dir = os.path.join(temp_dir, "results")
os.makedirs(results_dir, exist_ok=True)
processes = []
for name, script in extractors:
output_path = os.path.join(results_dir, f"{name}.json")
script_path = os.path.join(worker_dir, script)
cmd = [
sys.executable, script_path,
"--input", text_path,
"--output", output_path,
]
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
processes.append((name, proc, output_path))
logging.info(f"[{basename}] 시작: {name} (pid={proc.pid})")
except Exception as e:
logging.error(f"[{basename}] 실행 실패 {name}: {e}")
# 모든 프로세스 대기 (timeout=300초)
for name, proc, output_path in processes:
try:
stdout, stderr = proc.communicate(timeout=300)
if proc.returncode != 0:
logging.error(f"[{basename}] {name} 실패 (rc={proc.returncode}): {stderr[:200]}")
else:
logging.info(f"[{basename}] {name} 완료: {stdout.strip()[:100]}")
except subprocess.TimeoutExpired:
proc.kill()
logging.error(f"[{basename}] {name} 타임아웃 (300초)")
logging.info(f"[{basename}] Phase 3 완료 ({time.time()-t3:.1f}s)")
# ── Phase 4: 결과 JSON 통합 + tagNo 기준 중복 제거 ──────────────
t4 = time.time()
logging.info(f"[{basename}] Phase 4: 결과 통합 + 중복 제거")
all_tags = []
seen_tagnos = set()
for name, _, output_path in processes:
if not os.path.exists(output_path):
logging.warning(f"[{basename}] 결과 없음: {name}")
continue
try:
with open(output_path, "r", encoding="utf-8") as f:
result = json.load(f)
tags = result.get("tags", [])
count_new = 0
for tag in tags:
tag_no = tag.get("tagNo", "")
if tag_no and tag_no not in seen_tagnos:
seen_tagnos.add(tag_no)
all_tags.append(tag)
count_new += 1
logging.info(f"[{basename}] {name}: {len(tags)}개 중 {count_new}개 신규")
except Exception as e:
logging.error(f"[{basename}] 결과 로드 실패 {name}: {e}")
logging.info(f"[{basename}] Phase 4 완료 ({time.time()-t4:.1f}s) - 총 {len(all_tags)}개 태그")
# ── Phase 5: 위상 그래프 빌드 + 저장 ────────────────────────────
t5 = time.time()
logging.info(f"[{basename}] Phase 5: 위상 그래프 빌드")
builder = PidTopologyBuilder(geo_data)
builder.build_graph()
# 추출된 태그를 그래프에 추가
from shapely.geometry import box as shapely_box
for tag in all_tags:
tag_no = tag.get("tagNo", "UNKNOWN")
eq_name = tag.get("equipmentName")
inst_type = tag.get("instrumentType")
confidence = tag.get("confidence", 0.5)
# 해당 태그의 bbox 찾기 (geo_data에서 clean_value 매칭)
matched_bbox = None
for entity in geo_data:
if entity.get("clean_value", "").upper() == tag_no.upper():
bbox = entity.get("bbox", {})
matched_bbox = (
bbox.get("min_x"), bbox.get("min_y"),
bbox.get("max_x"), bbox.get("max_y")
)
break
node_id = f"tag_{tag_no}"
if matched_bbox:
bbox_geom = shapely_box(matched_bbox[0], matched_bbox[1],
matched_bbox[2], matched_bbox[3])
builder.G.add_node(node_id,
type="TEXT",
bbox=bbox_geom,
value=tag_no,
equipment_name=eq_name,
instrument_type=inst_type,
confidence=confidence)
else:
builder.G.add_node(node_id,
type="TEXT",
bbox=shapely_box(0, 0, 1, 1),
value=tag_no,
equipment_name=eq_name,
instrument_type=inst_type,
confidence=confidence)
# 태그-설비 연결
equipments = [n for n, d in builder.G.nodes(data=True)
if d.get("type") not in ("TEXT", "LINE", "LWPOLYLINE")]
if equipments:
eq_grid = builder._build_spatial_grid(equipments)
tag_ids = [f"tag_{t.get('tagNo', '')}" for t in all_tags]
for tag_id in tag_ids:
if tag_id in builder.G:
best_match = builder._find_nearest_equipment(tag_id, eq_grid)
if best_match:
builder.G.add_edge(tag_id, best_match, relation="associated_with")
graph_id = basename.replace(".dxf", "_graph.json")
graph_path = os.path.join(STORAGE_DIR, graph_id)
builder.save_graph(graph_path)
# 임시 디렉토리 정리
shutil.rmtree(temp_dir, ignore_errors=True)
total_time = time.time() - t0
logging.info(f"[{basename}] 전체 완료 ({total_time:.1f}s) - graph_id={graph_id} "
f"nodes={builder.G.number_of_nodes()} "
f"edges={builder.G.number_of_edges()} "
f"tags={len(all_tags)}")
return json.dumps({
"success": True,
"graph_id": graph_id,
"graph_path": graph_path,
"nodes": builder.G.number_of_nodes(),
"edges": builder.G.number_of_edges(),
"tags_extracted": len(all_tags),
"processing_time_sec": round(total_time, 1)
}, ensure_ascii=False)
def _analyze_pid_impact(graph_id: str, start_node_id: str) -> str:
from pipeline.analyzer import PidAnalysisEngine
graph_path = os.path.join(STORAGE_DIR, graph_id)
mapping_path = graph_path.replace("_graph.json", "_mapping.json")
analyzer = PidAnalysisEngine(graph_path, mapping_path)
result = analyzer.analyze_impact(start_node_id)
return json.dumps(result, ensure_ascii=False, indent=2)
# ── 요청 디스패처 ─────────────────────────────────────────────────────────────
async def _dispatch(tool: str, params: dict) -> str:
try:
match tool:
# blocking 함수는 asyncio.to_thread로 스레드풀 오프로드
case "extract_pid_tags":
return await asyncio.to_thread(_extract_pid_tags, **params)
case "match_pid_tags":
return await asyncio.to_thread(_match_pid_tags, **params)
case "parse_pid_dxf":
return await asyncio.to_thread(_parse_pid_dxf, **params)
case "parse_pid_pdf":
return await asyncio.to_thread(_parse_pid_pdf, **params)
case "parse_pid_drawing":
return await asyncio.to_thread(_parse_pid_drawing, **params)
case "analyze_pid_impact":
return await asyncio.to_thread(_analyze_pid_impact, **params)
# 이미 async — 직접 await
case "build_pid_graph_parallel":
return await _build_pid_graph_parallel(**params)
case _:
return json.dumps({"success": False, "error": f"알 수 없는 도구: {tool}"},
ensure_ascii=False)
except Exception as e:
logging.error(f"dispatch error tool={tool}: {e}", exc_info=True)
return json.dumps({"success": False, "error": str(e)}, ensure_ascii=False)
# ── 종료 예약 ─────────────────────────────────────────────────────────────────
def _schedule_shutdown():
"""
응답 전송 완료 후 프로세스 종료 예약.
FastAPI의 BackgroundTasks를 사용하여 응답이 완전히 전송된 후 종료되도록 유도함.
"""
async def _do():
# 네트워크 전송 및 커넥션 정리를 위한 최소한의 대기 시간
await asyncio.sleep(1.0)
logging.info("One-shot worker shutting down...")
os.kill(os.getpid(), signal.SIGTERM)
asyncio.create_task(_do())
# ── HTTP 엔드포인트 ───────────────────────────────────────────────────────────
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/execute")
async def execute(request: Request):
body = await request.json()
return await _dispatch(body["tool"], body["params"])
@app.post("/execute/one_shot")
async def execute_one_shot(request: Request, background_tasks: BackgroundTasks):
"""요청 처리 후 프로세스 자동 종료 (P&ID 워커 전용)."""
body = await request.json()
result = await _dispatch(body["tool"], body["params"])
# BackgroundTasks에 등록하여 응답 전송이 완료된 후 _schedule_shutdown이 실행되도록 함
background_tasks.add_task(_schedule_shutdown)
return result
# ── 진입점 ───────────────────────────────────────────────────────────────────
if __name__ == "__main__":
port = int(sys.argv[1]) if len(sys.argv) > 1 else 5005
os.makedirs(STORAGE_DIR, exist_ok=True)
uvicorn.run(app, host="0.0.0.0", port=port, log_level="warning")

View File

@@ -105,7 +105,7 @@ async def _ask_llm(question: str, context: str = "") -> str:
prompt = question
response = await client.chat.completions.create(
model=VLLM_MODEL,
model="Qwen3.6-27B-FP8",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt},