""" P&ID 흐름 추적 · 랜드마크 From/To 추출 (pid_trace_algorithm.md 구현) Phase 1(랜드마크+OFFPAGE 소스) → Phase 2(배관망+commodity 투명) → Phase 3(흐름추적: 방문가드·엘보·TEE·재순환·랜드마크 통과-후-재개) → Phase 4(라인번호 귀속) → Phase 5(랜드마크 From/To 환원). 출력: _connections.json (C# AnalyzeConnectionsAsync 소비 포맷 호환). usage: python3 mcp-server/pid_tracer.py [DXF경로] (기본 src/Web/uploads/pid/P10-EQP-BLOCK.dxf) PID_TRACE_DEBUG=1 python3 mcp-server/pid_tracer.py (경로 좌표 덤프) """ import sys, os, re, math, json, collections import ezdxf from ezdxf import recover DXF = sys.argv[1] if len(sys.argv) > 1 else "src/Web/uploads/pid/P10-EQP-BLOCK.dxf" DEBUG = os.environ.get("PID_TRACE_DEBUG") == "1" # 랜드마크 태그 패턴 / commodity 블록명(투명 통과) EQP_TAG = re.compile(r'^[A-Z]{1,4}-?\d{3,5}[A-Z]?$') INSTR_TAG = re.compile(r'^(FCV|FIT|FIC|LIC|LCV|PCV|PIC|TIC|TCV|FV|LV|PV|TV|AT|FT|LT|PT|TT)-?\d', re.I) # 라인번호: 사이즈코드/연속세그먼트 다수 (예 P-10149-40A-F1A-n, 25Ax32A) LINE_NO = re.compile(r'^[A-Z]{1,3}-?\d{3,5}-\w+-\w+|^\d+A?x\d+A?$', re.I) COMMODITY = re.compile( r'BALL|CHECK|GLOBE|GATE|REDUCER|FLEX|HOSE|TUBE|STRAIN|FLANGE|VALVE_BALL|ON-?OFF|' r'DIAPHRAGM|NEEDLE|PLUG|BUTTERFLY|TRAP|EXPAN|CONE', re.I) JUNK = {"asda", "QQ", "sm", "bv", "GENAXEH", "D660198", "RC11", "HS_BOM", "IC", "ORDERNO", "ASDADAS", "1", "F", "SG", "BFT", "EX1"} # 명명 심볼 블록 = 태그계기/컨트롤밸브 (인접 버블 TEXT 로 태그 합성). commodity 아님. NAMED_INSTRUMENT = {"MASS_FLOW_METER", "CONTROL_VALVE_GLOBE", "FLOW_METER_VARIABLE-AREA"} FUNC_CODE = re.compile( r'^(FIT|FCV|FIC|FE|FQ|FQI|FI|FT|LIC|LCV|LI|LT|LSL|LSH|PIC|PCV|PI|PT|PSV|PSH|' r'TIC|TCV|TI|TT|AT|AIT|FV|LV|PV|TV)$') NUM_TXT = re.compile(r'^\d{3,6}[A-Z]?$') def load(): try: return ezdxf.readfile(DXF) except ezdxf.DXFStructureError: d, _ = recover.readfile(DXF) return d def world_extent(insert): """SOLID/ARC/CIRCLE 반경까지 포함한 월드 bbox (블록 실체 범위).""" xs, ys = [], [] try: for ve in insert.virtual_entities(): t = ve.dxftype() if t == "LINE": xs += [ve.dxf.start.x, ve.dxf.end.x] ys += [ve.dxf.start.y, ve.dxf.end.y] elif t in ("CIRCLE", "ARC"): r = getattr(ve.dxf, "radius", 0.0) xs += [ve.dxf.center.x - r, ve.dxf.center.x + r] ys += [ve.dxf.center.y - r, ve.dxf.center.y + r] elif t == "ELLIPSE": xs += [ve.dxf.center.x] ys += [ve.dxf.center.y] elif t == "SOLID": for k in ("vtx0", "vtx1", "vtx2", "vtx3"): p = getattr(ve.dxf, k, None) if p is not None: xs.append(p.x) ys.append(p.y) except Exception: pass if not xs: ip = insert.dxf.insert return (ip.x, ip.y, ip.x, ip.y) return (min(xs), min(ys), max(xs), max(ys)) def is_landmark_block(name): if name.startswith(("A$", "*")) or name in JUNK: return False if name.startswith("OFFPAGE") or name.startswith("FLOW_DIRECTION"): return False if COMMODITY.search(name): return False base = name.split("-SAME")[0].strip() return bool(EQP_TAG.match(base)) or name in ("IBC TANK", "3-10203") or bool(INSTR_TAG.match(name)) def main(): doc = load() msp = doc.modelspace() all_text = [] for e in msp: if e.dxftype() in ("TEXT", "MTEXT"): v = (e.plain_text() if e.dxftype() == "MTEXT" else e.dxf.text).strip() if v: all_text.append((v, e.dxf.insert.x, e.dxf.insert.y)) # ── Phase 1: 랜드마크 노드 (INSERT 블록 + 태그계기 TEXT) ─────────── landmarks = {} # tag -> (cx, cy) lm_block = {} # tag -> (cx, cy, (x0,y0,x1,y1)) INSERT 실체 범위 for e in msp: if e.dxftype() != "INSERT": continue nm = e.dxf.name if not is_landmark_block(nm): continue tag = nm.split("-SAME")[0].strip() # T-3210-SAME* → T-3210 병합 x0, y0, x1, y1 = world_extent(e) cx, cy = (x0 + x1) / 2, (y0 + y1) / 2 landmarks.setdefault(tag, (cx, cy)) lm_block.setdefault(tag, (cx, cy, (x0, y0, x1, y1))) # 태그계기: 인접 TEXT 가 FCV-/FIT- 인 commodity-형상 심볼 (월드 텍스트로 근사) for v, tx, ty in all_text: vk = v.split()[0].replace(" ", "") if INSTR_TAG.match(vk) and not LINE_NO.match(vk): landmarks.setdefault(vk, (tx, ty)) # 명명 심볼 블록(MASS_FLOW_METER 등) = 태그계기. 인접 함수코드+번호 버블로 # 태그 합성(FIT+10101 → FIT-10101). 블록명은 종류힌트, 태그는 인접 TEXT. func_txt = [(v, x, y) for v, x, y in all_text if FUNC_CODE.match(v.strip())] num_txt = [(v, x, y) for v, x, y in all_text if NUM_TXT.match(v.strip())] for e in msp: if e.dxftype() != "INSERT" or e.dxf.name not in NAMED_INSTRUMENT: continue x0, y0, x1, y1 = world_extent(e) cx, cy = (x0 + x1) / 2, (y0 + y1) / 2 code = min(((math.hypot(fx - cx, fy - cy), fv, fx, fy) for fv, fx, fy in func_txt), default=None) if not code or code[0] > 12: continue _, fv, fx, fy = code num = min(((math.hypot(nx - fx, ny - fy), nv) for nv, nx, ny in num_txt if math.hypot(nx - fx, ny - fy) <= 4.5), default=None) if not num: continue tag = f"{fv.strip()}-{num[1].strip()}" landmarks.setdefault(tag, (cx, cy)) lm_block.setdefault(tag, (cx, cy, (x0, y0, x1, y1))) # ── Phase 1b: OFFPAGE_CONNECTOR 발원 소스 시드 ─────────────────── # 본체 라인 → apex(수렴 정점) 와 흐름방향(centroid→apex, 월드기하 권위). # 본체 근접 TEXT → 출처설비태그(EQP/INSTR) + 라인번호(LINE_NO). offpage_seeds = [] # (src_tag, (ax,ay), (dx,dy), line_no, conn_name) for e in msp: if e.dxftype() != "INSERT" or not e.dxf.name.startswith("OFFPAGE"): continue pts = [] for ve in e.virtual_entities(): if ve.dxftype() == "LINE": pts += [(ve.dxf.start.x, ve.dxf.start.y), (ve.dxf.end.x, ve.dxf.end.y)] if not pts: continue cxp = sum(p[0] for p in pts) / len(pts) cyp = sum(p[1] for p in pts) / len(pts) bx0, by0 = min(p[0] for p in pts), min(p[1] for p in pts) bx1, by1 = max(p[0] for p in pts), max(p[1] for p in pts) # apex = 본체 주축(긴 변)의 두 극단 중 '뾰족한' 쪽(정점 1개) — 평평한 # 뒷변(정점 2개)과 구분. 방향 = 뒷변 중점 → apex (월드기하 권위). ax_is_x = (bx1 - bx0) >= (by1 - by0) key = (lambda p: p[0]) if ax_is_x else (lambda p: p[1]) lo, hi = min(pts, key=key), max(pts, key=key) lo_n = sum(1 for p in pts if abs(key(p) - key(lo)) <= 0.5) hi_n = sum(1 for p in pts if abs(key(p) - key(hi)) <= 0.5) apex_pt, back_pts = (hi, [p for p in pts if abs(key(p) - key(lo)) <= 0.5]) \ if hi_n <= lo_n else (lo, [p for p in pts if abs(key(p) - key(hi)) <= 0.5]) ax, ay = apex_pt bmx = sum(p[0] for p in back_pts) / len(back_pts) bmy = sum(p[1] for p in back_pts) / len(back_pts) dvx, dvy = ax - bmx, ay - bmy dn = math.hypot(dvx, dvy) or 1.0 dvx, dvy = dvx / dn, dvy / dn src_tag, line_no = None, None best_src = 9e9 for v, tx, ty in all_text: if not (bx0 - 4 <= tx <= bx1 + 4 and by0 - 4 <= ty <= by1 + 4): continue vk = v.split()[0].replace(" ", "") if LINE_NO.match(vk) or re.search(r'\d+A?x\d+A?', vk): if line_no is None: line_no = v elif EQP_TAG.match(vk) or INSTR_TAG.match(vk): d = math.hypot(tx - cxp, ty - cyp) if d < best_src: best_src, src_tag = d, vk if src_tag: landmarks.setdefault(src_tag, (cxp, cyp)) # 발원 소스 노드 등록 offpage_seeds.append((src_tag, (ax, ay), (dvx, dvy), line_no, e.dxf.name)) # ── Phase 2: 배관망 + commodity 투명 앵커 ────────────────────────── raw = [] for e in msp: if e.dxftype() == "LINE": s, en = e.dxf.start, e.dxf.end if math.hypot(en.x - s.x, en.y - s.y) > 0.05: raw.append((round(s.x, 2), round(s.y, 2), round(en.x, 2), round(en.y, 2))) elif e.dxftype() == "LWPOLYLINE": p = [(round(a, 2), round(b, 2)) for a, b in e.get_points("xy")] for i in range(len(p) - 1): raw.append((p[i][0], p[i][1], p[i + 1][0], p[i + 1][1])) # commodity 볼앵커: 소형 빈 원 + 동반 bowtie/레버 단선 2개 이상 (과포함 정밀화) circ = [(x.dxf.center.x, x.dxf.center.y, round(x.dxf.radius, 3)) for x in msp if x.dxftype() == "CIRCLE"] seg_mid = collections.defaultdict(list) for x1, y1, x2, y2 in raw: L = math.hypot(x2 - x1, y2 - y1) if L <= 2.6: mx, my = (x1 + x2) / 2, (y1 + y2) / 2 seg_mid[(round(mx), round(my))].append((mx, my)) def companion_count(cx, cy, R=0.8): n = 0 for gx in (int(cx) - 1, int(cx), int(cx) + 1): for gy in (int(cy) - 1, int(cy), int(cy) + 1): for mx, my in seg_mid.get((gx, gy), ()): if math.hypot(mx - cx, my - cy) <= R: n += 1 return n ball_anchors = [(x, y) for x, y, r in circ if 0.28 <= r <= 0.46 and companion_count(x, y) >= 2] Q = 0.6 node = collections.defaultdict(list) for idx, (x1, y1, x2, y2) in enumerate(raw): L = math.hypot(x2 - x1, y2 - y1) node[(round(x1 / Q), round(y1 / Q))].append((idx, (x1, y1), (x2, y2), L)) node[(round(x2 / Q), round(y2 / Q))].append((idx, (x2, y2), (x1, y1), L)) def conn(p): out = [] k = (round(p[0] / Q), round(p[1] / Q)) for i in (-1, 0, 1): for j in (-1, 0, 1): for idx, a, b, L in node.get((k[0] + i, k[1] + j), ()): if math.hypot(a[0] - p[0], a[1] - p[1]) <= 0.8: out.append((idx, a, b, L)) return out def ball_ahead(p, dv, ml=14): best, bd = None, ml for x, y in ball_anchors: v = (x - p[0], y - p[1]) d = math.hypot(*v) if 0.5 < d <= ml and (v[0] * dv[0] + v[1] * dv[1]) / d > 0.65 and d < bd: bd, best = d, (x, y) return best lm_pts = list(landmarks.items()) def nearest_landmark(p, r=10): best, bd = None, r for t, (lx, ly) in lm_pts: d = math.hypot(lx - p[0], ly - p[1]) if d < bd: bd, best = d, t return best def resume_nozzle(lm_tag, pt, flow, visited): """랜드마크 블록 통과 후 하류 노즐에서 흐름 재개점 산출. 블록 실체 범위 밖, 안정적 흐름축(flow) 투영 최대인 미방문 배관 끝점. (직전 세그먼트 came 이 아닌 시드 흐름축을 써 병렬·수직 홉 노이즈 흡수.)""" if lm_tag not in lm_block: return None cx, cy, (bx0, by0, bx1, by1) = lm_block[lm_tag] half = max(bx1 - bx0, by1 - by0) / 2 rng = max(half + 8.0, 10.0) best, bscore = None, 0.6 for idx, (x1, y1, x2, y2) in enumerate(raw): if idx in visited: continue for ex, ey, ox, oy in ((x1, y1, x2, y2), (x2, y2, x1, y1)): # 블록 실체 내부 끝점 제외(노이즈), 중심 기준 하류측만 if bx0 - 0.5 <= ex <= bx1 + 0.5 and by0 - 0.5 <= ey <= by1 + 0.5: continue vx, vy = ex - cx, ey - cy d = math.hypot(vx, vy) if not (half * 0.4 < d <= rng): continue proj = (vx * flow[0] + vy * flow[1]) / d if proj > bscore: bscore, best = proj, (idx, (ex, ey), (ox, oy)) return best # ── Phase 3: 흐름 추적 ──────────────────────────────────────────── def trace(start, came, max_steps=400): flow_axis = came # 안정적 시드 흐름축(통과 재개 기준) pt = start visited = set() path = [] # [(kind, detail, (x,y))] hit = collections.OrderedDict() # 만난 랜드마크 순서 exited = set() # 통과-후-재개 완료한 랜드마크 recirc = None for _ in range(max_steps): lm = nearest_landmark(pt, 10) if lm: if lm in hit and lm not in exited and len(hit) > 2: # 이미 방문 랜드마크 복귀 = 재순환(킥백) 루프 → 분기 종료 recirc = lm path.append(("recirculation", lm, pt)) break hit.setdefault(lm, pt) if lm not in exited: exited.add(lm) nz = resume_nozzle(lm, pt, flow_axis, visited) if nz: idx, ep, op = nz visited.add(idx) d = (op[0] - ep[0], op[1] - ep[1]) n = math.hypot(*d) or 1 came = (d[0] / n, d[1] / n) pt = (op[0], op[1]) path.append(("through", lm, pt)) continue cand = [(idx, a, b, L) for idx, a, b, L in conn(pt) if idx not in visited] if cand: cand.sort(key=lambda c: -((c[2][0] - c[1][0]) * came[0] + (c[2][1] - c[1][1]) * came[1]) / (math.hypot(c[2][0] - c[1][0], c[2][1] - c[1][1]) or 1)) idx, a, b, L = cand[0] visited.add(idx) d = (b[0] - a[0], b[1] - a[1]) n = math.hypot(*d) or 1 came = (d[0] / n, d[1] / n) pt = (b[0], b[1]) path.append(("pipe", round(L, 1), pt)) continue bv = ball_ahead(pt, came) if bv: best, bd = None, 18 for i, (x1, y1, x2, y2) in enumerate(raw): if i in visited: continue for ex, ey in ((x1, y1), (x2, y2)): v = (ex - pt[0], ey - pt[1]) dist = math.hypot(*v) if 1.0 < dist < bd and (v[0] * came[0] + v[1] * came[1]) / dist > 0.7: bd, best = dist, (ex, ey) path.append(("commodity_passthrough", None, pt)) if best: pt = best continue # 일반 전방 gap-브리지: commodity/작도 끊김을 흐름방향 콘으로 점프. # 안정적 흐름축(flow_axis) 콘 사용 — through/병렬 홉 직후 came 역행 억제. # (좁은 콘 dot>0.85 + 수직오프셋 최소 → 밀집부 누수 억제) cone = flow_axis if (path and path[-1][0] in ("through", "gap_bridge")) else came gb, gdist = None, 12.0 for i, (x1, y1, x2, y2) in enumerate(raw): if i in visited: continue for ex, ey, ox, oy in ((x1, y1, x2, y2), (x2, y2, x1, y1)): vx, vy = ex - pt[0], ey - pt[1] dist = math.hypot(vx, vy) if not (0.8 < dist <= gdist): continue al = (vx * cone[0] + vy * cone[1]) / dist if al <= 0.85: continue perp = abs(vx * cone[1] - vy * cone[0]) score = dist + perp * 4 if score < gdist: gdist, gb = score, (i, (ex, ey), (ox, oy)) if gb: idx, ep, op = gb # 착지점(ep)으로 이동, 흐름방향(cone) 유지 — 이후 정상 conn 으로 전진. # (먼 끝 op 로 강제 점프 시 역행 가능 → 착지 후 재선택) jl = math.hypot(ep[0] - pt[0], ep[1] - pt[1]) came = cone pt = (ep[0], ep[1]) path.append(("gap_bridge", round(jl, 1), pt)) continue break return list(hit.keys()), path, recirc # 시드: OFFPAGE 발원 소스 + 검증된 ground-truth 진입점 SEEDS = [] for src_tag, apex, dv, line_no, cn in offpage_seeds: SEEDS.append((f"OFFPAGE {src_tag}→({cn})", apex, dv, src_tag, line_no)) SEEDS.append(("P-10101 discharge", (1702.52, 5217.69), (1, 0), "P-10101", None)) results = [] for label, sp, dv, src_tag, line_no in SEEDS: lms, path, recirc = trace(sp, dv) if src_tag and (not lms or lms[0] != src_tag): lms = [src_tag] + lms pc = sum(1 for k, _, _ in path if k == "pipe") cm = sum(1 for k, _, _ in path if k == "commodity_passthrough") th = sum(1 for k, _, _ in path if k == "through") gb_n = sum(1 for k, _, _ in path if k == "gap_bridge") results.append({"seed": label, "start": [round(sp[0], 2), round(sp[1], 2)], "source_tag": src_tag, "line_number": line_no, "landmarks_in_order": lms, "pipe_segments": pc, "commodity_passed": cm, "blocks_through": th, "gap_bridges": gb_n, "recirculation": recirc}) if DEBUG: print(f"\n[DEBUG {label}] steps={len(path)}") for k, det, xy in path: if k in ("through", "recirculation", "gap_bridge"): print(f" {k:<22} {det} @({xy[0]:.1f},{xy[1]:.1f})") # ── Phase 4: 라인번호 ↔ 배관 귀속 (직접배치 + 지시선 tip) ────────── # SOLID 화살촉 tip ↔ 배관 끝점 sub-0.5u 매칭, 또는 텍스트 복도 근접. solids = [] for e in msp: if e.dxftype() == "SOLID": vs = [getattr(e.dxf, k, None) for k in ("vtx0", "vtx1", "vtx2", "vtx3")] vs = [(p.x, p.y) for p in vs if p is not None] if len(vs) >= 3: solids.append(vs) pipe_ends = [] for idx, (x1, y1, x2, y2) in enumerate(raw): pipe_ends.append((idx, x1, y1)) pipe_ends.append((idx, x2, y2)) def pipe_near(px, py, tol): best, bd = None, tol for idx, ex, ey in pipe_ends: d = math.hypot(ex - px, ey - py) if d < bd: bd, best = d, idx return best line_no_attr = [] # {line_number, pipe_idx, mode} for v, tx, ty in all_text: vk = v.split()[0].replace(" ", "") if not (LINE_NO.match(vk) or re.search(r'\d+A?x\d+A?', vk)): continue # 케이스 B: 텍스트 근방 SOLID 화살촉 → tip 이 닿는 배관 attached = None for vs in solids: sc = (sum(p[0] for p in vs) / len(vs), sum(p[1] for p in vs) / len(vs)) if math.hypot(sc[0] - tx, sc[1] - ty) > 6.0: continue for tipx, tipy in vs: pi = pipe_near(tipx, tipy, 0.5) if pi is not None: attached = (pi, "leader") break if attached: break # 케이스 A: 직접배치 — 텍스트 박스 근접 배관 if attached is None: pi = pipe_near(tx, ty, 3.0) if pi is not None: attached = (pi, "direct") if attached: line_no_attr.append({"line_number": v, "pipe_idx": attached[0], "mode": attached[1]}) # ── Phase 5: 랜드마크 From/To 엣지 환원 ─────────────────────────── edges = [] for r in results: lm = r["landmarks_in_order"] for i in range(len(lm) - 1): edges.append({"from": lm[i], "to": lm[i + 1], "type": "process", "via_seed": r["seed"], "line_number": r["line_number"] if i == 0 else None}) if r["recirculation"] and lm: edges.append({"from": lm[-1], "to": r["recirculation"], "type": "recirculation", "via_seed": r["seed"], "line_number": None}) out = { "drawing": os.path.basename(DXF), "stats": {"landmarks": len(landmarks), "offpage_seeds": len(offpage_seeds), "ball_anchors": len(ball_anchors), "raw_segments": len(raw), "traces": len(results), "edges": len(edges), "line_number_attributions": len(line_no_attr)}, "landmarks": [{"tag": t, "x": round(xy[0], 1), "y": round(xy[1], 1)} for t, xy in sorted(landmarks.items())], "traces": results, "edges": edges, } prefix = os.path.basename(DXF).split("_")[0].split(".")[0] op = os.path.join("mcp-server", "storage", f"{prefix}_connections.json") os.makedirs(os.path.dirname(op), exist_ok=True) json.dump(out, open(op, "w", encoding="utf-8"), ensure_ascii=False, indent=1) print(f"landmarks={len(landmarks)} offpage_seeds={len(offpage_seeds)} " f"ball_anchors={len(ball_anchors)} raw_seg={len(raw)} " f"edges={len(edges)} lineno_attr={len(line_no_attr)}") for r in results: print(f"\n[{r['seed']}] pipe={r['pipe_segments']} " f"through={r['blocks_through']} commodity={r['commodity_passed']}" f"{' recirc=' + r['recirculation'] if r['recirculation'] else ''}") print(" 랜드마크 순서:", " → ".join(r["landmarks_in_order"]) or "(없음)") print(f"\n→ {op}") if __name__ == "__main__": main()