2026년 4월 30일 Stable State

This commit is contained in:
windpacer
2026-04-30 08:16:21 +09:00
parent c0f32177bf
commit fb11359b4c
41 changed files with 7977 additions and 88 deletions

View File

@@ -1,14 +1,16 @@
#!/usr/bin/env python3
"""
ExperionCrawler RAG MCP Server
- 임베딩: Ollama nomic-embed-text (768-dim) — Roo Code 인덱스와 동일 모델
- 벡터 DB: Qdrant localhost:6333
- LLM: vLLM GLM-4.7-Flash localhost:8000/v1
- 사용처: Claude Code MCP / Roo Code MCP (동일 서버)
ExperionCrawler Unified MCP Server
- RAG: Qdrant + Ollama nomic-embed-text + vLLM Qwen3-Coder-Next-FP8
- NL2SQL: 자연어 → LLM SQL 생성 → PostgreSQL 실행
- 사용처:
stdio 모드 (기본): Claude Code MCP / Roo Code MCP
HTTP 모드 (--http): C# McpClient (localhost:5001)
"""
from __future__ import annotations
import sys
import json
import logging
import httpx
from functools import lru_cache
@@ -21,13 +23,23 @@ QDRANT_URL = "http://localhost:6333"
OLLAMA_URL = "http://localhost:11434"
EMBED_MODEL = "nomic-embed-text" # 768-dim, Roo Code 인덱스와 동일
VLLM_BASE_URL = "http://localhost:8000/v1"
VLLM_MODEL = "glm-4.7-flash"
VLLM_MODEL = "Qwen/Qwen3-Coder-Next-FP8"
# Qdrant 컬렉션
COL_CODEBASE = "ws-65f457145aee80b2" # ExperionCrawler 소스코드
COL_OPC_DOCS = "experion-opc-docs" # Experion HS R530 OPC UA 공식 문서 (266 chunks)
mcp = FastMCP("iiot-rag")
# PostgreSQL 연결
DB_CONNECTION_STRING = "postgresql://postgres:postgres@localhost:5432/iiot_platform"
DB_TIMEOUT = 10 # 초
# C# McpClient(localhost:5001)와 통신: json_response+stateless로 단순 POST→JSON 방식
mcp = FastMCP(
"iiot-rag",
port=5001,
json_response=True,
stateless_http=True,
)
# ── 임베딩 (Ollama) ───────────────────────────────────────────────────────────
@@ -41,7 +53,7 @@ def _embed(text: str) -> list[float]:
resp.raise_for_status()
return resp.json()["embedding"]
# ── LLM (vLLM / GLM-4.7-Flash) ───────────────────────────────────────────────
# ── LLM (vLLM / Qwen3-Coder-Next-FP8) ───────────────────────────────────────
@lru_cache(maxsize=1)
def _llm():
@@ -79,7 +91,69 @@ def _search(collection: str, query: str, top_k: int, threshold: float = 0.25) ->
return "\n\n---\n\n".join(parts)
# ── MCP 도구 ─────────────────────────────────────────────────────────────────
# ── DB 헬퍼 ──────────────────────────────────────────────────────────────────
def _get_db_connection():
"""PostgreSQL DB 연결 획득."""
import psycopg
return psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
def _validate_sql(sql: str) -> tuple[bool, str]:
"""SQL 안전 검증 — SELECT만 허용, 위험 키워드 차단."""
if len(sql) > 2000:
return False, "쿼리 길이 2000자를 초과했습니다."
dangerous = ['EXEC', 'DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE', 'GRANT', 'REVOKE']
sql_upper = sql.upper()
for kw in dangerous:
if kw in sql_upper:
return False, f"허용되지 않은 키워드 '{kw}'를 사용했습니다."
if not sql_upper.strip().startswith('SELECT'):
return False, "단순 SELECT 쿼리만 허용됩니다."
if '..' in sql or '~' in sql:
return False, "파일 경로 표현은 허용되지 않습니다."
return True, ""
# DB 스키마 — LLM SQL 생성 시 컨텍스트로 사용
_DB_SCHEMA = """
PostgreSQL 시계열 데이터베이스 스키마
테이블: history_table (시계열 이력)
tagname TEXT - 태그명 (모두 소문자, 예: 'ficq-6113.pv') — 대소문자 구분
node_id TEXT - OPC UA 노드 ID
value TEXT - 측정값, 수치 연산 시 ::double precision 캐스트 필요
recorded_at TIMESTAMPTZ - 기록 시각(UTC), 스냅샷 주기 약 60초
테이블: realtime_table (실시간 최신값)
tagname TEXT - 태그명 (모두 소문자)
node_id TEXT - OPC UA 노드 ID
livevalue TEXT - 현재값
timestamp TIMESTAMPTZ - 최종 갱신 시각
N분 간격 집계 공식 (time_bucket 금지, date_trunc 사용):
1분 버킷: date_trunc('minute', recorded_at) AS bucket
2분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120) AS bucket
5분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/300)*300) AS bucket
10분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/600)*600) AS bucket
N분 버킷: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)) AS bucket
예시 (2분 간격, 여러 태그):
SELECT to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120) AS bucket,
tagname, AVG(value::double precision) AS avg_val
FROM history_table
WHERE tagname IN ('tag1', 'tag2')
AND recorded_at >= NOW() - INTERVAL '3 hours'
GROUP BY bucket, tagname ORDER BY bucket, tagname
규칙:
- SELECT만 허용 (INSERT/UPDATE/DELETE/DROP 등 불가)
- tagname은 모두 소문자로 정확히 입력
- value 컬럼은 TEXT이므로 집계 시 ::double precision 캐스트 필수
- time_bucket 함수 사용 금지 — 위의 to_timestamp/FLOOR/EPOCH 공식 사용
"""
# ── RAG 도구 ─────────────────────────────────────────────────────────────────
@mcp.tool()
def search_codebase(query: str, top_k: int = 6) -> str:
@@ -103,10 +177,9 @@ def search_r530_docs(query: str, top_k: int = 5) -> str:
사용 시점: Experion HS R530의 OPC UA 설정, 인증서, 보안 정책, 포인트 주소 형식,
채널/컨트롤러 속성, 문제해결 등 제품 스펙과 동작을 알고 싶을 때.
⚠️ ExperionCrawler 구현 코드를 찾으려면 search_codebase 사용.
Args:
query: 검색어 (예: "certificate configuration", "endpoint security policy", "point address syntax")
query: 검색어 (예: "certificate configuration", "endpoint security policy")
top_k: 반환 결과 수 (기본 5)
"""
return _search(COL_OPC_DOCS, query, top_k)
@@ -114,7 +187,7 @@ def search_r530_docs(query: str, top_k: int = 5) -> str:
@mcp.tool()
def ask_iiot_llm(question: str, context: str = "") -> str:
"""GLM-4.7-Flash에게 IIoT/OPC UA 질문 (컨텍스트 없이 LLM 직접 질문).
"""Qwen3-Coder-Next에게 IIoT/OPC UA 질문 (컨텍스트 없이 LLM 직접 질문).
사용 시점: search_codebase 또는 search_r530_docs 결과를 context로 넘겨
종합 분석·답변이 필요할 때. 또는 일반 IIoT/OPC UA 개념 질문.
@@ -143,14 +216,11 @@ def ask_iiot_llm(question: str, context: str = "") -> str:
@mcp.tool()
def rag_query(question: str, search_code: bool = False, search_docs: bool = True) -> str:
"""검색 → GLM-4.7-Flash 답변 생성 (통합 RAG).
"""검색 → Qwen3-Coder-Next 답변 생성 (통합 RAG).
기본값: Experion HS R530 공식 문서만 검색 (search_docs=True, search_code=False).
ExperionCrawler 코드도 함께 보려면 search_code=True 추가.
사용 시점: Experion HS R530 제품 질문이나 ExperionCrawler 코드 질문에
검색+LLM 답변을 한 번에 얻고 싶을 때.
Args:
question: 질문
search_docs: Experion HS R530 공식 문서 검색 여부 (기본 True)
@@ -161,9 +231,228 @@ def rag_query(question: str, search_code: bool = False, search_docs: bool = True
context_parts.append(f"=== Experion HS R530 공식 문서 ===\n{_search(COL_OPC_DOCS, question, 4)}")
if search_code:
context_parts.append(f"=== ExperionCrawler 구현 코드 ===\n{_search(COL_CODEBASE, question, 3)}")
return ask_iiot_llm(question, "\n\n".join(context_parts))
# ── NL2SQL 도구 ───────────────────────────────────────────────────────────────
@mcp.tool()
def run_sql(sql: str) -> str:
"""SQL 쿼리 실행 (SELECT만 허용).
Args:
sql: 실행할 SELECT SQL 문자열
Returns:
JSON: { success, columns, count, data } 또는 { success, error }
"""
valid, err = _validate_sql(sql)
if not valid:
return json.dumps({"success": False, "error": f"SQL 검증 실패: {err}"}, ensure_ascii=False)
try:
conn = _get_db_connection()
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall()
columns = [desc[0] for desc in cur.description]
result_data = [dict(zip(columns, row)) for row in rows]
return json.dumps({
"success": True,
"columns": columns,
"count": len(result_data),
"data": result_data
}, ensure_ascii=False, default=str)
except Exception as e:
return json.dumps({"success": False, "error": f"SQL 실행 실패: {e}"}, ensure_ascii=False)
@mcp.tool()
def query_pv_history(tag_names: list[str], time_from: str, time_to: str, limit: int = 100) -> str:
"""과거 값(PV) 히스토리 조회.
Args:
tag_names: 태그 이름 목록 (예: ["ficq-6113.pv", "ti-6101.pv"])
time_from: 시작 시간 (ISO 8601, 예: "2026-04-01T00:00:00")
time_to: 종료 시간 (ISO 8601, 예: "2026-04-02T00:00:00")
limit: 반환 행 수 제한 (기본 100, 최대 5000)
Returns:
JSON: { success, tag_names, time_range, limit, data }
"""
try:
limit = min(limit, 5000)
conn = _get_db_connection()
with conn.cursor() as cur:
cur.execute(
"""SELECT tagname, recorded_at, value
FROM history_table
WHERE tagname = ANY(%s)
AND recorded_at >= %s AND recorded_at <= %s
ORDER BY recorded_at, tagname
LIMIT %s""",
(tag_names, time_from, time_to, limit)
)
rows = cur.fetchall()
data = [{"tag_name": r[0], "timestamp": r[1].isoformat(), "value": r[2]} for r in rows]
return json.dumps({
"success": True,
"tag_names": tag_names,
"time_range": f"{time_from} ~ {time_to}",
"count": len(data),
"data": data
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"히스토리 쿼리 실패: {e}"}, ensure_ascii=False)
@mcp.tool()
def get_tag_metadata(query: str, limit: int = 10) -> str:
"""태그 메타데이터 검색 (realtime_table 기반).
Args:
query: 태그명 검색어 (패턴 매칭)
limit: 반환 태그 수 제한 (기본 10)
Returns:
JSON: { success, query, count, tags }
"""
try:
conn = _get_db_connection()
with conn.cursor() as cur:
cur.execute(
"""SELECT tagname, livevalue, timestamp, node_id
FROM realtime_table
WHERE tagname ILIKE %s
ORDER BY tagname LIMIT %s""",
(f"%{query}%", limit)
)
rows = cur.fetchall()
tags = [{"tag_name": r[0], "current_value": r[1],
"last_updated": r[2].isoformat() if r[2] else None,
"node_id": r[3]} for r in rows]
return json.dumps({"success": True, "query": query, "count": len(tags), "tags": tags},
ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"태그 메타데이터 검색 실패: {e}"}, ensure_ascii=False)
@mcp.tool()
def list_drawings(unit_no: str | None = None) -> str:
"""단위별 도면 목록 조회 (node_map_master.name 기반).
Args:
unit_no: 단위 번호 접두사 (예: "A", "B"). None이면 전체 목록
Returns:
JSON: { success, unit_no, count, names }
"""
try:
conn = _get_db_connection()
with conn.cursor() as cur:
if unit_no:
cur.execute(
"SELECT DISTINCT name FROM node_map_master WHERE name ILIKE %s ORDER BY name LIMIT 100",
(f"{unit_no}%",)
)
else:
cur.execute("SELECT DISTINCT name FROM node_map_master ORDER BY name LIMIT 100")
rows = cur.fetchall()
return json.dumps({"success": True, "unit_no": unit_no,
"count": len(rows), "names": [r[0] for r in rows]},
ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"도면 목록 조회 실패: {e}"}, ensure_ascii=False)
@mcp.tool()
def query_with_nl(question: str) -> str:
"""자연어 질문을 LLM이 SQL로 변환하고 시계열 DB를 조회합니다.
Args:
question: 자연어 질문 (예: "FICQ-6113.PV 최근 1시간 값을 1분 단위로 표시")
Returns:
JSON: { sql, success, columns, count, data } 또는 { sql, success, error }
"""
system = (
"You are a PostgreSQL SQL expert.\n"
"Convert the user's question into a SELECT SQL using the schema below.\n"
"IMPORTANT rules:\n"
"- Use ONLY PostgreSQL syntax. No DATE_FORMAT, no INTERVAL N DAY.\n"
"- Time column is 'recorded_at' (TIMESTAMPTZ). Do NOT use 'timestamp'.\n"
"- NEVER use time_bucket(). For N-minute buckets use to_timestamp/FLOOR/EPOCH formula.\n"
"- INTERVAL rule:\n"
" * If the question specifies an interval (e.g. '2분 간격', '5-minute interval'):\n"
" use: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)) AS bucket\n"
" with GROUP BY bucket, tagname and AVG(value::double precision) AS avg_val\n"
" * If NO interval is specified: SELECT recorded_at, tagname, value — NO GROUP BY.\n"
"- Current year is 2026. '4월 27일' means 2026-04-27.\n"
"- All times in DB are UTC. Korean input is KST (UTC+9). Convert: KST 12:00 = UTC 03:00.\n"
"- value column is TEXT; cast with ::double precision only when aggregating.\n"
"- All tagnames are lowercase (e.g. 'ficq-6113.pv'). Match exactly.\n"
"- PostgreSQL LIKE: dot has no special meaning, no escaping needed.\n"
"- Return ONLY the SQL statement. No explanation, no markdown.\n\n"
f"{_DB_SCHEMA}"
)
try:
resp = _llm().chat.completions.create(
model=VLLM_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": question},
],
max_tokens=8192,
temperature=0.1,
)
sql = (resp.choices[0].message.content or "").strip()
# 마크다운 코드 블록 제거
if sql.startswith("```"):
lines = sql.splitlines()
sql = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]).strip()
if not sql:
return json.dumps({"success": False, "sql": "", "error": "LLM이 SQL을 생성하지 못했습니다."}, ensure_ascii=False)
except Exception as e:
return json.dumps({"success": False, "sql": "", "error": f"LLM SQL 생성 실패: {e}"}, ensure_ascii=False)
# SQL 실행
raw = run_sql(sql)
result = json.loads(raw)
result["sql"] = sql
# long format → pivot 변환 (tagname 컬럼이 있으면 자동 PIVOT)
if result.get("success") and "data" in result:
cols = result.get("columns", [])
data = result["data"]
if "tagname" in cols and data:
time_col = next((c for c in cols if c not in ("tagname", "value", "livevalue", "avg_val")), None)
val_col = next((c for c in ("avg_val", "value") if c in cols), cols[-1])
if time_col:
tag_names_list = sorted(dict.fromkeys(row["tagname"] for row in data))
pivoted: dict = {}
for row in data:
key = str(row[time_col])
if key not in pivoted:
pivoted[key] = {time_col: row[time_col]}
pivoted[key][row["tagname"]] = row.get(val_col)
result["data"] = list(pivoted.values())
result["columns"] = [time_col] + tag_names_list
result["count"] = len(result["data"])
return json.dumps(result, ensure_ascii=False, default=str)
# ── 엔트리포인트 ──────────────────────────────────────────────────────────────
def main():
"""HTTP 모드로 실행 — C# McpClient (localhost:5001) 용."""
mcp.run(transport="streamable-http")
if __name__ == "__main__":
mcp.run(transport="stdio")
# --http 플래그: HTTP 모드 (C# McpClient 용)
# 플래그 없음: stdio 모드 (Claude Code / Roo Code MCP 용)
if "--http" in sys.argv:
mcp.run(transport="streamable-http")
else:
mcp.run(transport="stdio")