Files
HC900-Crawler/mcp-server/verifier/validators.py
windpacer 16fc7a2598 Initial commit: HC900 Crawler
Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL.
기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체.

- industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버)
- src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer)
- mcp-server: Python FastMCP (RAG/NL2SQL/P&ID)
- 다중 컨트롤러(N-Controller) 지원

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:28:14 +09:00

106 lines
4.2 KiB
Python

"""Phase B Verifier MVP — tool 인자 결정적 검증.
룰: R1(tag-existence), R2(area-format), R4(trace_connections 보강).
응답 텍스트 검증(R3, R5)은 Phase B.2 — stub만.
"""
from __future__ import annotations
import json, re, time, pathlib
from typing import Optional, Any
TAG_RE = re.compile(r'^[a-z][a-z0-9]*-\d+[a-z]?(\.[a-z0-9]+)?$')
AREA_RE = re.compile(r'^[A-Z][A-Z0-9]+(-\d+)?$')
VALID_AREAS = {"P1","P2","P3","P4","P5","P6","P8","P9","P10","UTIL","PACKING"}
VALID_DIRECTIONS = {"upstream","downstream"}
_LOG_DIR = pathlib.Path(__file__).parent / "logs"
class VerifierError(Exception):
def __init__(self, rule: str, code: str, hint: str, **extra):
self.rule, self.code, self.hint, self.extra = rule, code, hint, extra
def to_dict(self) -> dict:
return {"verifier_error": f"{self.rule}.{self.code}", "hint": self.hint, **self.extra}
# ── 태그 캐시 ──
_tag_cache: set[str] | None = None
_tag_cache_at: float = 0.0
_TAG_CACHE_TTL: float = 300.0 # 5분
def _load_tag_set(get_conn) -> set[str]:
global _tag_cache, _tag_cache_at
if _tag_cache is not None and (time.time() - _tag_cache_at) < _TAG_CACHE_TTL:
return _tag_cache
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT DISTINCT base_tag FROM tag_metadata WHERE base_tag IS NOT NULL")
s = {r[0].lower() for r in cur.fetchall() if r[0]}
cur.execute("SELECT DISTINCT tag_no FROM pid_equipment WHERE tag_no IS NOT NULL")
s |= {r[0].lower() for r in cur.fetchall() if r[0]}
finally:
conn.close()
_tag_cache, _tag_cache_at = s, time.time()
return s
# ── R1 ──
def validate_tag(tag: str | None, get_conn) -> Optional[VerifierError]:
if not tag:
return None
t = tag.lower()
if not TAG_RE.match(t):
return VerifierError("R1","invalid_tag_format",
hint=f"태그 형식 비정상: '{tag}'. 예시: ficq-6113.pv, p-6102")
base = t.split('.')[0]
tags = _load_tag_set(get_conn)
if base in tags:
return None
toks = [p for p in base.split('-') if len(p) > 2]
suggested = sorted({x for x in tags if any(p in x for p in toks)})[:3]
return VerifierError("R1","tag_not_found",
hint=f"태그 '{tag}' 는 DB에 존재하지 않습니다. find_tags(query=..., sub_area=...) 로 먼저 검색하세요.",
suggested=suggested)
# ── R2 ──
def validate_area(area: str | None, field: str = "area") -> Optional[VerifierError]:
if not area:
return None
area = area.upper()
if not AREA_RE.match(area):
return VerifierError("R2","invalid_area_format",
hint=f"{field}='{area}' 형식 오류. 'P6' 또는 'P6-1' 형식 사용.",
valid_areas=sorted(VALID_AREAS))
base = area.split('-')[0]
if base not in VALID_AREAS:
return VerifierError("R2","unknown_area",
hint=f"{field}='{area}' 미존재. valid: {sorted(VALID_AREAS)} (P7 없음)")
return None
# ── R4 ──
def validate_direction(d: str | None) -> Optional[VerifierError]:
if d and d not in VALID_DIRECTIONS:
return VerifierError("R4","invalid_direction",
hint=f"direction='{d}' 잘못. 'upstream' 또는 'downstream' 만 허용")
return None
def validate_max_depth(n: Any) -> Optional[VerifierError]:
if n is None: return None
try:
v = int(n)
except Exception:
return VerifierError("R4","invalid_max_depth", hint=f"max_depth='{n}' 은 정수여야 함")
if not (1 <= v <= 50):
return VerifierError("R4","max_depth_out_of_range", hint=f"max_depth={v} 범위 외 (1~50)")
return None
# ── R3, R5 stub (Phase B.2) ──
def validate_response_text(text: str) -> Optional[VerifierError]:
return None # Phase B.2 구현 예정
# ── 로그 적재 (Phase C LoRA 입력) ──
def log_rejection(tool: str, params: dict, err: VerifierError) -> None:
_LOG_DIR.mkdir(parents=True, exist_ok=True)
today = time.strftime("%Y-%m-%d")
rec = {"ts": time.time(), "tool": tool, "params": params,
"verifier_error": err.to_dict()}
with (_LOG_DIR / f"{today}.jsonl").open("a", encoding="utf-8") as f:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")