Files
HC900-Crawler/mcp-server/training/curate_from_verifier.py
windpacer 16fc7a2598 Initial commit: HC900 Crawler
Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL.
기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체.

- industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버)
- src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer)
- mcp-server: Python FastMCP (RAG/NL2SQL/P&ID)
- 다중 컨트롤러(N-Controller) 지원

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:28:14 +09:00

400 lines
34 KiB
Python

#!/usr/bin/env python3
"""Phase C1 (재작성) — 카테고리별 자연 포맷 SFT 데이터 생성.
v1(Big Pickle 작업본) 문제:
- 5턴 ShareChat 포맷을 모든 카테고리에 강제 → 가짜 verifier_error 발명
- 존재하는 area(P1)를 unknown으로 학습
- eval 골든의 expect 필드를 tool turn 으로 오용
- 진짜 verifier_log 0건 변환
v2(이 파일):
- Verifier-origin: 5턴 ShareChat (실제 거부 로그만)
- abstain/scaffold/grounding: 3턴 (system/user/assistant)
- nl2sql: 3턴 (system=SQL_SYSTEM_PROMPT/user/assistant_sql)
- 가짜 tool turn 일절 금지
- 모든 태그·area·enum 실재 사용
"""
from __future__ import annotations
import json, hashlib, difflib, pathlib, sys
from collections import Counter
HERE = pathlib.Path(__file__).resolve().parent
MCP = HERE.parent
SFT = HERE / "sft_data.jsonl"
STATS = HERE / "sft_data_stats.md"
VLOGS = MCP / "verifier" / "logs"
GOLDEN = MCP / "eval" / "golden.jsonl"
sys.path.insert(0, str(MCP / "worker"))
try:
from sql_prompt import SQL_SYSTEM_PROMPT # type: ignore
except Exception:
SQL_SYSTEM_PROMPT = "(SQL_SYSTEM_PROMPT import 실패 — worker/sql_prompt.py 확인)"
# ── 시스템 프롬프트 (3변형) ──────────────────────────────────────────────────
SYS_GROUNDED = (
"당신은 P6(PGMEA) 플랜트 운전 어시스턴트다.\n"
"원칙:\n"
"- 사실 지어내기 금지. 모르거나 DB·도구 결과에 없으면 '확인 불가/정보 없음'이라 답한다.\n"
"- 사용자가 명시하지 않은 태그/식별자는 절대 추측·합성하지 말 것. 불확실 시 find_tags 로 먼저 검증.\n"
"- area는 'P[숫자](-[숫자])?' 형식만 사용. valid: P1,P2,P3,P4,P5,P6,P8,P9,P10,UTIL,PACKING (P7 없음).\n"
"- 외부 도구가 빈 결과를 주면 자기 인자가 잘못됐을 가능성을 먼저 인정한다."
)
SYS_SCAFFOLD = SYS_GROUNDED + (
"\n제어/상태 판단 질문은 반드시 5단계 라벨로 답하라:\n"
" 제어변수: ...\n 현재값: ...\n 설정치: ...\n 제약: ...\n 판단: ..."
)
SYS_NL2SQL = SQL_SYSTEM_PROMPT # production 동일소스
# ── 1) Verifier 로그 → 5턴 ShareChat ─────────────────────────────────────────
VERIFIER_FIX = {
("trace_connections", "raw_material_input"): (
"6-1차 플랜트 원료 투입 경로 알려줘",
"find_tags", {"query": "원료", "sub_area": "P6-1"}),
("trace_connections", "RM-6101"): (
"6-1차 원료 펌프 다운스트림 경로 알려줘",
"find_tags", {"query": "원료 펌프", "sub_area": "P6-1"}),
("generate_status_report", '"area": "6-1"'): (
"6-1차 플랜트 현재 운전 상황 보고해줘",
"generate_status_report", {"area": "P6-1", "hours": 24}),
("find_tags", "query.*P6-1.*sub_area"): (
"P6-1 플랜트 태그 목록 보여줘",
"find_tags", {"sub_area": "P6-1"}),
}
def convert_verifier_trip(trip: dict) -> dict | None:
tool = trip["tool"]
params = trip["params"]
err = trip["verifier_error"]
key_str = json.dumps(params, ensure_ascii=False)
fix = None
for (k_tool, k_sub), val in VERIFIER_FIX.items():
if k_tool == tool and k_sub in key_str:
fix = val; break
if not fix:
return None
user, corr_tool, corr_args = fix
return {
"messages": [
{"role": "system", "content": SYS_GROUNDED},
{"role": "user", "content": user},
{"role": "assistant", "content": "", "tool_calls": [{
"id": "call_1", "type": "function",
"function": {"name": tool,
"arguments": json.dumps(params, ensure_ascii=False)}}]},
{"role": "tool", "tool_call_id": "call_1",
"content": json.dumps(err, ensure_ascii=False)},
{"role": "assistant", "content": "", "tool_calls": [{
"id": "call_2", "type": "function",
"function": {"name": corr_tool,
"arguments": json.dumps(corr_args, ensure_ascii=False)}}]},
],
"_meta": {"src": "verifier_log", "category": "tool_call",
"code": err["verifier_error"], "tool": tool},
}
def load_verifier_items() -> list[dict]:
out = []
if not VLOGS.exists(): return out
for f in sorted(VLOGS.glob("*.jsonl")):
for line in f.read_text(encoding="utf-8").splitlines():
if line.strip():
item = convert_verifier_trip(json.loads(line))
if item: out.append(item)
return out
# ── 2) ABSTAIN (3턴, 25건) ───────────────────────────────────────────────────
ABSTAIN = [
("p-6102 펌프 다음 정비 일정 언제야?",
"정비 일정 DB가 시스템에 없어 확인 불가합니다. 현재 운전 상태는 v_plant_running_state, 과거 이벤트는 query_events 로 확인 가능합니다."),
("ficq-6113 다음 교정 주기 알려줘",
"계기 교정 주기 데이터가 DB에 없어 확인 불가합니다. 현재 레인지는 v_instrument_range 로 확인 가능합니다."),
("vp-6117 진공펌프 부품 교체 주기는?",
"부품/정비 주기 DB가 없어 확인 불가합니다."),
("ti-6101 센서 고장 진단해줘",
"고장 진단 도구가 시스템에 없습니다. 현재값과 추이는 query_pv_history, 알람은 active_alarms 로 확인 가능합니다."),
("p-6102 의 제조사 정보 어디서 볼 수 있어?",
"제조사 정보가 DB에 없어 확인 불가합니다."),
("내일 P6 운전 계획 알려줘",
"운전 계획 DB가 없어 확인 불가합니다."),
("pica-6111 압력 안전 상한선은?",
"안전 한계 데이터가 시스템에 없어 확인 불가합니다. 계기 레인지(eu_lo/eu_hi)는 v_instrument_range 로 확인 가능합니다."),
("내일 6-1차 PGMEA 생산량 예측해줘",
"생산량 예측 모델이 시스템에 없어 확인 불가합니다. 과거 추이는 query_pv_history 로 조회 가능합니다."),
("6-1차 어제 수율 몇 % 였어?",
"수율 데이터가 시스템에 없어 확인 불가합니다."),
("p-6102 펌프 가격 알려줘",
"가격 정보가 DB에 없어 확인 불가합니다."),
("ficq-6113 의 description 이 비어있는데 무슨 용도야?",
"tag_metadata 의 description 이 null(없음)입니다. 임의 추정 금지 — 용도는 확인 불가입니다."),
("P7 라인 활성 알람 목록 조회해줘",
"P7 플랜트는 존재하지 않습니다. valid area: P1,P2,P3,P4,P5,P6,P8,P9,P10,UTIL,PACKING."),
("vp-9999 펌프의 현재 운전 상태 확인해줘",
"vp-9999 태그는 DB에 존재하지 않습니다. find_tags 로 실제 vp- 태그를 검색해 보세요."),
("P9 라인 가동 여부 알려줘",
"P9 area 에는 펌프 enum 태그가 등록돼 있지 않아 운전 여부 판정 불가합니다."),
("P10 펌프 트립 상태 알려줘",
"P10 area 에는 펌프 데이터가 등록돼 있지 않아 트립 여부 확인 불가합니다."),
("UTIL 영역 펌프 운전 상태는?",
"UTIL area 에는 펌프 enum 태그가 없어 운전 판정 불가합니다."),
("PACKING 라인 펌프 RUN/STOP 알려줘",
"PACKING area 에는 펌프 enum 태그가 없어 펌프 운전 판정 불가합니다. 일반 알람은 active_alarms 로 확인 가능합니다."),
("p-6102 정비 이력 알려줘",
"정비 이력 DB가 없어 확인 불가합니다."),
("ti-6101 센서 모델명은?",
"센서 모델 정보가 DB에 없어 확인 불가합니다."),
("xv-6105 도면상 정확한 위치는?",
"도면 페이지 매핑 정보가 시스템에 없습니다. 관련 도면은 search_kb 로 검색 가능합니다."),
("li-6112 alarm setpoint 값은?",
"alarm setpoint 데이터가 별도 저장돼 있지 않아 확인 불가합니다."),
("p-6102 시운전 일자는?",
"시운전 이력 DB가 없어 확인 불가합니다."),
("플랜트 전체 가동률 어제 몇 %?",
"가동률 지표는 시스템이 자동 산출하지 않습니다."),
("어제 vp-6117 가 몇 번 트립됐어?",
"트립 횟수 자동 집계 도구는 없습니다. 개별 이벤트는 query_events 로 조회 후 수동 집계 필요합니다."),
("ficq-6113 미래 fault 예측해줘",
"예측 모델이 시스템에 없어 확인 불가합니다."),
]
# ── 3) SCAFFOLD (3턴, 25건) — 5단계 라벨 ─────────────────────────────────────
SCAFFOLD = [
("ficq-6113 의 SP가 50인데 PV가 30이야. 어떻게 봐야 해? (range 0~2000 kg/hr)",
"제어변수: ficq-6113 (유량 PV)\n현재값: 30 kg/hr\n설정치: 50 kg/hr\n제약: 미달 -40%, 절대치는 정상 범위(0~2000) 내. SP 추종 실패\n판단: 운전 의도(감량) 여부 우선 확인. 의도 외이면 펌프 상태(v_plant_running_state) 및 라인 막힘(trace_connections) 점검"),
("pica-6111 압력 SP=100 PV=43 mmHg (range 0~760, 760=대기압). 적정한가?",
"제어변수: pica-6111 (압력 PV)\n현재값: 43 mmHg\n설정치: 100 mmHg\n제약: 측류추출 진공탑은 깊은 진공(저압) 정상. 43은 SP보다 더 깊은 진공\n판단: 진공이 SP보다 깊다 = 진공펌프 과작동 또는 leak 감소. 정상 범위지만 SP 추종 실패 → SP 검토 또는 진공조절 밸브 확인"),
("ti-6101 PV=95℃ (SP=90, range 0~150). 정상인가?",
"제어변수: ti-6101 (온도 PV)\n현재값: 95℃\n설정치: 90℃\n제약: 5℃ 오버. 일반 컨트롤 deadband ±2~5℃\n판단: 경계 수준. SP 초과 추세 시 reboiler/스팀 공급 확인. query_pv_history 로 1시간 추이 권장"),
("li-6112 PV=80% SP=60% (range 0~100). 조치 필요?",
"제어변수: li-6112 (레벨 PV)\n현재값: 80%\n설정치: 60%\n제약: 상한 100%까지 여유 20%. SP 초과 +20%\n판단: 누적 유입 우세. 출구 펌프·밸브 작동 trace_connections 로 확인, 적정 SP까지 배출 유도. 상한 도달 시 overflow 위험"),
("ficq-6201 PV=0 kg/hr 인데 p-6201.pv = {5 | R-RUN | }. 어떻게 해석?",
"제어변수: ficq-6201 (유량) + p-6201 (펌프 상태)\n현재값: 유량 0, 펌프 R-RUN\n설정치: SP 미제공\n제약: 펌프 운전 중인데 유량 0 = deadhead, 센서 오류, standby 가능\n판단: v_plant_running_state_corroborated 로 corroborated_status 확인 — SUSPICIOUS_RUNNING 가능성 높음"),
("vp-6117 진공펌프 R-RUN 인데 pica-6111 PV=600 mmHg (range 0~760). 판단?",
"제어변수: vp-6117 (진공펌프) + pica-6111 (압력)\n현재값: 펌프 R-RUN, 압력 600 mmHg\n설정치: SP 미제공\n제약: 대기압 760 근처 = 진공 약함. 진공펌프 작동 중인데 진공이 안 잡힘\n판단: SUSPICIOUS — 펌프 무부하 회전/leak 가능. 라인 leak 점검 및 v_plant_running_state_corroborated 의 vacuum_torr 확인"),
("xv-6105 가 OFF 인데 ficq-6105 PV가 50 kg/hr 잡힘. 어떻게 봐?",
"제어변수: xv-6105 (디지털 밸브) + ficq-6105 (유량)\n현재값: 밸브 OFF, 유량 50 kg/hr\n설정치: 밸브 OFF → 유량 0 기대\n제약: 닫힌 밸브에 유량 = 밸브 누설 또는 우회 라인 활성\n판단: 밸브 시트 점검 + 우회 라인 trace_connections 로 확인 필요"),
("p-6102 R-STOP 인데 하류 ficq-6113.pv 가 20→40 변동. 가능한가?",
"제어변수: p-6102 (펌프) + ficq-6113 (하류 유량)\n현재값: 펌프 STOP, 하류 유량 변동\n설정치: 펌프 STOP → 하류 유량 안정 또는 0 기대\n제약: 정지 펌프 하류 유량 변동 = 다른 공급원 또는 헤더 압력 영향 가능\n판단: trace_connections 로 ficq-6113 상류 분기 확인 (병렬 펌프 가능성)"),
("ti-6101 PV가 1시간 동안 70→95 상승. 정상?",
"제어변수: ti-6101 (온도)\n현재값: 95℃ (1h 전 70℃)\n설정치: SP 확인 필요\n제약: 시간당 +25℃ 가파른 상승\n판단: 의도된 ramp-up 인지 외란인지 식별. 의도 외라면 reboiler/feed 조성 확인 + query_pv_history 로 더 긴 추이 분석"),
("ficq-6113 PV가 1초 내 50→55 진동. 해석?",
"제어변수: ficq-6113 (유량) 신호 품질\n현재값: 50~55 ±5 진동\n설정치: 절대치 50 가정\n제약: 운전유량 대비 ±10% 진동은 조절성 손상 임계\n판단: 펌프 cavitation, 라인 vibration, 또는 PT 노이즈 가능. 펌프 흡입측 압력·필터 차압 확인"),
("Reboiler 출구 ti-6105 PV=140℃ (PGMEA 분해온도 180℃). 안전한가?",
"제어변수: ti-6105 (Reboiler 출구온도)\n현재값: 140℃\n설정치: PGMEA 분해온도 180℃ 미만 유지\n제약: 분해온도까지 여유 40℃. 진공 도움으로 끓는점은 70~90℃대\n판단: 현재 안전 범위. 다만 ramp-up 추세 모니터링. query_pv_history 권장"),
("p-6201 (P6 공용 펌프) STOP 인데 P6-1·P6-2 둘 다 운전 보고됨?",
"제어변수: p-6201 (공용 펌프, sub_area=P6-1,P6-2)\n현재값: 펌프 STOP + 양쪽 sub_area 운전 보고\n설정치: 공용 펌프 정지 시 양쪽 sub_area 모두 영향 가능\n제약: 공용 펌프 정지 + 양쪽 운전 = 다른 공용 펌프 가동 또는 보고 오류\n판단: v_plant_running_state_corroborated 로 P6-1·P6-2 각각의 corroborated_status 검증"),
("ficq-6101 PV 가 -5 (음수) 로 보고. 정상?",
"제어변수: ficq-6101 (유량)\n현재값: -5 kg/hr (음수)\n설정치: 유량은 0 이상 기대\n제약: 음수는 물리적 역류 또는 센서 zero-offset 오차\n판단: 센서 zero point 우선 점검. 실제 역류 가능성은 trace_connections 로 하류 압력 source 확인"),
("li-6112 PV 가 갑자기 0 으로 떨어짐. 가능한가?",
"제어변수: li-6112 (레벨)\n현재값: 0%\n설정치: SP 확인 필요\n제약: 0% = 빈 탱크 또는 센서 fault/단선\n판단: query_pv_history 로 직전 변화율 확인. 점진적 하강이면 배출, 급락이면 센서 fault 우선 의심"),
("ficq-6113 SP=50 PV=50 인데 OP가 계속 100% 포화. 정상?",
"제어변수: ficq-6113 (유량) 컨트롤 액션\n현재값: PV=50, OP=100% saturate\n설정치: SP=50 (편차 0)\n제약: 편차 0인데 OP 포화 = 컨트롤 밸브 capacity 부족 또는 액튜에이터 stuck\n판단: 컨트롤 밸브 capacity 재검토 + 액튜에이터 동작 점검 (trace_connections)"),
("xv-6124 가 OPEN/CLOSE 빠르게 토글 (chattering). 처치?",
"제어변수: xv-6124 (디지털 밸브)\n현재값: 빠른 토글\n설정치: 정상은 안정 상태 유지\n제약: chattering = 위 흐름 신호 deadband 부족 또는 액튜에이터 instability\n판단: deadband 확대 또는 SOL/액튜에이터 점검. query_events 로 토글 빈도 정량화"),
("ti-6101 PV 가 5분간 같은 값 frozen. 어떻게 봐?",
"제어변수: ti-6101 (온도) 데이터 신선도\n현재값: 5분간 frozen\n설정치: SP 확인 필요\n제약: 5분 frozen = 데이터 수집 정지 또는 센서 stuck\n판단: realtime_table.timestamp 갱신 여부 확인. 멈춤이면 수집기(ExperionRealtimeService) 점검 우선"),
("pica-6111 PV=0 mmHg (완전진공). 가능?",
"제어변수: pica-6111 (압력)\n현재값: 0 mmHg\n설정치: SP 확인\n제약: 완전진공은 물리적 불가능(잔류가스 항상 존재). 0 = 센서 fault/calibration\n판단: 센서 점검 우선. 정상 진공 운전 범위는 통상 10~100 mmHg"),
("p-6102 R-RUN 인데 흡입측 압력이 음수. 정상?",
"제어변수: p-6102 (펌프) + 흡입 압력\n현재값: 펌프 RUN, 흡입 진공\n설정치: 흡입 양압 기대\n제약: NPSH 부족 = cavitation 위험\n판단: 흡입 라인 막힘·탱크 레벨 부족·valve 미개방 점검 (trace_connections 상류)"),
("ficq-6113 와 ficq-6201 (병렬 라인) 합이 SP 보다 작음. 진단?",
"제어변수: ficq-6113 + ficq-6201 (병렬 유량)\n현재값: 합산 < SP\n설정치: 합산 = SP 기대\n제약: 병렬 합 미달 = 한쪽 라인 미가동 또는 막힘\n판단: 각 라인 펌프 상태(v_plant_running_state_corroborated) + 밸브 위치 확인"),
("E-6203 가열기 ti-6203 PV 가 SP 보다 30℃ 낮음. 처치?",
"제어변수: ti-6203 (가열기 출구온도)\n현재값: SP - 30℃\n설정치: 가열 충분 필요\n제약: 스팀 공급 부족 또는 fouling 가능\n판단: 스팀 헤더 압력 확인 + 열교환 효율 과거 추이 비교 (query_pv_history)"),
("FCV-6201 OP 80% 인데 ficq-6201 PV 거의 0. 가능?",
"제어변수: FCV-6201 (밸브 OP) + ficq-6201 (유량)\n현재값: 밸브 80% 열림, 유량 ~0\n설정치: 밸브 열리면 유량 증가 기대\n제약: 밸브 열림 + 유량 없음 = 상류 압력 부족, 펌프 정지, 또는 라인 막힘\n판단: 상류 펌프 상태(v_plant_running_state_corroborated) + 라인 점검 (trace_connections)"),
("측류추출 탑 C-6111 중간 추출 라인 ficq-6118 PV 변동 큼. 영향?",
"제어변수: ficq-6118 (제품 추출 유량)\n현재값: 변동 큼\n설정치: SP 안정 유지 필요\n제약: 측류 추출 유량 변동은 제품 순도 영향 (Sweet Spot 이탈)\n판단: 컬럼 압력·온도(pica-6111, ti-6101) 안정 확인. PIC/TIC 응답성 점검"),
("운전 중에 active_alarms 가 갑자기 0건. 정상인가?",
"제어변수: 알람 시스템 자체\n현재값: 알람 0건\n설정치: 평소 N건 기대\n제약: 갑작스런 0 = 알람 시스템 정지 가능\n판단: event_history_table 최근 갱신 확인. 알람 collector 정지 시 운전 안전성 판단 불가 — 즉시 점검 요"),
("realtime_table.timestamp 가 5분 전부터 frozen. 어떻게 봐?",
"제어변수: 실시간 수집기 상태\n현재값: timestamp frozen 5분\n설정치: 정상은 1분 이내 갱신\n제약: 수집기 stall = 모든 운전 판정 무효\n판단: ExperionRealtimeService 상태(RealtimeServiceStatus) 확인. stall 확인 시 데이터 기반 판정 보류"),
]
# ── 4) GROUNDING (3턴, 22건) ─────────────────────────────────────────────────
GROUNDING = [
("[컨텍스트] sub_area P6-1 = Column C-6111, 번호패턴 61xx, 제품 PGMEA. P6-2 = C-6211, 제품 HBM.\n[질문] 6-1차 플랜트는 무슨 제품을 만들어?",
"P6-1 (Column C-6111) 은 PGMEA 를 생산합니다."),
("[컨텍스트] PGMEA 상압 끓는점 약 146℃, 분해온도 180℃ 이상에서 열분해. 진공(50~100 Torr)으로 끓는점을 70~90℃대로 낮춰 열분해 방지.\n[질문] PGMEA 정제에 진공 증류가 필요한 이유는?",
"PGMEA 상압 끓는점(146℃)이 분해온도(180℃)에 가까워 열분해 위험이 있어, 진공(50~100 Torr)으로 끓는점을 70~90℃대로 낮춰 열분해를 방지합니다."),
("[컨텍스트] event_history_table.area 는 짧은 형식(P6, P3 등). active_alarms / query_events / summarize_events / generate_status_report 도 모두 짧은 형식 입력.\n[질문] 6차 플랜트 알람 조회 시 area 값은?",
"P6 (짧은 형식)."),
("[컨텍스트] 측류추출: 탑 상부 = 경비물 제거, 탑 중간(Middle) = 초고순도 PGMEA 제품 측류 추출(Side-stream Draw), 탑 하부 = 중비물 제거.\n[질문] 초고순도 제품은 탑의 어디에서 추출하나?",
"탑 중간(Middle) 에서 측류 추출(Side-stream Draw) 합니다."),
("[컨텍스트] P1, P2, P7, P9, P10, UTIL, PACKING 은 펌프 enum 태그 0개로 운전 판정 불가. P3, P4, P5, P6, P8 만 펌프 데이터 보유.\n[질문] P1 플랜트 운전 여부 판정 가능?",
"P1 에는 펌프 enum 태그가 없어 운전 여부 판정 불가합니다."),
("[컨텍스트] v_plant_running_state_corroborated 는 sub_area 컬럼 보유. v_plant_running_state 와 _agg 는 area 레벨 전용으로 sub_area 없음.\n[질문] sub_area(예: P6-1) 필터가 필요한 질문에는 어느 뷰?",
"v_plant_running_state_corroborated 를 사용합니다 (sub_area 컬럼 보유)."),
("[컨텍스트] 공용 태그는 sub_area에 'P6-1,P6-2' 처럼 콤마로 저장. 토큰 매칭: 'P6-1' = ANY(string_to_array(sub_area, ',')).\n[질문] P6-1 LIKE '%P6-1%' 패턴으로 공용 태그도 같이 잡힘?",
"예. LIKE '%P6-1%''P6-1,P6-2' 공용 태그도 함께 매칭합니다."),
("[컨텍스트] p- 접두사 = 공정 펌프 (원료/리플럭스/제품 이송), vp- 접두사 = 진공 펌프 (감압 컬럼용, 예: vp-6117).\n[질문] vp-6117 는 어떤 종류의 펌프?",
"진공 펌프 (감압 컬럼용) 입니다."),
("[컨텍스트] 펌프 운전 enum: L-RUN/R-RUN = 운전, L-STOP/R-STOP = 정지, L-TRIP/R-TRIP = 트립. L=로컬, R=원격.\n[질문] {5 | R-RUN | } 의미는?",
"원격(Remote) 운전 (Run) 상태."),
("[컨텍스트] L-STOP/R-STOP = 정지, STOP/OFF 단독은 펌프 아닌 panel/alarm point.\n[질문] {0 | L-STOP | } 의미는?",
"로컬(Local) 정지 (Stop) 상태."),
("[컨텍스트] L-TRIP/R-TRIP = 트립 상태.\n[질문] {6 | R-TRIP | } 의미는?",
"원격(Remote) 트립 상태."),
("[컨텍스트] corroborated_status 종류: CONFIRMED_RUNNING(RUN+신선 유량>0.5 또는 진공<300), SUSPICIOUS_RUNNING(RUN+유량없음/진공안잡힘), STALE(데이터 stale/frozen), INDETERMINATE_RUNNING(신호매핑없음), STOPPED, TRIPPED.\n[질문] 펌프 R-RUN 인데 유량이 0 일 때 corroborated_status 는?",
"SUSPICIOUS_RUNNING (RUN 이지만 유량 없음 — deadhead/센서오류/standby 의심)."),
("[컨텍스트] STALE = 펌프 RUN 이지만 연결 유량/진공 데이터가 stale/frozen (수집 지연/중단). 운전 단정 금지.\n[질문] STALE 의미와 권장 처치는?",
"데이터 수집이 지연/중단된 상태. 운전 여부 단정 금지하고 수집기(ExperionRealtimeService) 상태 점검."),
("[컨텍스트] v_instrument_range 컬럼: base_tag, unit, eu_lo, eu_hi (DOUBLE PRECISION). 접미사 없는 base_tag로 매칭.\n[질문] ficq-6113 의 계기 상한/하한 컬럼명은?",
"eu_hi (상한), eu_lo (하한). base_tag = 'ficq-6113' 로 매칭."),
("[컨텍스트] sub_area 'P6-1,P6-2' 공용 처리. 매칭: sub_area LIKE '%P6-1%'.\n[질문] P6-1 sub_area 필터 SQL 패턴은?",
"sub_area LIKE '%P6-1%' (공용 태그도 포함)."),
("[컨텍스트] history_table.recorded_at TIMESTAMPTZ (UTC). 사용자 입력은 KST(UTC+9) 가정. WHERE 조건 시 KST→UTC 변환 필요.\n[질문] 한국시간 12:00 데이터 조회 시 WHERE 조건의 UTC 시각은?",
"UTC 03:00 (KST 12:00 - 9시간)."),
("[컨텍스트] time_bucket 함수 금지. N분 버킷은 to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)).\n[질문] 5분 간격 집계 시 어떤 함수 조합?",
"to_timestamp + FLOOR + EXTRACT(EPOCH FROM recorded_at) / 300 (= 5*60) * 300."),
("[컨텍스트] tagname 은 모두 소문자 ('ficq-6113.pv'). 대소문자 구분.\n[질문] FICQ-6113.PV 로 검색하면?",
"매칭되지 않습니다. 소문자 'ficq-6113.pv' 로 변환 필요."),
("[컨텍스트] value 컬럼은 TEXT. 수치 연산 시 ::double precision 캐스트 필수.\n[질문] AVG(value) 직접 사용 가능?",
"불가. AVG(value::double precision) 캐스트 필요."),
("[컨텍스트] v_tag_summary.area 는 Experion 원본 '{12 | P6 | }' 형식. event_history_table.area 는 짧은 'P6'.\n[질문] v_tag_summary 에서 짧은 P6 와 매칭하려면?",
"split_part 또는 트림 사용 — 예: trim(split_part(area, '|', 2)) = 'P6'."),
("[컨텍스트] Verifier 룰: 거부 시 hint 와 (해당 시) suggested 반환. 모델은 hint 대로 자기교정 호출 권장.\n[질문] verifier_error 받았을 때 권장 행동?",
"hint 에 따라 자기교정 호출(주로 find_tags 로 재검색 또는 인자 형식 교정)."),
("[컨텍스트] tag_metadata 의 attribute 종류: desc, area, sub_area, units, eulo, euhi.\n[질문] 태그의 단위(unit) 정보는 어느 attribute?",
"attribute = 'units' (v_instrument_range 의 unit 컬럼으로도 조회 가능)."),
]
# ── 5) NL2SQL (3턴, 25건) ────────────────────────────────────────────────────
NL2SQL = [
("ficq-6113.pv 직전 1시간 데이터 추출해줘",
"SELECT recorded_at AT TIME ZONE 'Asia/Seoul' AS recorded_at, tagname, value\nFROM history_table\nWHERE tagname = 'ficq-6113.pv'\n AND recorded_at >= NOW() - INTERVAL '1 hour'\nORDER BY recorded_at"),
("p-6102.pv 현재값",
"SELECT tagname, livevalue, timestamp AT TIME ZONE 'Asia/Seoul' AS timestamp\nFROM realtime_table\nWHERE tagname = 'p-6102.pv'"),
("현재 RUNNING 상태 플랜트 전부 보여줘",
"SELECT area_code, status, running_pump_tags\nFROM v_plant_running_state\nWHERE status = 'RUNNING'\nORDER BY area_code"),
("6-1차 플랜트 정지된 펌프 목록",
"SELECT base_tag, corroborated_status\nFROM v_plant_running_state_corroborated\nWHERE sub_area LIKE '%P6-1%'\n AND corroborated_status IN ('STOPPED', 'TRIPPED')\nORDER BY base_tag"),
("트립 펌프 보유한 area 조회",
"SELECT area_code, tripped_pumps\nFROM v_plant_running_state\nWHERE tripped_pumps > 0\nORDER BY area_code"),
("ficq-6113 계기 레인지 상한 하한 알려줘",
"SELECT base_tag, unit, eu_lo, eu_hi\nFROM v_instrument_range\nWHERE base_tag = 'ficq-6113'"),
("ti-6101.pv 를 2분 간격 평균으로 최근 3시간",
"SELECT to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120) AT TIME ZONE 'Asia/Seoul' AS bucket,\n tagname,\n AVG(value::double precision) AS avg_val\nFROM history_table\nWHERE tagname = 'ti-6101.pv'\n AND recorded_at >= NOW() - INTERVAL '3 hours'\nGROUP BY to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120), tagname\nORDER BY to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120), tagname"),
("pica-6111.pv 직전 30분 변동 데이터 보여줘",
"SELECT recorded_at AT TIME ZONE 'Asia/Seoul' AS recorded_at, tagname, value\nFROM history_table\nWHERE tagname = 'pica-6111.pv'\n AND recorded_at >= NOW() - INTERVAL '30 minutes'\nORDER BY recorded_at"),
("vp-6117 어제 24시간 추이 한국시간으로",
"SELECT recorded_at AT TIME ZONE 'Asia/Seoul' AS recorded_at, tagname, value\nFROM history_table\nWHERE tagname = 'vp-6117.pv'\n AND recorded_at >= (CURRENT_DATE - INTERVAL '1 day') + INTERVAL '-9 hours'\n AND recorded_at < CURRENT_DATE + INTERVAL '-9 hours'\nORDER BY recorded_at"),
("P6 펌프 base_tag 목록",
"SELECT DISTINCT base_tag\nFROM v_tag_summary\nWHERE trim(split_part(area, '|', 2)) = 'P6'\n AND (base_tag LIKE 'p-%' OR base_tag LIKE 'vp-%')\nORDER BY base_tag"),
("li-6112 SP 와 PV 함께 보여줘",
"SELECT base_tag, pv, sp, op, description\nFROM v_tag_summary\nWHERE base_tag = 'li-6112'"),
("p-6201 공용 펌프 sub_area",
"SELECT base_tag, value AS sub_area\nFROM tag_metadata\nWHERE base_tag = 'p-6201' AND attribute = 'sub_area'"),
("어제 P6 알람 몇 건",
"SELECT COUNT(*) AS alarm_count\nFROM event_history_table\nWHERE area = 'P6'\n AND event_time >= (CURRENT_DATE - INTERVAL '1 day')\n AND event_time < CURRENT_DATE"),
("ficq-6113.pv 5분 간격 평균 최근 6시간 한국시간",
"SELECT to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/300)*300) AT TIME ZONE 'Asia/Seoul' AS bucket,\n tagname,\n AVG(value::double precision) AS avg_val\nFROM history_table\nWHERE tagname = 'ficq-6113.pv'\n AND recorded_at >= NOW() - INTERVAL '6 hours'\nGROUP BY to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/300)*300), tagname\nORDER BY to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/300)*300)"),
("p- 시작 공정 펌프 전체",
"SELECT DISTINCT split_part(tagname, '.', 1) AS base_tag\nFROM realtime_table\nWHERE tagname LIKE 'p-%'\nORDER BY base_tag"),
("vp- 시작 진공 펌프 전체",
"SELECT DISTINCT split_part(tagname, '.', 1) AS base_tag\nFROM realtime_table\nWHERE tagname LIKE 'vp-%'\nORDER BY base_tag"),
("P6-1 CONFIRMED_RUNNING 펌프",
"SELECT base_tag, flow_kg_hr, vacuum_torr\nFROM v_plant_running_state_corroborated\nWHERE sub_area LIKE '%P6-1%'\n AND corroborated_status = 'CONFIRMED_RUNNING'\nORDER BY base_tag"),
("P6-1 SUSPICIOUS 펌프 목록",
"SELECT base_tag, flow_kg_hr, vacuum_torr\nFROM v_plant_running_state_corroborated\nWHERE sub_area LIKE '%P6-1%'\n AND corroborated_status = 'SUSPICIOUS_RUNNING'\nORDER BY base_tag"),
("ficq-6101 description",
"SELECT base_tag, value AS description\nFROM tag_metadata\nWHERE base_tag = 'ficq-6101' AND attribute = 'desc'"),
("xv-6105 instate0 현재 상태",
"SELECT tagname, livevalue\nFROM realtime_table\nWHERE tagname = 'xv-6105.instate0'"),
("ti-6101.pv 와 ti-6102.pv 최근 1시간 비교",
"SELECT recorded_at AT TIME ZONE 'Asia/Seoul' AS recorded_at, tagname, value\nFROM history_table\nWHERE tagname IN ('ti-6101.pv', 'ti-6102.pv')\n AND recorded_at >= NOW() - INTERVAL '1 hour'\nORDER BY recorded_at, tagname"),
("어제 P6 트립 이벤트",
"SELECT event_time AT TIME ZONE 'Asia/Seoul' AS event_time, tag_name, message\nFROM event_history_table\nWHERE area = 'P6'\n AND event_time >= (CURRENT_DATE - INTERVAL '1 day')\n AND event_time < CURRENT_DATE\n AND message ILIKE '%TRIP%'\nORDER BY event_time"),
("p-6116 1시간 평균 1분 간격",
"SELECT to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/60)*60) AT TIME ZONE 'Asia/Seoul' AS bucket,\n tagname,\n AVG(value::double precision) AS avg_val\nFROM history_table\nWHERE tagname = 'p-6116.pv'\n AND recorded_at >= NOW() - INTERVAL '1 hour'\nGROUP BY to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/60)*60), tagname\nORDER BY to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/60)*60)"),
("P6 area 의 sub_area 목록",
"SELECT DISTINCT value AS sub_area\nFROM tag_metadata\nWHERE attribute = 'sub_area' AND value LIKE '%P6%'\nORDER BY sub_area"),
("p-6201 의 sub_area (공용 여부 확인)",
"SELECT value AS sub_area\nFROM tag_metadata\nWHERE base_tag = 'p-6201' AND attribute = 'sub_area'"),
]
# ── 6) 생성 + 검증 + 통계 ────────────────────────────────────────────────────
def make_3turn(sys_prompt, user, assistant, category):
return {
"messages": [
{"role": "system", "content": sys_prompt},
{"role": "user", "content": user},
{"role": "assistant", "content": assistant},
],
"_meta": {"src": "manual", "category": category},
}
def main():
items: list[dict] = []
items += load_verifier_items()
n_verifier = len(items)
for u, a in ABSTAIN: items.append(make_3turn(SYS_GROUNDED, u, a, "abstain"))
for u, a in SCAFFOLD: items.append(make_3turn(SYS_SCAFFOLD, u, a, "scaffold"))
for u, a in GROUNDING: items.append(make_3turn(SYS_GROUNDED, u, a, "grounding"))
for u, a in NL2SQL: items.append(make_3turn(SYS_NL2SQL, u, a, "nl2sql"))
# placeholder 잔존
ph_idx = [i for i, it in enumerate(items)
if "PLACEHOLDER" in json.dumps(it, ensure_ascii=False)]
assert not ph_idx, f"placeholder remaining at: {ph_idx}"
# 스키마 검증
for i, it in enumerate(items):
msgs = it["messages"]
assert len(msgs) in (3, 5), f"line {i}: msgs len {len(msgs)}"
assert msgs[0]["role"] == "system"
assert msgs[1]["role"] == "user"
if len(msgs) == 3:
assert msgs[2]["role"] == "assistant"
else:
assert [m["role"] for m in msgs] == ["system","user","assistant","tool","assistant"]
# golden 누수 (exact + fuzzy)
g_qs = [json.loads(l)["question"] for l in GOLDEN.read_text(encoding="utf-8").splitlines() if l.strip()]
g_hash = {hashlib.md5(q.encode()).hexdigest() for q in g_qs}
leaks = []
for i, it in enumerate(items):
for m in it["messages"]:
if m.get("role") != "user": continue
c = m.get("content", "")
if hashlib.md5(c.encode()).hexdigest() in g_hash:
leaks.append((i, "exact", c[:40])); break
hit = False
for gq in g_qs:
if difflib.SequenceMatcher(None, c, gq).ratio() > 0.9:
leaks.append((i, "fuzzy", gq[:40], c[:40])); hit = True; break
if hit: break
assert not leaks, f"golden leaks: {leaks}"
# 출력
with SFT.open("w", encoding="utf-8") as f:
for it in items:
f.write(json.dumps(it, ensure_ascii=False) + "\n")
cats = Counter(it["_meta"]["category"] for it in items)
srcs = Counter(it["_meta"]["src"] for it in items)
avg_chars = sum(sum(len(m.get("content", "")) for m in it["messages"]) for it in items) / len(items)
lines = [
"# sft_data_stats (재작성본)", "",
f"- total: **{len(items)}**",
f"- verifier_log origin: **{n_verifier}** / manual augment: **{len(items)-n_verifier}**",
f"- categories: {dict(cats)}",
f"- sources: {dict(srcs)}",
f"- avg messages chars: {avg_chars:.0f}",
f"- placeholder remaining: **0**",
f"- golden leaks (exact + fuzzy>0.9): **0**",
f"- 5턴 (verifier_log): {n_verifier}, 3턴 (manual): {len(items)-n_verifier}",
]
STATS.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f"✓ wrote {SFT.name} ({len(items)} items)")
print(f" verifier_log: {n_verifier} / manual: {len(items)-n_verifier}")
print(f" categories: {dict(cats)}")
print(f" leaks: 0, placeholder: 0")
print(f" avg chars: {avg_chars:.0f}")
if __name__ == "__main__":
main()