diff --git a/mcp-server/server.py b/mcp-server/server.py index f1f8aa3..783a003 100644 --- a/mcp-server/server.py +++ b/mcp-server/server.py @@ -61,6 +61,9 @@ KB_COLLECTIONS = { # PostgreSQL 연결 DB_CONNECTION_STRING = os.environ.get("DB_CONNECTION_STRING", "postgresql://postgres:postgres@localhost:5432/iiot_platform") +# 라이브 데이터는 hc900 스키마에 있음. search_path 미지정 시 기본 public(레거시/stale)로 해석되므로 강제 지정. +if "search_path" not in DB_CONNECTION_STRING and "options=" not in DB_CONNECTION_STRING: + DB_CONNECTION_STRING += ("&" if "?" in DB_CONNECTION_STRING else "?") + "options=-csearch_path%3Dhc900" DB_TIMEOUT = int(os.environ.get("DB_TIMEOUT", "10")) # C# McpClient(localhost:5001)와 통신: json_response+stateless로 단순 POST→JSON 방식 @@ -707,7 +710,8 @@ Views: v_tag_summary(base_tag TEXT, pv TEXT, sp TEXT, op TEXT, description TEXT, area TEXT, sub_area TEXT) Rules: - - SELECT only. tagname lowercase exact match. + - SELECT only. tagname UPPERCASE exact match (e.g. 'FICQ-6113.PV'). + - If user input is lowercase, convert to UPPERCASE before querying. - value is TEXT; cast ::double precision when aggregating. - time_bucket() banned. For N-min buckets: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)) - KST input = UTC-9 in DB. @@ -1352,7 +1356,8 @@ async def query_with_nl(question: str) -> str: "You are a PostgreSQL SQL expert.\n" "Convert the user's question into a SELECT SQL.\n" "Return ONLY the SQL. No explanation, no markdown, NO tags.\n" - "Use PostgreSQL syntax. tagname lowercase exact match.\n" + "Use PostgreSQL syntax. tagname UPPERCASE exact match (e.g. 'FICQ-6113.PV').\n" + "If user input is lowercase, convert to UPPERCASE before querying.\n" "value is TEXT; cast ::double precision when aggregating.\n" "KST input = UTC-9. Example: KST 12:00 = UTC 03:00.\n" "For N-min buckets: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)).\n" @@ -1622,7 +1627,7 @@ async def trace_connections(start_tag: str, direction: str = "downstream", max_d # 각 노드에 실시간 상태(pv) 부착 — 병렬 펌프 중 '실제 가동 중'인 것을 식별. # 예: F-6101A/B 상류에 P-6102(R-RUN)·P-6201(L-STOP)이 병렬이면 현재 공급원은 P-6102. if path: - pv_tags = [p["tag_no"].lower() + ".pv" for p in path] + pv_tags = [p["tag_no"].upper() + ".PV" for p in path] cur.execute( "SELECT tagname, livevalue FROM realtime_table WHERE tagname = ANY(%s)", (pv_tags,), @@ -1632,7 +1637,7 @@ async def trace_connections(start_tag: str, direction: str = "downstream", max_d m = re.match(r'\{\s*\d+\s*\|\s*([^|]+?)\s*\|', lv or '') pv_map[tn[:-3]] = (m.group(1).strip() if m else (lv or None)) for p in path: - p["live_state"] = pv_map.get(p["tag_no"].lower()) + p["live_state"] = pv_map.get(p["tag_no"]) return json.dumps({ "success": True, diff --git a/mcp-server/worker/nl2sql_worker.py b/mcp-server/worker/nl2sql_worker.py index 955313d..29b54fc 100644 --- a/mcp-server/worker/nl2sql_worker.py +++ b/mcp-server/worker/nl2sql_worker.py @@ -32,6 +32,9 @@ import httpx # ── 설정 ───────────────────────────────────────────────────────────────────── DB_CONNECTION_STRING = os.environ.get("DB_CONNECTION_STRING", "postgresql://postgres:postgres@localhost:5432/iiot_platform") +# 라이브 데이터는 hc900 스키마에 있음. search_path 미지정 시 기본 public(레거시/stale)로 해석되므로 강제 지정. +if "search_path" not in DB_CONNECTION_STRING and "options=" not in DB_CONNECTION_STRING: + DB_CONNECTION_STRING += ("&" if "?" in DB_CONNECTION_STRING else "?") + "options=-csearch_path%3Dhc900" DB_TIMEOUT = int(os.environ.get("DB_TIMEOUT", "10")) VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8000/v1") @@ -101,6 +104,61 @@ def _llm_client(): # DB 스키마 + SQL system 프롬프트 — worker/sql_prompt.py 로 단일화(production+eval 공유) from sql_prompt import DB_SCHEMA, SQL_SYSTEM_PROMPT # noqa: E402,F401 +# ── 태그명 대소문자 정규화 ──────────────────────────────────────────────────── + +# DB에 실제로 저장된 태그명 목록 (대문자) +_tag_cache: dict | None = None +_tag_cache_lock: object | None = None # placeholder — sync path uses _get_db_connection + + +def _load_tag_names() -> set[str]: + """realtime_table에서 현재 태그명 목록을 로드 (대문자).""" + global _tag_cache + try: + conn = _get_db_connection() + with conn.cursor() as cur: + cur.execute("SELECT tagname FROM hc900.realtime_table WHERE tagname IS NOT NULL") + _tag_cache = {r[0] for r in cur.fetchall()} + conn.close() + return _tag_cache + except Exception: + if _tag_cache is None: + _tag_cache = set() + return _tag_cache + + +def _normalize_tag_case(sql: str) -> str: + """SQL 내의 태그명을 실제 DB 태그명으로 정규화 (대소문자 불일치 해결). + + LLM이 소문자('ficq-6118.pv')로 생성한 태그명을 + 실제 DB에 저장된 대문자('FICQ-6118.PV')로 변환. + """ + if not _tag_cache: + _load_tag_names() + + if not _tag_cache: + return sql + + # 태그명 패턴: 알파벳+숫자+하이픈+접미사(.pv/.sp/.op 등) + # 예: ficq-6118.pv, FICQ-6118.PV, xv-6105.pv + tag_pattern = re.compile(r"\b([a-zA-Z][a-zA-Z0-9]*-?[a-zA-Z0-9]*)(\.[a-zA-Z0-9_]+)?\b") + + def _replace_tag(m: re.Match) -> str: + base = m.group(1) + suffix = m.group(2) or "" + candidate = base.upper() + suffix + if candidate in _tag_cache: + return candidate + # 접미사만 소문자일 수 있음 (예: FICQ-6118.pv → FICQ-6118.PV) + candidate2 = base.upper() + suffix.upper() + if candidate2 in _tag_cache: + return candidate2 + return m.group(0) + + normalized = tag_pattern.sub(_replace_tag, sql) + return normalized + + async def _generate_sql(natural_language: str) -> str: """자연어를 SQL로 변환.""" client = _llm_client() @@ -121,6 +179,8 @@ async def _generate_sql(natural_language: str) -> str: if sql.startswith("```"): lines = sql.splitlines() sql = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]).strip() + # 태그명 대소문자 정규화 + sql = _normalize_tag_case(sql) return sql # ── NL2SQL 도구 구현 ───────────────────────────────────────────────────────── diff --git a/mcp-server/worker/pid_worker.py b/mcp-server/worker/pid_worker.py index 5a40952..a7bed91 100644 --- a/mcp-server/worker/pid_worker.py +++ b/mcp-server/worker/pid_worker.py @@ -33,6 +33,9 @@ VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8000/v1 from config import get_vllm_model VLLM_MODEL = get_vllm_model() DB_CONNECTION_STRING = os.environ.get("DB_CONNECTION_STRING", "postgresql://postgres:postgres@localhost:5432/iiot_platform") +# 라이브 데이터는 hc900 스키마에 있음. search_path 미지정 시 기본 public(레거시/stale)로 해석되므로 강제 지정. +if "search_path" not in DB_CONNECTION_STRING and "options=" not in DB_CONNECTION_STRING: + DB_CONNECTION_STRING += ("&" if "?" in DB_CONNECTION_STRING else "?") + "options=-csearch_path%3Dhc900" DB_TIMEOUT = int(os.environ.get("DB_TIMEOUT", "10")) _SERVER_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) diff --git a/mcp-server/worker/sql_prompt.py b/mcp-server/worker/sql_prompt.py index 0942744..93a88fe 100644 --- a/mcp-server/worker/sql_prompt.py +++ b/mcp-server/worker/sql_prompt.py @@ -20,13 +20,13 @@ PostgreSQL 시계열 데이터베이스 스키마 ※ DCS 태그: SELECT WHERE tag_dcs=TRUE, 현장 계기: WHERE tag_dcs=FALSE AND category='instrument' 테이블: history_table (시계열 이력) - tagname TEXT - 태그명 (모두 소문자, 예: 'ficq-6113.pv') — 대소문자 구분 + tagname TEXT - 태그명 (대문자, 예: 'FICQ-6113.PV') — 대소문자 구분 node_id TEXT - OPC UA 노드 ID value TEXT - 측정값, 수치 연산 시 ::double precision 캐스트 필요 recorded_at TIMESTAMPTZ - 기록 시각(UTC), 스냅샷 주기 약 60초 테이블: realtime_table (실시간 최신값) - tagname TEXT - 태그명 (모두 소문자) + tagname TEXT - 태그명 (대문자) node_id TEXT - OPC UA 노드 ID livevalue TEXT - 현재값 timestamp TIMESTAMPTZ - 최종 갱신 시각 @@ -134,7 +134,7 @@ SQL_SYSTEM_PROMPT = ( " SELECT to_timestamp(...) AT TIME ZONE 'Asia/Seoul' AS bucket, AVG(...) AS avg_val\n" " FROM ... GROUP BY to_timestamp(...), tagname ORDER BY to_timestamp(...), tagname\n" "- value column is TEXT; cast with ::double precision only when aggregating.\n" - "- All tagnames are lowercase (e.g. 'ficq-6113.pv'). Match exactly.\n" + "- All tagnames are UPPERCASE (e.g. 'FICQ-6113.PV'). Match exactly.\n" "- PostgreSQL LIKE: dot has no special meaning, no escaping needed.\n" "- Return ONLY the SQL statement. No explanation, no markdown.\n\n" f"{DB_SCHEMA}"