106 lines
4.2 KiB
Python
106 lines
4.2 KiB
Python
"""Phase B Verifier MVP — tool 인자 결정적 검증.
|
|
|
|
룰: R1(tag-existence), R2(area-format), R4(trace_connections 보강).
|
|
응답 텍스트 검증(R3, R5)은 Phase B.2 — stub만.
|
|
"""
|
|
from __future__ import annotations
|
|
import json, re, time, pathlib
|
|
from typing import Optional, Any
|
|
|
|
TAG_RE = re.compile(r'^[a-z][a-z0-9]*-\d+[a-z]?(\.[a-z0-9]+)?$')
|
|
AREA_RE = re.compile(r'^[A-Z][A-Z0-9]+(-\d+)?$')
|
|
VALID_AREAS = {"P1","P2","P3","P4","P5","P6","P8","P9","P10","UTIL","PACKING"}
|
|
VALID_DIRECTIONS = {"upstream","downstream"}
|
|
|
|
_LOG_DIR = pathlib.Path(__file__).parent / "logs"
|
|
|
|
class VerifierError(Exception):
|
|
def __init__(self, rule: str, code: str, hint: str, **extra):
|
|
self.rule, self.code, self.hint, self.extra = rule, code, hint, extra
|
|
def to_dict(self) -> dict:
|
|
return {"verifier_error": f"{self.rule}.{self.code}", "hint": self.hint, **self.extra}
|
|
|
|
# ── 태그 캐시 ──
|
|
_tag_cache: set[str] | None = None
|
|
_tag_cache_at: float = 0.0
|
|
_TAG_CACHE_TTL: float = 300.0 # 5분
|
|
|
|
def _load_tag_set(get_conn) -> set[str]:
|
|
global _tag_cache, _tag_cache_at
|
|
if _tag_cache is not None and (time.time() - _tag_cache_at) < _TAG_CACHE_TTL:
|
|
return _tag_cache
|
|
conn = get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT DISTINCT base_tag FROM tag_metadata WHERE base_tag IS NOT NULL")
|
|
s = {r[0].lower() for r in cur.fetchall() if r[0]}
|
|
cur.execute("SELECT DISTINCT tag_no FROM pid_equipment WHERE tag_no IS NOT NULL")
|
|
s |= {r[0].lower() for r in cur.fetchall() if r[0]}
|
|
finally:
|
|
conn.close()
|
|
_tag_cache, _tag_cache_at = s, time.time()
|
|
return s
|
|
|
|
# ── R1 ──
|
|
def validate_tag(tag: str | None, get_conn) -> Optional[VerifierError]:
|
|
if not tag:
|
|
return None
|
|
t = tag.lower()
|
|
if not TAG_RE.match(t):
|
|
return VerifierError("R1","invalid_tag_format",
|
|
hint=f"태그 형식 비정상: '{tag}'. 예시: ficq-6113.pv, p-6102")
|
|
base = t.split('.')[0]
|
|
tags = _load_tag_set(get_conn)
|
|
if base in tags:
|
|
return None
|
|
toks = [p for p in base.split('-') if len(p) > 2]
|
|
suggested = sorted({x for x in tags if any(p in x for p in toks)})[:3]
|
|
return VerifierError("R1","tag_not_found",
|
|
hint=f"태그 '{tag}' 는 DB에 존재하지 않습니다. find_tags(query=..., sub_area=...) 로 먼저 검색하세요.",
|
|
suggested=suggested)
|
|
|
|
# ── R2 ──
|
|
def validate_area(area: str | None, field: str = "area") -> Optional[VerifierError]:
|
|
if not area:
|
|
return None
|
|
area = area.upper()
|
|
if not AREA_RE.match(area):
|
|
return VerifierError("R2","invalid_area_format",
|
|
hint=f"{field}='{area}' 형식 오류. 'P6' 또는 'P6-1' 형식 사용.",
|
|
valid_areas=sorted(VALID_AREAS))
|
|
base = area.split('-')[0]
|
|
if base not in VALID_AREAS:
|
|
return VerifierError("R2","unknown_area",
|
|
hint=f"{field}='{area}' 미존재. valid: {sorted(VALID_AREAS)} (P7 없음)")
|
|
return None
|
|
|
|
# ── R4 ──
|
|
def validate_direction(d: str | None) -> Optional[VerifierError]:
|
|
if d and d not in VALID_DIRECTIONS:
|
|
return VerifierError("R4","invalid_direction",
|
|
hint=f"direction='{d}' 잘못. 'upstream' 또는 'downstream' 만 허용")
|
|
return None
|
|
|
|
def validate_max_depth(n: Any) -> Optional[VerifierError]:
|
|
if n is None: return None
|
|
try:
|
|
v = int(n)
|
|
except Exception:
|
|
return VerifierError("R4","invalid_max_depth", hint=f"max_depth='{n}' 은 정수여야 함")
|
|
if not (1 <= v <= 50):
|
|
return VerifierError("R4","max_depth_out_of_range", hint=f"max_depth={v} 범위 외 (1~50)")
|
|
return None
|
|
|
|
# ── R3, R5 stub (Phase B.2) ──
|
|
def validate_response_text(text: str) -> Optional[VerifierError]:
|
|
return None # Phase B.2 구현 예정
|
|
|
|
# ── 로그 적재 (Phase C LoRA 입력) ──
|
|
def log_rejection(tool: str, params: dict, err: VerifierError) -> None:
|
|
_LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
today = time.strftime("%Y-%m-%d")
|
|
rec = {"ts": time.time(), "tool": tool, "params": params,
|
|
"verifier_error": err.to_dict()}
|
|
with (_LOG_DIR / f"{today}.jsonl").open("a", encoding="utf-8") as f:
|
|
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
|