fix: Phase 5 진단 핫픽스 + Phase 6 run_sql 안전 가드

진단 보고서(plans/...phase5-사용자체크리스트.md) 기반 7건 코드 이슈
수정 + Phase 6 잔여 항목 중 최우선인 run_sql 가드 구현.

핫픽스:
- nl2sql_worker.py: _list_drawings 파싱 버그(문자열 분리) HIGH
- nl2sql_worker.py: 5개 async 함수 blocking DB 연결 → to_thread MED
- ExperionDbContext.cs: KB DDL의 {} 문자가 String.Format placeholder로
  오인되어 부팅 실패 → 별도 NpgsqlCommand 사용 HIGH
- KbIngestWorker: 단일 청크 임베딩 실패 시 전체 abort → 부분 인덱싱 LOW
- KbAuthService: 초기 비번 로그 평문 → 마스킹 + 콘솔 분리 출력 LOW
- KbQdrantClient: new HttpClient → IHttpClientFactory LOW
- OllamaController: plant_context.md 매 요청 파일 읽기 → mtime 캐시 LOW

Phase 6 — run_sql 가드:
- _validate_sql 강화: \b 단어 경계로 updated_at 오탐 제거, WITH 허용,
  TRUNCATE/COPY 추가, 다중 세미콜론 차단
- _apply_sql_guards: LIMIT 미지정 시 SELECT * FROM (...) _capped LIMIT 1000
- _execute_sql_internal: 매 호출 SET statement_timeout = 30000
- SQL_MAX_ROWS / SQL_STATEMENT_TIMEOUT_MS 환경변수화
- 응답 JSON에 row_limit 필드 추가
- nl2sql_worker.py의 _run_sql / _query_with_nl에도 동일 적용

기타:
- .gitignore: storage/ 추가 (KB 업로드 원본 디렉토리)
- opencode.json: 모델 항목을 실제 서빙 모델(Qwen3.6-27B-FP8 / 256K)로 동기화

검증:
- dotnet build: 경고 0건, 에러 0건
- python3 -m py_compile: OK
- _apply_sql_guards / _validate_sql 스모크 테스트 통과

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
windpacer
2026-05-14 05:18:06 +09:00
parent 908bfe151f
commit 5a9d60e8a8
13 changed files with 473 additions and 136 deletions

View File

