#!/usr/bin/env python3 """ Experion OPC UA 문서 인덱싱 스크립트 - HTM 파일 → 텍스트 추출 → 청킹 → Ollama 임베딩 → Qdrant 업서트 - 사용 모델: nomic-embed-text (768-dim, MCP 서버와 동일) - 컬렉션: experion-opc-docs """ import os import sys import uuid import time import textwrap from html.parser import HTMLParser from pathlib import Path import httpx # ── 설정 ────────────────────────────────────────────────────────────────────── DOCS_DIR = "/home/windpacer/projects/Experion_opcua_documents" QDRANT_URL = "http://localhost:6333" OLLAMA_URL = "http://localhost:11434" EMBED_MODEL = "nomic-embed-text" COLLECTION = "experion-opc-docs" CHUNK_SIZE = 600 # 문자 수 CHUNK_OVERLAP = 100 VECTOR_DIM = 768 # ── HTML → 텍스트 추출 ──────────────────────────────────────────────────────── class _TextExtractor(HTMLParser): SKIP_TAGS = {"script", "style", "head", "nav", "footer"} def __init__(self): super().__init__() self._skip = 0 self._parts = [] def handle_starttag(self, tag, attrs): if tag in self.SKIP_TAGS: self._skip += 1 def handle_endtag(self, tag): if tag in self.SKIP_TAGS and self._skip: self._skip -= 1 if tag in ("p", "h1", "h2", "h3", "h4", "li", "td", "tr", "div"): self._parts.append("\n") def handle_data(self, data): if not self._skip: stripped = data.strip() if stripped: self._parts.append(stripped + " ") def get_text(self) -> str: raw = "".join(self._parts) lines = [l.strip() for l in raw.splitlines()] lines = [l for l in lines if l] return "\n".join(lines) def extract_text(htm_path: str) -> str: with open(htm_path, encoding="utf-8", errors="replace") as f: html = f.read() p = _TextExtractor() p.feed(html) return p.get_text() # ── 청킹 ───────────────────────────────────────────────────────────────────── def chunk_text(text: str, size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]: if len(text) <= size: return [text] if text.strip() else [] chunks = [] start = 0 while start < len(text): end = start + size chunk = text[start:end] if chunk.strip(): chunks.append(chunk.strip()) start += size - overlap return chunks # ── Ollama 임베딩 ───────────────────────────────────────────────────────────── def embed(text: str) -> list[float]: with httpx.Client(timeout=30) as client: resp = client.post( f"{OLLAMA_URL}/api/embeddings", json={"model": EMBED_MODEL, "prompt": text}, ) resp.raise_for_status() return resp.json()["embedding"] # ── Qdrant 컬렉션 생성 ──────────────────────────────────────────────────────── def ensure_collection(): with httpx.Client(timeout=15) as client: resp = client.get(f"{QDRANT_URL}/collections/{COLLECTION}") if resp.status_code == 200: info = resp.json()["result"] count = info.get("points_count", 0) print(f"컬렉션 '{COLLECTION}' 이미 존재 (points: {count})") answer = input("기존 컬렉션을 삭제하고 재인덱싱? [y/N]: ").strip().lower() if answer != "y": print("취소") sys.exit(0) client.delete(f"{QDRANT_URL}/collections/{COLLECTION}") print("기존 컬렉션 삭제 완료") create_resp = client.put( f"{QDRANT_URL}/collections/{COLLECTION}", json={"vectors": {"size": VECTOR_DIM, "distance": "Cosine"}}, ) create_resp.raise_for_status() print(f"컬렉션 '{COLLECTION}' 생성 완료") # ── Qdrant 업서트 ───────────────────────────────────────────────────────────── def upsert_batch(points: list[dict]): with httpx.Client(timeout=30) as client: resp = client.put( f"{QDRANT_URL}/collections/{COLLECTION}/points", json={"points": points}, ) resp.raise_for_status() # ── 메인 ───────────────────────────────────────────────────────────────────── def main(): htm_files = sorted(Path(DOCS_DIR).rglob("*.htm")) if not htm_files: print(f"HTM 파일 없음: {DOCS_DIR}") sys.exit(1) print(f"HTM 파일 수: {len(htm_files)}") ensure_collection() total_chunks = 0 batch: list[dict] = [] BATCH_SIZE = 20 for i, path in enumerate(htm_files, 1): rel = str(path.relative_to(Path(DOCS_DIR).parent)) text = extract_text(str(path)) chunks = chunk_text(text) for j, chunk in enumerate(chunks): vec = embed(chunk) batch.append({ "id": str(uuid.uuid5(uuid.NAMESPACE_URL, f"{path}#{j}")), "vector": vec, "payload": { "filePath": rel, "content": chunk, "chunkIndex": j, }, }) if len(batch) >= BATCH_SIZE: upsert_batch(batch) total_chunks += len(batch) batch = [] print(f"[{i:2d}/{len(htm_files)}] {path.name} ({len(chunks)} chunks)", flush=True) if batch: upsert_batch(batch) total_chunks += len(batch) print(f"\n완료: {total_chunks}개 청크 → 컬렉션 '{COLLECTION}'") if __name__ == "__main__": main()