- P&ID: 연결 분석 API, Prefix 규칙 관리, 카테고리 분류, DXF 그래프 빌드 - LLM: 대화 요약, tool card 영구 보존, 시계열 차트(uPlot), 에이전트 모드 - KB: 청크 미리보기, Field Instrument Inference, 인증/Qdrant 클라이언트 - MCP: 서버 기능 확장, 파이프라인 수정, timeout 개선 - Frontend: P&ID UI, LLM UI, KB UI, OPC UA Write 탭 추가 - 설정: AGENTS.md, plant_context, README, opencode.json 업데이트 - 정리: 진단 체크리스트 문서 삭제
718 lines
27 KiB
Python
718 lines
27 KiB
Python
"""
|
||
P&ID 연결 분석 v2 — 좌표 근접성 기반
|
||
======================================
|
||
|
||
원리: C-10111 분석에서 발견한 패턴
|
||
- 설비(컬럼/탱크) = 긴 수직 LINE(≥200u) 또는 큰 원(r≥4.5)
|
||
- 계기(instrument balloon)는 설비와 Y축 겹침 + 수평 200u 이내
|
||
- LINE 연결이 없어도 좌표 근접성으로 연결 판단
|
||
|
||
순서:
|
||
1. DXF 로드, LINE/CIRCLE/TEXT 추출
|
||
2. instrument balloon 식별 (CIRCLE r≥1.5 + func+num)
|
||
3. equipment anchor 식별 (긴 수직 LINE + 큰 원)
|
||
4. 좌표 근접성 기반 연결 (Y겹침 + 수평거리 ≤ 200)
|
||
5. LINE endpoint 연결과 비교
|
||
"""
|
||
|
||
import ezdxf, re, time, math
|
||
from shapely.geometry import LineString, Point, MultiLineString, box
|
||
from shapely.strtree import STRtree
|
||
from shapely.ops import linemerge
|
||
from collections import defaultdict
|
||
|
||
DXF_PATH = "src/Web/uploads/pid/No-10_Plant_PID.dxf"
|
||
TAG_RE = re.compile(r'^[A-Z]{1,6}-\d{2,6}(-[A-Z0-9]+)*$', re.I)
|
||
FUNC_RE = re.compile(r'^[FPLTASHQWVXZBDCRK][ICTREVYSAQGZ]{1,3}$')
|
||
NUM_RE = re.compile(r'^\d{3,6}[A-Z]?$')
|
||
PIPE_RE = re.compile(r'^\w+-\d{3,6}-\d+[A-Z]?-[A-Z][A-Z0-9]', re.I)
|
||
LOOP_RE = re.compile(r'\d{3,6}[A-Z]?')
|
||
|
||
# ── 1-2. DXF 로드 + LINE/CIRCLE/TEXT ──────────────────────────────────
|
||
t0 = time.time()
|
||
doc = ezdxf.readfile(DXF_PATH)
|
||
msp = doc.modelspace()
|
||
print(f"1. DXF 로드: {time.time()-t0:.2f}s", flush=True)
|
||
|
||
# LINE 추출
|
||
lines_raw = []
|
||
for e in msp:
|
||
t = e.dxftype()
|
||
if t == 'LINE':
|
||
s = (e.dxf.start.x, e.dxf.start.y)
|
||
e2 = (e.dxf.end.x, e.dxf.end.y)
|
||
if s != e2:
|
||
lines_raw.append(LineString([s, e2]))
|
||
elif t == 'LWPOLYLINE':
|
||
pts = [(p[0], p[1]) for p in e.vertices()]
|
||
if len(pts) >= 2:
|
||
lines_raw.append(LineString(pts))
|
||
|
||
# linemerge
|
||
merged = linemerge(MultiLineString(lines_raw))
|
||
merged_list = list(merged.geoms) if merged.geom_type == 'MultiLineString' else [merged]
|
||
|
||
# CIRCLE
|
||
circles = [(e.dxf.center.x, e.dxf.center.y, e.dxf.radius) for e in msp if e.dxftype() == 'CIRCLE']
|
||
|
||
# TEXT
|
||
text_entries = []
|
||
for e in msp:
|
||
if e.dxftype() == 'TEXT':
|
||
v = e.dxf.text.strip().replace('%%U', '').replace('%%C', 'Φ')
|
||
if v:
|
||
text_entries.append((e.dxf.insert.x, e.dxf.insert.y, v))
|
||
|
||
print(f"2. 추출: LINE={len(lines_raw)}, CIRCLE={len(circles)}, TEXT={len(text_entries)}", flush=True)
|
||
|
||
# ── 3. Instrument balloon 식별 ────────────────────────────────────────
|
||
t0 = time.time()
|
||
|
||
# TEXT → CIRCLE 포함 (r≥1.5)
|
||
text_in_circle = defaultdict(list)
|
||
for x, y, v in text_entries:
|
||
for cx, cy, r in circles:
|
||
if r >= 1.5 and (x-cx)**2 + (y-cy)**2 <= r*r:
|
||
text_in_circle[v].append((cx, cy, r))
|
||
|
||
# circle center → func+num
|
||
circ_data = {}
|
||
for v, occ in text_in_circle.items():
|
||
for cx, cy, r in occ:
|
||
k = (round(cx, 1), round(cy, 1))
|
||
if k not in circ_data:
|
||
circ_data[k] = {'r': r, 'x': cx, 'y': cy}
|
||
if FUNC_RE.match(v):
|
||
circ_data[k]['func'] = v
|
||
elif NUM_RE.match(v):
|
||
circ_data[k]['num'] = v
|
||
elif TAG_RE.match(v):
|
||
circ_data[k]['tag'] = v
|
||
|
||
instruments = []
|
||
for k, d in circ_data.items():
|
||
if 'func' in d:
|
||
tag = f"{d['func']}-{d.get('num', '?')}"
|
||
# prefer explicit tag if available
|
||
if 'tag' in d:
|
||
tag = d['tag']
|
||
instruments.append({'x': d['x'], 'y': d['y'], 'tag': tag, 'r': d['r'],
|
||
'loop': re.search(r'\d{3,6}', tag).group() if re.search(r'\d{3,6}', tag) else None})
|
||
|
||
# loop number별 계기 그룹
|
||
loop_inst = defaultdict(list)
|
||
for inst in instruments:
|
||
if inst['loop']:
|
||
loop_inst[inst['loop']].append(inst)
|
||
|
||
print(f"3-A. Instrument: {len(instruments)}개")
|
||
print(f" Loop 번호 그룹: {len(loop_inst)}개")
|
||
|
||
# ── 3-B. 방향표지판 검출 ──────────────────────────────────────────
|
||
t0 = time.time()
|
||
|
||
# 방향별 분류: h_seg는 raw LINE만 (O(n²) 회피), v/d_seg는 merged+raw
|
||
h_lines = []
|
||
v_lines = []
|
||
d_lines = []
|
||
for l in lines_raw:
|
||
coords = list(l.coords)
|
||
if len(coords) < 2:
|
||
continue
|
||
x1, y1 = coords[0]
|
||
x2, y2 = coords[-1]
|
||
dx = abs(x2 - x1)
|
||
dy = abs(y2 - y1)
|
||
if dy <= 1 and dx >= 3:
|
||
h_lines.append({'xl': min(x1, x2), 'xr': max(x1, x2), 'y': y1})
|
||
elif dx <= 1 and dy >= 2.5:
|
||
v_lines.append({'x': x1, 'y1': min(y1, y2), 'y2': max(y1, y2)})
|
||
elif dx >= 1 and dy >= 1:
|
||
angle = math.degrees(math.atan2(dy, dx))
|
||
if 20 <= angle <= 70 or 110 <= angle <= 160:
|
||
d_lines.append({'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2})
|
||
# merged LINE 보강 (v/d만) - merged LINE은 길어서 relaxed threshold
|
||
for l in merged_list:
|
||
coords = list(l.coords)
|
||
if len(coords) < 2:
|
||
continue
|
||
x1, y1 = coords[0]
|
||
x2, y2 = coords[-1]
|
||
dx = abs(x2 - x1)
|
||
dy = abs(y2 - y1)
|
||
if dx <= 1 and dy >= 2.5:
|
||
v_lines.append({'x': x1, 'y1': min(y1, y2), 'y2': max(y1, y2)})
|
||
elif dx >= 1 and dy >= 1:
|
||
angle = math.degrees(math.atan2(dy, dx))
|
||
if 20 <= angle <= 70 or 110 <= angle <= 160:
|
||
d_lines.append({'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2})
|
||
|
||
# 수평선쌍: 같은 xl/xr + y간격 2~8u (xr 기준 버킷으로 O(n²) 회피)
|
||
h_by_xr = defaultdict(list)
|
||
for h in h_lines:
|
||
h_by_xr[round(h['xr'] / 5) * 5].append(h)
|
||
h_pairs = []
|
||
seen_pair = set() # (xl_round, xr_round, y_mid_round) 중복 제거
|
||
for bucket in h_by_xr.values():
|
||
for i, h1 in enumerate(bucket):
|
||
for h2 in bucket[i+1:]:
|
||
if abs(h1['y'] - h2['y']) < 2:
|
||
continue
|
||
if abs(h1['xl'] - h2['xl']) <= 3 and abs(h1['xr'] - h2['xr']) <= 3:
|
||
ylo = min(h1['y'], h2['y'])
|
||
yhi = max(h1['y'], h2['y'])
|
||
gap = yhi - ylo
|
||
if 2 <= gap <= 8:
|
||
xl = (h1['xl']+h2['xl'])/2
|
||
xr = (h1['xr']+h2['xr'])/2
|
||
k = (round(xl), round(xr), round((ylo+yhi)/2))
|
||
if k not in seen_pair:
|
||
seen_pair.add(k)
|
||
h_pairs.append({'xl': xl, 'xr': xr, 'ylo': ylo, 'yhi': yhi, 'y_mid': (ylo+yhi)/2})
|
||
|
||
def _has_left_vert(hp, vlist, tol=8):
|
||
xl, ym = hp['xl'], hp['y_mid']
|
||
return any(abs(v['x'] - xl) <= tol and v['y1'] <= ym <= v['y2'] for v in vlist)
|
||
|
||
def _count_right_diag(hp, dlist, tol=8):
|
||
xr, ym = hp['xr'], hp['y_mid']
|
||
return sum(1 for d in dlist
|
||
if abs(d['x1'] - xr) <= tol and abs(d['y1'] - ym) <= tol
|
||
or abs(d['x2'] - xr) <= tol and abs(d['y2'] - ym) <= tol)
|
||
|
||
def _nearest_tag_to(hp, texts, limit=25):
|
||
mx, my = (hp['xl']+hp['xr'])/2, hp['y_mid']
|
||
best, best_d = None, 999
|
||
for tx, ty, tv in texts:
|
||
d = math.hypot(tx - mx, ty - my)
|
||
if d < best_d and d <= limit:
|
||
best_d, best = d, tv
|
||
return best, best_d
|
||
|
||
print(f" marker filter start... v={len(v_lines)} d={len(d_lines)} texts={len(text_entries)}")
|
||
markers = []
|
||
for i, hp in enumerate(h_pairs):
|
||
if not _has_left_vert(hp, v_lines):
|
||
continue
|
||
cnt_diag = _count_right_diag(hp, d_lines)
|
||
if cnt_diag < 2:
|
||
continue
|
||
tag, tag_dist = _nearest_tag_to(hp, text_entries)
|
||
if tag is None:
|
||
continue
|
||
mx = round((hp['xl']+hp['xr'])/2, 1)
|
||
my = round(hp['y_mid'], 1)
|
||
markers.append({'tag': tag, 'mx': mx, 'my': my,
|
||
'xl': hp['xl'], 'xr': hp['xr'],
|
||
'ylo': hp['ylo'], 'yhi': hp['yhi'], 'dr': cnt_diag})
|
||
|
||
et = time.time()
|
||
# 위치 중복 제거 (동일태그 + 동일좌표 → 1개)
|
||
seen = set()
|
||
deduped = []
|
||
for m in markers:
|
||
k = (m['tag'], round(m['mx'], 0), round(m['my'], 0))
|
||
if k not in seen:
|
||
seen.add(k)
|
||
deduped.append(m)
|
||
markers = deduped
|
||
|
||
print(f"3-B. 방향표지판(마커): {len(markers)}개 (h_pairs={len(h_pairs)}, elapsed={et-t0:.2f}s)")
|
||
print(f" ROI(y≥5100): {sum(1 for m in markers if m['my'] >= 5100)}개")
|
||
tag_groups = defaultdict(list)
|
||
for m in markers:
|
||
tag_groups[m['tag']].append(m)
|
||
multi = {t: v for t, v in tag_groups.items() if len(v) >= 2}
|
||
print(f" 태그 있는 마커: {sum(1 for m in markers if m['tag'])}개")
|
||
print(f" 동일태그 그룹(≥2): {len(multi)}개")
|
||
for t, v in sorted(multi.items(), key=lambda x: -len(x[1]))[:10]:
|
||
poss = ', '.join(f"({m['mx']:.0f},{m['my']:.0f})" for m in v)
|
||
print(f" {t}: {len(v)}개 [{poss}]")
|
||
|
||
# ── 4. Equipment anchor 식별 ──────────────────────────────────────────
|
||
t0_4 = time.time()
|
||
|
||
# (A) 긴 수직 LINE (≥200u, almost vertical)
|
||
vert_lines = []
|
||
for l in merged_list:
|
||
c = list(l.coords)
|
||
xs = [p[0] for p in c]
|
||
ys = [p[1] for p in c]
|
||
dx = max(xs) - min(xs)
|
||
dy = max(ys) - min(ys)
|
||
if dy >= 200 and dy > dx * 3: # vertical-ish
|
||
vert_lines.append({
|
||
'x': round(sum(xs)/len(xs), 1),
|
||
'y_min': min(ys), 'y_max': max(ys),
|
||
'length': l.length, 'bounds': l.bounds
|
||
})
|
||
|
||
# dedup by x
|
||
vert_grouped = defaultdict(list)
|
||
for v in vert_lines:
|
||
vert_grouped[round(v['x'])].append(v)
|
||
|
||
vert_anchors = []
|
||
for x, vl in vert_grouped.items():
|
||
y_min = min(v['y_min'] for v in vl)
|
||
y_max = max(v['y_max'] for v in vl)
|
||
vert_anchors.append({'x': x, 'y_min': y_min, 'y_max': y_max,
|
||
'count': len(vl), 'total_len': sum(v['length'] for v in vl)})
|
||
|
||
# (B) 큰 원 (r≥4.5, empty or with text)
|
||
large_circs = [(cx, cy, r) for cx, cy, r in circles if r >= 4.5]
|
||
# Find nearest TAG text
|
||
eqp_by_circle = []
|
||
for cx, cy, r in large_circs:
|
||
best_tag = None
|
||
best_d = 999
|
||
for tx, ty, v in text_entries:
|
||
if TAG_RE.match(v):
|
||
d = math.hypot(tx-cx, ty-cy)
|
||
if d < best_d and d > r and d < 200:
|
||
best_d = d
|
||
best_tag = v
|
||
eqp_by_circle.append({'x': cx, 'y': cy, 'tag': best_tag or f'CIRCLE@{cx:.0f},{cy:.0f}',
|
||
'r': r, 'kind': 'large_circle'})
|
||
|
||
print(f"\n4. Equipment anchor:")
|
||
print(f" 수직 LINE(≥200u): {len(vert_anchors)}개")
|
||
for va in sorted(vert_anchors, key=lambda v: v['x']):
|
||
print(f" x={va['x']:6.1f}, y=[{va['y_min']:.0f}, {va['y_max']:.0f}], 높이={va['y_max']-va['y_min']:.0f}")
|
||
print(f" 큰 원(r≥4.5): {len(eqp_by_circle)}개")
|
||
for eq in eqp_by_circle:
|
||
print(f" ({eq['x']:6.1f}, {eq['y']:6.1f}) r={eq['r']:.1f} tag={eq['tag']}")
|
||
|
||
# (C) 작은 원 (r<4.5, r≥1.5, exclude instrument balloons)
|
||
# instrument balloon circles already identified as those containing TEXT
|
||
balloon_circles = set()
|
||
for v, occ in text_in_circle.items():
|
||
for cx, cy, r in occ:
|
||
balloon_circles.add((round(cx, 1), round(cy, 1)))
|
||
small_circs = [(cx, cy, r) for cx, cy, r in circles
|
||
if 1.5 <= r < 4.5 and (round(cx,1), round(cy,1)) not in balloon_circles]
|
||
small_eqp = []
|
||
for cx, cy, r in small_circs:
|
||
best_tag = None
|
||
best_d = 999
|
||
for tx, ty, v in text_entries:
|
||
if TAG_RE.match(v):
|
||
d = math.hypot(tx-cx, ty-cy)
|
||
if d < best_d and d > r and d < 100:
|
||
best_d = d
|
||
best_tag = v
|
||
small_eqp.append({'x': cx, 'y': cy, 'tag': best_tag or f'SMALL@{cx:.0f},{cy:.0f}',
|
||
'r': r, 'kind': 'small_circle'})
|
||
eqp_by_small = small_eqp
|
||
|
||
print(f" 작은 원(1.5≤r<4.5, excl. balloon): {len(eqp_by_small)}개")
|
||
for eq in eqp_by_small[:10]:
|
||
print(f" ({eq['x']:6.1f}, {eq['y']:6.1f}) r={eq['r']:.1f} tag={eq['tag']}")
|
||
|
||
# Merge small into _what_at equipment list
|
||
all_eqp = eqp_by_circle + eqp_by_small
|
||
|
||
# ── 3-C. 방향표지판 LINE endpoint 추적 (2-hop BFS, 직각굽힘 허용) ──
|
||
t0 = time.time()
|
||
|
||
# Precompute: endpoint spatial index (grid bucketing)
|
||
# 30u limit → grid cell size 15 so we only check 3×3 neighbors
|
||
GRID = 15
|
||
ep_grid = defaultdict(list)
|
||
line_ep = {}
|
||
for li, l in enumerate(merged_list):
|
||
coords = list(l.coords)
|
||
if not coords:
|
||
continue
|
||
s, e = coords[0], coords[-1]
|
||
for ep, ei in [(s, 0), (e, -1)]:
|
||
gx, gy = int(ep[0] // GRID), int(ep[1] // GRID)
|
||
ep_grid[(gx, gy)].append((li, ei, ep))
|
||
line_ep[li] = {'s': s, 'e': e}
|
||
|
||
def _grid_neighbors(pt):
|
||
gx, gy = int(pt[0] // GRID), int(pt[1] // GRID)
|
||
return [(gx+dx, gy+dy) for dx in (-1,0,1) for dy in (-1,0,1)]
|
||
|
||
def _find_ep_idx(pt, limit=30):
|
||
best = None
|
||
for gk in _grid_neighbors(pt):
|
||
for li, ei, ep in ep_grid.get(gk, []):
|
||
d = math.hypot(pt[0]-ep[0], pt[1]-ep[1])
|
||
if d <= limit and (best is None or d < best[0]):
|
||
best = (d, li, ei, ep)
|
||
return best
|
||
|
||
def _follow_line(li, ei):
|
||
ep = line_ep[li]
|
||
return ep['e'] if ei == 0 else ep['s']
|
||
|
||
def _connected_at(pt, tol=5):
|
||
result = []
|
||
for gk in _grid_neighbors(pt):
|
||
for li, ei, ep in ep_grid.get(gk, []):
|
||
d = math.hypot(pt[0]-ep[0], pt[1]-ep[1])
|
||
if d <= tol:
|
||
result.append((li, ei, ep))
|
||
return result
|
||
|
||
def _is_origin(kind, tag, pt, origin_tag, origin_pos, span=40):
|
||
"""추적 결과가 출발 마커 자기자신인지 (cycle) 판정."""
|
||
if origin_tag is None or kind != 'marker' or tag != origin_tag:
|
||
return False
|
||
return math.hypot(pt[0] - origin_pos[0], pt[1] - origin_pos[1]) <= span
|
||
|
||
|
||
def _trace_2hop(pt, markers, vert_anchors, all_eqp, limit=30,
|
||
origin_tag=None, origin_pos=None):
|
||
"""마커 endpoint → 2-hop 추적. 출발 마커로 되돌아오는 사이클 차단 + 중복 제거."""
|
||
hop0 = _find_ep_idx(pt, limit)
|
||
if not hop0:
|
||
return []
|
||
_, li1, ei1, ep1 = hop0
|
||
op1 = _follow_line(li1, ei1)
|
||
best = {} # (kind, tag) → (total_len, path) 최단 1개만 유지
|
||
|
||
def _offer(kind, tag, total_len, path, at_pt):
|
||
if kind == 'empty':
|
||
return
|
||
if _is_origin(kind, tag, at_pt, origin_tag, origin_pos):
|
||
return # self-cycle: 출발 마커로 회귀
|
||
k = (kind, tag)
|
||
if k not in best or total_len < best[k][0]:
|
||
best[k] = (total_len, path)
|
||
|
||
# hop1: 같은 endpoint를 공유하는 세그먼트(중복 li2 제거)
|
||
seen_li2 = set()
|
||
for li2, ei2, ep2 in _connected_at(op1):
|
||
if li2 == li1 or li2 in seen_li2:
|
||
continue
|
||
seen_li2.add(li2)
|
||
op2 = _follow_line(li2, ei2)
|
||
kind, tag = _what_at_pt(op2, markers, vert_anchors, all_eqp, limit)
|
||
_offer(kind, tag, merged_list[li1].length + merged_list[li2].length,
|
||
f"seg#{li1}→seg#{li2}", op2)
|
||
|
||
# hop0 자체의 반대쪽 끝
|
||
kind1, tag1 = _what_at_pt(op1, markers, vert_anchors, all_eqp, limit)
|
||
_offer(kind1, tag1, merged_list[li1].length, f"seg#{li1}", op1)
|
||
|
||
return [(k[0], k[1], v[0], v[1]) for k, v in best.items()]
|
||
|
||
# Precompute marker grid for fast what_at lookup
|
||
marker_grid = defaultdict(list)
|
||
for mi, m in enumerate(markers):
|
||
gx, gy = int(m['mx'] // GRID), int(m['my'] // GRID)
|
||
marker_grid[(gx, gy)].append(mi)
|
||
|
||
def _what_at_pt(pt, markers, vert_anchors, all_eqp, limit=30):
|
||
px, py = pt
|
||
for gk in _grid_neighbors(pt):
|
||
for mi in marker_grid.get(gk, []):
|
||
m = markers[mi]
|
||
if math.hypot(px-m['mx'], py-m['my']) <= limit:
|
||
return 'marker', m['tag']
|
||
for va in vert_anchors:
|
||
if va['y_min'] <= py <= va['y_max'] and abs(px-va['x']) <= limit:
|
||
return 'equipment', f"VLINE@{va['x']:.0f}"
|
||
for eq in all_eqp:
|
||
if math.hypot(px-eq['x'], py-eq['y']) <= limit:
|
||
return 'equipment', eq['tag']
|
||
return 'empty', ''
|
||
|
||
marker_traces = []
|
||
trace_log = []
|
||
for m in markers:
|
||
mx, my, xl, xr, tag = m['mx'], m['my'], m['xl'], m['xr'], m['tag']
|
||
for side, pt in [('left', (xl, my)), ('right', (xr, my))]:
|
||
hops = _trace_2hop(pt, markers, vert_anchors, all_eqp,
|
||
origin_tag=tag, origin_pos=(mx, my))
|
||
for kind, to_tag, total_len, path in hops:
|
||
marker_traces.append({
|
||
'from': tag, 'from_pos': f"({mx:.0f},{my:.0f})",
|
||
'side': side, 'to_kind': kind, 'to_tag': to_tag,
|
||
'total_len': total_len, 'path': path
|
||
})
|
||
if tag == 'P-10101':
|
||
trace_log.append(f" {side} ({pt[0]:.1f},{pt[1]:.1f}) → {path}: {kind}={to_tag} ({total_len:.1f}u)")
|
||
|
||
et = time.time()
|
||
print(f"3-C. 마커 LINE endpoint 추적 (2-hop BFS, elapsed={et-t0:.2f}s):")
|
||
print(f" 추적 엣지: {len(marker_traces)}개")
|
||
lk = defaultdict(int)
|
||
for e in marker_traces:
|
||
lk[e['to_kind']] += 1
|
||
for k, c in sorted(lk.items()):
|
||
print(f" → {k}: {c}개")
|
||
if trace_log:
|
||
print(f" P-10101 상세 추적:")
|
||
for line in trace_log:
|
||
print(line)
|
||
|
||
# ── 3-D. 태그 매칭 연결 ─────────────────────────────────────────
|
||
t0 = time.time()
|
||
|
||
tag_match_edges = []
|
||
for tag, ml in multi.items():
|
||
for i in range(len(ml)):
|
||
for j in range(i+1, len(ml)):
|
||
tag_match_edges.append({
|
||
'tag': tag,
|
||
'from_pos': f"({ml[i]['mx']:.0f},{ml[i]['my']:.0f})",
|
||
'to_pos': f"({ml[j]['mx']:.0f},{ml[j]['my']:.0f})",
|
||
'span': abs(ml[i]['mx'] - ml[j]['mx'])
|
||
})
|
||
|
||
et = time.time()
|
||
print(f"3-D. 태그 매칭 연결: {len(tag_match_edges)}쌍 (elapsed={et-t0:.2f}s)")
|
||
for t, ml in sorted(multi.items(), key=lambda x: -len(x[1]))[:10]:
|
||
xs = [m['mx'] for m in ml]
|
||
span = max(xs) - min(xs)
|
||
print(f" {t}: {len(ml)}개 위치, 스팬={span:.0f}u")
|
||
|
||
# ── 5. 좌표 근접성 기반 연결 ──────────────────────────────────────────
|
||
t0 = time.time()
|
||
|
||
# 각 instrument → 가장 가까운 equipment anchor 찾기
|
||
# 기준: Y범위 겹침 + 수평거리 ≤ 200
|
||
HORIZONTAL_LIMIT = 200
|
||
|
||
inst_eqp_connections = [] # (inst, eqp, dist, method)
|
||
unconnected_inst = []
|
||
|
||
for inst in instruments:
|
||
ix, iy = inst['x'], inst['y']
|
||
best_eqp = None
|
||
best_d = 999
|
||
|
||
# 수직 LINE anchor
|
||
for va in vert_anchors:
|
||
if va['y_min'] <= iy <= va['y_max']:
|
||
d = abs(ix - va['x'])
|
||
if d < best_d and d <= HORIZONTAL_LIMIT:
|
||
best_d = d
|
||
best_eqp = (f"VLINE@{va['x']:.0f}", va, 'vline')
|
||
|
||
# 큰 원 anchor
|
||
for eq in eqp_by_circle:
|
||
d = math.hypot(ix - eq['x'], iy - eq['y'])
|
||
if d < best_d and d <= HORIZONTAL_LIMIT:
|
||
best_d = d
|
||
best_eqp = (eq['tag'], eq, 'circle')
|
||
|
||
if best_eqp:
|
||
inst_eqp_connections.append((inst, best_eqp[0], best_d, best_eqp[2]))
|
||
else:
|
||
unconnected_inst.append(inst)
|
||
|
||
print(f"\n5. 좌표 근접 연결 (수평≤{HORIZONTAL_LIMIT}):")
|
||
print(f" 계기-설비 연결: {len(inst_eqp_connections)}개")
|
||
print(f" 미연결 계기: {len(unconnected_inst)}개")
|
||
|
||
# 연결 분류
|
||
by_method = defaultdict(int)
|
||
for *_, method in inst_eqp_connections:
|
||
by_method[method] += 1
|
||
for m, c in by_method.items():
|
||
print(f" {m}: {c}개")
|
||
|
||
# loop별 연결 분포
|
||
loop_connected = defaultdict(list)
|
||
loop_unconnected = defaultdict(list)
|
||
for inst, eqp_tag, d, method in inst_eqp_connections:
|
||
loop_connected[inst['loop']].append((inst, eqp_tag, d))
|
||
for inst in unconnected_inst:
|
||
loop_unconnected[inst['loop']].append(inst)
|
||
|
||
print(f"\n Loop별 연결/미연결:")
|
||
for loop in sorted([k for k in loop_connected if k is not None], key=int)[:20]:
|
||
conn = len(loop_connected[loop])
|
||
unconn = len(loop_unconnected.get(loop, []))
|
||
total = conn + unconn
|
||
if total >= 3:
|
||
eqps = set(e for _, e, _ in loop_connected[loop])
|
||
print(f" Loop {loop}: {conn}/{total} 연결 → 설비: {', '.join(eqps)}")
|
||
|
||
# ── 6. LINE endpoint 연결과 비교 ──────────────────────────────────────
|
||
t0 = time.time()
|
||
|
||
# long lines (≥30)
|
||
long_lines = [l for l in merged_list if l.length >= 30]
|
||
|
||
# build anchor index
|
||
inst_tree = STRtree([Point(inst['x'], inst['y']) for inst in instruments])
|
||
inst_list = instruments
|
||
|
||
BUFFER = 50
|
||
|
||
def nearest_inst(pt):
|
||
idxs = inst_tree.query(box(pt.x-BUFFER, pt.y-BUFFER, pt.x+BUFFER, pt.y+BUFFER), predicate='intersects')
|
||
if len(idxs) == 0:
|
||
return None
|
||
best_d = 999
|
||
best = None
|
||
for idx in idxs:
|
||
p = inst_tree.geometries[idx]
|
||
d = p.distance(pt)
|
||
if d < best_d:
|
||
best_d = d
|
||
best = (inst_list[idx], best_d)
|
||
if best and best[1] <= BUFFER:
|
||
return best
|
||
return None
|
||
|
||
line_connections = []
|
||
inst_seen = set()
|
||
for l in long_lines:
|
||
ep1 = Point(l.coords[0])
|
||
ep2 = Point(l.coords[-1])
|
||
n1 = nearest_inst(ep1)
|
||
n2 = nearest_inst(ep2)
|
||
if n1 and n2:
|
||
i1, d1 = n1
|
||
i2, d2 = n2
|
||
if i1['tag'] != i2['tag']:
|
||
line_connections.append((i1, i2, l.length, d1, d2))
|
||
|
||
print(f"\n6. LINE endpoint 연결 (≥30u, buffer={BUFFER}):")
|
||
print(f" 계기-계기 연결: {len(line_connections)}개")
|
||
|
||
# 같은 loop 내 연결 vs 다른 loop 연결
|
||
same_loop = 0
|
||
diff_loop = 0
|
||
for i1, i2, ll, d1, d2 in line_connections:
|
||
if i1['loop'] and i2['loop'] and i1['loop'] == i2['loop']:
|
||
same_loop += 1
|
||
else:
|
||
diff_loop += 1
|
||
print(f" 같은 loop: {same_loop}개")
|
||
print(f" 다른 loop: {diff_loop}개")
|
||
|
||
# ── 7. Loop 기반 통합 연결 ────────────────────────────────────────────
|
||
# loop → 가장 가까운 equipment anchor
|
||
print(f"\n7. Loop 기반 연결 분석:")
|
||
for loop in sorted(loop_inst, key=int):
|
||
insts = loop_inst[loop]
|
||
# Y범위
|
||
ys = [i['y'] for i in insts]
|
||
y_min, y_max = min(ys), max(ys)
|
||
xs = [i['x'] for i in insts]
|
||
x_min, x_max = min(xs), max(xs)
|
||
|
||
# 가장 가까운 equipment 찾기
|
||
cx, cy = sum(xs)/len(xs), sum(ys)/len(ys)
|
||
best_eqp = None
|
||
best_d = 999
|
||
|
||
for va in vert_anchors:
|
||
if va['y_min'] <= cy <= va['y_max']:
|
||
d = abs(cx - va['x'])
|
||
if d < best_d and d <= HORIZONTAL_LIMIT:
|
||
best_d = d
|
||
best_eqp = f"VLINE@{va['x']:.0f}"
|
||
|
||
for eq in eqp_by_circle:
|
||
d = math.hypot(cx - eq['x'], cy - eq['y'])
|
||
if d < best_d and d <= HORIZONTAL_LIMIT:
|
||
best_d = d
|
||
best_eqp = eq['tag']
|
||
|
||
if best_eqp:
|
||
connected = sum(1 for inst in insts if any(inst['tag'] == ii['tag'] for ii, *_ in loop_connected.get(loop, [])))
|
||
print(f" Loop {loop}: {len(insts)}개 계기 → {best_eqp} (거리={best_d:.0f}, Y=[{y_min:.0f},{y_max:.0f}], X=[{x_min:.0f},{x_max:.0f}])")
|
||
|
||
# ── 8. 방향성 부여 + 유향 그래프 + JSON 출력 ──────────────────────────────
|
||
# ▶ 마커(V자=우측)는 국소 흐름이 좌→우.
|
||
# side='left' 추적대상 = 상류(upstream) → edge: target → marker
|
||
# side='right' 추적대상 = 하류(downstream)→ edge: marker → target
|
||
# 태그매칭: 같은 태그가 여러 x위치 = off-page connector 연속. x오름차순 체인(좌→우).
|
||
import json
|
||
try:
|
||
import networkx as _nx
|
||
_HAVE_NX = True
|
||
except ImportError:
|
||
_HAVE_NX = False
|
||
|
||
directed_edges = [] # {from, to, type, basis, weight}
|
||
_seen_de = set()
|
||
def _add_de(frm, to, typ, basis, w):
|
||
if not frm or not to or frm == to:
|
||
return
|
||
k = (frm, to, typ)
|
||
if k in _seen_de:
|
||
return
|
||
_seen_de.add(k)
|
||
directed_edges.append({'from': frm, 'to': to, 'type': typ,
|
||
'basis': basis, 'weight': round(w, 1)})
|
||
|
||
# (1) trace 기반 유향 엣지 (사이클 차단된 marker_traces 사용)
|
||
for e in marker_traces:
|
||
if e['to_kind'] == 'empty' or not e['to_tag'] or e['to_tag'] == e['from']:
|
||
continue
|
||
if e['side'] == 'left':
|
||
_add_de(e['to_tag'], e['from'], 'trace', 'marker-left(upstream)', e['total_len'])
|
||
else:
|
||
_add_de(e['from'], e['to_tag'], 'trace', 'marker-right(downstream)', e['total_len'])
|
||
|
||
# (2) 태그매칭 유향 엣지 (x오름차순 체인)
|
||
for tag, ml in multi.items():
|
||
ordered = sorted(ml, key=lambda m: m['mx'])
|
||
for a, b in zip(ordered, ordered[1:]):
|
||
_add_de(f"{tag}@{a['mx']:.0f},{a['my']:.0f}",
|
||
f"{tag}@{b['mx']:.0f},{b['my']:.0f}",
|
||
'tagmatch', 'offpage-connector(+x)',
|
||
abs(b['mx'] - a['mx']))
|
||
|
||
# 유향 그래프 → 약연결 성분
|
||
components_out = []
|
||
if _HAVE_NX:
|
||
DG = _nx.DiGraph()
|
||
for de in directed_edges:
|
||
DG.add_edge(de['from'], de['to'])
|
||
for comp in sorted(_nx.weakly_connected_components(DG), key=len, reverse=True):
|
||
if len(comp) >= 2:
|
||
components_out.append(sorted(comp))
|
||
|
||
out = {
|
||
'drawing': DXF_PATH,
|
||
'stats': {
|
||
'instruments': len(instruments),
|
||
'markers': len(markers),
|
||
'marker_tag_groups': len(multi),
|
||
'trace_edges_raw': len(marker_traces),
|
||
'directed_edges': len(directed_edges),
|
||
'tagmatch_directed': sum(1 for d in directed_edges if d['type'] == 'tagmatch'),
|
||
'trace_directed': sum(1 for d in directed_edges if d['type'] == 'trace'),
|
||
'components': len(components_out),
|
||
},
|
||
'markers': [{'tag': m['tag'], 'x': m['mx'], 'y': m['my'],
|
||
'dir': 'right', 'diag': m['dr']} for m in markers],
|
||
'equipment': [{'tag': eq['tag'], 'kind': eq['kind'],
|
||
'x': round(eq['x'], 1), 'y': round(eq['y'], 1)}
|
||
for eq in all_eqp],
|
||
'edges': directed_edges,
|
||
'components': components_out,
|
||
}
|
||
OUT_JSON = "mcp-server/storage/No-10_connections.json"
|
||
import os as _os
|
||
_os.makedirs(_os.path.dirname(OUT_JSON), exist_ok=True)
|
||
with open(OUT_JSON, 'w', encoding='utf-8') as f:
|
||
json.dump(out, f, ensure_ascii=False, indent=1)
|
||
print(f"\n8. 방향성 + JSON 출력:")
|
||
print(f" 유향 엣지: {len(directed_edges)}개 "
|
||
f"(태그매칭 {out['stats']['tagmatch_directed']}, trace {out['stats']['trace_directed']})")
|
||
print(f" 유향 약연결 성분(≥2): {len(components_out)}개")
|
||
print(f" → {OUT_JSON} ({_os.path.getsize(OUT_JSON)/1024:.0f} KB)")
|
||
|
||
# ── 요약 ──────────────────────────────────────────────────────────────────
|
||
print(f"\n{'='*60}")
|
||
print(f"요약")
|
||
print(f"{'='*60}")
|
||
print(f"Instruments (balloon): {len(instruments)}개")
|
||
print(f"방향표지판 (marker): {len(markers)}개 (태그있음={sum(1 for m in markers if m['tag'])}개)")
|
||
print(f"동일태그 그룹(≥2): {len(multi)}개, 태그매칭 엣지: {len(tag_match_edges)}쌍")
|
||
print(f"Equipment anchors: 수직LINE {len(vert_anchors)}개 + 큰원 {len(eqp_by_circle)}개 + 작은원 {len(eqp_by_small)}개")
|
||
print(f"좌표 근접 연결 (수평≤{HORIZONTAL_LIMIT}): {len(inst_eqp_connections)}개 / {len(instruments)}개 ({len(inst_eqp_connections)/max(len(instruments),1)*100:.1f}%)")
|
||
print(f"LINE endpoint 연결 (balloon-balloon): {len(line_connections)}쌍 (같은 loop {same_loop}개)")
|
||
print(f"Marker trace 엣지: {len(marker_traces)}개")
|