fix(nl2sql): query_with_nl 프롬프트 단일화 + 극값 CTE(전체반환·앵커) + 결정론 태그 검증 게이트
- server.py: inline _DB_SCHEMA 제거, worker/sql_prompt.SQL_SYSTEM_PROMPT로 단일화. query_pv_history/query_with_nl description 보강(집계·극값은 query_with_nl 유도). - 결정론 태그 검증 게이트(_verify_sql_tags)+피드백 재시도(최대3회): 환각 태그는 실행 거부·교정, 끝내 미존재면 빈 결과 대신 명시적 에러(날조 차단). 대소문자 교정. - sql_prompt.py: 극값→관련태그 CTE를 'value=MAX(...) 모든 시각 반환 + max_occurrences + 앵커(극값기준 태그) 포함'으로 교체. 태그 대문자 규칙. - 피벗 시 max_occurrences 등 스칼라 컬럼 보존. - curate/golden: 극값 CTE(전체반환·앵커≠요청) 학습예제·eval 추가. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -38,3 +38,5 @@
|
||||
{"id":"ground-04","category":"grounding","question":"PGMEA 측류추출에서 초고순도 제품은 탑의 어디에서 뽑아?","context":"측류추출: 탑 상부=경비물 제거, 탑 하부=중비물 제거, 탑 중간(Middle)=초고순도 PGMEA 제품 측류 추출(Side-stream Draw).","expect":{"answer_contains":["중간"]}}
|
||||
{"id":"ground-05","category":"grounding","question":"PGMEA 정제에 진공 증류가 필요한 이유는?","context":"PGMEA 상압 끓는점 약 146도, 분해온도 180도 이상에서 열분해 시작. 끓는점이 분해온도에 가까워 진공(약 50~100 Torr)으로 끓는점을 70~90도대로 낮춰 열분해를 방지한다.","expect":{"answer_contains":["열분해"]}}
|
||||
{"id":"ground-06","category":"grounding","question":"1차 플랜트에 등록된 태그 수는 몇 개야?","context":"area별 태그 수(참고): P1=87, P2=142, P3=50, P6=121.","expect":{"answer_contains":["87"]}}
|
||||
{"id":"nl2sql-11","category":"nl2sql","question":"FICQ-6101.PV 최근 48시간 최대값이 발생한 시각(들)의 PV·SP·OP를 전부 보여주고 반복 횟수도 알려줘","expect":{"sql_contains":["with","max(","recorded_at in","max_occurrences"],"must_not":["time_bucket","limit 1"]}}
|
||||
{"id":"nl2sql-12","category":"nl2sql","question":"TI-6111C 최근 24시간 최대값이 발생한 시각들의 TICA-6111A PV·SP·OP와 FIQ-6115 값을 전부 보여줘","expect":{"sql_contains":["with","max(","ti-6111c.pv","tica-6111a","fiq-6115","max_occurrences"],"must_not":["time_bucket","limit 1"]}}
|
||||
|
||||
@@ -45,6 +45,8 @@ EMBED_MODEL = os.environ.get("EMBED_MODEL", "nomic-embed-text")
|
||||
VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8000/v1")
|
||||
from config import get_vllm_model
|
||||
VLLM_MODEL = get_vllm_model()
|
||||
# NL2SQL 프롬프트 단일 소스 — worker/sql_prompt.py (production + eval 공유)
|
||||
from worker.sql_prompt import SQL_SYSTEM_PROMPT
|
||||
|
||||
# Qdrant 컬렉션
|
||||
COL_CODEBASE = "ws-65f457145aee80b2" # ExperionCrawler 소스코드
|
||||
@@ -694,29 +696,8 @@ def _apply_sql_guards(sql: str, max_rows: int = SQL_MAX_ROWS) -> str:
|
||||
return f"SELECT * FROM ({s}) _capped LIMIT {max_rows}"
|
||||
|
||||
|
||||
# Compact DB schema for LLM SQL generation
|
||||
_DB_SCHEMA = """
|
||||
Tables:
|
||||
history_table(tagname TEXT, value TEXT, recorded_at TIMESTAMPTZ)
|
||||
realtime_table(tagname TEXT, livevalue TEXT, timestamp TIMESTAMPTZ)
|
||||
tag_metadata(base_tag TEXT, attribute TEXT, value TEXT)
|
||||
event_history_table(tagname TEXT, prev_value TEXT, curr_value TEXT, event_type TEXT, event_time TIMESTAMPTZ, duration_seconds INT)
|
||||
pid_equipment(tag_no TEXT, category TEXT, tag_dcs BOOL, tag_class TEXT, instrument_type TEXT, from_tag TEXT, to_tag TEXT)
|
||||
-- tag_dcs=TRUE: DCS 함수블록(FIC/TIC/PIC류), FALSE: 현장 물리 계기(FT/FCV류)
|
||||
-- tag_class: 'field'(현장) / 'system'(DCS) — tag_dcs 기반
|
||||
-- from_tag(상류) → tag_no → to_tag(하류) 연결 추적
|
||||
|
||||
Views:
|
||||
v_tag_summary(base_tag TEXT, pv TEXT, sp TEXT, op TEXT, description TEXT, area TEXT, sub_area TEXT)
|
||||
|
||||
Rules:
|
||||
- SELECT only. tagname UPPERCASE exact match (e.g. 'FICQ-6113.PV').
|
||||
- If user input is lowercase, convert to UPPERCASE before querying.
|
||||
- value is TEXT; cast ::double precision when aggregating.
|
||||
- time_bucket() banned. For N-min buckets: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60))
|
||||
- KST input = UTC-9 in DB.
|
||||
- sub_area는 "P6-1" 또는 공용 "P6-1,P6-2" 형식. 매칭은 항상 토큰 비교: 'P6-1' = ANY(string_to_array(sub_area, ','))
|
||||
"""
|
||||
# DB 스키마/SQL system 프롬프트는 worker/sql_prompt.py 로 단일화됨 (위에서 SQL_SYSTEM_PROMPT import).
|
||||
# 이전의 inline _DB_SCHEMA 사본은 제거 — query_with_nl 은 SQL_SYSTEM_PROMPT 를 직접 사용.
|
||||
|
||||
|
||||
def _area_or_subarea_filter(area: str | None, tagname_col: str, area_col: str) -> tuple[str, list]:
|
||||
@@ -1151,7 +1132,11 @@ async def upsert_pid_connection(
|
||||
|
||||
@mcp.tool()
|
||||
async def query_pv_history(tag_names: list[str], time_from: str, time_to: str, limit: int = 100) -> str:
|
||||
"""과거 값(PV) 히스토리 조회.
|
||||
"""과거 값(PV) **원시 시계열 행**만 그대로 반환 (집계/계산 없음).
|
||||
|
||||
⚠️ 최대/최소/평균 등 **집계**나 '극값이 발생한 **시각**의 관련 태그(SP/OP 등) 조회'가
|
||||
필요하면 이 도구가 아니라 **query_with_nl** 을 사용할 것 (CTE로 정확히 계산됨).
|
||||
이 도구로 원시 행을 받아 모델이 직접 max/시각매칭을 추정하면 오답이 발생함.
|
||||
|
||||
Args:
|
||||
tag_names: 태그 이름 목록 (예: ["ficq-6113.pv", "ti-6101.pv"])
|
||||
@@ -1311,10 +1296,78 @@ async def classify_intent(question: str) -> str:
|
||||
return json.dumps({"success": True, "question": question, "route": route}, ensure_ascii=False)
|
||||
|
||||
|
||||
# ── 결정론적 SQL 태그 검증 게이트 (LLM 추론을 DB로 검증 → 피드백 루프) ──────────────
|
||||
_REALTAG_CACHE: dict = {"tags": None, "at": 0.0}
|
||||
_REALTAG_TTL = 300.0 # 5분
|
||||
# 계기 태그 literal: 영문 prefix(1~6) + '-' + 숫자 시작 + 선택 .suffix.
|
||||
# ('P6-1' 같은 area 코드는 prefix에 숫자가 있어 매칭 제외 → 오검출 방지)
|
||||
_SQL_TAG_RE = re.compile(r"'([A-Za-z]{1,6}-\d[0-9A-Za-z]*(?:\.[A-Za-z0-9]+)?)'")
|
||||
|
||||
|
||||
def _load_real_tagnames() -> set[str]:
|
||||
"""realtime_table 의 실제 태그명 집합(원본 대소문자) — 5분 캐시. SQL 실행 대상의 진실 공급원."""
|
||||
import time
|
||||
c = _REALTAG_CACHE
|
||||
if c["tags"] is not None and (time.time() - c["at"]) < _REALTAG_TTL:
|
||||
return c["tags"]
|
||||
conn = None
|
||||
try:
|
||||
conn = _get_db_connection_sync()
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT DISTINCT tagname FROM realtime_table WHERE tagname IS NOT NULL")
|
||||
s = {r[0] for r in cur.fetchall() if r[0]}
|
||||
c["tags"], c["at"] = s, time.time()
|
||||
return s
|
||||
except Exception:
|
||||
return c["tags"] or set()
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
|
||||
def _verify_sql_tags(sql: str):
|
||||
"""SQL 내 태그 literal 을 실제 DB 태그집합과 대조 (결정론적).
|
||||
|
||||
반환: (fixed_sql, problems)
|
||||
- fixed_sql: 실재하는 태그는 DB 표기(대소문자)로 교정
|
||||
- problems: [{"tag": <SQL표기>, "suggested": [후보...]}, ...]. 비어 있으면 전부 유효.
|
||||
"""
|
||||
real = _load_real_tagnames()
|
||||
if not real:
|
||||
return sql, [] # 태그집합 로드 실패 시 게이트는 통과(가용성 우선, 실행 단계에서 오류로 드러남)
|
||||
upper_map = {t.upper(): t for t in real}
|
||||
fixed = sql
|
||||
problems = []
|
||||
seen = set()
|
||||
for m in _SQL_TAG_RE.finditer(sql):
|
||||
lit = m.group(1)
|
||||
if lit in seen:
|
||||
continue
|
||||
seen.add(lit)
|
||||
U = lit.upper()
|
||||
if U in upper_map:
|
||||
actual = upper_map[U]
|
||||
if lit != actual: # 대소문자만 교정
|
||||
fixed = re.sub(r"'" + re.escape(lit) + r"'", "'" + actual + "'", fixed)
|
||||
continue
|
||||
# 존재하지 않는 태그 → base 토큰 부분일치로 실제 후보 제안
|
||||
base = U.split('.')[0]
|
||||
toks = [p for p in re.split(r'[-_]', base) if len(p) >= 2]
|
||||
cand = sorted({t for t in real if any(p in t.upper() for p in toks)})[:5]
|
||||
problems.append({"tag": lit, "suggested": cand})
|
||||
return fixed, problems
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def query_with_nl(question: str) -> str:
|
||||
"""자연어 질문을 LLM이 SQL로 변환하고 시계열 DB를 조회합니다.
|
||||
|
||||
**집계·극값·다중태그 조인은 반드시 이 도구**(query_pv_history 아님). 예:
|
||||
- "TI-6111C 24시간 최대/평균/최소"
|
||||
- "FICQ-6101.PV 최대값이 발생한 **시각**의 SP·OP·관련태그 값" (극값 시각 → 그 순간의 다른 태그)
|
||||
- "어제 유량 합계", "N분 간격 평균"
|
||||
이런 질문은 SQL/CTE 로 DB에서 직접 계산해야 정확하다(원시 행을 끌어와 모델이 눈대중 집계하면 틀린다).
|
||||
|
||||
Args:
|
||||
question: 자연어 질문 (예: "FICQ-6113.PV 최근 1시간 값을 1분 단위로 표시")
|
||||
|
||||
@@ -1352,41 +1405,58 @@ async def query_with_nl(question: str) -> str:
|
||||
# 라우팅 실패 시 원래 SQL 경로로 fallback
|
||||
pass
|
||||
|
||||
system = (
|
||||
"You are a PostgreSQL SQL expert.\n"
|
||||
"Convert the user's question into a SELECT SQL.\n"
|
||||
"Return ONLY the SQL. No explanation, no markdown, NO <think> tags.\n"
|
||||
"Use PostgreSQL syntax. tagname UPPERCASE exact match (e.g. 'FICQ-6113.PV').\n"
|
||||
"If user input is lowercase, convert to UPPERCASE before querying.\n"
|
||||
"value is TEXT; cast ::double precision when aggregating.\n"
|
||||
"KST input = UTC-9. Example: KST 12:00 = UTC 03:00.\n"
|
||||
"For N-min buckets: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60)).\n"
|
||||
"No GROUP BY if no interval specified.\n\n"
|
||||
f"{_DB_SCHEMA}"
|
||||
)
|
||||
|
||||
try:
|
||||
def _call_llm():
|
||||
return _llm().chat.completions.create(
|
||||
model=VLLM_MODEL,
|
||||
messages=[
|
||||
{"role": "system", "content": system},
|
||||
{"role": "user", "content": question},
|
||||
],
|
||||
max_tokens=8192,
|
||||
temperature=0.1,
|
||||
)
|
||||
|
||||
resp = await asyncio.to_thread(_call_llm)
|
||||
sql = _strip_think(resp.choices[0].message.content or "").strip()
|
||||
# 마크다운 코드 블록 제거
|
||||
if sql.startswith("```"):
|
||||
lines = sql.splitlines()
|
||||
sql = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]).strip()
|
||||
# 프롬프트 단일 소스: worker/sql_prompt.py 의 SQL_SYSTEM_PROMPT (풍부한 스키마·뷰·버킷·KST·CTE 예시).
|
||||
# 'No GROUP BY if no interval specified' 같은 단편 지시는 SQL_SYSTEM_PROMPT 의 정교한 INTERVAL 규칙으로 대체됨.
|
||||
messages = [
|
||||
{"role": "system", "content": SQL_SYSTEM_PROMPT},
|
||||
{"role": "user", "content": question},
|
||||
]
|
||||
MAX_TRIES = 3 # 결정론 검증 실패 시 LLM 자기교정 재시도 횟수
|
||||
sql = ""
|
||||
last_problems: list = []
|
||||
for attempt in range(MAX_TRIES):
|
||||
try:
|
||||
def _call_llm(_msgs=list(messages)):
|
||||
return _llm().chat.completions.create(
|
||||
model=VLLM_MODEL, messages=_msgs, max_tokens=8192, temperature=0.1,
|
||||
)
|
||||
resp = await asyncio.to_thread(_call_llm)
|
||||
sql = _strip_think(resp.choices[0].message.content or "").strip()
|
||||
if sql.startswith("```"):
|
||||
lines = sql.splitlines()
|
||||
sql = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]).strip()
|
||||
except Exception as e:
|
||||
return json.dumps({"success": False, "sql": "", "error": f"LLM SQL 생성 실패: {e}"}, ensure_ascii=False)
|
||||
if not sql:
|
||||
return json.dumps({"success": False, "sql": "", "error": "LLM이 SQL을 생성하지 못했습니다."}, ensure_ascii=False)
|
||||
except Exception as e:
|
||||
return json.dumps({"success": False, "sql": "", "error": f"LLM SQL 생성 실패: {e}"}, ensure_ascii=False)
|
||||
|
||||
# ── 결정론적 태그 검증 게이트: 실재하지 않는 태그면 실행 거부 + 자기교정 피드백 ──
|
||||
sql, last_problems = await asyncio.to_thread(_verify_sql_tags, sql)
|
||||
if not last_problems:
|
||||
break # 모든 태그 실재 확인(대소문자 교정 완료) → 실행 진행
|
||||
|
||||
if attempt < MAX_TRIES - 1:
|
||||
fb = []
|
||||
for p in last_problems:
|
||||
sug = ", ".join(p["suggested"]) if p["suggested"] else "(후보 없음 — find_tags 로 검색 필요)"
|
||||
fb.append(f"- '{p['tag']}' 는 DB에 존재하지 않음. 실제 후보: {sug}")
|
||||
feedback = (
|
||||
"직전 SQL에 존재하지 않는 태그가 있습니다:\n" + "\n".join(fb)
|
||||
+ "\n위 후보 중 정확한 실제 태그명만 사용해 SQL 전체를 다시 작성하세요. "
|
||||
"기간·CTE·집계 등 다른 로직은 그대로 두고 태그명만 교정. SQL만 반환."
|
||||
)
|
||||
messages.append({"role": "assistant", "content": sql})
|
||||
messages.append({"role": "user", "content": feedback})
|
||||
|
||||
# 재시도 소진 후에도 미존재 태그가 남으면 — 빈 결과 대신 명시적 에러 (날조 차단)
|
||||
if last_problems:
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"sql": sql,
|
||||
"error": "존재하지 않는 태그가 SQL에 포함되어 실행을 거부했습니다 (결정론 검증 게이트). "
|
||||
"find_tags 로 정확한 태그명을 먼저 확인하세요.",
|
||||
"invalid_tags": last_problems,
|
||||
}, ensure_ascii=False, default=str)
|
||||
|
||||
# SQL 실행
|
||||
raw = await _execute_sql_internal(sql)
|
||||
@@ -1401,15 +1471,20 @@ async def query_with_nl(question: str) -> str:
|
||||
time_col = next((c for c in cols if c not in ("tagname", "value", "livevalue", "avg_val")), None)
|
||||
val_col = next((c for c in ("avg_val", "value") if c in cols), cols[-1])
|
||||
if time_col:
|
||||
# tagname/val_col/time_col 외 스칼라 컬럼(예: max_occurrences)은 피벗 후에도 보존
|
||||
extra_cols = [c for c in cols if c not in ("tagname", val_col, time_col)]
|
||||
tag_names_list = sorted(dict.fromkeys(row["tagname"] for row in data))
|
||||
pivoted: dict = {}
|
||||
for row in data:
|
||||
key = str(row[time_col])
|
||||
if key not in pivoted:
|
||||
pivoted[key] = {time_col: row[time_col]}
|
||||
base = {time_col: row[time_col]}
|
||||
for c in extra_cols:
|
||||
base[c] = row.get(c)
|
||||
pivoted[key] = base
|
||||
pivoted[key][row["tagname"]] = row.get(val_col)
|
||||
result["data"] = list(pivoted.values())
|
||||
result["columns"] = [time_col] + tag_names_list
|
||||
result["columns"] = [time_col] + extra_cols + tag_names_list
|
||||
result["count"] = len(result["data"])
|
||||
|
||||
return json.dumps(result, ensure_ascii=False, default=str)
|
||||
|
||||
@@ -311,6 +311,12 @@ NL2SQL = [
|
||||
"SELECT DISTINCT value AS sub_area\nFROM tag_metadata\nWHERE attribute = 'sub_area' AND value LIKE '%P6%'\nORDER BY sub_area"),
|
||||
("p-6201 의 sub_area (공용 여부 확인)",
|
||||
"SELECT value AS sub_area\nFROM tag_metadata\nWHERE base_tag = 'p-6201' AND attribute = 'sub_area'"),
|
||||
("ficq-6101.pv 최근 48시간 최대값이 발생한 시각의 pv sp op 함께 보여줘",
|
||||
"WITH peak AS (\n SELECT recorded_at\n FROM history_table\n WHERE tagname = 'ficq-6101.pv'\n AND recorded_at >= NOW() - INTERVAL '48 hours'\n AND value::double precision = (\n SELECT MAX(value::double precision)\n FROM history_table\n WHERE tagname = 'ficq-6101.pv'\n AND recorded_at >= NOW() - INTERVAL '48 hours'\n )\n)\nSELECT recorded_at AT TIME ZONE 'Asia/Seoul' AS recorded_at, tagname, value,\n (SELECT COUNT(*) FROM peak) AS max_occurrences\nFROM history_table\nWHERE tagname IN ('ficq-6101.pv', 'ficq-6101.sp', 'ficq-6101.op')\n AND recorded_at IN (SELECT recorded_at FROM peak)\nORDER BY recorded_at, tagname"),
|
||||
("ti-6101.pv 어제 최저값 시각의 값 알려줘 (반복되면 전부)",
|
||||
"WITH trough AS (\n SELECT recorded_at\n FROM history_table\n WHERE tagname = 'ti-6101.pv'\n AND recorded_at >= (CURRENT_DATE - INTERVAL '1 day') + INTERVAL '-9 hours'\n AND recorded_at < CURRENT_DATE + INTERVAL '-9 hours'\n AND value::double precision = (\n SELECT MIN(value::double precision)\n FROM history_table\n WHERE tagname = 'ti-6101.pv'\n AND recorded_at >= (CURRENT_DATE - INTERVAL '1 day') + INTERVAL '-9 hours'\n AND recorded_at < CURRENT_DATE + INTERVAL '-9 hours'\n )\n)\nSELECT recorded_at AT TIME ZONE 'Asia/Seoul' AS recorded_at, tagname, value,\n (SELECT COUNT(*) FROM trough) AS min_occurrences\nFROM history_table\nWHERE tagname = 'ti-6101.pv'\n AND recorded_at IN (SELECT recorded_at FROM trough)\nORDER BY recorded_at"),
|
||||
("ti-6111c 최근 24시간 최대값이 발생한 시각들의 tica-6111a pv sp op와 fiq-6115 값 (전부, 앵커 포함)",
|
||||
"WITH peak AS (\n SELECT recorded_at\n FROM history_table\n WHERE tagname = 'ti-6111c.pv'\n AND recorded_at >= NOW() - INTERVAL '24 hours'\n AND value::double precision = (\n SELECT MAX(value::double precision)\n FROM history_table\n WHERE tagname = 'ti-6111c.pv'\n AND recorded_at >= NOW() - INTERVAL '24 hours'\n )\n)\nSELECT recorded_at AT TIME ZONE 'Asia/Seoul' AS recorded_at, tagname, value,\n (SELECT COUNT(*) FROM peak) AS max_occurrences\nFROM history_table\nWHERE tagname IN ('ti-6111c.pv', 'tica-6111a.pv', 'tica-6111a.sp', 'tica-6111a.op', 'fiq-6115.pv')\n AND recorded_at IN (SELECT recorded_at FROM peak)\nORDER BY recorded_at, tagname"),
|
||||
]
|
||||
|
||||
# ── 6) 생성 + 검증 + 통계 ────────────────────────────────────────────────────
|
||||
|
||||
@@ -106,10 +106,41 @@ N분 간격 집계 공식 (time_bucket 금지, date_trunc 사용):
|
||||
GROUP BY to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120), tagname
|
||||
ORDER BY to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/120)*120), tagname
|
||||
|
||||
예시 (극값 시각 → 그 시각의 관련 태그값 — CTE; ★극값 기준 태그(앵커)≠표시요청 태그인 경우):
|
||||
-- "TI-6111C 최대값이 발생한 시각(들)의 TICA-6111A PV/SP/OP, FIQ-6115 값을 보여줘"
|
||||
-- ★ 극값(최대/최소)은 양자화 sim 특성상 여러 시각에 반복될 수 있다 →
|
||||
-- LIMIT 1 로 하나만 고르지 말 것. value = MAX(...) 인 **모든 시각을 반환**하고 반복 횟수(max_occurrences) 명시.
|
||||
-- ★ 극값 기준 태그(여기선 TI-6111C.PV = '앵커')는 사용자가 표시 요청하지 않아도 SELECT 에 **반드시 포함**.
|
||||
-- 모든 행에서 그 값이 극값(=MAX)으로 고정되어, 독자가 "이 행들이 진짜 최대값 시각"임을 검증할 수 있다.
|
||||
WITH peak AS (
|
||||
SELECT recorded_at
|
||||
FROM history_table
|
||||
WHERE tagname = 'TI-6111C.PV'
|
||||
AND recorded_at >= NOW() - INTERVAL '24 hours'
|
||||
AND value::double precision = (
|
||||
SELECT MAX(value::double precision)
|
||||
FROM history_table
|
||||
WHERE tagname = 'TI-6111C.PV'
|
||||
AND recorded_at >= NOW() - INTERVAL '24 hours'
|
||||
)
|
||||
)
|
||||
SELECT recorded_at AT TIME ZONE 'Asia/Seoul' AS recorded_at, tagname, value,
|
||||
(SELECT COUNT(*) FROM peak) AS max_occurrences
|
||||
FROM history_table
|
||||
WHERE tagname IN ('TI-6111C.PV', -- ★ 앵커(극값 기준) 반드시 포함
|
||||
'TICA-6111A.PV', 'TICA-6111A.SP', 'TICA-6111A.OP', 'FIQ-6115.PV')
|
||||
AND recorded_at IN (SELECT recorded_at FROM peak)
|
||||
ORDER BY recorded_at, tagname
|
||||
-- 최소값은 MAX(...) 를 MIN(...) 으로. max_occurrences = 극값이 반복된 시각 수.
|
||||
-- 결과 제시 시: 헤더에 "TI-6111C.PV 최대값 = <값> (N회 발생)" 을 명시하고, 앵커 열(전 행 고정값)을 표에 함께 보여줄 것.
|
||||
-- 봉우리마다 관련 태그값(예: 유량)이 다를 수 있으므로 전부 보여주는 것이 정직하다.
|
||||
|
||||
규칙:
|
||||
- SELECT만 허용 (INSERT/UPDATE/DELETE/DROP 등 불가)
|
||||
- tagname은 모두 소문자로 정확히 입력
|
||||
- value 컬럼은 TEXT이므로 집계 시 ::double precision 캐스트 필수
|
||||
- SELECT/WITH(CTE) 허용 (INSERT/UPDATE/DELETE/DROP 등 불가). 극값→관련태그 류는 CTE를 적극 사용.
|
||||
- 극값→관련태그: 극값 기준 태그(앵커)를 SELECT/IN 목록에 **반드시 포함**(사용자가 그 태그를 요청하지 않았어도).
|
||||
결과 설명에는 **극값 수치와 반복 횟수(max_occurrences)** 를 명시한다.
|
||||
- tagname은 모두 대문자로 정확히 입력 (예: 'FICQ-6113.PV'). 입력이 소문자면 대문자로 변환.
|
||||
- value 컬럼은 TEXT이므로 집계/정렬 시 ::double precision 캐스트 필수
|
||||
- time_bucket 함수 사용 금지 — 위의 to_timestamp/FLOOR/EPOCH 공식 사용
|
||||
"""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user