#!/usr/bin/env python3 """5개 추출기 병렬 실행 성능 테스트 5개 추출기를 subprocess.Popen으로 동시에 실행하고, 전체 소요 시간과 개별 소요 시간을 측정합니다. 사용법: python test_parallel_extractors.py [--dxf PATH] """ import argparse import json import os import subprocess import sys import time from pathlib import Path try: import ezdxf HAS_EZDXF = True except ImportError: HAS_EZDXF = False WORKER_DIR = Path(__file__).parent / "mcp-server" / "worker" DEFAULT_DXF = Path(__file__).parent / "src" / "Web" / "uploads" / "pid" / "No-10_Plant_PID.dxf" EXTRACTORS = [ {"name": "sensor", "script": "pid_extract_sensor.py"}, {"name": "valve", "script": "pid_extract_valve.py"}, {"name": "system", "script": "pid_extract_system.py"}, {"name": "gauge", "script": "pid_extract_gauge.py"}, {"name": "pump", "script": "pid_extract_pump.py"}, ] REQUIRED_TOP_KEYS = {"success", "count", "tags", "processing_time_sec"} REQUIRED_TAG_KEYS = {"tagNo", "instrumentType", "confidence"} def extract_dxf_text(dxf_path: str) -> str: if not HAS_EZDXF: print("[ERROR] ezdxf가 설치되어 없습니다.") sys.exit(1) print(f"[INFO] DXF 로드 중: {dxf_path}") doc = ezdxf.readfile(dxf_path) modelspace = doc.modelspace() texts = [] for entity in modelspace: if entity.dxftype() in ("TEXT", "MTEXT"): try: text = entity.dxf.text if text and text.strip(): texts.append(text.strip()) except Exception: pass result = "\n".join(texts) print(f"[INFO] 텍스트 추출 완료: {len(texts)}개, {len(result)}자") return result def validate_result(result: dict, name: str) -> list: issues = [] if not isinstance(result, dict): issues.append(f"{name}: 결과가 dict 아님") return issues missing = REQUIRED_TOP_KEYS - set(result.keys()) if missing: issues.append(f"{name}: 누락된 키: {missing}") if "tags" in result and isinstance(result["tags"], list): for i, tag in enumerate(result["tags"]): if not isinstance(tag, dict): issues.append(f"{name}: tags[{i}] dict 아님") continue tag_missing = REQUIRED_TAG_KEYS - set(tag.keys()) if tag_missing: issues.append(f"{name}: tags[{i}] 누락: {tag_missing}") if "count" in result and isinstance(result.get("tags"), list): if result["count"] != len(result["tags"]): issues.append(f"{name}: count 불일치") return issues def main(): parser = argparse.ArgumentParser(description="5개 추출기 병렬 실행 성능 테스트") parser.add_argument("--dxf", default=str(DEFAULT_DXF), help="DXF 파일 경로") args = parser.parse_args() if not os.path.exists(args.dxf): print(f"[ERROR] DXF 없음: {args.dxf}") sys.exit(1) # 출력 디렉토리 output_dir = Path(__file__).parent / "test_parallel_output" output_dir.mkdir(exist_ok=True) # DXF 텍스트 추출 input_file = output_dir / "full_text.txt" print("[STEP 1] DXF 텍스트 추출") text = extract_dxf_text(args.dxf) with open(input_file, "w", encoding="utf-8") as f: f.write(text) # 5개 추출기 병렬 실행 print(f"\n[STEP 2] 5개 추출기 병렬 실행 시작") overall_start = time.time() processes = {} start_times = {} for ext in EXTRACTORS: output_file = output_dir / f"{ext['name']}.json" cmd = [ sys.executable, str(WORKER_DIR / ext["script"]), "--input", str(input_file), "--output", str(output_file), ] t0 = time.time() proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) processes[ext["name"]] = proc start_times[ext["name"]] = t0 print(f" [{ext['name']}] PID={proc.pid} 시작") # 모든 프로세스 대기 print(f"\n[INFO] 5개 프로세스 실행 중... 대기") overall_end_wait = None while processes: time.sleep(2) for name, proc in list(processes.items()): ret = proc.poll() if ret is not None: elapsed = time.time() - start_times[name] if overall_end_wait is None: overall_end_wait = time.time() print(f" [{name}] 종료 (rc={ret}, {elapsed:.1f}초)") # stdout 확인 proc.wait() if proc.stdout: stdout_text = proc.stdout.read().strip() if stdout_text: print(f" stdout: {stdout_text[:200]}") if proc.stderr: stderr_text = proc.stderr.read().strip() for line in stderr_text.split("\n"): if "INFO" in line or "WARN" in line or "ERROR" in line: print(f" log: {line.strip()[:150]}") del processes[name] total_elapsed = time.time() - overall_start if overall_end_wait: last_to_first = overall_end_wait - overall_start print(f"\n[INFO] 첫 번째 완료: {last_to_first:.1f}초") print(f"[INFO] 전체 소요 시간: {total_elapsed:.1f}초") # 결과 검증 print(f"\n[STEP 3] 결과 검증") all_issues = [] results_summary = [] for ext in EXTRACTORS: output_file = output_dir / f"{ext['name']}.json" name = ext["name"] if not output_file.exists(): all_issues.append(f"{name}: 출력 파일 없음") results_summary.append({"name": name, "status": "FAIL", "reason": "파일 없음"}) continue try: with open(output_file, "r", encoding="utf-8") as f: data = json.load(f) except json.JSONDecodeError as e: all_issues.append(f"{name}: JSON 파싱 실패: {e}") results_summary.append({"name": name, "status": "FAIL", "reason": "JSON 오류"}) continue issues = validate_result(data, name) all_issues.extend(issues) tag_count = data.get("count", 0) proc_time = data.get("processing_time_sec", 0) status = "PASS" if not issues else "WARN" results_summary.append({ "name": name, "status": status, "tags": tag_count, "time_sec": proc_time, "issues": len(issues), }) # 샘플 태그 tags = data.get("tags", []) print(f"\n [{name}] {status} — {tag_count}개 태그, {proc_time}초") if tags: print(f" 샘플: {tags[0]}") if issues: for issue in issues[:3]: print(f" ⚠ {issue}") # 요약 print(f"\n{'='*60}") print("[RESULT] 병렬 실행 결과 요약") print(f"{'='*60}") print(f" 전체 소요 시간: {total_elapsed:.1f}초") print(f" 추출기 수: {len(EXTRACTORS)}개") total_tags = sum(r.get("tags", 0) for r in results_summary if isinstance(r, dict)) print(f" 총 추출 태그: {total_tags}개") passed = sum(1 for r in results_summary if r.get("status") == "PASS") print(f" 통과: {passed}/{len(results_summary)}") if all_issues: print(f"\n 문제 ({len(all_issues)}개):") for issue in all_issues[:10]: print(f" - {issue}") else: print("\n ✅ 모든 검증 통과!") print(f"\n 출력: {output_dir}/") # 순차 실행과 비교 estimated_sequential = sum(r.get("time_sec", 0) for r in results_summary if isinstance(r, dict)) if estimated_sequential > 0: ratio = total_elapsed / estimated_sequential print(f"\n [비교] 순차 실행 예상: {estimated_sequential:.1f}초") print(f" [비교] 병렬 실행 실제: {total_elapsed:.1f}초") print(f" [비교] 속도비: {ratio:.2f}x (1.0 = 순차와 동일, <1.0 = 병렬이 빠름)") if all_issues: sys.exit(1) if __name__ == "__main__": main()