Files
ExperionCrawler/mcp-server/sim_line_connection.py
windpacer 302183c97e feat: P&ID 연결 분석, LLM 에이전트 모드, KB 확장, MCP 서버 리팩토링
- P&ID: 연결 분석 API, Prefix 규칙 관리, 카테고리 분류, DXF 그래프 빌드
- LLM: 대화 요약, tool card 영구 보존, 시계열 차트(uPlot), 에이전트 모드
- KB: 청크 미리보기, Field Instrument Inference, 인증/Qdrant 클라이언트
- MCP: 서버 기능 확장, 파이프라인 수정, timeout 개선
- Frontend: P&ID UI, LLM UI, KB UI, OPC UA Write 탭 추가
- 설정: AGENTS.md, plant_context, README, opencode.json 업데이트
- 정리: 진단 체크리스트 문서 삭제
2026-05-21 23:36:57 +09:00

718 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
P&ID 연결 분석 v2 — 좌표 근접성 기반
======================================
원리: C-10111 분석에서 발견한 패턴
- 설비(컬럼/탱크) = 긴 수직 LINE(≥200u) 또는 큰 원(r≥4.5)
- 계기(instrument balloon)는 설비와 Y축 겹침 + 수평 200u 이내
- LINE 연결이 없어도 좌표 근접성으로 연결 판단
순서:
1. DXF 로드, LINE/CIRCLE/TEXT 추출
2. instrument balloon 식별 (CIRCLE r≥1.5 + func+num)
3. equipment anchor 식별 (긴 수직 LINE + 큰 원)
4. 좌표 근접성 기반 연결 (Y겹침 + 수평거리 ≤ 200)
5. LINE endpoint 연결과 비교
"""
import ezdxf, re, time, math
from shapely.geometry import LineString, Point, MultiLineString, box
from shapely.strtree import STRtree
from shapely.ops import linemerge
from collections import defaultdict
DXF_PATH = "src/Web/uploads/pid/No-10_Plant_PID.dxf"
TAG_RE = re.compile(r'^[A-Z]{1,6}-\d{2,6}(-[A-Z0-9]+)*$', re.I)
FUNC_RE = re.compile(r'^[FPLTASHQWVXZBDCRK][ICTREVYSAQGZ]{1,3}$')
NUM_RE = re.compile(r'^\d{3,6}[A-Z]?$')
PIPE_RE = re.compile(r'^\w+-\d{3,6}-\d+[A-Z]?-[A-Z][A-Z0-9]', re.I)
LOOP_RE = re.compile(r'\d{3,6}[A-Z]?')
# ── 1-2. DXF 로드 + LINE/CIRCLE/TEXT ──────────────────────────────────
t0 = time.time()
doc = ezdxf.readfile(DXF_PATH)
msp = doc.modelspace()
print(f"1. DXF 로드: {time.time()-t0:.2f}s", flush=True)
# LINE 추출
lines_raw = []
for e in msp:
t = e.dxftype()
if t == 'LINE':
s = (e.dxf.start.x, e.dxf.start.y)
e2 = (e.dxf.end.x, e.dxf.end.y)
if s != e2:
lines_raw.append(LineString([s, e2]))
elif t == 'LWPOLYLINE':
pts = [(p[0], p[1]) for p in e.vertices()]
if len(pts) >= 2:
lines_raw.append(LineString(pts))
# linemerge
merged = linemerge(MultiLineString(lines_raw))
merged_list = list(merged.geoms) if merged.geom_type == 'MultiLineString' else [merged]
# CIRCLE
circles = [(e.dxf.center.x, e.dxf.center.y, e.dxf.radius) for e in msp if e.dxftype() == 'CIRCLE']
# TEXT
text_entries = []
for e in msp:
if e.dxftype() == 'TEXT':
v = e.dxf.text.strip().replace('%%U', '').replace('%%C', 'Φ')
if v:
text_entries.append((e.dxf.insert.x, e.dxf.insert.y, v))
print(f"2. 추출: LINE={len(lines_raw)}, CIRCLE={len(circles)}, TEXT={len(text_entries)}", flush=True)
# ── 3. Instrument balloon 식별 ────────────────────────────────────────
t0 = time.time()
# TEXT → CIRCLE 포함 (r≥1.5)
text_in_circle = defaultdict(list)
for x, y, v in text_entries:
for cx, cy, r in circles:
if r >= 1.5 and (x-cx)**2 + (y-cy)**2 <= r*r:
text_in_circle[v].append((cx, cy, r))
# circle center → func+num
circ_data = {}
for v, occ in text_in_circle.items():
for cx, cy, r in occ:
k = (round(cx, 1), round(cy, 1))
if k not in circ_data:
circ_data[k] = {'r': r, 'x': cx, 'y': cy}
if FUNC_RE.match(v):
circ_data[k]['func'] = v
elif NUM_RE.match(v):
circ_data[k]['num'] = v
elif TAG_RE.match(v):
circ_data[k]['tag'] = v
instruments = []
for k, d in circ_data.items():
if 'func' in d:
tag = f"{d['func']}-{d.get('num', '?')}"
# prefer explicit tag if available
if 'tag' in d:
tag = d['tag']
instruments.append({'x': d['x'], 'y': d['y'], 'tag': tag, 'r': d['r'],
'loop': re.search(r'\d{3,6}', tag).group() if re.search(r'\d{3,6}', tag) else None})
# loop number별 계기 그룹
loop_inst = defaultdict(list)
for inst in instruments:
if inst['loop']:
loop_inst[inst['loop']].append(inst)
print(f"3-A. Instrument: {len(instruments)}")
print(f" Loop 번호 그룹: {len(loop_inst)}")
# ── 3-B. 방향표지판 검출 ──────────────────────────────────────────
t0 = time.time()
# 방향별 분류: h_seg는 raw LINE만 (O(n²) 회피), v/d_seg는 merged+raw
h_lines = []
v_lines = []
d_lines = []
for l in lines_raw:
coords = list(l.coords)
if len(coords) < 2:
continue
x1, y1 = coords[0]
x2, y2 = coords[-1]
dx = abs(x2 - x1)
dy = abs(y2 - y1)
if dy <= 1 and dx >= 3:
h_lines.append({'xl': min(x1, x2), 'xr': max(x1, x2), 'y': y1})
elif dx <= 1 and dy >= 2.5:
v_lines.append({'x': x1, 'y1': min(y1, y2), 'y2': max(y1, y2)})
elif dx >= 1 and dy >= 1:
angle = math.degrees(math.atan2(dy, dx))
if 20 <= angle <= 70 or 110 <= angle <= 160:
d_lines.append({'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2})
# merged LINE 보강 (v/d만) - merged LINE은 길어서 relaxed threshold
for l in merged_list:
coords = list(l.coords)
if len(coords) < 2:
continue
x1, y1 = coords[0]
x2, y2 = coords[-1]
dx = abs(x2 - x1)
dy = abs(y2 - y1)
if dx <= 1 and dy >= 2.5:
v_lines.append({'x': x1, 'y1': min(y1, y2), 'y2': max(y1, y2)})
elif dx >= 1 and dy >= 1:
angle = math.degrees(math.atan2(dy, dx))
if 20 <= angle <= 70 or 110 <= angle <= 160:
d_lines.append({'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2})
# 수평선쌍: 같은 xl/xr + y간격 2~8u (xr 기준 버킷으로 O(n²) 회피)
h_by_xr = defaultdict(list)
for h in h_lines:
h_by_xr[round(h['xr'] / 5) * 5].append(h)
h_pairs = []
seen_pair = set() # (xl_round, xr_round, y_mid_round) 중복 제거
for bucket in h_by_xr.values():
for i, h1 in enumerate(bucket):
for h2 in bucket[i+1:]:
if abs(h1['y'] - h2['y']) < 2:
continue
if abs(h1['xl'] - h2['xl']) <= 3 and abs(h1['xr'] - h2['xr']) <= 3:
ylo = min(h1['y'], h2['y'])
yhi = max(h1['y'], h2['y'])
gap = yhi - ylo
if 2 <= gap <= 8:
xl = (h1['xl']+h2['xl'])/2
xr = (h1['xr']+h2['xr'])/2
k = (round(xl), round(xr), round((ylo+yhi)/2))
if k not in seen_pair:
seen_pair.add(k)
h_pairs.append({'xl': xl, 'xr': xr, 'ylo': ylo, 'yhi': yhi, 'y_mid': (ylo+yhi)/2})
def _has_left_vert(hp, vlist, tol=8):
xl, ym = hp['xl'], hp['y_mid']
return any(abs(v['x'] - xl) <= tol and v['y1'] <= ym <= v['y2'] for v in vlist)
def _count_right_diag(hp, dlist, tol=8):
xr, ym = hp['xr'], hp['y_mid']
return sum(1 for d in dlist
if abs(d['x1'] - xr) <= tol and abs(d['y1'] - ym) <= tol
or abs(d['x2'] - xr) <= tol and abs(d['y2'] - ym) <= tol)
def _nearest_tag_to(hp, texts, limit=25):
mx, my = (hp['xl']+hp['xr'])/2, hp['y_mid']
best, best_d = None, 999
for tx, ty, tv in texts:
d = math.hypot(tx - mx, ty - my)
if d < best_d and d <= limit:
best_d, best = d, tv
return best, best_d
print(f" marker filter start... v={len(v_lines)} d={len(d_lines)} texts={len(text_entries)}")
markers = []
for i, hp in enumerate(h_pairs):
if not _has_left_vert(hp, v_lines):
continue
cnt_diag = _count_right_diag(hp, d_lines)
if cnt_diag < 2:
continue
tag, tag_dist = _nearest_tag_to(hp, text_entries)
if tag is None:
continue
mx = round((hp['xl']+hp['xr'])/2, 1)
my = round(hp['y_mid'], 1)
markers.append({'tag': tag, 'mx': mx, 'my': my,
'xl': hp['xl'], 'xr': hp['xr'],
'ylo': hp['ylo'], 'yhi': hp['yhi'], 'dr': cnt_diag})
et = time.time()
# 위치 중복 제거 (동일태그 + 동일좌표 → 1개)
seen = set()
deduped = []
for m in markers:
k = (m['tag'], round(m['mx'], 0), round(m['my'], 0))
if k not in seen:
seen.add(k)
deduped.append(m)
markers = deduped
print(f"3-B. 방향표지판(마커): {len(markers)}개 (h_pairs={len(h_pairs)}, elapsed={et-t0:.2f}s)")
print(f" ROI(y≥5100): {sum(1 for m in markers if m['my'] >= 5100)}")
tag_groups = defaultdict(list)
for m in markers:
tag_groups[m['tag']].append(m)
multi = {t: v for t, v in tag_groups.items() if len(v) >= 2}
print(f" 태그 있는 마커: {sum(1 for m in markers if m['tag'])}")
print(f" 동일태그 그룹(≥2): {len(multi)}")
for t, v in sorted(multi.items(), key=lambda x: -len(x[1]))[:10]:
poss = ', '.join(f"({m['mx']:.0f},{m['my']:.0f})" for m in v)
print(f" {t}: {len(v)}개 [{poss}]")
# ── 4. Equipment anchor 식별 ──────────────────────────────────────────
t0_4 = time.time()
# (A) 긴 수직 LINE (≥200u, almost vertical)
vert_lines = []
for l in merged_list:
c = list(l.coords)
xs = [p[0] for p in c]
ys = [p[1] for p in c]
dx = max(xs) - min(xs)
dy = max(ys) - min(ys)
if dy >= 200 and dy > dx * 3: # vertical-ish
vert_lines.append({
'x': round(sum(xs)/len(xs), 1),
'y_min': min(ys), 'y_max': max(ys),
'length': l.length, 'bounds': l.bounds
})
# dedup by x
vert_grouped = defaultdict(list)
for v in vert_lines:
vert_grouped[round(v['x'])].append(v)
vert_anchors = []
for x, vl in vert_grouped.items():
y_min = min(v['y_min'] for v in vl)
y_max = max(v['y_max'] for v in vl)
vert_anchors.append({'x': x, 'y_min': y_min, 'y_max': y_max,
'count': len(vl), 'total_len': sum(v['length'] for v in vl)})
# (B) 큰 원 (r≥4.5, empty or with text)
large_circs = [(cx, cy, r) for cx, cy, r in circles if r >= 4.5]
# Find nearest TAG text
eqp_by_circle = []
for cx, cy, r in large_circs:
best_tag = None
best_d = 999
for tx, ty, v in text_entries:
if TAG_RE.match(v):
d = math.hypot(tx-cx, ty-cy)
if d < best_d and d > r and d < 200:
best_d = d
best_tag = v
eqp_by_circle.append({'x': cx, 'y': cy, 'tag': best_tag or f'CIRCLE@{cx:.0f},{cy:.0f}',
'r': r, 'kind': 'large_circle'})
print(f"\n4. Equipment anchor:")
print(f" 수직 LINE(≥200u): {len(vert_anchors)}")
for va in sorted(vert_anchors, key=lambda v: v['x']):
print(f" x={va['x']:6.1f}, y=[{va['y_min']:.0f}, {va['y_max']:.0f}], 높이={va['y_max']-va['y_min']:.0f}")
print(f" 큰 원(r≥4.5): {len(eqp_by_circle)}")
for eq in eqp_by_circle:
print(f" ({eq['x']:6.1f}, {eq['y']:6.1f}) r={eq['r']:.1f} tag={eq['tag']}")
# (C) 작은 원 (r<4.5, r≥1.5, exclude instrument balloons)
# instrument balloon circles already identified as those containing TEXT
balloon_circles = set()
for v, occ in text_in_circle.items():
for cx, cy, r in occ:
balloon_circles.add((round(cx, 1), round(cy, 1)))
small_circs = [(cx, cy, r) for cx, cy, r in circles
if 1.5 <= r < 4.5 and (round(cx,1), round(cy,1)) not in balloon_circles]
small_eqp = []
for cx, cy, r in small_circs:
best_tag = None
best_d = 999
for tx, ty, v in text_entries:
if TAG_RE.match(v):
d = math.hypot(tx-cx, ty-cy)
if d < best_d and d > r and d < 100:
best_d = d
best_tag = v
small_eqp.append({'x': cx, 'y': cy, 'tag': best_tag or f'SMALL@{cx:.0f},{cy:.0f}',
'r': r, 'kind': 'small_circle'})
eqp_by_small = small_eqp
print(f" 작은 원(1.5≤r<4.5, excl. balloon): {len(eqp_by_small)}")
for eq in eqp_by_small[:10]:
print(f" ({eq['x']:6.1f}, {eq['y']:6.1f}) r={eq['r']:.1f} tag={eq['tag']}")
# Merge small into _what_at equipment list
all_eqp = eqp_by_circle + eqp_by_small
# ── 3-C. 방향표지판 LINE endpoint 추적 (2-hop BFS, 직각굽힘 허용) ──
t0 = time.time()
# Precompute: endpoint spatial index (grid bucketing)
# 30u limit → grid cell size 15 so we only check 3×3 neighbors
GRID = 15
ep_grid = defaultdict(list)
line_ep = {}
for li, l in enumerate(merged_list):
coords = list(l.coords)
if not coords:
continue
s, e = coords[0], coords[-1]
for ep, ei in [(s, 0), (e, -1)]:
gx, gy = int(ep[0] // GRID), int(ep[1] // GRID)
ep_grid[(gx, gy)].append((li, ei, ep))
line_ep[li] = {'s': s, 'e': e}
def _grid_neighbors(pt):
gx, gy = int(pt[0] // GRID), int(pt[1] // GRID)
return [(gx+dx, gy+dy) for dx in (-1,0,1) for dy in (-1,0,1)]
def _find_ep_idx(pt, limit=30):
best = None
for gk in _grid_neighbors(pt):
for li, ei, ep in ep_grid.get(gk, []):
d = math.hypot(pt[0]-ep[0], pt[1]-ep[1])
if d <= limit and (best is None or d < best[0]):
best = (d, li, ei, ep)
return best
def _follow_line(li, ei):
ep = line_ep[li]
return ep['e'] if ei == 0 else ep['s']
def _connected_at(pt, tol=5):
result = []
for gk in _grid_neighbors(pt):
for li, ei, ep in ep_grid.get(gk, []):
d = math.hypot(pt[0]-ep[0], pt[1]-ep[1])
if d <= tol:
result.append((li, ei, ep))
return result
def _is_origin(kind, tag, pt, origin_tag, origin_pos, span=40):
"""추적 결과가 출발 마커 자기자신인지 (cycle) 판정."""
if origin_tag is None or kind != 'marker' or tag != origin_tag:
return False
return math.hypot(pt[0] - origin_pos[0], pt[1] - origin_pos[1]) <= span
def _trace_2hop(pt, markers, vert_anchors, all_eqp, limit=30,
origin_tag=None, origin_pos=None):
"""마커 endpoint → 2-hop 추적. 출발 마커로 되돌아오는 사이클 차단 + 중복 제거."""
hop0 = _find_ep_idx(pt, limit)
if not hop0:
return []
_, li1, ei1, ep1 = hop0
op1 = _follow_line(li1, ei1)
best = {} # (kind, tag) → (total_len, path) 최단 1개만 유지
def _offer(kind, tag, total_len, path, at_pt):
if kind == 'empty':
return
if _is_origin(kind, tag, at_pt, origin_tag, origin_pos):
return # self-cycle: 출발 마커로 회귀
k = (kind, tag)
if k not in best or total_len < best[k][0]:
best[k] = (total_len, path)
# hop1: 같은 endpoint를 공유하는 세그먼트(중복 li2 제거)
seen_li2 = set()
for li2, ei2, ep2 in _connected_at(op1):
if li2 == li1 or li2 in seen_li2:
continue
seen_li2.add(li2)
op2 = _follow_line(li2, ei2)
kind, tag = _what_at_pt(op2, markers, vert_anchors, all_eqp, limit)
_offer(kind, tag, merged_list[li1].length + merged_list[li2].length,
f"seg#{li1}→seg#{li2}", op2)
# hop0 자체의 반대쪽 끝
kind1, tag1 = _what_at_pt(op1, markers, vert_anchors, all_eqp, limit)
_offer(kind1, tag1, merged_list[li1].length, f"seg#{li1}", op1)
return [(k[0], k[1], v[0], v[1]) for k, v in best.items()]
# Precompute marker grid for fast what_at lookup
marker_grid = defaultdict(list)
for mi, m in enumerate(markers):
gx, gy = int(m['mx'] // GRID), int(m['my'] // GRID)
marker_grid[(gx, gy)].append(mi)
def _what_at_pt(pt, markers, vert_anchors, all_eqp, limit=30):
px, py = pt
for gk in _grid_neighbors(pt):
for mi in marker_grid.get(gk, []):
m = markers[mi]
if math.hypot(px-m['mx'], py-m['my']) <= limit:
return 'marker', m['tag']
for va in vert_anchors:
if va['y_min'] <= py <= va['y_max'] and abs(px-va['x']) <= limit:
return 'equipment', f"VLINE@{va['x']:.0f}"
for eq in all_eqp:
if math.hypot(px-eq['x'], py-eq['y']) <= limit:
return 'equipment', eq['tag']
return 'empty', ''
marker_traces = []
trace_log = []
for m in markers:
mx, my, xl, xr, tag = m['mx'], m['my'], m['xl'], m['xr'], m['tag']
for side, pt in [('left', (xl, my)), ('right', (xr, my))]:
hops = _trace_2hop(pt, markers, vert_anchors, all_eqp,
origin_tag=tag, origin_pos=(mx, my))
for kind, to_tag, total_len, path in hops:
marker_traces.append({
'from': tag, 'from_pos': f"({mx:.0f},{my:.0f})",
'side': side, 'to_kind': kind, 'to_tag': to_tag,
'total_len': total_len, 'path': path
})
if tag == 'P-10101':
trace_log.append(f" {side} ({pt[0]:.1f},{pt[1]:.1f}) → {path}: {kind}={to_tag} ({total_len:.1f}u)")
et = time.time()
print(f"3-C. 마커 LINE endpoint 추적 (2-hop BFS, elapsed={et-t0:.2f}s):")
print(f" 추적 엣지: {len(marker_traces)}")
lk = defaultdict(int)
for e in marker_traces:
lk[e['to_kind']] += 1
for k, c in sorted(lk.items()):
print(f"{k}: {c}")
if trace_log:
print(f" P-10101 상세 추적:")
for line in trace_log:
print(line)
# ── 3-D. 태그 매칭 연결 ─────────────────────────────────────────
t0 = time.time()
tag_match_edges = []
for tag, ml in multi.items():
for i in range(len(ml)):
for j in range(i+1, len(ml)):
tag_match_edges.append({
'tag': tag,
'from_pos': f"({ml[i]['mx']:.0f},{ml[i]['my']:.0f})",
'to_pos': f"({ml[j]['mx']:.0f},{ml[j]['my']:.0f})",
'span': abs(ml[i]['mx'] - ml[j]['mx'])
})
et = time.time()
print(f"3-D. 태그 매칭 연결: {len(tag_match_edges)}쌍 (elapsed={et-t0:.2f}s)")
for t, ml in sorted(multi.items(), key=lambda x: -len(x[1]))[:10]:
xs = [m['mx'] for m in ml]
span = max(xs) - min(xs)
print(f" {t}: {len(ml)}개 위치, 스팬={span:.0f}u")
# ── 5. 좌표 근접성 기반 연결 ──────────────────────────────────────────
t0 = time.time()
# 각 instrument → 가장 가까운 equipment anchor 찾기
# 기준: Y범위 겹침 + 수평거리 ≤ 200
HORIZONTAL_LIMIT = 200
inst_eqp_connections = [] # (inst, eqp, dist, method)
unconnected_inst = []
for inst in instruments:
ix, iy = inst['x'], inst['y']
best_eqp = None
best_d = 999
# 수직 LINE anchor
for va in vert_anchors:
if va['y_min'] <= iy <= va['y_max']:
d = abs(ix - va['x'])
if d < best_d and d <= HORIZONTAL_LIMIT:
best_d = d
best_eqp = (f"VLINE@{va['x']:.0f}", va, 'vline')
# 큰 원 anchor
for eq in eqp_by_circle:
d = math.hypot(ix - eq['x'], iy - eq['y'])
if d < best_d and d <= HORIZONTAL_LIMIT:
best_d = d
best_eqp = (eq['tag'], eq, 'circle')
if best_eqp:
inst_eqp_connections.append((inst, best_eqp[0], best_d, best_eqp[2]))
else:
unconnected_inst.append(inst)
print(f"\n5. 좌표 근접 연결 (수평≤{HORIZONTAL_LIMIT}):")
print(f" 계기-설비 연결: {len(inst_eqp_connections)}")
print(f" 미연결 계기: {len(unconnected_inst)}")
# 연결 분류
by_method = defaultdict(int)
for *_, method in inst_eqp_connections:
by_method[method] += 1
for m, c in by_method.items():
print(f" {m}: {c}")
# loop별 연결 분포
loop_connected = defaultdict(list)
loop_unconnected = defaultdict(list)
for inst, eqp_tag, d, method in inst_eqp_connections:
loop_connected[inst['loop']].append((inst, eqp_tag, d))
for inst in unconnected_inst:
loop_unconnected[inst['loop']].append(inst)
print(f"\n Loop별 연결/미연결:")
for loop in sorted([k for k in loop_connected if k is not None], key=int)[:20]:
conn = len(loop_connected[loop])
unconn = len(loop_unconnected.get(loop, []))
total = conn + unconn
if total >= 3:
eqps = set(e for _, e, _ in loop_connected[loop])
print(f" Loop {loop}: {conn}/{total} 연결 → 설비: {', '.join(eqps)}")
# ── 6. LINE endpoint 연결과 비교 ──────────────────────────────────────
t0 = time.time()
# long lines (≥30)
long_lines = [l for l in merged_list if l.length >= 30]
# build anchor index
inst_tree = STRtree([Point(inst['x'], inst['y']) for inst in instruments])
inst_list = instruments
BUFFER = 50
def nearest_inst(pt):
idxs = inst_tree.query(box(pt.x-BUFFER, pt.y-BUFFER, pt.x+BUFFER, pt.y+BUFFER), predicate='intersects')
if len(idxs) == 0:
return None
best_d = 999
best = None
for idx in idxs:
p = inst_tree.geometries[idx]
d = p.distance(pt)
if d < best_d:
best_d = d
best = (inst_list[idx], best_d)
if best and best[1] <= BUFFER:
return best
return None
line_connections = []
inst_seen = set()
for l in long_lines:
ep1 = Point(l.coords[0])
ep2 = Point(l.coords[-1])
n1 = nearest_inst(ep1)
n2 = nearest_inst(ep2)
if n1 and n2:
i1, d1 = n1
i2, d2 = n2
if i1['tag'] != i2['tag']:
line_connections.append((i1, i2, l.length, d1, d2))
print(f"\n6. LINE endpoint 연결 (≥30u, buffer={BUFFER}):")
print(f" 계기-계기 연결: {len(line_connections)}")
# 같은 loop 내 연결 vs 다른 loop 연결
same_loop = 0
diff_loop = 0
for i1, i2, ll, d1, d2 in line_connections:
if i1['loop'] and i2['loop'] and i1['loop'] == i2['loop']:
same_loop += 1
else:
diff_loop += 1
print(f" 같은 loop: {same_loop}")
print(f" 다른 loop: {diff_loop}")
# ── 7. Loop 기반 통합 연결 ────────────────────────────────────────────
# loop → 가장 가까운 equipment anchor
print(f"\n7. Loop 기반 연결 분석:")
for loop in sorted(loop_inst, key=int):
insts = loop_inst[loop]
# Y범위
ys = [i['y'] for i in insts]
y_min, y_max = min(ys), max(ys)
xs = [i['x'] for i in insts]
x_min, x_max = min(xs), max(xs)
# 가장 가까운 equipment 찾기
cx, cy = sum(xs)/len(xs), sum(ys)/len(ys)
best_eqp = None
best_d = 999
for va in vert_anchors:
if va['y_min'] <= cy <= va['y_max']:
d = abs(cx - va['x'])
if d < best_d and d <= HORIZONTAL_LIMIT:
best_d = d
best_eqp = f"VLINE@{va['x']:.0f}"
for eq in eqp_by_circle:
d = math.hypot(cx - eq['x'], cy - eq['y'])
if d < best_d and d <= HORIZONTAL_LIMIT:
best_d = d
best_eqp = eq['tag']
if best_eqp:
connected = sum(1 for inst in insts if any(inst['tag'] == ii['tag'] for ii, *_ in loop_connected.get(loop, [])))
print(f" Loop {loop}: {len(insts)}개 계기 → {best_eqp} (거리={best_d:.0f}, Y=[{y_min:.0f},{y_max:.0f}], X=[{x_min:.0f},{x_max:.0f}])")
# ── 8. 방향성 부여 + 유향 그래프 + JSON 출력 ──────────────────────────────
# ▶ 마커(V자=우측)는 국소 흐름이 좌→우.
# side='left' 추적대상 = 상류(upstream) → edge: target → marker
# side='right' 추적대상 = 하류(downstream)→ edge: marker → target
# 태그매칭: 같은 태그가 여러 x위치 = off-page connector 연속. x오름차순 체인(좌→우).
import json
try:
import networkx as _nx
_HAVE_NX = True
except ImportError:
_HAVE_NX = False
directed_edges = [] # {from, to, type, basis, weight}
_seen_de = set()
def _add_de(frm, to, typ, basis, w):
if not frm or not to or frm == to:
return
k = (frm, to, typ)
if k in _seen_de:
return
_seen_de.add(k)
directed_edges.append({'from': frm, 'to': to, 'type': typ,
'basis': basis, 'weight': round(w, 1)})
# (1) trace 기반 유향 엣지 (사이클 차단된 marker_traces 사용)
for e in marker_traces:
if e['to_kind'] == 'empty' or not e['to_tag'] or e['to_tag'] == e['from']:
continue
if e['side'] == 'left':
_add_de(e['to_tag'], e['from'], 'trace', 'marker-left(upstream)', e['total_len'])
else:
_add_de(e['from'], e['to_tag'], 'trace', 'marker-right(downstream)', e['total_len'])
# (2) 태그매칭 유향 엣지 (x오름차순 체인)
for tag, ml in multi.items():
ordered = sorted(ml, key=lambda m: m['mx'])
for a, b in zip(ordered, ordered[1:]):
_add_de(f"{tag}@{a['mx']:.0f},{a['my']:.0f}",
f"{tag}@{b['mx']:.0f},{b['my']:.0f}",
'tagmatch', 'offpage-connector(+x)',
abs(b['mx'] - a['mx']))
# 유향 그래프 → 약연결 성분
components_out = []
if _HAVE_NX:
DG = _nx.DiGraph()
for de in directed_edges:
DG.add_edge(de['from'], de['to'])
for comp in sorted(_nx.weakly_connected_components(DG), key=len, reverse=True):
if len(comp) >= 2:
components_out.append(sorted(comp))
out = {
'drawing': DXF_PATH,
'stats': {
'instruments': len(instruments),
'markers': len(markers),
'marker_tag_groups': len(multi),
'trace_edges_raw': len(marker_traces),
'directed_edges': len(directed_edges),
'tagmatch_directed': sum(1 for d in directed_edges if d['type'] == 'tagmatch'),
'trace_directed': sum(1 for d in directed_edges if d['type'] == 'trace'),
'components': len(components_out),
},
'markers': [{'tag': m['tag'], 'x': m['mx'], 'y': m['my'],
'dir': 'right', 'diag': m['dr']} for m in markers],
'equipment': [{'tag': eq['tag'], 'kind': eq['kind'],
'x': round(eq['x'], 1), 'y': round(eq['y'], 1)}
for eq in all_eqp],
'edges': directed_edges,
'components': components_out,
}
OUT_JSON = "mcp-server/storage/No-10_connections.json"
import os as _os
_os.makedirs(_os.path.dirname(OUT_JSON), exist_ok=True)
with open(OUT_JSON, 'w', encoding='utf-8') as f:
json.dump(out, f, ensure_ascii=False, indent=1)
print(f"\n8. 방향성 + JSON 출력:")
print(f" 유향 엣지: {len(directed_edges)}"
f"(태그매칭 {out['stats']['tagmatch_directed']}, trace {out['stats']['trace_directed']})")
print(f" 유향 약연결 성분(≥2): {len(components_out)}")
print(f"{OUT_JSON} ({_os.path.getsize(OUT_JSON)/1024:.0f} KB)")
# ── 요약 ──────────────────────────────────────────────────────────────────
print(f"\n{'='*60}")
print(f"요약")
print(f"{'='*60}")
print(f"Instruments (balloon): {len(instruments)}")
print(f"방향표지판 (marker): {len(markers)}개 (태그있음={sum(1 for m in markers if m['tag'])}개)")
print(f"동일태그 그룹(≥2): {len(multi)}개, 태그매칭 엣지: {len(tag_match_edges)}")
print(f"Equipment anchors: 수직LINE {len(vert_anchors)}개 + 큰원 {len(eqp_by_circle)}개 + 작은원 {len(eqp_by_small)}")
print(f"좌표 근접 연결 (수평≤{HORIZONTAL_LIMIT}): {len(inst_eqp_connections)}개 / {len(instruments)}개 ({len(inst_eqp_connections)/max(len(instruments),1)*100:.1f}%)")
print(f"LINE endpoint 연결 (balloon-balloon): {len(line_connections)}쌍 (같은 loop {same_loop}개)")
print(f"Marker trace 엣지: {len(marker_traces)}")