chore: .gitignore에 Python 캐시 및 가상환경 무시 규칙 추가

This commit is contained in:
windpacer
2026-05-03 03:58:23 +09:00
parent f71ec310e4
commit a0404b1fee
254 changed files with 33 additions and 5083599 deletions

View File

@@ -1,179 +0,0 @@
#!/usr/bin/env python3
"""
Experion OPC UA 문서 인덱싱 스크립트
- HTM 파일 → 텍스트 추출 → 청킹 → Ollama 임베딩 → Qdrant 업서트
- 사용 모델: nomic-embed-text (768-dim, MCP 서버와 동일)
- 컬렉션: experion-opc-docs
"""
import os
import sys
import uuid
import time
import textwrap
from html.parser import HTMLParser
from pathlib import Path
import httpx
# ── 설정 ──────────────────────────────────────────────────────────────────────
DOCS_DIR = "/home/windpacer/projects/Experion_opcua_documents"
QDRANT_URL = "http://localhost:6333"
OLLAMA_URL = "http://localhost:11434"
EMBED_MODEL = "nomic-embed-text"
COLLECTION = "experion-opc-docs"
CHUNK_SIZE = 600 # 문자 수
CHUNK_OVERLAP = 100
VECTOR_DIM = 768
# ── HTML → 텍스트 추출 ────────────────────────────────────────────────────────
class _TextExtractor(HTMLParser):
SKIP_TAGS = {"script", "style", "head", "nav", "footer"}
def __init__(self):
super().__init__()
self._skip = 0
self._parts = []
def handle_starttag(self, tag, attrs):
if tag in self.SKIP_TAGS:
self._skip += 1
def handle_endtag(self, tag):
if tag in self.SKIP_TAGS and self._skip:
self._skip -= 1
if tag in ("p", "h1", "h2", "h3", "h4", "li", "td", "tr", "div"):
self._parts.append("\n")
def handle_data(self, data):
if not self._skip:
stripped = data.strip()
if stripped:
self._parts.append(stripped + " ")
def get_text(self) -> str:
raw = "".join(self._parts)
lines = [l.strip() for l in raw.splitlines()]
lines = [l for l in lines if l]
return "\n".join(lines)
def extract_text(htm_path: str) -> str:
with open(htm_path, encoding="utf-8", errors="replace") as f:
html = f.read()
p = _TextExtractor()
p.feed(html)
return p.get_text()
# ── 청킹 ─────────────────────────────────────────────────────────────────────
def chunk_text(text: str, size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]:
if len(text) <= size:
return [text] if text.strip() else []
chunks = []
start = 0
while start < len(text):
end = start + size
chunk = text[start:end]
if chunk.strip():
chunks.append(chunk.strip())
start += size - overlap
return chunks
# ── Ollama 임베딩 ─────────────────────────────────────────────────────────────
def embed(text: str) -> list[float]:
with httpx.Client(timeout=30) as client:
resp = client.post(
f"{OLLAMA_URL}/api/embeddings",
json={"model": EMBED_MODEL, "prompt": text},
)
resp.raise_for_status()
return resp.json()["embedding"]
# ── Qdrant 컬렉션 생성 ────────────────────────────────────────────────────────
def ensure_collection():
with httpx.Client(timeout=15) as client:
resp = client.get(f"{QDRANT_URL}/collections/{COLLECTION}")
if resp.status_code == 200:
info = resp.json()["result"]
count = info.get("points_count", 0)
print(f"컬렉션 '{COLLECTION}' 이미 존재 (points: {count})")
answer = input("기존 컬렉션을 삭제하고 재인덱싱? [y/N]: ").strip().lower()
if answer != "y":
print("취소")
sys.exit(0)
client.delete(f"{QDRANT_URL}/collections/{COLLECTION}")
print("기존 컬렉션 삭제 완료")
create_resp = client.put(
f"{QDRANT_URL}/collections/{COLLECTION}",
json={"vectors": {"size": VECTOR_DIM, "distance": "Cosine"}},
)
create_resp.raise_for_status()
print(f"컬렉션 '{COLLECTION}' 생성 완료")
# ── Qdrant 업서트 ─────────────────────────────────────────────────────────────
def upsert_batch(points: list[dict]):
with httpx.Client(timeout=30) as client:
resp = client.put(
f"{QDRANT_URL}/collections/{COLLECTION}/points",
json={"points": points},
)
resp.raise_for_status()
# ── 메인 ─────────────────────────────────────────────────────────────────────
def main():
htm_files = sorted(Path(DOCS_DIR).rglob("*.htm"))
if not htm_files:
print(f"HTM 파일 없음: {DOCS_DIR}")
sys.exit(1)
print(f"HTM 파일 수: {len(htm_files)}")
ensure_collection()
total_chunks = 0
batch: list[dict] = []
BATCH_SIZE = 20
for i, path in enumerate(htm_files, 1):
rel = str(path.relative_to(Path(DOCS_DIR).parent))
text = extract_text(str(path))
chunks = chunk_text(text)
for j, chunk in enumerate(chunks):
vec = embed(chunk)
batch.append({
"id": str(uuid.uuid5(uuid.NAMESPACE_URL, f"{path}#{j}")),
"vector": vec,
"payload": {
"filePath": rel,
"content": chunk,
"chunkIndex": j,
},
})
if len(batch) >= BATCH_SIZE:
upsert_batch(batch)
total_chunks += len(batch)
batch = []
print(f"[{i:2d}/{len(htm_files)}] {path.name} ({len(chunks)} chunks)", flush=True)
if batch:
upsert_batch(batch)
total_chunks += len(batch)
print(f"\n완료: {total_chunks}개 청크 → 컬렉션 '{COLLECTION}'")
if __name__ == "__main__":
main()

