루트 파일 정리: - DXF/P&ID 관련 → dxf-graph/ - fastTable 관련 → fastTable/ - plan/ → plans/ 통합 (최신 버전 유지) - 테스트 출력 파일, 구버전 프로젝트 삭제 - 불필요한 루트 문서 삭제
297 lines
9.9 KiB
Python
297 lines
9.9 KiB
Python
#!/usr/bin/env python3
|
|
"""개별 추출기 단독 테스트
|
|
|
|
5개 추출기 각각을 단독으로 실행하고, 출력 JSON schema를 검증합니다.
|
|
|
|
사용법:
|
|
python test_individual_extractors.py [--dxf PATH]
|
|
|
|
--dxf: 테스트용 DXF 파일 경로 (기본: src/Web/uploads/pid/No-10_Plant_PID.dxf)
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# DXF 텍스트 추출을 위해 ezdxf 사용
|
|
try:
|
|
import ezdxf
|
|
HAS_EZDXF = True
|
|
except ImportError:
|
|
HAS_EZDXF = False
|
|
|
|
WORKER_DIR = Path(__file__).parent / "mcp-server" / "worker"
|
|
DEFAULT_DXF = Path(__file__).parent / "src" / "Web" / "uploads" / "pid" / "No-10_Plant_PID.dxf"
|
|
|
|
# 추출기 목록
|
|
EXTRACTORS = [
|
|
{"name": "sensor", "script": "pid_extract_sensor.py"},
|
|
{"name": "valve", "script": "pid_extract_valve.py"},
|
|
{"name": "system", "script": "pid_extract_system.py"},
|
|
{"name": "gauge", "script": "pid_extract_gauge.py"},
|
|
{"name": "pump", "script": "pid_extract_pump.py"},
|
|
]
|
|
|
|
# JSON schema 검증 기준
|
|
REQUIRED_TOP_KEYS = {"success", "count", "tags", "processing_time_sec"}
|
|
REQUIRED_TAG_KEYS = {"tagNo", "instrumentType", "confidence"}
|
|
|
|
|
|
def extract_dxf_text(dxf_path: str) -> str:
|
|
"""DXF 파일에서 TEXT/MTEXT 추출."""
|
|
if not HAS_EZDXF:
|
|
print("[ERROR] ezdxf가 설치되어 없습니다. pip install ezdxf")
|
|
sys.exit(1)
|
|
|
|
print(f"[INFO] DXF 로드 중: {dxf_path}")
|
|
doc = ezdxf.readfile(dxf_path)
|
|
modelspace = doc.modelspace()
|
|
|
|
texts = []
|
|
for entity in modelspace:
|
|
if entity.dxftype() in ("TEXT", "MTEXT"):
|
|
try:
|
|
text = entity.dxf.text
|
|
if text and text.strip():
|
|
texts.append(text.strip())
|
|
except Exception:
|
|
pass
|
|
|
|
result = "\n".join(texts)
|
|
print(f"[INFO] 텍스트 추출 완료: {len(texts)}개 텍스트 엔티티, {len(result)}자")
|
|
return result
|
|
|
|
|
|
def validate_json_schema(result: dict, extractor_name: str) -> list:
|
|
"""JSON schema 검증. 문제 목록 반환."""
|
|
issues = []
|
|
|
|
# 최상위 키 검증
|
|
if not isinstance(result, dict):
|
|
issues.append(f"{extractor_name}: 결과가 dict가 아님: {type(result)}")
|
|
return issues
|
|
|
|
missing_keys = REQUIRED_TOP_KEYS - set(result.keys())
|
|
if missing_keys:
|
|
issues.append(f"{extractor_name}: 누락된 최상위 키: {missing_keys}")
|
|
|
|
# success 필드
|
|
if "success" in result and result["success"] is not True:
|
|
issues.append(f"{extractor_name}: success가 True가 아님: {result.get('success')}")
|
|
|
|
# count 필드
|
|
if "count" in result:
|
|
if not isinstance(result["count"], int):
|
|
issues.append(f"{extractor_name}: count가 int가 아님: {type(result['count'])}")
|
|
elif result["count"] != len(result.get("tags", [])):
|
|
issues.append(f"{extractor_name}: count({result['count']}) != tags.length({len(result.get('tags', []))})")
|
|
|
|
# tags 필드
|
|
if "tags" in result:
|
|
if not isinstance(result["tags"], list):
|
|
issues.append(f"{extractor_name}: tags가 list가 아님: {type(result['tags'])}")
|
|
else:
|
|
for i, tag in enumerate(result["tags"]):
|
|
if not isinstance(tag, dict):
|
|
issues.append(f"{extractor_name}: tags[{i}]가 dict가 아님")
|
|
continue
|
|
|
|
missing_tag_keys = REQUIRED_TAG_KEYS - set(tag.keys())
|
|
if missing_tag_keys:
|
|
issues.append(f"{extractor_name}: tags[{i}] 누락된 키: {missing_tag_keys}")
|
|
|
|
# confidence 범위
|
|
if "confidence" in tag:
|
|
conf = tag["confidence"]
|
|
if isinstance(conf, (int, float)):
|
|
if not (0 <= conf <= 1):
|
|
issues.append(f"{extractor_name}: tags[{i}].confidence 범위 초과: {conf}")
|
|
|
|
# processing_time_sec
|
|
if "processing_time_sec" in result:
|
|
if not isinstance(result["processing_time_sec"], (int, float)):
|
|
issues.append(f"{extractor_name}: processing_time_sec가 숫자가 아님")
|
|
|
|
return issues
|
|
|
|
|
|
def run_extractor(extractor: dict, input_file: str, output_file: str) -> dict:
|
|
"""단일 추출기 실행."""
|
|
script_path = WORKER_DIR / extractor["script"]
|
|
name = extractor["name"]
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"[TEST] {name} 추출기 실행")
|
|
print(f" 스크립트: {script_path}")
|
|
print(f" 입력: {input_file}")
|
|
print(f" 출력: {output_file}")
|
|
print(f"{'='*60}")
|
|
|
|
cmd = [
|
|
sys.executable, str(script_path),
|
|
"--input", input_file,
|
|
"--output", output_file,
|
|
]
|
|
|
|
t0 = time.time()
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=300,
|
|
cwd=str(WORKER_DIR),
|
|
)
|
|
elapsed = time.time() - t0
|
|
|
|
print(f"[INFO] 종료 코드: {result.returncode}, 소요: {elapsed:.1f}초")
|
|
|
|
if result.stdout.strip():
|
|
print(f"[STDOUT] {result.stdout.strip()}")
|
|
if result.stderr.strip():
|
|
# 로깅 출력은 INFO 레벨만 표시
|
|
for line in result.stderr.strip().split("\n"):
|
|
if "INFO" in line or "WARN" in line:
|
|
print(f"[LOG] {line.strip()}")
|
|
|
|
return {
|
|
"name": name,
|
|
"success": result.returncode == 0,
|
|
"returncode": result.returncode,
|
|
"elapsed": round(elapsed, 1),
|
|
"stdout": result.stdout,
|
|
"stderr": result.stderr,
|
|
"output_file": output_file,
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
elapsed = time.time() - t0
|
|
print(f"[ERROR] 타임아웃 ({elapsed:.1f}초)")
|
|
return {
|
|
"name": name,
|
|
"success": False,
|
|
"returncode": -1,
|
|
"elapsed": round(elapsed, 1),
|
|
"stdout": "",
|
|
"stderr": "Timeout",
|
|
"output_file": output_file,
|
|
}
|
|
except Exception as e:
|
|
elapsed = time.time() - t0
|
|
print(f"[ERROR] 예외: {e}")
|
|
return {
|
|
"name": name,
|
|
"success": False,
|
|
"returncode": -1,
|
|
"elapsed": round(elapsed, 1),
|
|
"stdout": "",
|
|
"stderr": str(e),
|
|
"output_file": output_file,
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="개별 추출기 단독 테스트")
|
|
parser.add_argument("--dxf", default=str(DEFAULT_DXF), help="테스트용 DXF 파일 경로")
|
|
parser.add_argument("--dry-run", action="store_true", help="DXF 텍스트 추출만 하고 추출기 실행 생략")
|
|
args = parser.parse_args()
|
|
|
|
dxf_path = args.dxf
|
|
if not os.path.exists(dxf_path):
|
|
print(f"[ERROR] DXF 파일을 찾을 수 없습니다: {dxf_path}")
|
|
sys.exit(1)
|
|
|
|
# 임시 디렉토리 생성
|
|
temp_dir = Path(__file__).parent / "test_extractor_output"
|
|
temp_dir.mkdir(exist_ok=True)
|
|
|
|
# DXF 텍스트 추출
|
|
input_file = temp_dir / "full_text.txt"
|
|
print("[STEP 1] DXF에서 텍스트 추출")
|
|
text = extract_dxf_text(dxf_path)
|
|
with open(input_file, "w", encoding="utf-8") as f:
|
|
f.write(text)
|
|
print(f"[INFO] 입력 파일 저장: {input_file}")
|
|
|
|
if args.dry_run:
|
|
print("[INFO] dry-run 모드: 추출기 실행 생략")
|
|
return
|
|
|
|
# 추출기 실행
|
|
print("\n[STEP 2] 5개 추출기 개별 실행")
|
|
results = []
|
|
all_issues = []
|
|
|
|
for extractor in EXTRACTORS:
|
|
output_file = temp_dir / f"{extractor['name']}.json"
|
|
result = run_extractor(extractor, str(input_file), str(output_file))
|
|
results.append(result)
|
|
|
|
# JSON schema 검증
|
|
if result["success"] and os.path.exists(output_file):
|
|
print(f"\n[VALIDATE] {extractor['name']} JSON schema 검증")
|
|
with open(output_file, "r", encoding="utf-8") as f:
|
|
try:
|
|
data = json.load(f)
|
|
issues = validate_json_schema(data, extractor["name"])
|
|
all_issues.extend(issues)
|
|
|
|
if issues:
|
|
print(f" [FAIL] {len(issues)}개 문제 발견:")
|
|
for issue in issues:
|
|
print(f" - {issue}")
|
|
else:
|
|
tag_count = data.get("count", 0)
|
|
print(f" [PASS] schema 검증 통과 — {tag_count}개 태그 추출")
|
|
|
|
# 샘플 태그 출력
|
|
tags = data.get("tags", [])
|
|
if tags:
|
|
print(f" 샘플 태그 (최대 3개):")
|
|
for tag in tags[:3]:
|
|
print(f" - {tag}")
|
|
except json.JSONDecodeError as e:
|
|
issue = f"{extractor['name']}: JSON 파싱 실패: {e}"
|
|
all_issues.append(issue)
|
|
print(f" [FAIL] {issue}")
|
|
else:
|
|
issue = f"{extractor['name']}: 추출기 실행 실패 (rc={result['returncode']})"
|
|
all_issues.append(issue)
|
|
print(f" [FAIL] {issue}")
|
|
|
|
# 결과 요약
|
|
print(f"\n{'='*60}")
|
|
print("[RESULT] 테스트 결과 요약")
|
|
print(f"{'='*60}")
|
|
|
|
passed = sum(1 for r in results if r["success"])
|
|
failed = len(results) - passed
|
|
|
|
for r in results:
|
|
status = "PASS" if r["success"] else "FAIL"
|
|
print(f" [{status}] {r['name']}: {r['elapsed']}초")
|
|
|
|
print(f"\n 총: {len(results)}개 중 {passed}개 통과, {failed}개 실패")
|
|
|
|
if all_issues:
|
|
print(f"\n 발견된 문제 ({len(all_issues)}개):")
|
|
for issue in all_issues:
|
|
print(f" - {issue}")
|
|
else:
|
|
print("\n 모든 schema 검증 통과!")
|
|
|
|
print(f"\n 출력 파일: {temp_dir}/")
|
|
print(f" ('rm -rf {temp_dir}')로 정리 가능")
|
|
|
|
# 종료 코드
|
|
if all_issues:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|