@@ -11,6 +11,7 @@ ExperionCrawler Unified MCP Server
from __future__ import annotations
import sys
import os
import re
import json
import logging
import httpx
@@ -325,21 +326,39 @@ async def _get_db_connection():
def _validate_sql(sql: str) -> tuple[bool, str]:
"""SQL 안전 검증 — SELECT만 허용, 위험 키워드 차단."""
"""SQL 안전 검증 — SELECT/WITH만 허용, 위험 키워드 차단."""
if len(sql) > 2000:
return False, "쿼리 길이 2000자를 초과했습니다."
dangerous = ['EXEC', 'DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE', 'GRANT', 'REVOKE']
dangerous = ['EXEC', 'DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE', 'GRANT', 'REVOKE', 'TRUNCATE', 'COPY']
sql_upper = sql.upper()
for kw in dangerous:
if kw in sql_upper:
if re.search(rf"\b{kw}\b", sql_upper):
return False, f"허용되지 않은 키워드 '{kw}'를 사용했습니다."
if not sql_upper.strip().startswith('SELECT'):
return False, "단순 SELECT 쿼리만 허용됩니다."
head = sql_upper.lstrip().lstrip('(').lstrip()
if not (head.startswith('SELECT') or head.startswith('WITH')):
return False, "SELECT 또는 WITH 쿼리만 허용됩니다."
if '..' in sql or '~' in sql:
return False, "파일 경로 표현은 허용되지 않습니다."
if ';' in sql.rstrip().rstrip(';'):
return False, "다중 문장(세미콜론)은 허용되지 않습니다."
return True, ""
# SQL 가드 — auto-LIMIT + statement_timeout
SQL_MAX_ROWS = int(os.environ.get("SQL_MAX_ROWS", "1000"))
SQL_STATEMENT_TIMEOUT_MS = int(os.environ.get("SQL_STATEMENT_TIMEOUT_MS", "30000"))
_RE_LIMIT_TAIL = re.compile(r"\bLIMIT\b\s+\d+(\s+OFFSET\s+\d+)?\s*$", re.IGNORECASE)
def _apply_sql_guards(sql: str, max_rows: int = SQL_MAX_ROWS) -> str:
"""LIMIT가 없으면 서브쿼리로 감싸 강제 부착. 이미 끝부분에 LIMIT가 있으면 그대로."""
s = sql.strip().rstrip(';').strip()
if _RE_LIMIT_TAIL.search(s):
return s
return f"SELECT * FROM ({s}) _capped LIMIT {max_rows}"
# DB 스키마 — LLM SQL 생성 시 컨텍스트로 사용
_DB_SCHEMA = """
PostgreSQL 시계열 데이터베이스 스키마
@@ -627,16 +646,21 @@ async def search_kb(
# ── NL2SQL 도구 ───────────────────────────────────────────────────────────────
async def _execute_sql_internal(sql: str) -> str:
"""SQL 실행 내부 함수 (run_sql과 query_with_nl에서 공유)."""
"""SQL 실행 내부 함수 (run_sql과 query_with_nl에서 공유).
가드: LIMIT 미지정 시 자동 LIMIT 부착(SQL_MAX_ROWS), statement_timeout 적용.
"""
valid, err = _validate_sql(sql)
if not valid:
return json.dumps({"success": False, "error": f"SQL 검증 실패: {err}"}, ensure_ascii=False)
capped_sql = _apply_sql_guards(sql)
conn = None
try:
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(sql)
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
cur.execute(capped_sql)
rows = cur.fetchall()
columns = [desc[0] for desc in cur.description]
result_data = [dict(zip(columns, row)) for row in rows]
@@ -644,6 +668,7 @@ async def _execute_sql_internal(sql: str) -> str:
"success": True,
"columns": columns,
"count": len(result_data),
"row_limit": SQL_MAX_ROWS,
"data": result_data
}, ensure_ascii=False, default=str)
except Exception as e:
@@ -654,13 +679,19 @@ async def _execute_sql_internal(sql: str) -> str:
@mcp.tool()
async def run_sql(sql: str) -> str:
"""SQL 쿼리 실행 (SELECT만 허용).
"""SQL 쿼리 실행 (SELECT/WITH만 허용).
안전 가드:
- 위험 키워드(INSERT/UPDATE/DELETE/DROP/ALTER/CREATE/GRANT/REVOKE/TRUNCATE/COPY/EXEC) 차단
- 다중 문장(세미콜론) 차단
- LIMIT 미지정 시 SQL_MAX_ROWS(기본 1000)로 자동 제한
- statement_timeout = SQL_STATEMENT_TIMEOUT_MS(기본 30s)
Args:
sql: 실행할 SELECT SQL 문자열
sql: 실행할 SELECT/WITH SQL 문자열 (최대 2000자)
Returns:
JSON: { success, columns, count, data } 또는 { success, error }
JSON: { success, columns, count, row_limit, data } 또는 { success, error }
"""
return await _execute_sql_internal(sql)

View File

