""" P&ID 연결 분석 v2 — 좌표 근접성 기반 ====================================== 원리: C-10111 분석에서 발견한 패턴 - 설비(컬럼/탱크) = 긴 수직 LINE(≥200u) 또는 큰 원(r≥4.5) - 계기(instrument balloon)는 설비와 Y축 겹침 + 수평 200u 이내 - LINE 연결이 없어도 좌표 근접성으로 연결 판단 순서: 1. DXF 로드, LINE/CIRCLE/TEXT 추출 2. instrument balloon 식별 (CIRCLE r≥1.5 + func+num) 3. equipment anchor 식별 (긴 수직 LINE + 큰 원) 4. 좌표 근접성 기반 연결 (Y겹침 + 수평거리 ≤ 200) 5. LINE endpoint 연결과 비교 """ import ezdxf, re, time, math from shapely.geometry import LineString, Point, MultiLineString, box from shapely.strtree import STRtree from shapely.ops import linemerge from collections import defaultdict DXF_PATH = "src/Web/uploads/pid/No-10_Plant_PID.dxf" TAG_RE = re.compile(r'^[A-Z]{1,6}-\d{2,6}(-[A-Z0-9]+)*$', re.I) FUNC_RE = re.compile(r'^[FPLTASHQWVXZBDCRK][ICTREVYSAQGZ]{1,3}$') NUM_RE = re.compile(r'^\d{3,6}[A-Z]?$') PIPE_RE = re.compile(r'^\w+-\d{3,6}-\d+[A-Z]?-[A-Z][A-Z0-9]', re.I) LOOP_RE = re.compile(r'\d{3,6}[A-Z]?') # ── 1-2. DXF 로드 + LINE/CIRCLE/TEXT ────────────────────────────────── t0 = time.time() doc = ezdxf.readfile(DXF_PATH) msp = doc.modelspace() print(f"1. DXF 로드: {time.time()-t0:.2f}s", flush=True) # LINE 추출 lines_raw = [] for e in msp: t = e.dxftype() if t == 'LINE': s = (e.dxf.start.x, e.dxf.start.y) e2 = (e.dxf.end.x, e.dxf.end.y) if s != e2: lines_raw.append(LineString([s, e2])) elif t == 'LWPOLYLINE': pts = [(p[0], p[1]) for p in e.vertices()] if len(pts) >= 2: lines_raw.append(LineString(pts)) # linemerge merged = linemerge(MultiLineString(lines_raw)) merged_list = list(merged.geoms) if merged.geom_type == 'MultiLineString' else [merged] # CIRCLE circles = [(e.dxf.center.x, e.dxf.center.y, e.dxf.radius) for e in msp if e.dxftype() == 'CIRCLE'] # TEXT text_entries = [] for e in msp: if e.dxftype() == 'TEXT': v = e.dxf.text.strip().replace('%%U', '').replace('%%C', 'Φ') if v: text_entries.append((e.dxf.insert.x, e.dxf.insert.y, v)) print(f"2. 추출: LINE={len(lines_raw)}, CIRCLE={len(circles)}, TEXT={len(text_entries)}", flush=True) # ── 3. Instrument balloon 식별 ──────────────────────────────────────── t0 = time.time() # TEXT → CIRCLE 포함 (r≥1.5) text_in_circle = defaultdict(list) for x, y, v in text_entries: for cx, cy, r in circles: if r >= 1.5 and (x-cx)**2 + (y-cy)**2 <= r*r: text_in_circle[v].append((cx, cy, r)) # circle center → func+num circ_data = {} for v, occ in text_in_circle.items(): for cx, cy, r in occ: k = (round(cx, 1), round(cy, 1)) if k not in circ_data: circ_data[k] = {'r': r, 'x': cx, 'y': cy} if FUNC_RE.match(v): circ_data[k]['func'] = v elif NUM_RE.match(v): circ_data[k]['num'] = v elif TAG_RE.match(v): circ_data[k]['tag'] = v instruments = [] for k, d in circ_data.items(): if 'func' in d: tag = f"{d['func']}-{d.get('num', '?')}" # prefer explicit tag if available if 'tag' in d: tag = d['tag'] instruments.append({'x': d['x'], 'y': d['y'], 'tag': tag, 'r': d['r'], 'loop': re.search(r'\d{3,6}', tag).group() if re.search(r'\d{3,6}', tag) else None}) # loop number별 계기 그룹 loop_inst = defaultdict(list) for inst in instruments: if inst['loop']: loop_inst[inst['loop']].append(inst) print(f"3-A. Instrument: {len(instruments)}개") print(f" Loop 번호 그룹: {len(loop_inst)}개") # ── 3-B. 방향표지판 검출 ────────────────────────────────────────── t0 = time.time() # 방향별 분류: h_seg는 raw LINE만 (O(n²) 회피), v/d_seg는 merged+raw h_lines = [] v_lines = [] d_lines = [] for l in lines_raw: coords = list(l.coords) if len(coords) < 2: continue x1, y1 = coords[0] x2, y2 = coords[-1] dx = abs(x2 - x1) dy = abs(y2 - y1) if dy <= 1 and dx >= 3: h_lines.append({'xl': min(x1, x2), 'xr': max(x1, x2), 'y': y1}) elif dx <= 1 and dy >= 2.5: v_lines.append({'x': x1, 'y1': min(y1, y2), 'y2': max(y1, y2)}) elif dx >= 1 and dy >= 1: angle = math.degrees(math.atan2(dy, dx)) if 20 <= angle <= 70 or 110 <= angle <= 160: d_lines.append({'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2}) # merged LINE 보강 (v/d만) - merged LINE은 길어서 relaxed threshold for l in merged_list: coords = list(l.coords) if len(coords) < 2: continue x1, y1 = coords[0] x2, y2 = coords[-1] dx = abs(x2 - x1) dy = abs(y2 - y1) if dx <= 1 and dy >= 2.5: v_lines.append({'x': x1, 'y1': min(y1, y2), 'y2': max(y1, y2)}) elif dx >= 1 and dy >= 1: angle = math.degrees(math.atan2(dy, dx)) if 20 <= angle <= 70 or 110 <= angle <= 160: d_lines.append({'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2}) # 수평선쌍: 같은 xl/xr + y간격 2~8u (xr 기준 버킷으로 O(n²) 회피) h_by_xr = defaultdict(list) for h in h_lines: h_by_xr[round(h['xr'] / 5) * 5].append(h) h_pairs = [] seen_pair = set() # (xl_round, xr_round, y_mid_round) 중복 제거 for bucket in h_by_xr.values(): for i, h1 in enumerate(bucket): for h2 in bucket[i+1:]: if abs(h1['y'] - h2['y']) < 2: continue if abs(h1['xl'] - h2['xl']) <= 3 and abs(h1['xr'] - h2['xr']) <= 3: ylo = min(h1['y'], h2['y']) yhi = max(h1['y'], h2['y']) gap = yhi - ylo if 2 <= gap <= 8: xl = (h1['xl']+h2['xl'])/2 xr = (h1['xr']+h2['xr'])/2 k = (round(xl), round(xr), round((ylo+yhi)/2)) if k not in seen_pair: seen_pair.add(k) h_pairs.append({'xl': xl, 'xr': xr, 'ylo': ylo, 'yhi': yhi, 'y_mid': (ylo+yhi)/2}) def _has_left_vert(hp, vlist, tol=8): xl, ym = hp['xl'], hp['y_mid'] return any(abs(v['x'] - xl) <= tol and v['y1'] <= ym <= v['y2'] for v in vlist) def _count_right_diag(hp, dlist, tol=8): xr, ym = hp['xr'], hp['y_mid'] return sum(1 for d in dlist if abs(d['x1'] - xr) <= tol and abs(d['y1'] - ym) <= tol or abs(d['x2'] - xr) <= tol and abs(d['y2'] - ym) <= tol) def _nearest_tag_to(hp, texts, limit=25): mx, my = (hp['xl']+hp['xr'])/2, hp['y_mid'] best, best_d = None, 999 for tx, ty, tv in texts: d = math.hypot(tx - mx, ty - my) if d < best_d and d <= limit: best_d, best = d, tv return best, best_d print(f" marker filter start... v={len(v_lines)} d={len(d_lines)} texts={len(text_entries)}") markers = [] for i, hp in enumerate(h_pairs): if not _has_left_vert(hp, v_lines): continue cnt_diag = _count_right_diag(hp, d_lines) if cnt_diag < 2: continue tag, tag_dist = _nearest_tag_to(hp, text_entries) if tag is None: continue mx = round((hp['xl']+hp['xr'])/2, 1) my = round(hp['y_mid'], 1) markers.append({'tag': tag, 'mx': mx, 'my': my, 'xl': hp['xl'], 'xr': hp['xr'], 'ylo': hp['ylo'], 'yhi': hp['yhi'], 'dr': cnt_diag}) et = time.time() # 위치 중복 제거 (동일태그 + 동일좌표 → 1개) seen = set() deduped = [] for m in markers: k = (m['tag'], round(m['mx'], 0), round(m['my'], 0)) if k not in seen: seen.add(k) deduped.append(m) markers = deduped print(f"3-B. 방향표지판(마커): {len(markers)}개 (h_pairs={len(h_pairs)}, elapsed={et-t0:.2f}s)") print(f" ROI(y≥5100): {sum(1 for m in markers if m['my'] >= 5100)}개") tag_groups = defaultdict(list) for m in markers: tag_groups[m['tag']].append(m) multi = {t: v for t, v in tag_groups.items() if len(v) >= 2} print(f" 태그 있는 마커: {sum(1 for m in markers if m['tag'])}개") print(f" 동일태그 그룹(≥2): {len(multi)}개") for t, v in sorted(multi.items(), key=lambda x: -len(x[1]))[:10]: poss = ', '.join(f"({m['mx']:.0f},{m['my']:.0f})" for m in v) print(f" {t}: {len(v)}개 [{poss}]") # ── 4. Equipment anchor 식별 ────────────────────────────────────────── t0_4 = time.time() # (A) 긴 수직 LINE (≥200u, almost vertical) vert_lines = [] for l in merged_list: c = list(l.coords) xs = [p[0] for p in c] ys = [p[1] for p in c] dx = max(xs) - min(xs) dy = max(ys) - min(ys) if dy >= 200 and dy > dx * 3: # vertical-ish vert_lines.append({ 'x': round(sum(xs)/len(xs), 1), 'y_min': min(ys), 'y_max': max(ys), 'length': l.length, 'bounds': l.bounds }) # dedup by x vert_grouped = defaultdict(list) for v in vert_lines: vert_grouped[round(v['x'])].append(v) vert_anchors = [] for x, vl in vert_grouped.items(): y_min = min(v['y_min'] for v in vl) y_max = max(v['y_max'] for v in vl) vert_anchors.append({'x': x, 'y_min': y_min, 'y_max': y_max, 'count': len(vl), 'total_len': sum(v['length'] for v in vl)}) # (B) 큰 원 (r≥4.5, empty or with text) large_circs = [(cx, cy, r) for cx, cy, r in circles if r >= 4.5] # Find nearest TAG text eqp_by_circle = [] for cx, cy, r in large_circs: best_tag = None best_d = 999 for tx, ty, v in text_entries: if TAG_RE.match(v): d = math.hypot(tx-cx, ty-cy) if d < best_d and d > r and d < 200: best_d = d best_tag = v eqp_by_circle.append({'x': cx, 'y': cy, 'tag': best_tag or f'CIRCLE@{cx:.0f},{cy:.0f}', 'r': r, 'kind': 'large_circle'}) print(f"\n4. Equipment anchor:") print(f" 수직 LINE(≥200u): {len(vert_anchors)}개") for va in sorted(vert_anchors, key=lambda v: v['x']): print(f" x={va['x']:6.1f}, y=[{va['y_min']:.0f}, {va['y_max']:.0f}], 높이={va['y_max']-va['y_min']:.0f}") print(f" 큰 원(r≥4.5): {len(eqp_by_circle)}개") for eq in eqp_by_circle: print(f" ({eq['x']:6.1f}, {eq['y']:6.1f}) r={eq['r']:.1f} tag={eq['tag']}") # (C) 작은 원 (r<4.5, r≥1.5, exclude instrument balloons) # instrument balloon circles already identified as those containing TEXT balloon_circles = set() for v, occ in text_in_circle.items(): for cx, cy, r in occ: balloon_circles.add((round(cx, 1), round(cy, 1))) small_circs = [(cx, cy, r) for cx, cy, r in circles if 1.5 <= r < 4.5 and (round(cx,1), round(cy,1)) not in balloon_circles] small_eqp = [] for cx, cy, r in small_circs: best_tag = None best_d = 999 for tx, ty, v in text_entries: if TAG_RE.match(v): d = math.hypot(tx-cx, ty-cy) if d < best_d and d > r and d < 100: best_d = d best_tag = v small_eqp.append({'x': cx, 'y': cy, 'tag': best_tag or f'SMALL@{cx:.0f},{cy:.0f}', 'r': r, 'kind': 'small_circle'}) eqp_by_small = small_eqp print(f" 작은 원(1.5≤r<4.5, excl. balloon): {len(eqp_by_small)}개") for eq in eqp_by_small[:10]: print(f" ({eq['x']:6.1f}, {eq['y']:6.1f}) r={eq['r']:.1f} tag={eq['tag']}") # Merge small into _what_at equipment list all_eqp = eqp_by_circle + eqp_by_small # ── 3-C. 방향표지판 LINE endpoint 추적 (2-hop BFS, 직각굽힘 허용) ── t0 = time.time() # Precompute: endpoint spatial index (grid bucketing) # 30u limit → grid cell size 15 so we only check 3×3 neighbors GRID = 15 ep_grid = defaultdict(list) line_ep = {} for li, l in enumerate(merged_list): coords = list(l.coords) if not coords: continue s, e = coords[0], coords[-1] for ep, ei in [(s, 0), (e, -1)]: gx, gy = int(ep[0] // GRID), int(ep[1] // GRID) ep_grid[(gx, gy)].append((li, ei, ep)) line_ep[li] = {'s': s, 'e': e} def _grid_neighbors(pt): gx, gy = int(pt[0] // GRID), int(pt[1] // GRID) return [(gx+dx, gy+dy) for dx in (-1,0,1) for dy in (-1,0,1)] def _find_ep_idx(pt, limit=30): best = None for gk in _grid_neighbors(pt): for li, ei, ep in ep_grid.get(gk, []): d = math.hypot(pt[0]-ep[0], pt[1]-ep[1]) if d <= limit and (best is None or d < best[0]): best = (d, li, ei, ep) return best def _follow_line(li, ei): ep = line_ep[li] return ep['e'] if ei == 0 else ep['s'] def _connected_at(pt, tol=5): result = [] for gk in _grid_neighbors(pt): for li, ei, ep in ep_grid.get(gk, []): d = math.hypot(pt[0]-ep[0], pt[1]-ep[1]) if d <= tol: result.append((li, ei, ep)) return result def _is_origin(kind, tag, pt, origin_tag, origin_pos, span=40): """추적 결과가 출발 마커 자기자신인지 (cycle) 판정.""" if origin_tag is None or kind != 'marker' or tag != origin_tag: return False return math.hypot(pt[0] - origin_pos[0], pt[1] - origin_pos[1]) <= span def _trace_2hop(pt, markers, vert_anchors, all_eqp, limit=30, origin_tag=None, origin_pos=None): """마커 endpoint → 2-hop 추적. 출발 마커로 되돌아오는 사이클 차단 + 중복 제거.""" hop0 = _find_ep_idx(pt, limit) if not hop0: return [] _, li1, ei1, ep1 = hop0 op1 = _follow_line(li1, ei1) best = {} # (kind, tag) → (total_len, path) 최단 1개만 유지 def _offer(kind, tag, total_len, path, at_pt): if kind == 'empty': return if _is_origin(kind, tag, at_pt, origin_tag, origin_pos): return # self-cycle: 출발 마커로 회귀 k = (kind, tag) if k not in best or total_len < best[k][0]: best[k] = (total_len, path) # hop1: 같은 endpoint를 공유하는 세그먼트(중복 li2 제거) seen_li2 = set() for li2, ei2, ep2 in _connected_at(op1): if li2 == li1 or li2 in seen_li2: continue seen_li2.add(li2) op2 = _follow_line(li2, ei2) kind, tag = _what_at_pt(op2, markers, vert_anchors, all_eqp, limit) _offer(kind, tag, merged_list[li1].length + merged_list[li2].length, f"seg#{li1}→seg#{li2}", op2) # hop0 자체의 반대쪽 끝 kind1, tag1 = _what_at_pt(op1, markers, vert_anchors, all_eqp, limit) _offer(kind1, tag1, merged_list[li1].length, f"seg#{li1}", op1) return [(k[0], k[1], v[0], v[1]) for k, v in best.items()] # Precompute marker grid for fast what_at lookup marker_grid = defaultdict(list) for mi, m in enumerate(markers): gx, gy = int(m['mx'] // GRID), int(m['my'] // GRID) marker_grid[(gx, gy)].append(mi) def _what_at_pt(pt, markers, vert_anchors, all_eqp, limit=30): px, py = pt for gk in _grid_neighbors(pt): for mi in marker_grid.get(gk, []): m = markers[mi] if math.hypot(px-m['mx'], py-m['my']) <= limit: return 'marker', m['tag'] for va in vert_anchors: if va['y_min'] <= py <= va['y_max'] and abs(px-va['x']) <= limit: return 'equipment', f"VLINE@{va['x']:.0f}" for eq in all_eqp: if math.hypot(px-eq['x'], py-eq['y']) <= limit: return 'equipment', eq['tag'] return 'empty', '' marker_traces = [] trace_log = [] for m in markers: mx, my, xl, xr, tag = m['mx'], m['my'], m['xl'], m['xr'], m['tag'] for side, pt in [('left', (xl, my)), ('right', (xr, my))]: hops = _trace_2hop(pt, markers, vert_anchors, all_eqp, origin_tag=tag, origin_pos=(mx, my)) for kind, to_tag, total_len, path in hops: marker_traces.append({ 'from': tag, 'from_pos': f"({mx:.0f},{my:.0f})", 'side': side, 'to_kind': kind, 'to_tag': to_tag, 'total_len': total_len, 'path': path }) if tag == 'P-10101': trace_log.append(f" {side} ({pt[0]:.1f},{pt[1]:.1f}) → {path}: {kind}={to_tag} ({total_len:.1f}u)") et = time.time() print(f"3-C. 마커 LINE endpoint 추적 (2-hop BFS, elapsed={et-t0:.2f}s):") print(f" 추적 엣지: {len(marker_traces)}개") lk = defaultdict(int) for e in marker_traces: lk[e['to_kind']] += 1 for k, c in sorted(lk.items()): print(f" → {k}: {c}개") if trace_log: print(f" P-10101 상세 추적:") for line in trace_log: print(line) # ── 3-D. 태그 매칭 연결 ───────────────────────────────────────── t0 = time.time() tag_match_edges = [] for tag, ml in multi.items(): for i in range(len(ml)): for j in range(i+1, len(ml)): tag_match_edges.append({ 'tag': tag, 'from_pos': f"({ml[i]['mx']:.0f},{ml[i]['my']:.0f})", 'to_pos': f"({ml[j]['mx']:.0f},{ml[j]['my']:.0f})", 'span': abs(ml[i]['mx'] - ml[j]['mx']) }) et = time.time() print(f"3-D. 태그 매칭 연결: {len(tag_match_edges)}쌍 (elapsed={et-t0:.2f}s)") for t, ml in sorted(multi.items(), key=lambda x: -len(x[1]))[:10]: xs = [m['mx'] for m in ml] span = max(xs) - min(xs) print(f" {t}: {len(ml)}개 위치, 스팬={span:.0f}u") # ── 5. 좌표 근접성 기반 연결 ────────────────────────────────────────── t0 = time.time() # 각 instrument → 가장 가까운 equipment anchor 찾기 # 기준: Y범위 겹침 + 수평거리 ≤ 200 HORIZONTAL_LIMIT = 200 inst_eqp_connections = [] # (inst, eqp, dist, method) unconnected_inst = [] for inst in instruments: ix, iy = inst['x'], inst['y'] best_eqp = None best_d = 999 # 수직 LINE anchor for va in vert_anchors: if va['y_min'] <= iy <= va['y_max']: d = abs(ix - va['x']) if d < best_d and d <= HORIZONTAL_LIMIT: best_d = d best_eqp = (f"VLINE@{va['x']:.0f}", va, 'vline') # 큰 원 anchor for eq in eqp_by_circle: d = math.hypot(ix - eq['x'], iy - eq['y']) if d < best_d and d <= HORIZONTAL_LIMIT: best_d = d best_eqp = (eq['tag'], eq, 'circle') if best_eqp: inst_eqp_connections.append((inst, best_eqp[0], best_d, best_eqp[2])) else: unconnected_inst.append(inst) print(f"\n5. 좌표 근접 연결 (수평≤{HORIZONTAL_LIMIT}):") print(f" 계기-설비 연결: {len(inst_eqp_connections)}개") print(f" 미연결 계기: {len(unconnected_inst)}개") # 연결 분류 by_method = defaultdict(int) for *_, method in inst_eqp_connections: by_method[method] += 1 for m, c in by_method.items(): print(f" {m}: {c}개") # loop별 연결 분포 loop_connected = defaultdict(list) loop_unconnected = defaultdict(list) for inst, eqp_tag, d, method in inst_eqp_connections: loop_connected[inst['loop']].append((inst, eqp_tag, d)) for inst in unconnected_inst: loop_unconnected[inst['loop']].append(inst) print(f"\n Loop별 연결/미연결:") for loop in sorted([k for k in loop_connected if k is not None], key=int)[:20]: conn = len(loop_connected[loop]) unconn = len(loop_unconnected.get(loop, [])) total = conn + unconn if total >= 3: eqps = set(e for _, e, _ in loop_connected[loop]) print(f" Loop {loop}: {conn}/{total} 연결 → 설비: {', '.join(eqps)}") # ── 6. LINE endpoint 연결과 비교 ────────────────────────────────────── t0 = time.time() # long lines (≥30) long_lines = [l for l in merged_list if l.length >= 30] # build anchor index inst_tree = STRtree([Point(inst['x'], inst['y']) for inst in instruments]) inst_list = instruments BUFFER = 50 def nearest_inst(pt): idxs = inst_tree.query(box(pt.x-BUFFER, pt.y-BUFFER, pt.x+BUFFER, pt.y+BUFFER), predicate='intersects') if len(idxs) == 0: return None best_d = 999 best = None for idx in idxs: p = inst_tree.geometries[idx] d = p.distance(pt) if d < best_d: best_d = d best = (inst_list[idx], best_d) if best and best[1] <= BUFFER: return best return None line_connections = [] inst_seen = set() for l in long_lines: ep1 = Point(l.coords[0]) ep2 = Point(l.coords[-1]) n1 = nearest_inst(ep1) n2 = nearest_inst(ep2) if n1 and n2: i1, d1 = n1 i2, d2 = n2 if i1['tag'] != i2['tag']: line_connections.append((i1, i2, l.length, d1, d2)) print(f"\n6. LINE endpoint 연결 (≥30u, buffer={BUFFER}):") print(f" 계기-계기 연결: {len(line_connections)}개") # 같은 loop 내 연결 vs 다른 loop 연결 same_loop = 0 diff_loop = 0 for i1, i2, ll, d1, d2 in line_connections: if i1['loop'] and i2['loop'] and i1['loop'] == i2['loop']: same_loop += 1 else: diff_loop += 1 print(f" 같은 loop: {same_loop}개") print(f" 다른 loop: {diff_loop}개") # ── 7. Loop 기반 통합 연결 ──────────────────────────────────────────── # loop → 가장 가까운 equipment anchor print(f"\n7. Loop 기반 연결 분석:") for loop in sorted(loop_inst, key=int): insts = loop_inst[loop] # Y범위 ys = [i['y'] for i in insts] y_min, y_max = min(ys), max(ys) xs = [i['x'] for i in insts] x_min, x_max = min(xs), max(xs) # 가장 가까운 equipment 찾기 cx, cy = sum(xs)/len(xs), sum(ys)/len(ys) best_eqp = None best_d = 999 for va in vert_anchors: if va['y_min'] <= cy <= va['y_max']: d = abs(cx - va['x']) if d < best_d and d <= HORIZONTAL_LIMIT: best_d = d best_eqp = f"VLINE@{va['x']:.0f}" for eq in eqp_by_circle: d = math.hypot(cx - eq['x'], cy - eq['y']) if d < best_d and d <= HORIZONTAL_LIMIT: best_d = d best_eqp = eq['tag'] if best_eqp: connected = sum(1 for inst in insts if any(inst['tag'] == ii['tag'] for ii, *_ in loop_connected.get(loop, []))) print(f" Loop {loop}: {len(insts)}개 계기 → {best_eqp} (거리={best_d:.0f}, Y=[{y_min:.0f},{y_max:.0f}], X=[{x_min:.0f},{x_max:.0f}])") # ── 8. 방향성 부여 + 유향 그래프 + JSON 출력 ────────────────────────────── # ▶ 마커(V자=우측)는 국소 흐름이 좌→우. # side='left' 추적대상 = 상류(upstream) → edge: target → marker # side='right' 추적대상 = 하류(downstream)→ edge: marker → target # 태그매칭: 같은 태그가 여러 x위치 = off-page connector 연속. x오름차순 체인(좌→우). import json try: import networkx as _nx _HAVE_NX = True except ImportError: _HAVE_NX = False directed_edges = [] # {from, to, type, basis, weight} _seen_de = set() def _add_de(frm, to, typ, basis, w): if not frm or not to or frm == to: return k = (frm, to, typ) if k in _seen_de: return _seen_de.add(k) directed_edges.append({'from': frm, 'to': to, 'type': typ, 'basis': basis, 'weight': round(w, 1)}) # (1) trace 기반 유향 엣지 (사이클 차단된 marker_traces 사용) for e in marker_traces: if e['to_kind'] == 'empty' or not e['to_tag'] or e['to_tag'] == e['from']: continue if e['side'] == 'left': _add_de(e['to_tag'], e['from'], 'trace', 'marker-left(upstream)', e['total_len']) else: _add_de(e['from'], e['to_tag'], 'trace', 'marker-right(downstream)', e['total_len']) # (2) 태그매칭 유향 엣지 (x오름차순 체인) for tag, ml in multi.items(): ordered = sorted(ml, key=lambda m: m['mx']) for a, b in zip(ordered, ordered[1:]): _add_de(f"{tag}@{a['mx']:.0f},{a['my']:.0f}", f"{tag}@{b['mx']:.0f},{b['my']:.0f}", 'tagmatch', 'offpage-connector(+x)', abs(b['mx'] - a['mx'])) # 유향 그래프 → 약연결 성분 components_out = [] if _HAVE_NX: DG = _nx.DiGraph() for de in directed_edges: DG.add_edge(de['from'], de['to']) for comp in sorted(_nx.weakly_connected_components(DG), key=len, reverse=True): if len(comp) >= 2: components_out.append(sorted(comp)) out = { 'drawing': DXF_PATH, 'stats': { 'instruments': len(instruments), 'markers': len(markers), 'marker_tag_groups': len(multi), 'trace_edges_raw': len(marker_traces), 'directed_edges': len(directed_edges), 'tagmatch_directed': sum(1 for d in directed_edges if d['type'] == 'tagmatch'), 'trace_directed': sum(1 for d in directed_edges if d['type'] == 'trace'), 'components': len(components_out), }, 'markers': [{'tag': m['tag'], 'x': m['mx'], 'y': m['my'], 'dir': 'right', 'diag': m['dr']} for m in markers], 'equipment': [{'tag': eq['tag'], 'kind': eq['kind'], 'x': round(eq['x'], 1), 'y': round(eq['y'], 1)} for eq in all_eqp], 'edges': directed_edges, 'components': components_out, } OUT_JSON = "mcp-server/storage/No-10_connections.json" import os as _os _os.makedirs(_os.path.dirname(OUT_JSON), exist_ok=True) with open(OUT_JSON, 'w', encoding='utf-8') as f: json.dump(out, f, ensure_ascii=False, indent=1) print(f"\n8. 방향성 + JSON 출력:") print(f" 유향 엣지: {len(directed_edges)}개 " f"(태그매칭 {out['stats']['tagmatch_directed']}, trace {out['stats']['trace_directed']})") print(f" 유향 약연결 성분(≥2): {len(components_out)}개") print(f" → {OUT_JSON} ({_os.path.getsize(OUT_JSON)/1024:.0f} KB)") # ── 요약 ────────────────────────────────────────────────────────────────── print(f"\n{'='*60}") print(f"요약") print(f"{'='*60}") print(f"Instruments (balloon): {len(instruments)}개") print(f"방향표지판 (marker): {len(markers)}개 (태그있음={sum(1 for m in markers if m['tag'])}개)") print(f"동일태그 그룹(≥2): {len(multi)}개, 태그매칭 엣지: {len(tag_match_edges)}쌍") print(f"Equipment anchors: 수직LINE {len(vert_anchors)}개 + 큰원 {len(eqp_by_circle)}개 + 작은원 {len(eqp_by_small)}개") print(f"좌표 근접 연결 (수평≤{HORIZONTAL_LIMIT}): {len(inst_eqp_connections)}개 / {len(instruments)}개 ({len(inst_eqp_connections)/max(len(instruments),1)*100:.1f}%)") print(f"LINE endpoint 연결 (balloon-balloon): {len(line_connections)}쌍 (같은 loop {same_loop}개)") print(f"Marker trace 엣지: {len(marker_traces)}개")