Files
HC900-Crawler/mcp-server/eval/run_eval.py
windpacer 16fc7a2598 Initial commit: HC900 Crawler
Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL.
기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체.

- industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버)
- src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer)
- mcp-server: Python FastMCP (RAG/NL2SQL/P&ID)
- 다중 컨트롤러(N-Controller) 지원

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:28:14 +09:00

369 lines
16 KiB
Python

#!/usr/bin/env python3
"""Phase 0 평가 러너 — 골든셋으로 모델(사다리) 평가.
byOPUS 플랜 Phase 0. 카테고리별 채점 + fabrication rate + 회귀(baseline) 비교.
LLM이 안 떠 있어도 `--lint` 로 골든셋 구조 검증 가능(오프라인).
사용:
# 오프라인 구조검증 (LLM 불필요)
python run_eval.py --lint
# 단일 모델 평가 (vLLM 떠 있어야 함)
python run_eval.py --base-url http://localhost:8001/v1 --model Qwen3-8B
# 모델 사다리 (models.json 의 reachable 모델 전부)
python run_eval.py --models models.json
# 회귀 비교
python run_eval.py --model Qwen3-8B --baseline results/qwen3-8b_xxx.json
채점은 production 프롬프트를 그대로 사용:
- nl2sql : worker/sql_prompt.SQL_SYSTEM_PROMPT (단일 소스)
- 그 외 : 본 파일의 TOOL_ROUTING/GROUNDING/SCAFFOLD 프롬프트
(= Phase 2 Verifier·Rule 코퍼스의 초안)
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import os
import re
import sys
from pathlib import Path
HERE = Path(__file__).resolve().parent
MCP = HERE.parent
sys.path.insert(0, str(MCP / "worker"))
# production NL2SQL 프롬프트 (단일 소스). import 실패해도 lint 는 동작.
try:
from sql_prompt import SQL_SYSTEM_PROMPT # type: ignore
except Exception: # pragma: no cover
SQL_SYSTEM_PROMPT = None
# ── 평가용 프롬프트 (Phase 2 Verifier/Rule 초안) ──────────────────────────────
TOOL_LIST = [
("active_alarms", "현재 활성 알람/트립 조회"),
("query_events", "기간 이벤트 로그 조회"),
("summarize_events", "기간 이벤트 요약(특정 area, 어제/지난 N시간 등)"),
("generate_status_report", "교대 보고서/운전 상태 종합 보고"),
("find_tags", "태그 검색(어떤 태그가 있는지, 종류별 찾기)"),
("query_pv_history", "특정 태그 과거 PV 추이"),
("query_with_nl", "자연어로 임의 SQL 결과"),
("run_sql", "직접 SQL 실행"),
("search_kb", "절차서/매뉴얼/설계서 등 문서 검색"),
("trace_connections", "유체/공급/계통 경로 추적(어디서 공급, 흐름)"),
]
KNOWN_TOOLS = [t for t, _ in TOOL_LIST]
TOOL_ROUTING_PROMPT = (
"너는 산업 플랜트 운전 어시스턴트의 의도 라우터다.\n"
"사용자 질문에 가장 알맞은 도구 하나를 아래 목록에서 고른다.\n\n"
+ "\n".join(f"- {name}: {desc}" for name, desc in TOOL_LIST)
+ "\n\n규칙: 설명/문장 없이 도구 이름 하나만 정확히 출력하라(예: active_alarms)."
)
GROUNDING_PROMPT = (
"너는 산업 플랜트 운전 어시스턴트다. 안전이 중요하므로 사실을 지어내지 않는다.\n"
"원칙:\n"
"- 주어진 컨텍스트/데이터에 있는 사실만 사용한다.\n"
"- 근거가 없으면 '확인 불가' 또는 '정보 없음'이라고 답하고, 추정·창작하지 않는다.\n"
"- description/값이 비어있으면 '없음/미등록'으로 표시하고 용도를 지어내지 않는다.\n"
"- 존재하지 않는 태그/플랜트/설비에 대해서는 존재하지 않는다고 답한다.\n"
)
SCAFFOLD_PROMPT = (
GROUNDING_PROMPT
+ "\n제어/상태 판단 질문은 반드시 아래 5단계를 이 순서의 라벨로 답하라:\n"
" 제어변수: ...\n 현재값: ...\n 설정치: ...\n 제약: ...\n 판단: ...\n"
"데이터가 없는 항목은 라벨을 유지하고 '미제공'이라고 쓴다.\n"
)
REFUSE_MARKERS = [
"확인 불가", "판정 불가", "판단 불가", "정보 없음", "정보가 없", "데이터가 없", "데이터 없",
"없습니다", "미등록", "모름", "알 수 없", "존재하지 않", "제공되지 않", "예측할 수 없",
"예측은 불가", "확인할 수 없", "조회되지 않",
]
# ── 채점기 ────────────────────────────────────────────────────────────────────
def _norm(s: str) -> str:
s = s or ""
# 마크다운 펜스 제거
s = re.sub(r"```[a-zA-Z]*", "", s).replace("```", "")
return re.sub(r"\s+", " ", s).strip().lower()
def score_nl2sql(out: str, exp: dict) -> tuple[bool, str]:
n = _norm(out)
missing = [s for s in exp.get("sql_contains", []) if s.lower() not in n]
bad = [s for s in exp.get("must_not", []) if s.lower() in n]
ok = not missing and not bad
detail = []
if missing:
detail.append(f"누락:{missing}")
if bad:
detail.append(f"금지포함:{bad}")
return ok, "; ".join(detail) or "ok"
def score_tool(out: str, exp: dict) -> tuple[bool, str]:
n = _norm(out)
picked = next((t for t in KNOWN_TOOLS if t in n), "")
targets = exp.get("tool_any") or ([exp["tool"]] if "tool" in exp else [])
ok = picked in [t.lower() for t in targets]
return ok, f"선택={picked or '(없음)'} 기대={targets}"
def score_abstain(out: str, exp: dict) -> tuple[bool, str]:
n = _norm(out)
refused = any(m.lower() in n for m in REFUSE_MARKERS)
fabricated = [s for s in exp.get("must_not", []) if s.lower() in n]
ok = refused and not fabricated
detail = []
if not refused:
detail.append("거부표현 없음(헛소리 위험)")
if fabricated:
detail.append(f"창작포함:{fabricated}")
return ok, "; ".join(detail) or "거부 ok"
def score_scaffold(out: str, exp: dict) -> tuple[bool, str]:
n = _norm(out)
steps = [s.lower() for s in exp.get("steps", [])]
pos, last, missing, order_ok = [], -1, [], True
for s in steps:
i = n.find(s)
if i < 0:
missing.append(s)
else:
if exp.get("order") and i < last:
order_ok = False
last = i
pos.append(s)
ok = not missing and order_ok
detail = []
if missing:
detail.append(f"누락단계:{missing}")
if not order_ok:
detail.append("순서 어긋남")
return ok, "; ".join(detail) or "절차 ok"
def score_grounding(out: str, exp: dict) -> tuple[bool, str]:
n = _norm(out)
missing = [s for s in exp.get("answer_contains", []) if s.lower() not in n]
ok = not missing
return ok, (f"누락:{missing}" if missing else "ok")
SCORERS = {
"nl2sql": score_nl2sql,
"tool_call": score_tool,
"abstain": score_abstain,
"scaffold": score_scaffold,
"grounding": score_grounding,
}
# ── 프롬프트 빌드 ─────────────────────────────────────────────────────────────
def build_messages(item: dict) -> list[dict]:
cat, q = item["category"], item["question"]
ctx = item.get("context", "")
user = (f"[컨텍스트]\n{ctx}\n\n[질문]\n{q}" if ctx else q)
if cat == "nl2sql":
if not SQL_SYSTEM_PROMPT:
raise RuntimeError("sql_prompt.SQL_SYSTEM_PROMPT import 실패")
return [{"role": "system", "content": SQL_SYSTEM_PROMPT}, {"role": "user", "content": q}]
if cat == "tool_call":
return [{"role": "system", "content": TOOL_ROUTING_PROMPT}, {"role": "user", "content": q}]
if cat == "scaffold":
return [{"role": "system", "content": SCAFFOLD_PROMPT}, {"role": "user", "content": user}]
# abstain, grounding
return [{"role": "system", "content": GROUNDING_PROMPT}, {"role": "user", "content": user}]
# ── 골든셋 로드 + lint ────────────────────────────────────────────────────────
REQUIRED_EXPECT = {
"nl2sql": ["sql_contains"],
"tool_call": [], # tool 또는 tool_any
"abstain": ["refuse"],
"scaffold": ["steps"],
"grounding": ["answer_contains"],
}
def load_golden(path: Path) -> list[dict]:
items = []
for i, line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
line = line.strip()
if not line:
continue
try:
items.append(json.loads(line))
except json.JSONDecodeError as e:
raise SystemExit(f"[lint] {path.name}:{i} JSON 파싱 실패: {e}")
return items
def lint(items: list[dict]) -> int:
errs, counts = [], {}
ids = set()
for it in items:
cat = it.get("category")
counts[cat] = counts.get(cat, 0) + 1
for f in ("id", "category", "question", "expect"):
if f not in it:
errs.append(f"{it.get('id','?')}: 필드 누락 '{f}'")
if it.get("id") in ids:
errs.append(f"중복 id: {it['id']}")
ids.add(it.get("id"))
if cat not in SCORERS:
errs.append(f"{it.get('id')}: 알 수 없는 category '{cat}'")
continue
exp = it.get("expect", {})
for req in REQUIRED_EXPECT[cat]:
if req not in exp:
errs.append(f"{it.get('id')}: expect.{req} 누락")
if cat == "tool_call" and not (exp.get("tool") or exp.get("tool_any")):
errs.append(f"{it.get('id')}: tool 또는 tool_any 필요")
if cat == "tool_call":
for t in (exp.get("tool_any") or [exp.get("tool")]):
if t and t not in KNOWN_TOOLS:
errs.append(f"{it.get('id')}: 미지의 도구 '{t}'")
print(f"골든셋 {len(items)}문항 " + " ".join(f"{k}={v}" for k, v in sorted(counts.items())))
if errs:
print("\n".join("" + e for e in errs))
return 1
print("✓ lint 통과")
return 0
# ── 모델 호출 + 실행 ──────────────────────────────────────────────────────────
def make_client(base_url: str):
from openai import OpenAI
return OpenAI(base_url=base_url, api_key=os.environ.get("VLLM_API_KEY", "dummy"))
def run_model(name: str, base_url: str, model: str, items: list[dict], args) -> dict:
client = make_client(base_url)
results, cat_pass, cat_tot = [], {}, {}
print(f"\n=== [{name}] {model} @ {base_url} ===")
for it in items:
cat = it["category"]
cat_tot[cat] = cat_tot.get(cat, 0) + 1
try:
kw = dict(model=model, messages=build_messages(it),
max_tokens=args.max_tokens, temperature=args.temperature)
if args.seed is not None:
kw["seed"] = args.seed
if args.no_think:
kw["extra_body"] = {"chat_template_kwargs": {"enable_thinking": False}}
resp = client.chat.completions.create(**kw)
out = resp.choices[0].message.content or ""
ok, detail = SCORERS[cat](out, it["expect"])
except Exception as e:
ok, detail, out = False, f"ERROR: {e}", ""
cat_pass[cat] = cat_pass.get(cat, 0) + (1 if ok else 0)
results.append({"id": it["id"], "category": cat, "passed": ok, "detail": detail})
print(f" [{'PASS' if ok else 'FAIL'}] {it['id']:<12} {detail}")
total = len(results)
passed = sum(r["passed"] for r in results)
ab_tot = cat_tot.get("abstain", 0)
ab_pass = cat_pass.get("abstain", 0)
fab_rate = round((ab_tot - ab_pass) / ab_tot, 3) if ab_tot else None
per_cat = {c: {"pass": cat_pass.get(c, 0), "total": cat_tot[c],
"pct": round(cat_pass.get(c, 0) / cat_tot[c], 3)} for c in sorted(cat_tot)}
print(" ── 카테고리별 ──")
for c, v in per_cat.items():
print(f" {c:<11} {v['pass']:>2}/{v['total']:<2} {v['pct']*100:5.1f}%")
print(f" 전체 {passed}/{total} ({passed/total*100:.1f}%) "
f"fabrication_rate={fab_rate if fab_rate is not None else 'n/a'}")
return {"name": name, "model": model, "base_url": base_url,
"timestamp": dt.datetime.now().isoformat(timespec="seconds"),
"overall": {"pass": passed, "total": total, "pct": round(passed / total, 3)},
"fabrication_rate": fab_rate, "per_category": per_cat, "items": results}
def diff_baseline(cur: dict, baseline_path: Path):
base = json.loads(baseline_path.read_text(encoding="utf-8"))
base_map = {r["id"]: r["passed"] for r in base.get("items", [])}
regr = [r["id"] for r in cur["items"] if base_map.get(r["id"]) and not r["passed"]]
fixed = [r["id"] for r in cur["items"] if base_map.get(r["id"]) is False and r["passed"]]
print(f"\n── 회귀 비교 vs {baseline_path.name} ──")
print(f" 회귀(PASS→FAIL): {regr or '없음'}")
print(f" 개선(FAIL→PASS): {fixed or '없음'}")
return 1 if regr else 0
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--golden", default=str(HERE / "golden.jsonl"))
ap.add_argument("--models", help="모델 사다리 JSON (models.json)")
ap.add_argument("--base-url", default=os.environ.get("VLLM_BASE_URL", "http://localhost:8001/v1"))
ap.add_argument("--model", default=os.environ.get("VLLM_MODEL", ""))
ap.add_argument("--out", default=str(HERE / "results"))
ap.add_argument("--baseline", help="회귀 비교용 이전 결과 JSON")
ap.add_argument("--categories", help="콤마구분 카테고리 필터")
ap.add_argument("--limit", type=int, default=0, help="앞에서 N문항만")
ap.add_argument("--max-tokens", type=int, default=1024)
ap.add_argument("--temperature", type=float, default=0.0)
ap.add_argument("--seed", type=int, default=42)
ap.add_argument("--lint", action="store_true", help="LLM 없이 골든셋 구조검증만")
ap.add_argument("--no-think", action="store_true", help="Qwen3 등 thinking 모델: enable_thinking=false")
args = ap.parse_args()
items = load_golden(Path(args.golden))
if args.categories:
cats = set(args.categories.split(","))
items = [it for it in items if it.get("category") in cats]
if args.limit:
items = items[: args.limit]
if args.lint:
return lint(items)
rc = lint(items)
if rc:
print("lint 실패 — 평가 중단")
return rc
# 모델 목록 결정
if args.models:
cfg = json.loads(Path(args.models).read_text(encoding="utf-8"))
rungs = cfg["models"]
elif args.model:
rungs = [{"name": args.model, "base_url": args.base_url, "model": args.model}]
else:
print("모델 미지정: --model 또는 --models 필요 (또는 VLLM_MODEL 환경변수)")
return 2
outdir = Path(args.out)
outdir.mkdir(parents=True, exist_ok=True)
summaries, rc = [], 0
for r in rungs:
res = run_model(r["name"], r.get("base_url", args.base_url), r["model"], items, args)
summaries.append(res)
stamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
fp = outdir / f"{r['name']}_{stamp}.json"
fp.write_text(json.dumps(res, ensure_ascii=False, indent=2), encoding="utf-8")
print(f" → 저장 {fp}")
if args.baseline:
rc |= diff_baseline(res, Path(args.baseline))
if len(summaries) > 1:
print("\n=== 모델 사다리 비교 (합격선 넘는 最小 모델 선택) ===")
print(f" {'model':<22} {'overall':>8} {'fab_rate':>9} per-category")
for s in summaries:
pc = " ".join(f"{c}:{v['pct']*100:.0f}%" for c, v in s["per_category"].items())
print(f" {s['name']:<22} {s['overall']['pct']*100:6.1f}% "
f"{(s['fabrication_rate'] if s['fabrication_rate'] is not None else 0)*100:7.1f}% {pc}")
return rc
if __name__ == "__main__":
raise SystemExit(main())