@@ -16,6 +16,7 @@ Usage: python nl2sql_worker.py <port>
from __future__ import annotations
import sys
import os
import re
# mcp-server 디렉토리를 Python 경로에 추가
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -51,6 +52,45 @@ def _get_db_connection():
import psycopg
return psycopg.connect(DB_CONNECTION_STRING, connect_timeout=DB_TIMEOUT)
async def _aget_db_connection():
"""비동기 환경에서 안전하게 DB 연결 획득 (blocking connect를 to_thread로 격리)."""
import asyncio
return await asyncio.to_thread(_get_db_connection)
# ── SQL 가드 ─────────────────────────────────────────────────────────────────
SQL_MAX_ROWS = int(os.environ.get("SQL_MAX_ROWS", "1000"))
SQL_STATEMENT_TIMEOUT_MS = int(os.environ.get("SQL_STATEMENT_TIMEOUT_MS", "30000"))
_RE_LIMIT_TAIL = re.compile(r"\bLIMIT\b\s+\d+(\s+OFFSET\s+\d+)?\s*$", re.IGNORECASE)
_DANGEROUS_KW = ('EXEC', 'DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE', 'GRANT', 'REVOKE', 'TRUNCATE', 'COPY')
def _validate_sql(sql: str) -> tuple[bool, str]:
"""SELECT/WITH만 허용, 위험 키워드/다중 문장 차단."""
if not sql or len(sql) > 2000:
return False, "쿼리가 비어있거나 2000자를 초과했습니다."
upper = sql.upper()
for kw in _DANGEROUS_KW:
if re.search(rf"\b{kw}\b", upper):
return False, f"허용되지 않은 키워드 '{kw}'"
head = upper.lstrip().lstrip('(').lstrip()
if not (head.startswith('SELECT') or head.startswith('WITH')):
return False, "SELECT 또는 WITH 쿼리만 허용됩니다."
if ';' in sql.rstrip().rstrip(';'):
return False, "다중 문장(세미콜론)은 허용되지 않습니다."
return True, ""
def _apply_sql_guards(sql: str, max_rows: int = SQL_MAX_ROWS) -> str:
s = sql.strip().rstrip(';').strip()
if _RE_LIMIT_TAIL.search(s):
return s
return f"SELECT * FROM ({s}) _capped LIMIT {max_rows}"
# ── LLM 클라이언트 ───────────────────────────────────────────────────────────
@lru_cache(maxsize=1)
@@ -206,11 +246,17 @@ async def execute(request: Request):
return {"success": False, "error": str(e)}
async def _run_sql(sql: str) -> str:
"""SQL 실행."""
conn = _get_db_connection()
"""SQL 실행 (가드: SELECT/WITH만, auto-LIMIT, statement_timeout)."""
valid, err = _validate_sql(sql)
if not valid:
return {"success": False, "error": f"SQL 검증 실패: {err}"}
capped_sql = _apply_sql_guards(sql)
conn = await _aget_db_connection()
try:
with conn.cursor() as cur:
cur.execute(sql)
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
cur.execute(capped_sql)
if cur.description:
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
@@ -219,6 +265,7 @@ async def _run_sql(sql: str) -> str:
"success": True,
"columns": columns,
"count": len(data),
"row_limit": SQL_MAX_ROWS,
"data": data,
}
else:
@@ -227,6 +274,8 @@ async def _run_sql(sql: str) -> str:
"success": True,
"message": f"Query executed successfully. {cur.rowcount} rows affected.",
}
except Exception as e:
return {"success": False, "error": f"SQL 실행 실패: {e}"}
finally:
conn.close()
@@ -235,7 +284,7 @@ async def _query_pv_history(tag_names: list[str], time_from: str, time_to: str,
if not tag_names:
return {"success": False, "error": "tag_names is required"}
conn = _get_db_connection()
conn = await _aget_db_connection()
try:
with conn.cursor() as cur:
cur.execute(
@@ -266,7 +315,7 @@ async def _query_pv_history(tag_names: list[str], time_from: str, time_to: str,
async def _get_tag_metadata(query: str, limit: int = 10) -> str:
"""태그 메타데이터 검색."""
conn = _get_db_connection()
conn = await _aget_db_connection()
try:
with conn.cursor() as cur:
cur.execute(
@@ -301,7 +350,7 @@ async def _get_tag_metadata(query: str, limit: int = 10) -> str:
async def _list_drawings(unit_no: str = None) -> str:
"""단위별 도면 목록 조회."""
conn = _get_db_connection()
conn = await _aget_db_connection()
try:
with conn.cursor() as cur:
if unit_no:
@@ -322,14 +371,13 @@ async def _list_drawings(unit_no: str = None) -> str:
ORDER BY name
"""
)
columns = ["name"]
rows = cur.fetchall()
data = [dict(zip(columns, row[0])) for row in rows]
names = [row[0] for row in rows]
return {
"success": True,
"unit_no": unit_no,
"count": len(data),
"names": [d["name"] for d in data],
"count": len(names),
"names": names,
}
finally:
conn.close()
@@ -343,10 +391,17 @@ async def _query_with_nl(question: str) -> str:
if not sql:
return json.dumps({"success": False, "sql": "", "error": "LLM이 SQL을 생성하지 못했습니다."}, ensure_ascii=False)
conn = _get_db_connection()
# LLM 생성 SQL도 동일 가드 적용
valid, err = _validate_sql(sql)
if not valid:
return {"success": False, "sql": sql, "error": f"SQL 검증 실패: {err}"}
capped_sql = _apply_sql_guards(sql)
conn = await _aget_db_connection()
try:
with conn.cursor() as cur:
cur.execute(sql)
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
cur.execute(capped_sql)
if cur.description:
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
@@ -356,6 +411,7 @@ async def _query_with_nl(question: str) -> str:
"sql": sql,
"columns": columns,
"count": len(data),
"row_limit": SQL_MAX_ROWS,
"data": data,
}
else: