Files
ExperionCrawler/test_individual_extractors.py
2026-05-08 17:22:10 +09:00

297 lines
9.9 KiB
Python

#!/usr/bin/env python3
"""개별 추출기 단독 테스트
5개 추출기 각각을 단독으로 실행하고, 출력 JSON schema를 검증합니다.
사용법:
python test_individual_extractors.py [--dxf PATH]
--dxf: 테스트용 DXF 파일 경로 (기본: src/Web/uploads/pid/No-10_Plant_PID.dxf)
"""
import argparse
import json
import os
import subprocess
import sys
import time
from pathlib import Path
# DXF 텍스트 추출을 위해 ezdxf 사용
try:
import ezdxf
HAS_EZDXF = True
except ImportError:
HAS_EZDXF = False
WORKER_DIR = Path(__file__).parent / "mcp-server" / "worker"
DEFAULT_DXF = Path(__file__).parent / "src" / "Web" / "uploads" / "pid" / "No-10_Plant_PID.dxf"
# 추출기 목록
EXTRACTORS = [
{"name": "sensor", "script": "pid_extract_sensor.py"},
{"name": "valve", "script": "pid_extract_valve.py"},
{"name": "system", "script": "pid_extract_system.py"},
{"name": "gauge", "script": "pid_extract_gauge.py"},
{"name": "pump", "script": "pid_extract_pump.py"},
]
# JSON schema 검증 기준
REQUIRED_TOP_KEYS = {"success", "count", "tags", "processing_time_sec"}
REQUIRED_TAG_KEYS = {"tagNo", "instrumentType", "confidence"}
def extract_dxf_text(dxf_path: str) -> str:
"""DXF 파일에서 TEXT/MTEXT 추출."""
if not HAS_EZDXF:
print("[ERROR] ezdxf가 설치되어 없습니다. pip install ezdxf")
sys.exit(1)
print(f"[INFO] DXF 로드 중: {dxf_path}")
doc = ezdxf.readfile(dxf_path)
modelspace = doc.modelspace()
texts = []
for entity in modelspace:
if entity.dxftype() in ("TEXT", "MTEXT"):
try:
text = entity.dxf.text
if text and text.strip():
texts.append(text.strip())
except Exception:
pass
result = "\n".join(texts)
print(f"[INFO] 텍스트 추출 완료: {len(texts)}개 텍스트 엔티티, {len(result)}")
return result
def validate_json_schema(result: dict, extractor_name: str) -> list:
"""JSON schema 검증. 문제 목록 반환."""
issues = []
# 최상위 키 검증
if not isinstance(result, dict):
issues.append(f"{extractor_name}: 결과가 dict가 아님: {type(result)}")
return issues
missing_keys = REQUIRED_TOP_KEYS - set(result.keys())
if missing_keys:
issues.append(f"{extractor_name}: 누락된 최상위 키: {missing_keys}")
# success 필드
if "success" in result and result["success"] is not True:
issues.append(f"{extractor_name}: success가 True가 아님: {result.get('success')}")
# count 필드
if "count" in result:
if not isinstance(result["count"], int):
issues.append(f"{extractor_name}: count가 int가 아님: {type(result['count'])}")
elif result["count"] != len(result.get("tags", [])):
issues.append(f"{extractor_name}: count({result['count']}) != tags.length({len(result.get('tags', []))})")
# tags 필드
if "tags" in result:
if not isinstance(result["tags"], list):
issues.append(f"{extractor_name}: tags가 list가 아님: {type(result['tags'])}")
else:
for i, tag in enumerate(result["tags"]):
if not isinstance(tag, dict):
issues.append(f"{extractor_name}: tags[{i}]가 dict가 아님")
continue
missing_tag_keys = REQUIRED_TAG_KEYS - set(tag.keys())
if missing_tag_keys:
issues.append(f"{extractor_name}: tags[{i}] 누락된 키: {missing_tag_keys}")
# confidence 범위
if "confidence" in tag:
conf = tag["confidence"]
if isinstance(conf, (int, float)):
if not (0 <= conf <= 1):
issues.append(f"{extractor_name}: tags[{i}].confidence 범위 초과: {conf}")
# processing_time_sec
if "processing_time_sec" in result:
if not isinstance(result["processing_time_sec"], (int, float)):
issues.append(f"{extractor_name}: processing_time_sec가 숫자가 아님")
return issues
def run_extractor(extractor: dict, input_file: str, output_file: str) -> dict:
"""단일 추출기 실행."""
script_path = WORKER_DIR / extractor["script"]
name = extractor["name"]
print(f"\n{'='*60}")
print(f"[TEST] {name} 추출기 실행")
print(f" 스크립트: {script_path}")
print(f" 입력: {input_file}")
print(f" 출력: {output_file}")
print(f"{'='*60}")
cmd = [
sys.executable, str(script_path),
"--input", input_file,
"--output", output_file,
]
t0 = time.time()
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300,
cwd=str(WORKER_DIR),
)
elapsed = time.time() - t0
print(f"[INFO] 종료 코드: {result.returncode}, 소요: {elapsed:.1f}")
if result.stdout.strip():
print(f"[STDOUT] {result.stdout.strip()}")
if result.stderr.strip():
# 로깅 출력은 INFO 레벨만 표시
for line in result.stderr.strip().split("\n"):
if "INFO" in line or "WARN" in line:
print(f"[LOG] {line.strip()}")
return {
"name": name,
"success": result.returncode == 0,
"returncode": result.returncode,
"elapsed": round(elapsed, 1),
"stdout": result.stdout,
"stderr": result.stderr,
"output_file": output_file,
}
except subprocess.TimeoutExpired:
elapsed = time.time() - t0
print(f"[ERROR] 타임아웃 ({elapsed:.1f}초)")
return {
"name": name,
"success": False,
"returncode": -1,
"elapsed": round(elapsed, 1),
"stdout": "",
"stderr": "Timeout",
"output_file": output_file,
}
except Exception as e:
elapsed = time.time() - t0
print(f"[ERROR] 예외: {e}")
return {
"name": name,
"success": False,
"returncode": -1,
"elapsed": round(elapsed, 1),
"stdout": "",
"stderr": str(e),
"output_file": output_file,
}
def main():
parser = argparse.ArgumentParser(description="개별 추출기 단독 테스트")
parser.add_argument("--dxf", default=str(DEFAULT_DXF), help="테스트용 DXF 파일 경로")
parser.add_argument("--dry-run", action="store_true", help="DXF 텍스트 추출만 하고 추출기 실행 생략")
args = parser.parse_args()
dxf_path = args.dxf
if not os.path.exists(dxf_path):
print(f"[ERROR] DXF 파일을 찾을 수 없습니다: {dxf_path}")
sys.exit(1)
# 임시 디렉토리 생성
temp_dir = Path(__file__).parent / "test_extractor_output"
temp_dir.mkdir(exist_ok=True)
# DXF 텍스트 추출
input_file = temp_dir / "full_text.txt"
print("[STEP 1] DXF에서 텍스트 추출")
text = extract_dxf_text(dxf_path)
with open(input_file, "w", encoding="utf-8") as f:
f.write(text)
print(f"[INFO] 입력 파일 저장: {input_file}")
if args.dry_run:
print("[INFO] dry-run 모드: 추출기 실행 생략")
return
# 추출기 실행
print("\n[STEP 2] 5개 추출기 개별 실행")
results = []
all_issues = []
for extractor in EXTRACTORS:
output_file = temp_dir / f"{extractor['name']}.json"
result = run_extractor(extractor, str(input_file), str(output_file))
results.append(result)
# JSON schema 검증
if result["success"] and os.path.exists(output_file):
print(f"\n[VALIDATE] {extractor['name']} JSON schema 검증")
with open(output_file, "r", encoding="utf-8") as f:
try:
data = json.load(f)
issues = validate_json_schema(data, extractor["name"])
all_issues.extend(issues)
if issues:
print(f" [FAIL] {len(issues)}개 문제 발견:")
for issue in issues:
print(f" - {issue}")
else:
tag_count = data.get("count", 0)
print(f" [PASS] schema 검증 통과 — {tag_count}개 태그 추출")
# 샘플 태그 출력
tags = data.get("tags", [])
if tags:
print(f" 샘플 태그 (최대 3개):")
for tag in tags[:3]:
print(f" - {tag}")
except json.JSONDecodeError as e:
issue = f"{extractor['name']}: JSON 파싱 실패: {e}"
all_issues.append(issue)
print(f" [FAIL] {issue}")
else:
issue = f"{extractor['name']}: 추출기 실행 실패 (rc={result['returncode']})"
all_issues.append(issue)
print(f" [FAIL] {issue}")
# 결과 요약
print(f"\n{'='*60}")
print("[RESULT] 테스트 결과 요약")
print(f"{'='*60}")
passed = sum(1 for r in results if r["success"])
failed = len(results) - passed
for r in results:
status = "PASS" if r["success"] else "FAIL"
print(f" [{status}] {r['name']}: {r['elapsed']}")
print(f"\n 총: {len(results)}개 중 {passed}개 통과, {failed}개 실패")
if all_issues:
print(f"\n 발견된 문제 ({len(all_issues)}개):")
for issue in all_issues:
print(f" - {issue}")
else:
print("\n 모든 schema 검증 통과!")
print(f"\n 출력 파일: {temp_dir}/")
print(f" ('rm -rf {temp_dir}')로 정리 가능")
# 종료 코드
if all_issues:
sys.exit(1)
if __name__ == "__main__":
main()