"""Phase B Verifier MVP — tool 인자 결정적 검증. 룰: R1(tag-existence), R2(area-format), R4(trace_connections 보강). 응답 텍스트 검증(R3, R5)은 Phase B.2 — stub만. """ from __future__ import annotations import json, re, time, pathlib from typing import Optional, Any TAG_RE = re.compile(r'^[a-z][a-z0-9]*-\d+[a-z]?(\.[a-z0-9]+)?$') AREA_RE = re.compile(r'^[A-Z][A-Z0-9]+(-\d+)?$') VALID_AREAS = {"P1","P2","P3","P4","P5","P6","P8","P9","P10","UTIL","PACKING"} VALID_DIRECTIONS = {"upstream","downstream"} _LOG_DIR = pathlib.Path(__file__).parent / "logs" class VerifierError(Exception): def __init__(self, rule: str, code: str, hint: str, **extra): self.rule, self.code, self.hint, self.extra = rule, code, hint, extra def to_dict(self) -> dict: return {"verifier_error": f"{self.rule}.{self.code}", "hint": self.hint, **self.extra} # ── 태그 캐시 ── _tag_cache: set[str] | None = None _tag_cache_at: float = 0.0 _TAG_CACHE_TTL: float = 300.0 # 5분 def _load_tag_set(get_conn) -> set[str]: global _tag_cache, _tag_cache_at if _tag_cache is not None and (time.time() - _tag_cache_at) < _TAG_CACHE_TTL: return _tag_cache conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT DISTINCT base_tag FROM tag_metadata WHERE base_tag IS NOT NULL") s = {r[0].lower() for r in cur.fetchall() if r[0]} cur.execute("SELECT DISTINCT tag_no FROM pid_equipment WHERE tag_no IS NOT NULL") s |= {r[0].lower() for r in cur.fetchall() if r[0]} finally: conn.close() _tag_cache, _tag_cache_at = s, time.time() return s # ── R1 ── def validate_tag(tag: str | None, get_conn) -> Optional[VerifierError]: if not tag: return None t = tag.lower() if not TAG_RE.match(t): return VerifierError("R1","invalid_tag_format", hint=f"태그 형식 비정상: '{tag}'. 예시: ficq-6113.pv, p-6102") base = t.split('.')[0] tags = _load_tag_set(get_conn) if base in tags: return None toks = [p for p in base.split('-') if len(p) > 2] suggested = sorted({x for x in tags if any(p in x for p in toks)})[:3] return VerifierError("R1","tag_not_found", hint=f"태그 '{tag}' 는 DB에 존재하지 않습니다. find_tags(query=..., sub_area=...) 로 먼저 검색하세요.", suggested=suggested) # ── R2 ── def validate_area(area: str | None, field: str = "area") -> Optional[VerifierError]: if not area: return None area = area.upper() if not AREA_RE.match(area): return VerifierError("R2","invalid_area_format", hint=f"{field}='{area}' 형식 오류. 'P6' 또는 'P6-1' 형식 사용.", valid_areas=sorted(VALID_AREAS)) base = area.split('-')[0] if base not in VALID_AREAS: return VerifierError("R2","unknown_area", hint=f"{field}='{area}' 미존재. valid: {sorted(VALID_AREAS)} (P7 없음)") return None # ── R4 ── def validate_direction(d: str | None) -> Optional[VerifierError]: if d and d not in VALID_DIRECTIONS: return VerifierError("R4","invalid_direction", hint=f"direction='{d}' 잘못. 'upstream' 또는 'downstream' 만 허용") return None def validate_max_depth(n: Any) -> Optional[VerifierError]: if n is None: return None try: v = int(n) except Exception: return VerifierError("R4","invalid_max_depth", hint=f"max_depth='{n}' 은 정수여야 함") if not (1 <= v <= 50): return VerifierError("R4","max_depth_out_of_range", hint=f"max_depth={v} 범위 외 (1~50)") return None # ── R3, R5 stub (Phase B.2) ── def validate_response_text(text: str) -> Optional[VerifierError]: return None # Phase B.2 구현 예정 # ── 로그 적재 (Phase C LoRA 입력) ── def log_rejection(tool: str, params: dict, err: VerifierError) -> None: _LOG_DIR.mkdir(parents=True, exist_ok=True) today = time.strftime("%Y-%m-%d") rec = {"ts": time.time(), "tool": tool, "params": params, "verifier_error": err.to_dict()} with (_LOG_DIR / f"{today}.jsonl").open("a", encoding="utf-8") as f: f.write(json.dumps(rec, ensure_ascii=False) + "\n")