250 lines
8.1 KiB
Python
250 lines
8.1 KiB
Python
#!/usr/bin/env python3
|
|
"""5개 추출기 병렬 실행 성능 테스트
|
|
|
|
5개 추출기를 subprocess.Popen으로 동시에 실행하고,
|
|
전체 소요 시간과 개별 소요 시간을 측정합니다.
|
|
|
|
사용법:
|
|
python test_parallel_extractors.py [--dxf PATH]
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import ezdxf
|
|
HAS_EZDXF = True
|
|
except ImportError:
|
|
HAS_EZDXF = False
|
|
|
|
WORKER_DIR = Path(__file__).parent / "mcp-server" / "worker"
|
|
DEFAULT_DXF = Path(__file__).parent / "src" / "Web" / "uploads" / "pid" / "No-10_Plant_PID.dxf"
|
|
|
|
EXTRACTORS = [
|
|
{"name": "sensor", "script": "pid_extract_sensor.py"},
|
|
{"name": "valve", "script": "pid_extract_valve.py"},
|
|
{"name": "system", "script": "pid_extract_system.py"},
|
|
{"name": "gauge", "script": "pid_extract_gauge.py"},
|
|
{"name": "pump", "script": "pid_extract_pump.py"},
|
|
]
|
|
|
|
REQUIRED_TOP_KEYS = {"success", "count", "tags", "processing_time_sec"}
|
|
REQUIRED_TAG_KEYS = {"tagNo", "instrumentType", "confidence"}
|
|
|
|
|
|
def extract_dxf_text(dxf_path: str) -> str:
|
|
if not HAS_EZDXF:
|
|
print("[ERROR] ezdxf가 설치되어 없습니다.")
|
|
sys.exit(1)
|
|
|
|
print(f"[INFO] DXF 로드 중: {dxf_path}")
|
|
doc = ezdxf.readfile(dxf_path)
|
|
modelspace = doc.modelspace()
|
|
|
|
texts = []
|
|
for entity in modelspace:
|
|
if entity.dxftype() in ("TEXT", "MTEXT"):
|
|
try:
|
|
text = entity.dxf.text
|
|
if text and text.strip():
|
|
texts.append(text.strip())
|
|
except Exception:
|
|
pass
|
|
|
|
result = "\n".join(texts)
|
|
print(f"[INFO] 텍스트 추출 완료: {len(texts)}개, {len(result)}자")
|
|
return result
|
|
|
|
|
|
def validate_result(result: dict, name: str) -> list:
|
|
issues = []
|
|
if not isinstance(result, dict):
|
|
issues.append(f"{name}: 결과가 dict 아님")
|
|
return issues
|
|
|
|
missing = REQUIRED_TOP_KEYS - set(result.keys())
|
|
if missing:
|
|
issues.append(f"{name}: 누락된 키: {missing}")
|
|
|
|
if "tags" in result and isinstance(result["tags"], list):
|
|
for i, tag in enumerate(result["tags"]):
|
|
if not isinstance(tag, dict):
|
|
issues.append(f"{name}: tags[{i}] dict 아님")
|
|
continue
|
|
tag_missing = REQUIRED_TAG_KEYS - set(tag.keys())
|
|
if tag_missing:
|
|
issues.append(f"{name}: tags[{i}] 누락: {tag_missing}")
|
|
|
|
if "count" in result and isinstance(result.get("tags"), list):
|
|
if result["count"] != len(result["tags"]):
|
|
issues.append(f"{name}: count 불일치")
|
|
|
|
return issues
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="5개 추출기 병렬 실행 성능 테스트")
|
|
parser.add_argument("--dxf", default=str(DEFAULT_DXF), help="DXF 파일 경로")
|
|
args = parser.parse_args()
|
|
|
|
if not os.path.exists(args.dxf):
|
|
print(f"[ERROR] DXF 없음: {args.dxf}")
|
|
sys.exit(1)
|
|
|
|
# 출력 디렉토리
|
|
output_dir = Path(__file__).parent / "test_parallel_output"
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
# DXF 텍스트 추출
|
|
input_file = output_dir / "full_text.txt"
|
|
print("[STEP 1] DXF 텍스트 추출")
|
|
text = extract_dxf_text(args.dxf)
|
|
with open(input_file, "w", encoding="utf-8") as f:
|
|
f.write(text)
|
|
|
|
# 5개 추출기 병렬 실행
|
|
print(f"\n[STEP 2] 5개 추출기 병렬 실행 시작")
|
|
overall_start = time.time()
|
|
|
|
processes = {}
|
|
start_times = {}
|
|
|
|
for ext in EXTRACTORS:
|
|
output_file = output_dir / f"{ext['name']}.json"
|
|
cmd = [
|
|
sys.executable, str(WORKER_DIR / ext["script"]),
|
|
"--input", str(input_file),
|
|
"--output", str(output_file),
|
|
]
|
|
t0 = time.time()
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
processes[ext["name"]] = proc
|
|
start_times[ext["name"]] = t0
|
|
print(f" [{ext['name']}] PID={proc.pid} 시작")
|
|
|
|
# 모든 프로세스 대기
|
|
print(f"\n[INFO] 5개 프로세스 실행 중... 대기")
|
|
overall_end_wait = None
|
|
|
|
while processes:
|
|
time.sleep(2)
|
|
for name, proc in list(processes.items()):
|
|
ret = proc.poll()
|
|
if ret is not None:
|
|
elapsed = time.time() - start_times[name]
|
|
if overall_end_wait is None:
|
|
overall_end_wait = time.time()
|
|
print(f" [{name}] 종료 (rc={ret}, {elapsed:.1f}초)")
|
|
|
|
# stdout 확인
|
|
proc.wait()
|
|
if proc.stdout:
|
|
stdout_text = proc.stdout.read().strip()
|
|
if stdout_text:
|
|
print(f" stdout: {stdout_text[:200]}")
|
|
if proc.stderr:
|
|
stderr_text = proc.stderr.read().strip()
|
|
for line in stderr_text.split("\n"):
|
|
if "INFO" in line or "WARN" in line or "ERROR" in line:
|
|
print(f" log: {line.strip()[:150]}")
|
|
|
|
del processes[name]
|
|
|
|
total_elapsed = time.time() - overall_start
|
|
if overall_end_wait:
|
|
last_to_first = overall_end_wait - overall_start
|
|
print(f"\n[INFO] 첫 번째 완료: {last_to_first:.1f}초")
|
|
print(f"[INFO] 전체 소요 시간: {total_elapsed:.1f}초")
|
|
|
|
# 결과 검증
|
|
print(f"\n[STEP 3] 결과 검증")
|
|
all_issues = []
|
|
results_summary = []
|
|
|
|
for ext in EXTRACTORS:
|
|
output_file = output_dir / f"{ext['name']}.json"
|
|
name = ext["name"]
|
|
|
|
if not output_file.exists():
|
|
all_issues.append(f"{name}: 출력 파일 없음")
|
|
results_summary.append({"name": name, "status": "FAIL", "reason": "파일 없음"})
|
|
continue
|
|
|
|
try:
|
|
with open(output_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
except json.JSONDecodeError as e:
|
|
all_issues.append(f"{name}: JSON 파싱 실패: {e}")
|
|
results_summary.append({"name": name, "status": "FAIL", "reason": "JSON 오류"})
|
|
continue
|
|
|
|
issues = validate_result(data, name)
|
|
all_issues.extend(issues)
|
|
|
|
tag_count = data.get("count", 0)
|
|
proc_time = data.get("processing_time_sec", 0)
|
|
status = "PASS" if not issues else "WARN"
|
|
results_summary.append({
|
|
"name": name,
|
|
"status": status,
|
|
"tags": tag_count,
|
|
"time_sec": proc_time,
|
|
"issues": len(issues),
|
|
})
|
|
|
|
# 샘플 태그
|
|
tags = data.get("tags", [])
|
|
print(f"\n [{name}] {status} — {tag_count}개 태그, {proc_time}초")
|
|
if tags:
|
|
print(f" 샘플: {tags[0]}")
|
|
if issues:
|
|
for issue in issues[:3]:
|
|
print(f" ⚠ {issue}")
|
|
|
|
# 요약
|
|
print(f"\n{'='*60}")
|
|
print("[RESULT] 병렬 실행 결과 요약")
|
|
print(f"{'='*60}")
|
|
print(f" 전체 소요 시간: {total_elapsed:.1f}초")
|
|
print(f" 추출기 수: {len(EXTRACTORS)}개")
|
|
|
|
total_tags = sum(r.get("tags", 0) for r in results_summary if isinstance(r, dict))
|
|
print(f" 총 추출 태그: {total_tags}개")
|
|
|
|
passed = sum(1 for r in results_summary if r.get("status") == "PASS")
|
|
print(f" 통과: {passed}/{len(results_summary)}")
|
|
|
|
if all_issues:
|
|
print(f"\n 문제 ({len(all_issues)}개):")
|
|
for issue in all_issues[:10]:
|
|
print(f" - {issue}")
|
|
else:
|
|
print("\n ✅ 모든 검증 통과!")
|
|
|
|
print(f"\n 출력: {output_dir}/")
|
|
|
|
# 순차 실행과 비교
|
|
estimated_sequential = sum(r.get("time_sec", 0) for r in results_summary if isinstance(r, dict))
|
|
if estimated_sequential > 0:
|
|
ratio = total_elapsed / estimated_sequential
|
|
print(f"\n [비교] 순차 실행 예상: {estimated_sequential:.1f}초")
|
|
print(f" [비교] 병렬 실행 실제: {total_elapsed:.1f}초")
|
|
print(f" [비교] 속도비: {ratio:.2f}x (1.0 = 순차와 동일, <1.0 = 병렬이 빠름)")
|
|
|
|
if all_issues:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|