180 lines
6.3 KiB
Python
180 lines
6.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Experion OPC UA 문서 인덱싱 스크립트
|
|
- HTM 파일 → 텍스트 추출 → 청킹 → Ollama 임베딩 → Qdrant 업서트
|
|
- 사용 모델: nomic-embed-text (768-dim, MCP 서버와 동일)
|
|
- 컬렉션: experion-opc-docs
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import uuid
|
|
import time
|
|
import textwrap
|
|
from html.parser import HTMLParser
|
|
from pathlib import Path
|
|
import httpx
|
|
|
|
# ── 설정 ──────────────────────────────────────────────────────────────────────
|
|
DOCS_DIR = "/home/windpacer/projects/Experion_opcua_documents"
|
|
QDRANT_URL = "http://localhost:6333"
|
|
OLLAMA_URL = "http://localhost:11434"
|
|
EMBED_MODEL = "nomic-embed-text"
|
|
COLLECTION = "experion-opc-docs"
|
|
CHUNK_SIZE = 600 # 문자 수
|
|
CHUNK_OVERLAP = 100
|
|
VECTOR_DIM = 768
|
|
|
|
# ── HTML → 텍스트 추출 ────────────────────────────────────────────────────────
|
|
|
|
class _TextExtractor(HTMLParser):
|
|
SKIP_TAGS = {"script", "style", "head", "nav", "footer"}
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._skip = 0
|
|
self._parts = []
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
if tag in self.SKIP_TAGS:
|
|
self._skip += 1
|
|
|
|
def handle_endtag(self, tag):
|
|
if tag in self.SKIP_TAGS and self._skip:
|
|
self._skip -= 1
|
|
if tag in ("p", "h1", "h2", "h3", "h4", "li", "td", "tr", "div"):
|
|
self._parts.append("\n")
|
|
|
|
def handle_data(self, data):
|
|
if not self._skip:
|
|
stripped = data.strip()
|
|
if stripped:
|
|
self._parts.append(stripped + " ")
|
|
|
|
def get_text(self) -> str:
|
|
raw = "".join(self._parts)
|
|
lines = [l.strip() for l in raw.splitlines()]
|
|
lines = [l for l in lines if l]
|
|
return "\n".join(lines)
|
|
|
|
|
|
def extract_text(htm_path: str) -> str:
|
|
with open(htm_path, encoding="utf-8", errors="replace") as f:
|
|
html = f.read()
|
|
p = _TextExtractor()
|
|
p.feed(html)
|
|
return p.get_text()
|
|
|
|
|
|
# ── 청킹 ─────────────────────────────────────────────────────────────────────
|
|
|
|
def chunk_text(text: str, size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]:
|
|
if len(text) <= size:
|
|
return [text] if text.strip() else []
|
|
chunks = []
|
|
start = 0
|
|
while start < len(text):
|
|
end = start + size
|
|
chunk = text[start:end]
|
|
if chunk.strip():
|
|
chunks.append(chunk.strip())
|
|
start += size - overlap
|
|
return chunks
|
|
|
|
|
|
# ── Ollama 임베딩 ─────────────────────────────────────────────────────────────
|
|
|
|
def embed(text: str) -> list[float]:
|
|
with httpx.Client(timeout=30) as client:
|
|
resp = client.post(
|
|
f"{OLLAMA_URL}/api/embeddings",
|
|
json={"model": EMBED_MODEL, "prompt": text},
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()["embedding"]
|
|
|
|
|
|
# ── Qdrant 컬렉션 생성 ────────────────────────────────────────────────────────
|
|
|
|
def ensure_collection():
|
|
with httpx.Client(timeout=15) as client:
|
|
resp = client.get(f"{QDRANT_URL}/collections/{COLLECTION}")
|
|
if resp.status_code == 200:
|
|
info = resp.json()["result"]
|
|
count = info.get("points_count", 0)
|
|
print(f"컬렉션 '{COLLECTION}' 이미 존재 (points: {count})")
|
|
answer = input("기존 컬렉션을 삭제하고 재인덱싱? [y/N]: ").strip().lower()
|
|
if answer != "y":
|
|
print("취소")
|
|
sys.exit(0)
|
|
client.delete(f"{QDRANT_URL}/collections/{COLLECTION}")
|
|
print("기존 컬렉션 삭제 완료")
|
|
|
|
create_resp = client.put(
|
|
f"{QDRANT_URL}/collections/{COLLECTION}",
|
|
json={"vectors": {"size": VECTOR_DIM, "distance": "Cosine"}},
|
|
)
|
|
create_resp.raise_for_status()
|
|
print(f"컬렉션 '{COLLECTION}' 생성 완료")
|
|
|
|
|
|
# ── Qdrant 업서트 ─────────────────────────────────────────────────────────────
|
|
|
|
def upsert_batch(points: list[dict]):
|
|
with httpx.Client(timeout=30) as client:
|
|
resp = client.put(
|
|
f"{QDRANT_URL}/collections/{COLLECTION}/points",
|
|
json={"points": points},
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
|
|
# ── 메인 ─────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
htm_files = sorted(Path(DOCS_DIR).rglob("*.htm"))
|
|
if not htm_files:
|
|
print(f"HTM 파일 없음: {DOCS_DIR}")
|
|
sys.exit(1)
|
|
|
|
print(f"HTM 파일 수: {len(htm_files)}")
|
|
ensure_collection()
|
|
|
|
total_chunks = 0
|
|
batch: list[dict] = []
|
|
BATCH_SIZE = 20
|
|
|
|
for i, path in enumerate(htm_files, 1):
|
|
rel = str(path.relative_to(Path(DOCS_DIR).parent))
|
|
text = extract_text(str(path))
|
|
chunks = chunk_text(text)
|
|
|
|
for j, chunk in enumerate(chunks):
|
|
vec = embed(chunk)
|
|
batch.append({
|
|
"id": str(uuid.uuid5(uuid.NAMESPACE_URL, f"{path}#{j}")),
|
|
"vector": vec,
|
|
"payload": {
|
|
"filePath": rel,
|
|
"content": chunk,
|
|
"chunkIndex": j,
|
|
},
|
|
})
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
upsert_batch(batch)
|
|
total_chunks += len(batch)
|
|
batch = []
|
|
|
|
print(f"[{i:2d}/{len(htm_files)}] {path.name} ({len(chunks)} chunks)", flush=True)
|
|
|
|
if batch:
|
|
upsert_batch(batch)
|
|
total_chunks += len(batch)
|
|
|
|
print(f"\n완료: {total_chunks}개 청크 → 컬렉션 '{COLLECTION}'")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|