Files
ExperionCrawler/.rooBackup/2026-05-02_pipeline_sync/Graph_Pipeline_Phase3.md

212 lines
11 KiB
Markdown

# 🧠 Graph Pipeline Phase 3: 지능형 매핑 및 검증 (Intelligent Mapping & Validation)
이 문서는 P&ID Graph Pipeline의 세 번째 단계인 **지능형 매핑 및 검증**의 상세 구현 계획을 다룹니다. 2단계에서 구축한 위상 그래프(Topology Graph)를 활용하여, 도면 상의 가상 노드들을 실제 Experion 시스템의 **실시간 태그(Real-time Tags)**와 정밀하게 연결하고 그 타당성을 검증하는 것이 목표입니다.
---
## 🚩 [Supervisor's Audit] 감독자 진단 결과 및 수정 사항
본 프로그램 설계에 대해 감독자 관점에서 정밀 진단을 수행하였으며, 다음과 같은 취약점과 개선 사항을 발견하여 반영하였습니다.
### 1. 진단 결과 (Audit Findings)
| 항목 | 진단 내용 | 심각도 | 수정 방향 |
|---|---|---|---|
| **에러 처리** | LLM 응답이 JSON 형식이 아니거나 `UNKNOWN`일 때의 예외 처리 로직 부족 | HIGH | 구조화된 출력(JSON) 강제 및 Fallback 전략 추가 |
| **성능/비용** | 모든 노드에 대해 개별 LLM 호출 시 API 비용 급증 및 속도 저하 | MED | 배치(Batch) 처리 및 1차 필터링 강화 |
| **검증 정밀도** | 단순 키워드 매칭 기반 검증은 오탐(False Positive) 가능성이 높음 | MED | 데이터 타입 및 엔지니어링 유닛(EU)의 엄격한 비교 로직 추가 |
| **데이터 정합성** | 매핑 결과의 이력 관리 및 사람이 수동으로 수정할 수 있는 피드백 루프 부재 | LOW | 매핑 결과 저장 스키마에 `confidence``manual_override` 필드 추가 |
### 2. 수정 이유 (Rationale)
- **안정성 확보:** LLM은 비결정론적 특성이 있으므로, 프로그램이 런타임에 중단되지 않도록 Pydantic을 이용한 엄격한 스키마 검증이 필수적입니다.
- **효율성 최적화:** 수천 개의 태그를 개별 호출하는 것은 비효율적입니다. 유사도 기반으로 후보군을 좁히고, 유사 그룹을 묶어 배치 처리함으로써 비용을 절감합니다.
- **신뢰도 향상:** 단순 텍스트 매칭을 넘어 실제 시스템의 메타데이터(Unit, Range 등)를 교차 검증해야 엔지니어링 관점에서 신뢰할 수 있는 결과가 됩니다.
---
## 📦 1. 필수 패키지 및 환경 설정
### 1.1 Python 패키지
| 패키지 | 용도 | 비고 |
|---|---|---|
| `openai` / `langchain` | LLM API 연동 및 프롬프트 체이닝 | 매핑 추론 및 검증 핵심 |
| `fuzzywuzzy` / `rapidfuzz` | 태그 이름 간의 문자열 유사도 계산 | 1차 후보군 추출용 |
| `networkx` | 그래프 기반 인접 노드(Context) 추출 | 2단계 그래프 활용 |
| `pydantic` | 매핑 결과의 구조화 및 유효성 검사 | **[강화]** 데이터 정규화 및 런타임 타입 체크 |
| `requests` | ExperionCrawler API (C#)와 통신 | 실제 태그 리스트 조회 |
### 1.2 설치 명령어
```bash
pip install openai langchain rapidfuzz networkx pydantic requests
```
---
## 📐 2. 상세 설계 구조
### 2.1 매핑 파이프라인 (Mapping Pipeline)
단순 이름 매칭의 한계를 극복하기 위해 **[후보 추출 $\rightarrow$ 맥락 분석 $\rightarrow$ LLM 확정 $\rightarrow$ 스키마 검증]**의 4단계 프로세스를 거칩니다.
1. **1차 후보 추출 (Candidate Generation):**
* 도면의 태그 텍스트와 Experion 시스템의 전체 태그 리스트를 `RapidFuzz`로 비교하여 유사도 상위 N개를 추출합니다.
2. **맥락 정보 수집 (Context Gathering):**
* 해당 노드의 그래프 상 인접 노드(1-hop, 2-hop) 정보를 수집합니다.
* 예: "현재 노드는 `PT-101`이며, 상류에 `P-101(Pump)`이 있고 하류에 `V-101(Valve)`이 있음."
3. **LLM 기반 최종 매핑 (LLM-based Resolution):**
* 후보 태그 리스트와 위상 맥락을 LLM에게 전달하여 가장 타당한 태그를 선택하게 합니다.
* **[개선]** JSON Mode를 사용하여 `{"tag": "...", "reason": "...", "confidence": 0.9}` 형태로 응답을 강제합니다.
4. **구조적 검증 (Structural Validation):**
* Pydantic 모델을 통해 LLM 응답의 형식을 검증하고, 실패 시 `UNKNOWN` 처리 및 로그를 남깁니다.
### 2.2 상호 검증 로직 (Cross-Validation)
매핑된 결과가 실제 공정 데이터와 일치하는지 검증합니다.
* **위상적 일관성:** 도면에서 `A $\rightarrow$ B` 순서라면, 실제 데이터에서도 `A`의 변화가 `B`에 영향을 주는지 상관관계 분석.
* **속성 일치성:** 도면의 심볼 타입(예: Pressure Transmitter)과 실제 태그의 속성(예: Engineering Unit = 'bar' 또는 'psi')이 일치하는지 확인. **[강화]** 단순 키워드가 아닌 Unit 매핑 테이블을 통한 엄격한 비교.
---
## 💻 3. 실제 구현 코딩 가이드 (Example)
### 3.1 맥락 기반 매핑 엔진
```python
import networkx as nx
import asyncio
import json
from typing import List, Optional
from pydantic import BaseModel, Field
from rapidfuzz import process, fuzz
from openai import AsyncOpenAI
# --- [추가] 응답 구조화를 위한 Pydantic 모델 ---
class MappingResult(BaseModel):
resolved_tag: str = Field(..., description="The final mapped system tag")
reason: str = Field(..., description="Reason for this mapping based on context")
confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score from 0 to 1")
client = AsyncOpenAI(api_key="your-api-key")
class IntelligentMapper:
def __init__(self, graph, system_tags):
self.graph = graph # Phase 2에서 생성된 NetworkX 그래프
self.system_tags = system_tags # Experion 시스템의 전체 태그 리스트
def get_node_context(self, node_id):
"""노드의 주변 위상 정보를 텍스트로 변환"""
neighbors = list(self.graph.neighbors(node_id))
context = []
for n in neighbors:
attr = self.graph.nodes[n]
context.append(f"Connected to {attr.get('value', n)} (Type: {attr.get('type')})")
return ", ".join(context)
async def _resolve_generic(self, node_id, category_prompt):
"""공통 매핑 로직 (비동기 + 구조화 응답)"""
tag_text = self.graph.nodes[node_id].get('value', '')
candidates = process.extract(tag_text, self.system_tags, scorer=fuzz.WRatio, limit=5)
context = self.get_node_context(node_id)
prompt = f"""
{category_prompt}
P&ID 도면의 태그 '{tag_text}'를 실제 시스템 태그와 매핑해야 합니다.
위상 맥락: {context}
후보 리스트: {candidates}
반드시 다음 JSON 형식으로만 응답하세요:
{{
"resolved_tag": "태그명 또는 UNKNOWN",
"reason": "매핑 이유",
"confidence": 0.0~1.0
}}
"""
try:
response = await client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}],
response_format={ "type": "json_object" } # JSON 모드 강제
)
raw_content = response.choices[0].message.content
# Pydantic을 통한 유효성 검사
return MappingResult.model_validate_json(raw_content)
except Exception as e:
print(f"Error resolving node {node_id}: {e}")
return MappingResult(resolved_tag="UNKNOWN", reason=f"Error: {str(e)}", confidence=0.0)
# --- 전문화된 Worker 함수들 ---
async def extract_transmitters(self, node_ids):
prompt = "당신은 계측기 전문 엔지니어입니다. 특히 Pressure/Flow/Level Transmitter 매핑에 특화되어 있습니다."
return {nid: await self._resolve_generic(nid, prompt) for nid in node_ids}
async def extract_valves(self, node_ids):
prompt = "당신은 밸브 및 액추에이터 전문 엔지니어입니다. 밸브의 개폐 상태 및 제어 태그 매핑에 특화되어 있습니다."
return {nid: await self._resolve_generic(nid, prompt) for nid in node_ids}
async def extract_equipment(self, node_ids):
prompt = "당신은 공정 설비 전문 엔지니어입니다. 펌프, 탱크, 열교환기 등의 메인 설비 태그 매핑에 특화되어 있습니다."
return {nid: await self._resolve_generic(nid, prompt) for nid in node_ids}
# 사용 예시
async def main():
# 가상 데이터
graph = nx.Graph()
graph.add_node("node_1", value="PT-101", type="Pressure Transmitter")
graph.add_node("node_2", value="P-101", type="Pump")
graph.add_edge("node_1", "node_2")
mapper = IntelligentMapper(graph, ["PT-101.PV", "PT-102.PV", "P-101.STATUS"])
results = await asyncio.gather(
mapper.extract_transmitters(["node_1"]),
mapper.extract_equipment(["node_2"])
)
final_mapping = {**results[0], **results[1]}
print(f"Parallel Resolved Mapping: {final_mapping}")
asyncio.run(main())
```
### 3.2 검증 유틸리티: 속성 일치 확인 (강화 버전)
```python
def validate_mapping(resolved_tag, symbol_type, tag_metadata):
"""심볼 타입과 실제 태그 메타데이터의 엄격한 일치 여부 검증"""
# 단순 키워드가 아닌 허용 단위(Unit) 정의
unit_map = {
"Pressure Transmitter": ["bar", "psi", "kPa", "Pa"],
"Flow Meter": ["m3/h", "lpm", "kg/h"],
"Temperature Sensor": ["°C", "C", "K", "°F"]
}
actual_unit = tag_metadata.get('unit', '').strip()
allowed_units = unit_map.get(symbol_type, [])
# 1. 단위 일치 확인 (최우선)
if actual_unit and actual_unit in allowed_units:
return True, "Unit Match"
# 2. 단위가 없는 경우 설명(Description) 기반 2차 검증
actual_desc = tag_metadata.get('description', '').lower()
expected_keywords = {
"Pressure Transmitter": ["pressure", "press"],
"Flow Meter": ["flow", "flowrate"],
"Temperature Sensor": ["temp", "temperature"]
}
keywords = expected_keywords.get(symbol_type, [])
if any(kw in actual_desc for kw in keywords):
return True, "Description Match (Unit Missing)"
return False, "Mismatch: Symbol type and Tag metadata do not align"
```
---
## 🚀 4. Phase 3 완료 기준 (Definition of Done)
- [ ] 모든 도면 노드에 대해 **1차 후보군(Candidates)**이 자동으로 생성되는가?
- [ ] `NetworkX` 그래프를 통해 **인접 노드 맥락(Context)**이 정확히 추출되는가?
- [ ] LLM이 **JSON 형식**으로 최종 태그를 결정하고, 그 근거와 신뢰도를 제시하는가?
- [ ] **Pydantic**을 통해 LLM 응답의 구조적 유효성이 검증되는가?
- [ ] 매핑된 태그의 **엔지니어링 유닛(Unit)**과 도면 심볼 타입 간의 일치성이 엄격히 검증되는가?
- [ ] 최종 매핑 결과가 `(도면노드ID, 시스템태그, 신뢰도, 검증결과, 매핑근거)` 형태로 저장되는가?