Files
ExperionCrawler/mcp-server/pid_tracer.py
windpacer 302183c97e feat: P&ID 연결 분석, LLM 에이전트 모드, KB 확장, MCP 서버 리팩토링
- P&ID: 연결 분석 API, Prefix 규칙 관리, 카테고리 분류, DXF 그래프 빌드
- LLM: 대화 요약, tool card 영구 보존, 시계열 차트(uPlot), 에이전트 모드
- KB: 청크 미리보기, Field Instrument Inference, 인증/Qdrant 클라이언트
- MCP: 서버 기능 확장, 파이프라인 수정, timeout 개선
- Frontend: P&ID UI, LLM UI, KB UI, OPC UA Write 탭 추가
- 설정: AGENTS.md, plant_context, README, opencode.json 업데이트
- 정리: 진단 체크리스트 문서 삭제
2026-05-21 23:36:57 +09:00

498 lines
22 KiB
Python

"""
P&ID 흐름 추적 · 랜드마크 From/To 추출 (pid_trace_algorithm.md 구현)
Phase 1(랜드마크+OFFPAGE 소스) → Phase 2(배관망+commodity 투명) →
Phase 3(흐름추적: 방문가드·엘보·TEE·재순환·랜드마크 통과-후-재개) →
Phase 4(라인번호 귀속) → Phase 5(랜드마크 From/To 환원).
출력: <prefix>_connections.json (C# AnalyzeConnectionsAsync 소비 포맷 호환).
usage:
python3 mcp-server/pid_tracer.py [DXF경로] (기본 src/Web/uploads/pid/P10-EQP-BLOCK.dxf)
PID_TRACE_DEBUG=1 python3 mcp-server/pid_tracer.py (경로 좌표 덤프)
"""
import sys, os, re, math, json, collections
import ezdxf
from ezdxf import recover
DXF = sys.argv[1] if len(sys.argv) > 1 else "src/Web/uploads/pid/P10-EQP-BLOCK.dxf"
DEBUG = os.environ.get("PID_TRACE_DEBUG") == "1"
# 랜드마크 태그 패턴 / commodity 블록명(투명 통과)
EQP_TAG = re.compile(r'^[A-Z]{1,4}-?\d{3,5}[A-Z]?$')
INSTR_TAG = re.compile(r'^(FCV|FIT|FIC|LIC|LCV|PCV|PIC|TIC|TCV|FV|LV|PV|TV|AT|FT|LT|PT|TT)-?\d', re.I)
# 라인번호: 사이즈코드/연속세그먼트 다수 (예 P-10149-40A-F1A-n, 25Ax32A)
LINE_NO = re.compile(r'^[A-Z]{1,3}-?\d{3,5}-\w+-\w+|^\d+A?x\d+A?$', re.I)
COMMODITY = re.compile(
r'BALL|CHECK|GLOBE|GATE|REDUCER|FLEX|HOSE|TUBE|STRAIN|FLANGE|VALVE_BALL|ON-?OFF|'
r'DIAPHRAGM|NEEDLE|PLUG|BUTTERFLY|TRAP|EXPAN|CONE', re.I)
JUNK = {"asda", "QQ", "sm", "bv", "GENAXEH", "D660198", "RC11", "HS_BOM",
"IC", "ORDERNO", "ASDADAS", "1", "F", "SG", "BFT", "EX1"}
# 명명 심볼 블록 = 태그계기/컨트롤밸브 (인접 버블 TEXT 로 태그 합성). commodity 아님.
NAMED_INSTRUMENT = {"MASS_FLOW_METER", "CONTROL_VALVE_GLOBE",
"FLOW_METER_VARIABLE-AREA"}
FUNC_CODE = re.compile(
r'^(FIT|FCV|FIC|FE|FQ|FQI|FI|FT|LIC|LCV|LI|LT|LSL|LSH|PIC|PCV|PI|PT|PSV|PSH|'
r'TIC|TCV|TI|TT|AT|AIT|FV|LV|PV|TV)$')
NUM_TXT = re.compile(r'^\d{3,6}[A-Z]?$')
def load():
try:
return ezdxf.readfile(DXF)
except ezdxf.DXFStructureError:
d, _ = recover.readfile(DXF)
return d
def world_extent(insert):
"""SOLID/ARC/CIRCLE 반경까지 포함한 월드 bbox (블록 실체 범위)."""
xs, ys = [], []
try:
for ve in insert.virtual_entities():
t = ve.dxftype()
if t == "LINE":
xs += [ve.dxf.start.x, ve.dxf.end.x]
ys += [ve.dxf.start.y, ve.dxf.end.y]
elif t in ("CIRCLE", "ARC"):
r = getattr(ve.dxf, "radius", 0.0)
xs += [ve.dxf.center.x - r, ve.dxf.center.x + r]
ys += [ve.dxf.center.y - r, ve.dxf.center.y + r]
elif t == "ELLIPSE":
xs += [ve.dxf.center.x]
ys += [ve.dxf.center.y]
elif t == "SOLID":
for k in ("vtx0", "vtx1", "vtx2", "vtx3"):
p = getattr(ve.dxf, k, None)
if p is not None:
xs.append(p.x)
ys.append(p.y)
except Exception:
pass
if not xs:
ip = insert.dxf.insert
return (ip.x, ip.y, ip.x, ip.y)
return (min(xs), min(ys), max(xs), max(ys))
def is_landmark_block(name):
if name.startswith(("A$", "*")) or name in JUNK:
return False
if name.startswith("OFFPAGE") or name.startswith("FLOW_DIRECTION"):
return False
if COMMODITY.search(name):
return False
base = name.split("-SAME")[0].strip()
return bool(EQP_TAG.match(base)) or name in ("IBC TANK", "3-10203") or bool(INSTR_TAG.match(name))
def main():
doc = load()
msp = doc.modelspace()
all_text = []
for e in msp:
if e.dxftype() in ("TEXT", "MTEXT"):
v = (e.plain_text() if e.dxftype() == "MTEXT" else e.dxf.text).strip()
if v:
all_text.append((v, e.dxf.insert.x, e.dxf.insert.y))
# ── Phase 1: 랜드마크 노드 (INSERT 블록 + 태그계기 TEXT) ───────────
landmarks = {} # tag -> (cx, cy)
lm_block = {} # tag -> (cx, cy, (x0,y0,x1,y1)) INSERT 실체 범위
for e in msp:
if e.dxftype() != "INSERT":
continue
nm = e.dxf.name
if not is_landmark_block(nm):
continue
tag = nm.split("-SAME")[0].strip() # T-3210-SAME* → T-3210 병합
x0, y0, x1, y1 = world_extent(e)
cx, cy = (x0 + x1) / 2, (y0 + y1) / 2
landmarks.setdefault(tag, (cx, cy))
lm_block.setdefault(tag, (cx, cy, (x0, y0, x1, y1)))
# 태그계기: 인접 TEXT 가 FCV-/FIT- 인 commodity-형상 심볼 (월드 텍스트로 근사)
for v, tx, ty in all_text:
vk = v.split()[0].replace(" ", "")
if INSTR_TAG.match(vk) and not LINE_NO.match(vk):
landmarks.setdefault(vk, (tx, ty))
# 명명 심볼 블록(MASS_FLOW_METER 등) = 태그계기. 인접 함수코드+번호 버블로
# 태그 합성(FIT+10101 → FIT-10101). 블록명은 종류힌트, 태그는 인접 TEXT.
func_txt = [(v, x, y) for v, x, y in all_text if FUNC_CODE.match(v.strip())]
num_txt = [(v, x, y) for v, x, y in all_text if NUM_TXT.match(v.strip())]
for e in msp:
if e.dxftype() != "INSERT" or e.dxf.name not in NAMED_INSTRUMENT:
continue
x0, y0, x1, y1 = world_extent(e)
cx, cy = (x0 + x1) / 2, (y0 + y1) / 2
code = min(((math.hypot(fx - cx, fy - cy), fv, fx, fy)
for fv, fx, fy in func_txt), default=None)
if not code or code[0] > 12:
continue
_, fv, fx, fy = code
num = min(((math.hypot(nx - fx, ny - fy), nv)
for nv, nx, ny in num_txt
if math.hypot(nx - fx, ny - fy) <= 4.5), default=None)
if not num:
continue
tag = f"{fv.strip()}-{num[1].strip()}"
landmarks.setdefault(tag, (cx, cy))
lm_block.setdefault(tag, (cx, cy, (x0, y0, x1, y1)))
# ── Phase 1b: OFFPAGE_CONNECTOR 발원 소스 시드 ───────────────────
# 본체 라인 → apex(수렴 정점) 와 흐름방향(centroid→apex, 월드기하 권위).
# 본체 근접 TEXT → 출처설비태그(EQP/INSTR) + 라인번호(LINE_NO).
offpage_seeds = [] # (src_tag, (ax,ay), (dx,dy), line_no, conn_name)
for e in msp:
if e.dxftype() != "INSERT" or not e.dxf.name.startswith("OFFPAGE"):
continue
pts = []
for ve in e.virtual_entities():
if ve.dxftype() == "LINE":
pts += [(ve.dxf.start.x, ve.dxf.start.y), (ve.dxf.end.x, ve.dxf.end.y)]
if not pts:
continue
cxp = sum(p[0] for p in pts) / len(pts)
cyp = sum(p[1] for p in pts) / len(pts)
bx0, by0 = min(p[0] for p in pts), min(p[1] for p in pts)
bx1, by1 = max(p[0] for p in pts), max(p[1] for p in pts)
# apex = 본체 주축(긴 변)의 두 극단 중 '뾰족한' 쪽(정점 1개) — 평평한
# 뒷변(정점 2개)과 구분. 방향 = 뒷변 중점 → apex (월드기하 권위).
ax_is_x = (bx1 - bx0) >= (by1 - by0)
key = (lambda p: p[0]) if ax_is_x else (lambda p: p[1])
lo, hi = min(pts, key=key), max(pts, key=key)
lo_n = sum(1 for p in pts if abs(key(p) - key(lo)) <= 0.5)
hi_n = sum(1 for p in pts if abs(key(p) - key(hi)) <= 0.5)
apex_pt, back_pts = (hi, [p for p in pts if abs(key(p) - key(lo)) <= 0.5]) \
if hi_n <= lo_n else (lo, [p for p in pts if abs(key(p) - key(hi)) <= 0.5])
ax, ay = apex_pt
bmx = sum(p[0] for p in back_pts) / len(back_pts)
bmy = sum(p[1] for p in back_pts) / len(back_pts)
dvx, dvy = ax - bmx, ay - bmy
dn = math.hypot(dvx, dvy) or 1.0
dvx, dvy = dvx / dn, dvy / dn
src_tag, line_no = None, None
best_src = 9e9
for v, tx, ty in all_text:
if not (bx0 - 4 <= tx <= bx1 + 4 and by0 - 4 <= ty <= by1 + 4):
continue
vk = v.split()[0].replace(" ", "")
if LINE_NO.match(vk) or re.search(r'\d+A?x\d+A?', vk):
if line_no is None:
line_no = v
elif EQP_TAG.match(vk) or INSTR_TAG.match(vk):
d = math.hypot(tx - cxp, ty - cyp)
if d < best_src:
best_src, src_tag = d, vk
if src_tag:
landmarks.setdefault(src_tag, (cxp, cyp)) # 발원 소스 노드 등록
offpage_seeds.append((src_tag, (ax, ay), (dvx, dvy), line_no, e.dxf.name))
# ── Phase 2: 배관망 + commodity 투명 앵커 ──────────────────────────
raw = []
for e in msp:
if e.dxftype() == "LINE":
s, en = e.dxf.start, e.dxf.end
if math.hypot(en.x - s.x, en.y - s.y) > 0.05:
raw.append((round(s.x, 2), round(s.y, 2), round(en.x, 2), round(en.y, 2)))
elif e.dxftype() == "LWPOLYLINE":
p = [(round(a, 2), round(b, 2)) for a, b in e.get_points("xy")]
for i in range(len(p) - 1):
raw.append((p[i][0], p[i][1], p[i + 1][0], p[i + 1][1]))
# commodity 볼앵커: 소형 빈 원 + 동반 bowtie/레버 단선 2개 이상 (과포함 정밀화)
circ = [(x.dxf.center.x, x.dxf.center.y, round(x.dxf.radius, 3))
for x in msp if x.dxftype() == "CIRCLE"]
seg_mid = collections.defaultdict(list)
for x1, y1, x2, y2 in raw:
L = math.hypot(x2 - x1, y2 - y1)
if L <= 2.6:
mx, my = (x1 + x2) / 2, (y1 + y2) / 2
seg_mid[(round(mx), round(my))].append((mx, my))
def companion_count(cx, cy, R=0.8):
n = 0
for gx in (int(cx) - 1, int(cx), int(cx) + 1):
for gy in (int(cy) - 1, int(cy), int(cy) + 1):
for mx, my in seg_mid.get((gx, gy), ()):
if math.hypot(mx - cx, my - cy) <= R:
n += 1
return n
ball_anchors = [(x, y) for x, y, r in circ
if 0.28 <= r <= 0.46 and companion_count(x, y) >= 2]
Q = 0.6
node = collections.defaultdict(list)
for idx, (x1, y1, x2, y2) in enumerate(raw):
L = math.hypot(x2 - x1, y2 - y1)
node[(round(x1 / Q), round(y1 / Q))].append((idx, (x1, y1), (x2, y2), L))
node[(round(x2 / Q), round(y2 / Q))].append((idx, (x2, y2), (x1, y1), L))
def conn(p):
out = []
k = (round(p[0] / Q), round(p[1] / Q))
for i in (-1, 0, 1):
for j in (-1, 0, 1):
for idx, a, b, L in node.get((k[0] + i, k[1] + j), ()):
if math.hypot(a[0] - p[0], a[1] - p[1]) <= 0.8:
out.append((idx, a, b, L))
return out
def ball_ahead(p, dv, ml=14):
best, bd = None, ml
for x, y in ball_anchors:
v = (x - p[0], y - p[1])
d = math.hypot(*v)
if 0.5 < d <= ml and (v[0] * dv[0] + v[1] * dv[1]) / d > 0.65 and d < bd:
bd, best = d, (x, y)
return best
lm_pts = list(landmarks.items())
def nearest_landmark(p, r=10):
best, bd = None, r
for t, (lx, ly) in lm_pts:
d = math.hypot(lx - p[0], ly - p[1])
if d < bd:
bd, best = d, t
return best
def resume_nozzle(lm_tag, pt, flow, visited):
"""랜드마크 블록 통과 후 하류 노즐에서 흐름 재개점 산출.
블록 실체 범위 밖, 안정적 흐름축(flow) 투영 최대인 미방문 배관 끝점.
(직전 세그먼트 came 이 아닌 시드 흐름축을 써 병렬·수직 홉 노이즈 흡수.)"""
if lm_tag not in lm_block:
return None
cx, cy, (bx0, by0, bx1, by1) = lm_block[lm_tag]
half = max(bx1 - bx0, by1 - by0) / 2
rng = max(half + 8.0, 10.0)
best, bscore = None, 0.6
for idx, (x1, y1, x2, y2) in enumerate(raw):
if idx in visited:
continue
for ex, ey, ox, oy in ((x1, y1, x2, y2), (x2, y2, x1, y1)):
# 블록 실체 내부 끝점 제외(노이즈), 중심 기준 하류측만
if bx0 - 0.5 <= ex <= bx1 + 0.5 and by0 - 0.5 <= ey <= by1 + 0.5:
continue
vx, vy = ex - cx, ey - cy
d = math.hypot(vx, vy)
if not (half * 0.4 < d <= rng):
continue
proj = (vx * flow[0] + vy * flow[1]) / d
if proj > bscore:
bscore, best = proj, (idx, (ex, ey), (ox, oy))
return best
# ── Phase 3: 흐름 추적 ────────────────────────────────────────────
def trace(start, came, max_steps=400):
flow_axis = came # 안정적 시드 흐름축(통과 재개 기준)
pt = start
visited = set()
path = [] # [(kind, detail, (x,y))]
hit = collections.OrderedDict() # 만난 랜드마크 순서
exited = set() # 통과-후-재개 완료한 랜드마크
recirc = None
for _ in range(max_steps):
lm = nearest_landmark(pt, 10)
if lm:
if lm in hit and lm not in exited and len(hit) > 2:
# 이미 방문 랜드마크 복귀 = 재순환(킥백) 루프 → 분기 종료
recirc = lm
path.append(("recirculation", lm, pt))
break
hit.setdefault(lm, pt)
if lm not in exited:
exited.add(lm)
nz = resume_nozzle(lm, pt, flow_axis, visited)
if nz:
idx, ep, op = nz
visited.add(idx)
d = (op[0] - ep[0], op[1] - ep[1])
n = math.hypot(*d) or 1
came = (d[0] / n, d[1] / n)
pt = (op[0], op[1])
path.append(("through", lm, pt))
continue
cand = [(idx, a, b, L) for idx, a, b, L in conn(pt) if idx not in visited]
if cand:
cand.sort(key=lambda c: -((c[2][0] - c[1][0]) * came[0] + (c[2][1] - c[1][1]) * came[1])
/ (math.hypot(c[2][0] - c[1][0], c[2][1] - c[1][1]) or 1))
idx, a, b, L = cand[0]
visited.add(idx)
d = (b[0] - a[0], b[1] - a[1])
n = math.hypot(*d) or 1
came = (d[0] / n, d[1] / n)
pt = (b[0], b[1])
path.append(("pipe", round(L, 1), pt))
continue
bv = ball_ahead(pt, came)
if bv:
best, bd = None, 18
for i, (x1, y1, x2, y2) in enumerate(raw):
if i in visited:
continue
for ex, ey in ((x1, y1), (x2, y2)):
v = (ex - pt[0], ey - pt[1])
dist = math.hypot(*v)
if 1.0 < dist < bd and (v[0] * came[0] + v[1] * came[1]) / dist > 0.7:
bd, best = dist, (ex, ey)
path.append(("commodity_passthrough", None, pt))
if best:
pt = best
continue
# 일반 전방 gap-브리지: commodity/작도 끊김을 흐름방향 콘으로 점프.
# 안정적 흐름축(flow_axis) 콘 사용 — through/병렬 홉 직후 came 역행 억제.
# (좁은 콘 dot>0.85 + 수직오프셋 최소 → 밀집부 누수 억제)
cone = flow_axis if (path and path[-1][0] in ("through", "gap_bridge")) else came
gb, gdist = None, 12.0
for i, (x1, y1, x2, y2) in enumerate(raw):
if i in visited:
continue
for ex, ey, ox, oy in ((x1, y1, x2, y2), (x2, y2, x1, y1)):
vx, vy = ex - pt[0], ey - pt[1]
dist = math.hypot(vx, vy)
if not (0.8 < dist <= gdist):
continue
al = (vx * cone[0] + vy * cone[1]) / dist
if al <= 0.85:
continue
perp = abs(vx * cone[1] - vy * cone[0])
score = dist + perp * 4
if score < gdist:
gdist, gb = score, (i, (ex, ey), (ox, oy))
if gb:
idx, ep, op = gb
# 착지점(ep)으로 이동, 흐름방향(cone) 유지 — 이후 정상 conn 으로 전진.
# (먼 끝 op 로 강제 점프 시 역행 가능 → 착지 후 재선택)
jl = math.hypot(ep[0] - pt[0], ep[1] - pt[1])
came = cone
pt = (ep[0], ep[1])
path.append(("gap_bridge", round(jl, 1), pt))
continue
break
return list(hit.keys()), path, recirc
# 시드: OFFPAGE 발원 소스 + 검증된 ground-truth 진입점
SEEDS = []
for src_tag, apex, dv, line_no, cn in offpage_seeds:
SEEDS.append((f"OFFPAGE {src_tag}→({cn})", apex, dv, src_tag, line_no))
SEEDS.append(("P-10101 discharge", (1702.52, 5217.69), (1, 0), "P-10101", None))
results = []
for label, sp, dv, src_tag, line_no in SEEDS:
lms, path, recirc = trace(sp, dv)
if src_tag and (not lms or lms[0] != src_tag):
lms = [src_tag] + lms
pc = sum(1 for k, _, _ in path if k == "pipe")
cm = sum(1 for k, _, _ in path if k == "commodity_passthrough")
th = sum(1 for k, _, _ in path if k == "through")
gb_n = sum(1 for k, _, _ in path if k == "gap_bridge")
results.append({"seed": label, "start": [round(sp[0], 2), round(sp[1], 2)],
"source_tag": src_tag, "line_number": line_no,
"landmarks_in_order": lms,
"pipe_segments": pc, "commodity_passed": cm,
"blocks_through": th, "gap_bridges": gb_n,
"recirculation": recirc})
if DEBUG:
print(f"\n[DEBUG {label}] steps={len(path)}")
for k, det, xy in path:
if k in ("through", "recirculation", "gap_bridge"):
print(f" {k:<22} {det} @({xy[0]:.1f},{xy[1]:.1f})")
# ── Phase 4: 라인번호 ↔ 배관 귀속 (직접배치 + 지시선 tip) ──────────
# SOLID 화살촉 tip ↔ 배관 끝점 sub-0.5u 매칭, 또는 텍스트 복도 근접.
solids = []
for e in msp:
if e.dxftype() == "SOLID":
vs = [getattr(e.dxf, k, None) for k in ("vtx0", "vtx1", "vtx2", "vtx3")]
vs = [(p.x, p.y) for p in vs if p is not None]
if len(vs) >= 3:
solids.append(vs)
pipe_ends = []
for idx, (x1, y1, x2, y2) in enumerate(raw):
pipe_ends.append((idx, x1, y1))
pipe_ends.append((idx, x2, y2))
def pipe_near(px, py, tol):
best, bd = None, tol
for idx, ex, ey in pipe_ends:
d = math.hypot(ex - px, ey - py)
if d < bd:
bd, best = d, idx
return best
line_no_attr = [] # {line_number, pipe_idx, mode}
for v, tx, ty in all_text:
vk = v.split()[0].replace(" ", "")
if not (LINE_NO.match(vk) or re.search(r'\d+A?x\d+A?', vk)):
continue
# 케이스 B: 텍스트 근방 SOLID 화살촉 → tip 이 닿는 배관
attached = None
for vs in solids:
sc = (sum(p[0] for p in vs) / len(vs), sum(p[1] for p in vs) / len(vs))
if math.hypot(sc[0] - tx, sc[1] - ty) > 6.0:
continue
for tipx, tipy in vs:
pi = pipe_near(tipx, tipy, 0.5)
if pi is not None:
attached = (pi, "leader")
break
if attached:
break
# 케이스 A: 직접배치 — 텍스트 박스 근접 배관
if attached is None:
pi = pipe_near(tx, ty, 3.0)
if pi is not None:
attached = (pi, "direct")
if attached:
line_no_attr.append({"line_number": v, "pipe_idx": attached[0],
"mode": attached[1]})
# ── Phase 5: 랜드마크 From/To 엣지 환원 ───────────────────────────
edges = []
for r in results:
lm = r["landmarks_in_order"]
for i in range(len(lm) - 1):
edges.append({"from": lm[i], "to": lm[i + 1], "type": "process",
"via_seed": r["seed"],
"line_number": r["line_number"] if i == 0 else None})
if r["recirculation"] and lm:
edges.append({"from": lm[-1], "to": r["recirculation"],
"type": "recirculation", "via_seed": r["seed"],
"line_number": None})
out = {
"drawing": os.path.basename(DXF),
"stats": {"landmarks": len(landmarks),
"offpage_seeds": len(offpage_seeds),
"ball_anchors": len(ball_anchors),
"raw_segments": len(raw),
"traces": len(results),
"edges": len(edges),
"line_number_attributions": len(line_no_attr)},
"landmarks": [{"tag": t, "x": round(xy[0], 1), "y": round(xy[1], 1)}
for t, xy in sorted(landmarks.items())],
"traces": results,
"edges": edges,
}
prefix = os.path.basename(DXF).split("_")[0].split(".")[0]
op = os.path.join("mcp-server", "storage", f"{prefix}_connections.json")
os.makedirs(os.path.dirname(op), exist_ok=True)
json.dump(out, open(op, "w", encoding="utf-8"), ensure_ascii=False, indent=1)
print(f"landmarks={len(landmarks)} offpage_seeds={len(offpage_seeds)} "
f"ball_anchors={len(ball_anchors)} raw_seg={len(raw)} "
f"edges={len(edges)} lineno_attr={len(line_no_attr)}")
for r in results:
print(f"\n[{r['seed']}] pipe={r['pipe_segments']} "
f"through={r['blocks_through']} commodity={r['commodity_passed']}"
f"{' recirc=' + r['recirculation'] if r['recirculation'] else ''}")
print(" 랜드마크 순서:", "".join(r["landmarks_in_order"]) or "(없음)")
print(f"\n{op}")
if __name__ == "__main__":
main()