View File

@@ -1,78 +0,0 @@
import networkx as nx
from typing import Dict, List, Optional
import json
import os
class PidAnalysisEngine:
def __init__(self, topology_file: str, mapping_file: str):
self.topology_file = topology_file
self.mapping_file = mapping_file
self.graph = nx.DiGraph()
self.tag_mapping = {}
self.load_data()
def load_data(self):
"""그래프 및 매핑 데이터 로드"""
try:
if os.path.exists(self.topology_file):
with open(self.topology_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# NetworkX 그래프 생성 (node_link_data 형식 가정)
for node in data.get('nodes', []):
self.graph.add_node(node['id'], **node)
for edge in data.get('links', []): # node_link_data는 'links' 사용
self.graph.add_edge(edge['source'], edge['target'], **edge)
if os.path.exists(self.mapping_file):
with open(self.mapping_file, 'r', encoding='utf-8') as f:
self.tag_mapping = json.load(f)
except Exception as e:
print(f"Error loading analysis data: {e}")
def get_propagation_path_with_flow(self, start_node: str):
"""
엣지의 방향성(flow_direction)과 상태(valve_status)를 고려한 실제 영향 전파 경로 추출
"""
if start_node not in self.graph:
return {}
# 1. 유효한 엣지만 필터링 (방향이 forward이고 밸브가 open인 경로)
valid_edges = [
(u, v) for u, v, d in self.graph.edges(data=True)
if d.get('flow_direction', 'forward') == 'forward'
and d.get('valve_status', 'open') == 'open'
]
filtered_graph = nx.DiGraph()
filtered_graph.add_edges_from(valid_edges)
# 2. 전파 단계별 노드 추출 (BFS)
try:
propagation_levels = nx.single_source_shortest_path_length(filtered_graph, start_node)
return propagation_levels
except Exception:
return {}
def analyze_impact(self, node_id: str):
"""특정 노드 장애 시 영향도 분석 결과 반환"""
if node_id not in self.graph:
return {"success": False, "error": f"Node {node_id} not found in topology"}
impact_map = self.get_propagation_path_with_flow(node_id)
# 경로 추출 (시각화를 위해 모든 영향 노드로의 최단 경로 포함)
paths = []
for target in impact_map.keys():
if target != node_id:
try:
path = nx.shortest_path(self.graph, source=node_id, target=target)
paths.append(path)
except nx.NetworkXNoPath:
continue
return {
"success": True,
"startNode": node_id,
"impactedNodes": impact_map,
"paths": paths
}

View File

@@ -1,173 +0,0 @@
import ezdxf
import re
import json
from typing import List, Optional, Tuple, Union
from pydantic import BaseModel, Field
from shapely.geometry import box, Point
# --- Data Models ---
class BoundingBox(BaseModel):
min_x: float
min_y: float
max_x: float
max_y: float
center: Tuple[float, float]
class GeometricEntity(BaseModel):
entity_id: str
entity_type: str # TEXT, MTEXT, LINE, LWPOLYLINE, CIRCLE, ARC
layer: str
bbox: BoundingBox
raw_value: Optional[str] = None
clean_value: Optional[str] = None
coordinates: List[Union[Tuple[float, float], List[float]]] = Field(default_factory=list)
properties: dict = Field(default_factory=dict)
# --- Extractor Implementation ---
class PidGeometricExtractor:
def __init__(self, file_path: str):
try:
self.doc = ezdxf.readfile(file_path)
self.msp = self.doc.modelspace()
except Exception as e:
raise IOError(f"Failed to load DXF file: {e}")
def clean_text(self, text: str) -> str:
"""
DXF 특수 제어 문자 및 MTEXT 포맷팅을 제거하여 정제된 텍스트 반환.
"""
if not text:
return ""
# 1. MTEXT 포맷팅 및 제어 문자 제거 (\P, \W, \L, \A, \C, \H, \S, \T 등)
text = re.sub(r'\\([P|W|L|A|C|H|S|T])\d*;?', ' ', text)
# 2. 중괄호 { } 제거
text = re.sub(r'[\{\}]', ' ', text)
# 3. DXF 특수 제어 문자 제거 (%%U: Underline, %%O: Overline, %%S: Strikethrough, %%R: Registered)
text = re.sub(r'%%[U|O|S|R]', ' ', text)
# 4. 불필요한 특수 기호 및 반복되는 공백 정제
text = re.sub(r'\s+', ' ', text).strip()
return text
def get_bbox(self, entity) -> Optional[BoundingBox]:
"""
엔티티 타입별로 동적인 Bounding Box를 계산하여 반환.
"""
try:
if entity.dxftype() == 'TEXT':
p = entity.dxf.insert
h = entity.dxf.height
# 텍스트 길이에 따른 대략적인 너비 계산 (글자수 * 높이 * 0.6)
width = len(entity.dxf.text) * h * 0.6
return self._create_bbox(p.x, p.y, p.x + width, p.y + h)
elif entity.dxftype() == 'MTEXT':
p = entity.dxf.insert
h = entity.dxf.char_height if hasattr(entity.dxf, 'char_height') else 2.5
w = entity.dxf.width if entity.dxf.width > 0 else len(entity.text) * h * 0.6
return self._create_bbox(p.x, p.y, p.x + w, p.y + h)
elif entity.dxftype() == 'LINE':
start = entity.dxf.start
end = entity.dxf.end
return self._create_bbox(
min(start.x, end.x), min(start.y, end.y),
max(start.x, end.x), max(start.y, end.y)
)
elif entity.dxftype() == 'LWPOLYLINE':
points = entity.get_points()
if not points: return None
xs = [p[0] for p in points]
ys = [p[1] for p in points]
return self._create_bbox(min(xs), min(ys), max(xs), max(ys))
elif entity.dxftype() in ('CIRCLE', 'ARC'):
center = entity.dxf.center
radius = entity.dxf.radius
return self._create_bbox(
center.x - radius, center.y - radius,
center.x + radius, center.y + radius
)
except Exception as e:
print(f"Error calculating bbox for {entity.dxftype()} ({entity.dxf.handle}): {e}")
return None
def _create_bbox(self, min_x, min_y, max_x, max_y) -> BoundingBox:
return BoundingBox(
min_x=min_x,
min_y=min_y,
max_x=max_x,
max_y=max_y,
center=((min_x + max_x) / 2, (min_y + max_y) / 2)
)
def extract_and_save(self, output_path: str):
"""
기하학적 데이터를 추출하여 JSON 파일로 저장.
"""
results = []
for entity in self.msp:
bbox_obj = self.get_bbox(entity)
if not bbox_obj:
continue
raw_text = ""
if entity.dxftype() == 'TEXT':
raw_text = entity.dxf.text
elif entity.dxftype() == 'MTEXT':
raw_text = entity.text
# 좌표 추출 (3D 좌표를 2D로 변환)
coords = []
if hasattr(entity, 'get_points'):
# ezdxf의 get_points()는 (x, y, z) 튜플 리스트를 반환함
coords = [(p[0], p[1]) for p in entity.get_points()]
elif entity.dxftype() == 'LINE':
coords = [(entity.dxf.start.x, entity.dxf.start.y), (entity.dxf.end.x, entity.dxf.end.y)]
elif entity.dxftype() in ('CIRCLE', 'ARC'):
coords = [(entity.dxf.center.x, entity.dxf.center.y)]
entity_data = GeometricEntity(
entity_id=entity.dxf.handle,
entity_type=entity.dxftype(),
layer=entity.dxf.layer,
bbox=bbox_obj,
raw_value=raw_text if raw_text else None,
clean_value=self.clean_text(raw_text) if raw_text else None,
coordinates=coords,
properties={
"color": entity.dxf.color,
"lineweight": entity.dxf.lineweight if hasattr(entity.dxf, 'lineweight') else None,
}
)
results.append(entity_data.model_dump())
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=4)
return output_path
# --- Proximity Utilities ---
def is_near(bbox_a: BoundingBox, bbox_b: BoundingBox, threshold=5.0) -> bool:
"""
두 Bounding Box 간의 최단 거리가 임계값 이내인지 확인.
shapely box를 사용하여 거리 계산.
"""
box_a = box(bbox_a.min_x, bbox_a.min_y, bbox_a.max_x, bbox_a.max_y)
box_b = box(bbox_b.min_x, bbox_b.min_y, bbox_b.max_x, bbox_b.max_y)
return box_a.distance(box_b) <= threshold
def is_inside(point: Tuple[float, float], bbox: BoundingBox) -> bool:
"""
특정 점이 Bounding Box 내부에 있는지 확인.
"""
return (bbox.min_x <= point[0] <= bbox.max_x) and (bbox.min_y <= point[1] <= bbox.max_y)

