feat: Knowledge Base RAG 시스템 + 채팅 LLM 개선 (Phase 0~5 완료)

- KB RAG 전체 파이프라인: 업로드, 파싱(xlsx/pdf/docx/text), 임베딩, Qdrant 인덱싱
- KB 관리 UI(14번 탭): 로그인, 문서 목록, 업로드, 삭제, 재인덱스
- OllamaController: 한글 시스템 프롬프트, plant_context.md 외부 파일화, SSE tool_start/tool_result 이벤트
- 프론트: 툴 실행 카드, KB 인용 링크, 표 자동 렌더, 추천 질문 칩
- nl2sql_worker: history_table.recorded_at 사용, tag_metadata 응답 개선
- DB: KB 테이블 5개 DDL + 시드, pgcrypto 확장
This commit is contained in:
windpacer
2026-05-13 20:22:27 +09:00
parent 35136ba91e
commit 908bfe151f
32 changed files with 3202 additions and 91 deletions

View File

@@ -31,6 +31,15 @@ VLLM_MODEL = get_vllm_model()
COL_CODEBASE = "ws-65f457145aee80b2" # ExperionCrawler 소스코드
COL_OPC_DOCS = "experion-opc-docs" # Experion HS R530 OPC UA 공식 문서 (266 chunks)
# 사용자 KB 컬렉션 (kb_collections 시드 5종과 일치)
KB_COLLECTIONS = {
"system_instrument": "kb_system_instrument",
"plant_operation": "kb_plant_operation",
"procedure": "kb_procedure",
"report": "kb_report",
"vendor_doc": "kb_vendor_doc",
}
# PostgreSQL 연결
DB_CONNECTION_STRING = os.environ.get("DB_CONNECTION_STRING", "postgresql://postgres:postgres@localhost:5432/iiot_platform")
DB_TIMEOUT = int(os.environ.get("DB_TIMEOUT", "10"))
@@ -248,6 +257,60 @@ async def _search(collection: str, query: str, top_k: int, threshold: float = 0.
return "\n\n---\n\n".join(parts)
async def _search_kb_collection(
qdrant_name: str,
vec: list[float],
top_k: int,
tags: list[str] | None = None,
) -> list[dict]:
"""KB 컬렉션 1개에 대해 의미 검색. 결과를 정규화된 dict 리스트로 반환."""
must = []
if tags:
must.append({"key": "tags", "match": {"any": tags}})
body: dict = {
"vector": vec,
"limit": top_k,
"with_payload": True,
"score_threshold": 0.20,
}
if must:
body["filter"] = {"must": must}
def _call():
with httpx.Client(timeout=20) as client:
resp = client.post(f"{QDRANT_URL}/collections/{qdrant_name}/points/search", json=body)
if resp.status_code == 404:
return []
resp.raise_for_status()
return resp.json().get("result", [])
try:
return await asyncio.to_thread(_call)
except Exception as e:
logging.warning(f"[search_kb] {qdrant_name} 검색 실패: {e}")
return []
def _recency_factor(uploaded_at_iso: str | None) -> float:
"""uploaded_at 기준 최신 가중치. 최근 7일 +10%, 30일 +5%, 90일 +2%, 그 외 1.0."""
if not uploaded_at_iso:
return 1.0
try:
from datetime import datetime, timezone
ts = datetime.fromisoformat(uploaded_at_iso.replace("Z", "+00:00"))
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - ts).total_seconds() / 86400.0
if age < 7: return 1.10
if age < 30: return 1.05
if age < 90: return 1.02
return 1.0
except Exception:
return 1.0
# ── DB 헬퍼 ──────────────────────────────────────────────────────────────────
async def _get_db_connection():
@@ -406,25 +469,161 @@ def ask_iiot_llm(question: str, context: str = "") -> str:
@mcp.tool()
async def rag_query(question: str, search_code: bool = False, search_docs: bool = True) -> str:
async def rag_query(
question: str,
search_code: bool = False,
search_docs: bool = True,
search_kb: bool = False,
kb_collections: list[str] | None = None,
) -> str:
"""검색 → LLM 답변 생성 (통합 RAG).
기본값: Experion HS R530 공식 문서만 검색 (search_docs=True, search_code=False).
ExperionCrawler 코드도 함께 보려면 search_code=True 추가.
기본값: Experion HS R530 공식 문서만 검색.
사용자 KB 검색을 포함하려면 search_kb=True. 코드 검색은 search_code=True.
Args:
question: 질문
search_docs: Experion HS R530 공식 문서 검색 여부 (기본 True)
search_code: ExperionCrawler 소스코드 검색 여부 (기본 False)
question: 질문
search_docs: Experion HS R530 공식 문서 검색 여부 (기본 True)
search_code: ExperionCrawler 소스코드 검색 여부 (기본 False)
search_kb: 사용자 KB 검색 여부 (기본 False)
kb_collections: 검색 대상 KB 컬렉션 키 목록. None이면 전체.
예: ["plant_operation", "procedure"]
"""
context_parts: list[str] = []
if search_docs:
context_parts.append(f"=== Experion HS R530 공식 문서 ===\n{await _search(COL_OPC_DOCS, question, 4)}")
if search_code:
context_parts.append(f"=== ExperionCrawler 구현 코드 ===\n{await _search(COL_CODEBASE, question, 3)}")
if search_kb:
kb_text = await _format_kb_results(question, kb_collections, top_k=6)
context_parts.append(f"=== 사용자 지식 베이스 ===\n{kb_text}")
return ask_iiot_llm(question, "\n\n".join(context_parts))
async def _format_kb_results(
query: str,
collection_keys: list[str] | None,
top_k: int,
tags: list[str] | None = None,
since: str | None = None,
boost_recent: bool = True,
) -> str:
"""search_kb 내부 헬퍼: 다중 컬렉션 의미검색 후 인용 텍스트로 직렬화."""
hits = await _search_kb_raw(query, collection_keys, top_k, tags, since, boost_recent)
if not hits:
return "관련 KB 결과 없음."
parts = []
for h in hits:
title = h.get("title") or "(제목없음)"
loc = h.get("locator") or ""
score = h.get("score", 0.0)
text = (h.get("text") or "").strip()
# 인용 헤더: "[score=0.812] 정비이력_2026Q1.xlsx > 시트:Pump-A > 행 12"
loc_str = f" > {loc}" if loc else ""
parts.append(f"[score={score:.3f}] {title}{loc_str}\n{text[:700]}")
return "\n\n---\n\n".join(parts)
async def _search_kb_raw(
query: str,
collection_keys: list[str] | None,
top_k: int,
tags: list[str] | None,
since: str | None,
boost_recent: bool,
) -> list[dict]:
"""KB 검색 핵심 로직 — 다중 컬렉션 의미검색 + 최신 가중치 + 후필터."""
targets = collection_keys or list(KB_COLLECTIONS.keys())
qdrant_names = [KB_COLLECTIONS[k] for k in targets if k in KB_COLLECTIONS]
if not qdrant_names:
return []
vec = await _embed(query)
per_coll_k = max(top_k, 8)
results: list[dict] = []
for qname in qdrant_names:
hits = await _search_kb_collection(qname, vec, per_coll_k, tags=tags)
for h in hits:
p = h.get("payload", {})
uploaded_at = p.get("uploaded_at")
if since and uploaded_at:
try:
if uploaded_at < since:
continue
except Exception:
pass
base_score = h.get("score", 0.0)
recency = _recency_factor(uploaded_at) if boost_recent else 1.0
results.append({
"score": base_score * recency,
"raw_score": base_score,
"doc_id": p.get("doc_id"),
"collection_key": p.get("collection_key"),
"title": p.get("title"),
"text": p.get("text", ""),
"chunk_kind": p.get("chunk_kind"),
"locator": p.get("locator"),
"uploaded_at": uploaded_at,
"tags": p.get("tags") or [],
})
# 점수 내림차순 정렬, 동일 doc_id 중복 dedup(최고점만)
results.sort(key=lambda r: r["score"], reverse=True)
seen: set[str] = set()
unique: list[dict] = []
for r in results:
key = f'{r.get("doc_id")}::{r.get("locator")}'
if key in seen:
continue
seen.add(key)
unique.append(r)
if len(unique) >= top_k:
break
return unique
@mcp.tool()
async def search_kb(
query: str,
collection_keys: list[str] | None = None,
top_k: int = 8,
tags: list[str] | None = None,
since: str | None = None,
boost_recent: bool = True,
) -> str:
"""사용자 지식 베이스(KB) 다중 컬렉션 의미 검색.
관리탭에서 업로드/인덱싱한 문서에서 질의와 의미적으로 가까운 청크를 찾는다.
Args:
query: 검색어 또는 자연어 질문
collection_keys: 대상 컬렉션 키 목록. None이면 전체.
가능한 값: system_instrument, plant_operation,
procedure, report, vendor_doc
top_k: 반환 결과 수 (기본 8)
tags: 태그 필터 (any 매칭). 예: ["unit-a", "P-6201"]
since: 이 ISO 시각 이후 업로드된 문서만. 예: "2026-04-01T00:00:00Z"
boost_recent: True이면 uploaded_at 기준 최신 가중치 적용 (기본 True)
Returns:
JSON 문자열: { success, count, hits: [{ doc_id, collection_key, title,
text, chunk_kind, locator, score, uploaded_at, tags }, ...] }
"""
try:
hits = await _search_kb_raw(query, collection_keys, top_k, tags, since, boost_recent)
return json.dumps(
{"success": True, "count": len(hits), "hits": hits},
ensure_ascii=False,
default=str,
)
except Exception as e:
return json.dumps({"success": False, "error": f"search_kb 실패: {e}"}, ensure_ascii=False)
# ── NL2SQL 도구 ───────────────────────────────────────────────────────────────
async def _execute_sql_internal(sql: str) -> str:
@@ -1224,6 +1423,63 @@ async def parse_pid_drawing(filepath: str) -> str:
# ── KB ingest 파서 ────────────────────────────────────────────────────────────
@mcp.tool()
async def parse_document(
doc_id: str,
title: str,
file_path: str,
mime_type: str = "",
collection_key: str = "",
chunking_policy: str = "",
) -> str:
"""KB ingest 파서. 파일 확장자에 따라 적절한 청킹을 수행한다.
Args:
doc_id: 문서 ID (UUID 문자열)
title: 제목 (오류 메시지에만 사용)
file_path: 절대 경로
mime_type: 정보용 (옵션)
collection_key: 정보용 (옵션)
chunking_policy: JSON 문자열, 향후 정책 분기에 사용
Returns:
JSON 문자열: {"success": true, "chunks": [{"text", "chunk_kind", "locator"}, ...]}
or {"success": false, "error": "..."}
"""
import os
if not os.path.isfile(file_path):
return json.dumps({"success": False, "error": f"file not found: {file_path}"}, ensure_ascii=False)
ext = os.path.splitext(file_path)[1].lower()
try:
if ext in (".xlsx", ".xlsm"):
from parsers import xlsx_parser
chunks = await asyncio.to_thread(xlsx_parser.parse, file_path)
elif ext == ".pdf":
from parsers import pdf_parser
chunks = await asyncio.to_thread(pdf_parser.parse, file_path)
elif ext == ".docx":
from parsers import docx_parser
chunks = await asyncio.to_thread(docx_parser.parse, file_path)
elif ext in (".md", ".txt", ".markdown"):
from parsers import text_parser
chunks = await asyncio.to_thread(text_parser.parse, file_path)
else:
return json.dumps(
{"success": False, "error": f"unsupported extension: {ext}"},
ensure_ascii=False
)
return json.dumps(
{"success": True, "doc_id": doc_id, "chunks": chunks, "count": len(chunks)},
ensure_ascii=False
)
except Exception as e:
return json.dumps({"success": False, "error": f"parse failed: {e}"}, ensure_ascii=False)
# ── 엔트리포인트 ──────────────────────────────────────────────────────────────
def main():