Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL. 기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체. - industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버) - src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer) - mcp-server: Python FastMCP (RAG/NL2SQL/P&ID) - 다중 컨트롤러(N-Controller) 지원 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
231 lines
7.9 KiB
Python
231 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""RAG 전용 워커 프로세스
|
|
|
|
Usage: python rag_worker.py <port>
|
|
|
|
담당 도구:
|
|
search_codebase, search_r530_docs, ask_iiot_llm, rag_query
|
|
|
|
특징:
|
|
- Ollama Embedding + Qdrant 검색 + vLLM LLM 조합
|
|
- 메모리: ~200MB (워커 자체, vLLM 외부 서비스 사용 시)
|
|
- 생명주기: 메인 서버 종료 시까지 유지
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import sys
|
|
import os
|
|
|
|
# mcp-server 디렉토리를 Python 경로에 추가 (pipeline 패키지 접근)
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import logging
|
|
import asyncio
|
|
from functools import lru_cache
|
|
|
|
from fastapi import FastAPI, Request
|
|
import uvicorn
|
|
import httpx
|
|
|
|
# ── 설정 ─────────────────────────────────────────────────────────────────────
|
|
|
|
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
|
|
QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333")
|
|
VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8000/v1")
|
|
from config import get_vllm_model
|
|
VLLM_MODEL = get_vllm_model()
|
|
EMBED_MODEL = os.environ.get("EMBED_MODEL", "nomic-embed-text")
|
|
|
|
COL_CODEBASE = os.environ.get("COL_CODEBASE", "ws-65f457145aee80b2")
|
|
COL_OPC_DOCS = os.environ.get("COL_OPC_DOCS", "experion-opc-docs")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
stream=sys.stderr,
|
|
format="%(asctime)s [rag_worker] %(levelname)s %(message)s",
|
|
)
|
|
|
|
app = FastAPI()
|
|
|
|
# ── HTTP 클라이언트 싱글톤 ────────────────────────────────────────────────────
|
|
|
|
@lru_cache(maxsize=1)
|
|
def _get_http_client():
|
|
return httpx.AsyncClient(timeout=30)
|
|
|
|
# ── 임베딩 (Ollama) ───────────────────────────────────────────────────────────
|
|
|
|
async def _embed(text: str) -> list[float]:
|
|
"""Ollama nomic-embed-text로 768-dim 벡터 생성."""
|
|
async with _get_http_client() as client:
|
|
resp = await client.post(
|
|
f"{OLLAMA_URL}/api/embeddings",
|
|
json={"model": EMBED_MODEL, "prompt": text},
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()["embedding"]
|
|
|
|
# ── Qdrant 검색 ──────────────────────────────────────────────────────────────
|
|
|
|
async def _qdrant_search(collection: str, query_vector: list[float], top_k: int = 6) -> list[dict]:
|
|
"""Qdrant에서 벡터 유사도 검색."""
|
|
async with _get_http_client() as client:
|
|
resp = await client.post(
|
|
f"{QDRANT_URL}/collections/{collection}/points/search",
|
|
json={
|
|
"vector": query_vector,
|
|
"limit": top_k,
|
|
"with_payload": True,
|
|
},
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json().get("result", [])
|
|
|
|
# ── LLM (vLLM) ───────────────────────────────────────────────────────────────
|
|
|
|
@lru_cache(maxsize=1)
|
|
def _llm_client():
|
|
from openai import AsyncOpenAI
|
|
return AsyncOpenAI(base_url=VLLM_BASE_URL, api_key="dummy")
|
|
|
|
async def _ask_llm(question: str, context: str = "") -> str:
|
|
"""vLLM LLM으로 질문 응답."""
|
|
client = _llm_client()
|
|
|
|
if context:
|
|
prompt = f"""주어진 컨텍스트를 바탕으로 질문에 답변하세요.
|
|
|
|
컨텍스트:
|
|
{context}
|
|
|
|
질문:
|
|
{question}
|
|
|
|
답변:"""
|
|
else:
|
|
prompt = question
|
|
|
|
response = await client.chat.completions.create(
|
|
model=VLLM_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": "You are a helpful assistant."},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
max_tokens=4096,
|
|
temperature=0.1,
|
|
)
|
|
return response.choices[0].message.content
|
|
|
|
# ── RAG 도구 구현 ─────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""워커 헬스체크."""
|
|
return {"status": "ok"}
|
|
|
|
@app.post("/execute")
|
|
async def execute(request: Request):
|
|
"""HTTP 요청을 MCP 도구 호출로 변환."""
|
|
body = await request.json()
|
|
tool = body["tool"]
|
|
params = body["params"]
|
|
|
|
try:
|
|
if tool == "search_codebase":
|
|
result = await _search_codebase(**params)
|
|
elif tool == "search_r530_docs":
|
|
result = await _search_r530_docs(**params)
|
|
elif tool == "ask_iiot_llm":
|
|
result = await _ask_iiot_llm(**params)
|
|
elif tool == "rag_query":
|
|
result = await _rag_query(**params)
|
|
else:
|
|
return {"success": False, "error": f"Unknown tool: {tool}"}
|
|
|
|
return result
|
|
except Exception as e:
|
|
logging.error(f"Error executing {tool}: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
async def _search_codebase(query: str, top_k: int = 6) -> str:
|
|
"""소스코드 검색."""
|
|
query_vector = await _embed(query)
|
|
results = await _qdrant_search(COL_CODEBASE, query_vector, top_k)
|
|
|
|
items = []
|
|
for hit in results:
|
|
payload = hit.get("payload", {})
|
|
items.append({
|
|
"score": hit.get("score", 0),
|
|
"file": payload.get("file", "unknown"),
|
|
"content": payload.get("content", "")[:500],
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"count": len(items),
|
|
"items": items,
|
|
}
|
|
|
|
async def _search_r530_docs(query: str, top_k: int = 5) -> str:
|
|
"""Experion HS R530 공식 문서 검색."""
|
|
query_vector = await _embed(query)
|
|
results = await _qdrant_search(COL_OPC_DOCS, query_vector, top_k)
|
|
|
|
items = []
|
|
for hit in results:
|
|
payload = hit.get("payload", {})
|
|
items.append({
|
|
"score": hit.get("score", 0),
|
|
"title": payload.get("title", "unknown"),
|
|
"content": payload.get("content", "")[:500],
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"count": len(items),
|
|
"items": items,
|
|
}
|
|
|
|
async def _ask_iiot_llm(question: str, context: str = "") -> str:
|
|
"""IIoT/OPC UA 질문 응답."""
|
|
answer = await _ask_llm(question, context)
|
|
return {
|
|
"success": True,
|
|
"question": question,
|
|
"answer": answer,
|
|
}
|
|
|
|
async def _rag_query(question: str, search_code: bool = False, search_docs: bool = True) -> str:
|
|
"""통합 RAG 검색."""
|
|
contexts = []
|
|
|
|
if search_code:
|
|
query_vector = await _embed(question)
|
|
code_results = await _qdrant_search(COL_CODEBASE, query_vector, 3)
|
|
for hit in code_results:
|
|
contexts.append(hit.get("payload", {}).get("content", ""))
|
|
|
|
if search_docs:
|
|
query_vector = await _embed(question)
|
|
doc_results = await _qdrant_search(COL_OPC_DOCS, query_vector, 3)
|
|
for hit in doc_results:
|
|
contexts.append(hit.get("payload", {}).get("content", ""))
|
|
|
|
context = "\n\n".join(contexts[:5])
|
|
answer = await _ask_llm(question, context)
|
|
|
|
return {
|
|
"success": True,
|
|
"question": question,
|
|
"context_count": len(contexts),
|
|
"answer": answer,
|
|
}
|
|
|
|
# ── 메인 ─────────────────────────────────────────────────────────────────────
|
|
|
|
if __name__ == "__main__":
|
|
port = int(sys.argv[1]) if len(sys.argv) > 1 else 5002
|
|
logging.info(f"Starting RAG worker on port {port}")
|
|
uvicorn.run(app, host="0.0.0.0", port=port)
|