View File

@@ -1,122 +0,0 @@
import networkx as nx
import asyncio
import json
from typing import List, Optional, Dict, Any, Tuple
from pydantic import BaseModel, Field
from rapidfuzz import process, fuzz
from openai import AsyncOpenAI
# --- 응답 구조화를 위한 Pydantic 모델 ---
class MappingResult(BaseModel):
resolved_tag: str = Field(..., description="The final mapped system tag")
reason: str = Field(..., description="Reason for this mapping based on context")
confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score from 0 to 1")
class IntelligentMapper:
def __init__(self, graph: nx.Graph, system_tags: List[str], api_client: Optional[AsyncOpenAI] = None):
self.graph = graph # Phase 2에서 생성된 NetworkX 그래프
self.system_tags = system_tags # Experion 시스템의 전체 태그 리스트
self.client = api_client
def get_node_context(self, node_id: str) -> str:
"""노드의 주변 위상 정보를 텍스트로 변환"""
if not self.graph.has_node(node_id):
return "Node not found in graph"
neighbors = list(self.graph.neighbors(node_id))
context = []
for n in neighbors:
attr = self.graph.nodes[n]
val = attr.get('value', n)
typ = attr.get('type', 'Unknown')
context.append(f"Connected to {val} (Type: {typ})")
return ", ".join(context) if context else "No connected neighbors"
async def _resolve_generic(self, node_id: str, category_prompt: str) -> MappingResult:
"""공통 매핑 로직 (비동기 + 구조화 응답)"""
if not self.client:
return MappingResult(resolved_tag="UNKNOWN", reason="API Client not provided", confidence=0.0)
# Phase 2에서 'value'에 clean_value가 저장됨
node_data = self.graph.nodes.get(node_id, {})
tag_text = node_data.get('value', '')
# 1차 후보 추출 (RapidFuzz)
candidates = process.extract(tag_text, self.system_tags, scorer=fuzz.WRatio, limit=5)
context = self.get_node_context(node_id)
prompt = f"""
{category_prompt}
P&ID 도면의 태그 '{tag_text}'를 실제 시스템 태그와 매핑해야 합니다.
위상 맥락: {context}
후보 리스트: {candidates}
반드시 다음 JSON 형식으로만 응답하세요:
{{
"resolved_tag": "태그명 또는 UNKNOWN",
"reason": "매핑 이유",
"confidence": 0.0~1.0
}}
"""
try:
response = await self.client.chat.completions.create(
model="Qwen/Qwen3-Coder-Next-FP8", # MCP 서버 설정 모델 사용
messages=[{"role": "user", "content": prompt}],
response_format={ "type": "json_object" }
)
raw_content = response.choices[0].message.content
return MappingResult.model_validate_json(raw_content)
except Exception as e:
print(f"Error resolving node {node_id}: {e}")
return MappingResult(resolved_tag="UNKNOWN", reason=f"Error: {str(e)}", confidence=0.0)
# --- 전문화된 Worker 함수들 ---
async def extract_transmitters(self, node_ids: List[str]) -> Dict[str, MappingResult]:
prompt = "당신은 계측기 전문 엔지니어입니다. 특히 Pressure/Flow/Level Transmitter 매핑에 특화되어 있습니다."
tasks = [self._resolve_generic(nid, prompt) for nid in node_ids]
results = await asyncio.gather(*tasks)
return dict(zip(node_ids, results))
async def extract_valves(self, node_ids: List[str]) -> Dict[str, MappingResult]:
prompt = "당신은 밸브 및 액추에이터 전문 엔지니어입니다. 밸브의 개폐 상태 및 제어 태그 매핑에 특화되어 있습니다."
tasks = [self._resolve_generic(nid, prompt) for nid in node_ids]
results = await asyncio.gather(*tasks)
return dict(zip(node_ids, results))
async def extract_equipment(self, node_ids: List[str]) -> Dict[str, MappingResult]:
prompt = "당신은 공정 설비 전문 엔지니어입니다. 펌프, 탱크, 열교환기 등의 메인 설비 태그 매핑에 특화되어 있습니다."
tasks = [self._resolve_generic(nid, prompt) for nid in node_ids]
results = await asyncio.gather(*tasks)
return dict(zip(node_ids, results))
def validate_mapping(resolved_tag: str, symbol_type: str, tag_metadata: Dict[str, Any]) -> Tuple[bool, str]:
"""심볼 타입과 실제 태그 메타데이터의 엄격한 일치 여부 검증"""
if resolved_tag == "UNKNOWN":
return False, "Tag not resolved"
unit_map = {
"Pressure Transmitter": ["bar", "psi", "kPa", "Pa"],
"Flow Meter": ["m3/h", "lpm", "kg/h"],
"Temperature Sensor": ["°C", "C", "K", "°F"]
}
actual_unit = tag_metadata.get('unit', '').strip()
allowed_units = unit_map.get(symbol_type, [])
if actual_unit and actual_unit in allowed_units:
return True, "Unit Match"
actual_desc = tag_metadata.get('description', '').lower()
expected_keywords = {
"Pressure Transmitter": ["pressure", "press"],
"Flow Meter": ["flow", "flowrate"],
"Temperature Sensor": ["temp", "temperature"]
}
keywords = expected_keywords.get(symbol_type, [])
if any(kw in actual_desc for kw in keywords):
return True, "Description Match (Unit Missing)"
return False, "Mismatch: Symbol type and Tag metadata do not align"

View File

@@ -1,123 +0,0 @@
import networkx as nx
from shapely.geometry import box, Point, LineString
import json
from typing import List, Dict, Any, Optional, Tuple
class PidTopologyBuilder:
def __init__(self, geometric_data: List[Dict[str, Any]], all_extracted_tags: Optional[List[Dict[str, Any]]] = None, config: Optional[Dict[str, float]] = None):
"""
- geometric_data: Phase 1에서 추출된 기하학적 데이터 (List of dicts)
- all_extracted_tags: 통합된 태그 리스트
- config: {'dist_threshold': 50.0, 'tag_threshold': 100.0} 등 설정값
"""
self.data = geometric_data
self.all_tags = all_extracted_tags if all_extracted_tags else []
self.config = config if config else {'dist_threshold': 50.0, 'tag_threshold': 100.0}
self.G = nx.DiGraph() # 방향성 그래프 생성
def build_graph(self):
# 1. 모든 객체를 노드로 추가
for item in self.data:
bbox_vals = item['bbox']
# BoundingBox 모델의 필드명에 맞춰 추출 (min_x, min_y, max_x, max_y)
bbox_geom = box(bbox_vals['min_x'], bbox_vals['min_y'], bbox_vals['max_x'], bbox_vals['max_y'])
self.G.add_node(item['entity_id'],
type=item['entity_type'],
bbox=bbox_geom,
value=item.get('clean_value'),
layer=item.get('layer'))
# 2. 분산 추출된 태그 통합 및 노드 추가
for tag in self.all_tags:
bbox_vals = tag['bbox']
bbox_geom = box(bbox_vals['min_x'], bbox_vals['min_y'], bbox_vals['max_x'], bbox_vals['max_y'])
self.G.add_node(tag['entity_id'],
type='TEXT',
bbox=bbox_geom,
value=tag.get('clean_value') or tag.get('tagName'))
# 3. 태그-설비 논리적 연결 (Association)
tags = [n for n, d in self.G.nodes(data=True) if d['type'] == 'TEXT']
equipments = [n for n, d in self.G.nodes(data=True) if d['type'] not in ['TEXT', 'LINE', 'LWPOLYLINE']]
for tag in tags:
best_match = self._find_nearest_equipment(tag, equipments)
if best_match:
self.G.add_edge(tag, best_match, relation='associated_with')
# 4. 배관 기반 물리적 연결 (Pipe) [개선됨: End-point 기반]
lines = [n for n, d in self.G.nodes(data=True) if d['type'] in ['LINE', 'LWPOLYLINE']]
for line_id in lines:
original_item = next((item for item in self.data if item['entity_id'] == line_id), None)
if not original_item or not original_item.get('coordinates'):
continue
coords = original_item['coordinates']
line_geom = LineString(coords)
endpoints = [line_geom.coords[0], line_geom.coords[-1]]
connected_nodes = []
for pt in endpoints:
p = Point(pt)
for eq_id in equipments:
if self.G.nodes[eq_id]['bbox'].distance(p) < self.config['dist_threshold']:
connected_nodes.append(eq_id)
# 중복 제거
connected_nodes = list(set(connected_nodes))
if len(connected_nodes) >= 2:
# 방향성 추론 로직 (단순화: 첫 번째 발견된 설비 -> 두 번째 발견된 설비)
self.G.add_edge(connected_nodes[0], connected_nodes[1], relation='pipe')
elif len(connected_nodes) == 1:
pass
def _find_nearest_equipment(self, tag_id, equipment_ids):
tag_bbox = self.G.nodes[tag_id]['bbox']
min_dist = float('inf')
nearest = None
for eq_id in equipment_ids:
eq_bbox = self.G.nodes[eq_id]['bbox']
dist = tag_bbox.distance(eq_bbox)
if dist < min_dist:
min_dist = dist
nearest = eq_id
return nearest if min_dist < self.config['tag_threshold'] else None
def validate_topology(self):
"""위상 무결성 검증"""
isolated = list(nx.isolates(self.G))
return {
"isolated_nodes": isolated,
"node_count": self.G.number_of_nodes(),
"edge_count": self.G.number_of_edges()
}
def save_graph(self, output_path: str):
"""그래프 구조를 JSON 형태로 저장"""
from networkx.readwrite import json_graph
data = json_graph.node_link_data(self.G)
# shapely geometry 객체는 JSON 직렬화가 안 되므로 변환
for node in data['nodes']:
if 'bbox' in node:
bbox = node['bbox']
node['bbox'] = {
'min_x': bbox.bounds[0],
'min_y': bbox.bounds[1],
'max_x': bbox.bounds[2],
'max_y': bbox.bounds[3]
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return output_path
def analyze_impact(graph, start_node):
"""특정 설비 장애 시 하류(Downstream)에 영향을 받는 모든 노드 추출"""
if start_node not in graph:
return []
# BFS를 통해 도달 가능한 모든 노드 탐색
impacted_nodes = nx.descendants(graph, start_node)
return list(impacted_nodes)

View File

@@ -1,35 +0,0 @@
[project]
name = "iiot-rag-mcp"
version = "0.1.0"
description = "ExperionCrawler Unified MCP Server — RAG + NL2SQL"
requires-python = ">=3.10"
dependencies = [
"mcp[cli]>=1.0.0",
"qdrant-client>=1.9.0",
"sentence-transformers>=3.0.0",
"openai>=1.0.0",
"httpx>=0.27.0",
"psycopg[binary]>=3.1.0",
# P&ID 파싱
"ezdxf>=1.3.0",
# ARM64 환경 지원: paddlepaddle 3.x는 ARM64 wheel 미지원
# 2.6.0~2.9.x는 소스 빌드 가능
"paddlepaddle>=2.6.0,<3.0.0",
# paddleocr 2.7.0+는 paddlepaddle 3.3.1을 요구하며 ARM64 wheel 미지원
# 2.6.0은 paddlepaddle 2.x를 지원하여 ARM64 설치 가능
"paddleocr>=2.6.0,<2.7.0",
"pymupdf>=1.24.0",
"scikit-learn>=1.3.0",
"numpy>=1.24.0",
"Pillow>=10.0.0",
]
[project.scripts]
iiot-rag-mcp = "server:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
only-include = ["server.py", "index_opc_docs.py"]

File diff suppressed because it is too large Load Diff

4558
mcp-server/uv.lock generated

File diff suppressed because it is too large Load Diff