feat: Sub-Area(세부 Area) 분류 기능 + 포인트 삭제 시 메타데이터/이력 정리

하나의 area(P6)를 Column 단위 sub_area(P6-1/P6-2)로 분류. tag_metadata
attribute='sub_area'(EAV)에 저장, 공용 설비는 "P6-1,P6-2" 형식 + 토큰 매칭.

- 백엔드: GetSubAreaListByAreaAsync/UpdateSubAreaAsync/SeedSubAreaAsync,
  SubAreaController(GET/PUT/POST seed), SubAreaDtos
- 포인트 삭제 개선: DeleteRealtimePointAsync(purgeHistory) — 잔여 0이면
  고아 메타데이터(desc/area/sub_area) 정리, opt-in 시 history_table 영구 삭제
- MCP: find_tags(sub_area=...) + _area_or_subarea_filter('-' 포함 시 자동 토큰 매칭)
- 문서: prompts/plant_context.md, AGENTS.md, SubArea-추가플랜.md
- UI: 포인트빌더 Sub-Area 관리 카드(조회/수정/seed) + 행별 이력 삭제 체크박스

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
windpacer
2026-05-24 06:31:51 +09:00
parent 302183c97e
commit f81044c451
10 changed files with 2159 additions and 53 deletions

View File

@@ -662,15 +662,37 @@ Tables:
event_history_table(tagname TEXT, prev_value TEXT, curr_value TEXT, event_type TEXT, event_time TIMESTAMPTZ, duration_seconds INT)
Views:
v_tag_summary(base_tag TEXT, pv TEXT, sp TEXT, op TEXT, description TEXT, area TEXT)
v_tag_summary(base_tag TEXT, pv TEXT, sp TEXT, op TEXT, description TEXT, area TEXT, sub_area TEXT)
Rules:
- SELECT only. tagname lowercase exact match.
- value is TEXT; cast ::double precision when aggregating.
- time_bucket() banned. For N-min buckets: to_timestamp(FLOOR(EXTRACT(EPOCH FROM recorded_at)/(N*60))*(N*60))
- KST input = UTC-9 in DB.
- sub_area는 "P6-1" 또는 공용 "P6-1,P6-2" 형식. 매칭은 항상 토큰 비교: 'P6-1' = ANY(string_to_array(sub_area, ','))
"""
def _area_or_subarea_filter(area: str | None, tagname_col: str, area_col: str) -> tuple[str, list]:
"""area 필터 SQL 조건 + 파라미터 반환.
- area가 "P6-1"처럼 '-'를 포함하면 sub_area 코드로 간주 →
tag_metadata(attribute='sub_area')를 EXISTS 조인하여 토큰 매칭 (공용 "P6-1,P6-2" 포함).
이벤트 테이블의 tagname은 'p-6116.pv' 형식이므로 split_part로 base_tag 추출.
- 그 외(area 코드, 예 "P6")는 기존 컬럼 정확 매칭.
- area가 None이면 항상 참(TRUE).
"""
if not area:
return "TRUE", []
if "-" in area: # sub_area 코드
return (
f"EXISTS (SELECT 1 FROM tag_metadata tm "
f"WHERE tm.base_tag = split_part({tagname_col}, '.', 1) "
f"AND tm.attribute = 'sub_area' AND %s = ANY(string_to_array(tm.value, ',')))",
[area],
)
return f"{area_col} = %s", [area]
# ── RAG 도구 ─────────────────────────────────────────────────────────────────
@mcp.tool()
@@ -1206,57 +1228,62 @@ _VALID_EVENT_TYPES = ("ALARM", "TRIP", "NORMAL", "RUN", "CHANGE")
@mcp.tool()
async def find_tags(query: str, area: str | None = None, top_k: int = 20) -> str:
"""태그 검색 — base_tag/설명(desc)/area 통합 검색 (v_tag_summary 뷰 기반).
async def find_tags(query: str, area: str | None = None, sub_area: str | None = None, top_k: int = 20) -> str:
"""태그 검색 — base_tag/설명(desc)/area/sub_area 통합 검색 (v_tag_summary 뷰 기반).
사용 시점: 사용자가 "온도", "Tower 1 압력", "운전 중인 펌프" 같은 자연어로
태그를 지칭할 때 실제 base_tag(예: 'ti-6101', 'p-6102')를 역으로 찾기 위해.
get_tag_metadata와 차이: 단순 tagname LIKE만 보지 않고 description/area에도
매칭하며, 현재 PV/SP/OP/description/area를 함께 반환.
매칭하며, 현재 PV/SP/OP/description/area/sub_area를 함께 반환.
Args:
query: 검색어 (base_tag 또는 description 부분 일치, 대소문자 무시)
area: (선택) area 필터 (예: 'tower-1', 'utility'). NULL이면 전체
area: (선택) area 필터 (예: 'P6'). "P6-1"처럼 '-'가 있으면 sub_area로 자동 처리.
sub_area: (선택) sub_area 필터 (예: 'P6-1'). 공용 태그("P6-1,P6-2")도 매칭됨.
top_k: 반환 태그 수 (기본 20, 최대 100)
Returns:
JSON: { success, query, count, tags: [{base_tag, pv, sp, op, description, area}] }
JSON: { success, query, count, tags: [{base_tag, pv, sp, op, description, area, sub_area}] }
"""
conn = None
try:
top_k = max(1, min(top_k, 100))
q = f"%{query.strip()}%"
# sub_area 명시 파라미터 우선, 없으면 area가 "P6-1" 형식이면 sub_area로 간주
effective_sub = sub_area or (area if (area and "-" in area) else None)
effective_area = None if (area and "-" in area) else area
conds = ["(base_tag ILIKE %s OR description ILIKE %s)"]
params: list = [q, q]
if effective_sub:
conds.append("%s = ANY(string_to_array(sub_area, ','))")
params.append(effective_sub)
if effective_area:
# v_tag_summary.area는 '{12 | P6 | }' 원본 형식이므로 코드만 추출해 비교
conds.append("trim(split_part(area, '|', 2)) = %s")
params.append(effective_area)
sql = (
"SELECT base_tag, pv, sp, op, description, area, sub_area "
"FROM v_tag_summary WHERE " + " AND ".join(conds) +
" ORDER BY base_tag LIMIT %s"
)
params.append(top_k)
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
if area:
cur.execute(
"""SELECT base_tag, pv, sp, op, description, area
FROM v_tag_summary
WHERE (base_tag ILIKE %s OR description ILIKE %s)
AND area = %s
ORDER BY base_tag
LIMIT %s""",
(q, q, area, top_k)
)
else:
cur.execute(
"""SELECT base_tag, pv, sp, op, description, area
FROM v_tag_summary
WHERE base_tag ILIKE %s OR description ILIKE %s
ORDER BY base_tag
LIMIT %s""",
(q, q, top_k)
)
cur.execute(sql, params)
rows = cur.fetchall()
tags = [
{"base_tag": r[0], "pv": r[1], "sp": r[2], "op": r[3],
"description": r[4], "area": r[5]}
"description": r[4], "area": r[5], "sub_area": r[6]}
for r in rows
]
return json.dumps({
"success": True, "query": query, "area": area,
"success": True, "query": query, "area": area, "sub_area": effective_sub,
"count": len(tags), "tags": tags
}, ensure_ascii=False, indent=2)
except Exception as e:
@@ -1279,7 +1306,9 @@ async def trace_connections(start_tag: str, direction: str = "downstream", max_d
max_depth: 최대 추적 깊이 (기본 20)
Returns:
JSON: { success, start_tag, direction, path: [{step, from_tag, to_tag, role, tag_no}] }
JSON: { success, start_tag, direction, path: [{step, tag_no, from_tag, to_tag, role, live_state}] }
- from_tag/to_tag에 쉼표가 여러 개면 병렬 펌프·라인 → 모두 경로에 포함됨(누락 없음).
- live_state: 해당 태그의 실시간 상태(예: R-RUN/L-STOP). 병렬 펌프 중 R-RUN/L-RUN이 현재 공급원.
"""
conn = None
try:
@@ -1385,7 +1414,22 @@ async def trace_connections(start_tag: str, direction: str = "downstream", max_d
visited.add(start_tag)
start_tags = _split_tags(start_tag)
_trace_upstream(start_tags, visited, 0)
# 각 노드에 실시간 상태(pv) 부착 — 병렬 펌프 중 '실제 가동 중'인 것을 식별.
# 예: F-6101A/B 상류에 P-6102(R-RUN)·P-6201(L-STOP)이 병렬이면 현재 공급원은 P-6102.
if path:
pv_tags = [p["tag_no"].lower() + ".pv" for p in path]
cur.execute(
"SELECT tagname, livevalue FROM realtime_table WHERE tagname = ANY(%s)",
(pv_tags,),
)
pv_map = {}
for tn, lv in cur.fetchall():
m = re.match(r'\{\s*\d+\s*\|\s*([^|]+?)\s*\|', lv or '')
pv_map[tn[:-3]] = (m.group(1).strip() if m else (lv or None))
for p in path:
p["live_state"] = pv_map.get(p["tag_no"].lower())
return json.dumps({
"success": True,
"start_tag": start_tag,
@@ -1413,7 +1457,7 @@ async def query_events(
Args:
tag_name: (선택) 태그명 LIKE 패턴 (예: 'p-6102', 'xv-%')
event_type: (선택) ALARM / TRIP / NORMAL / RUN / CHANGE 중 하나
area: (선택) area 정확 매칭
area: (선택) area 코드("P6") 정확 매칭. "P6-1"처럼 '-'가 있으면 sub_area로 자동 처리(공용 포함)
since: (선택) 시작 시간 ISO 8601. 기본 24시간 전
until: (선택) 종료 시간 ISO 8601. 기본 현재
limit: 반환 행 수 (기본 100, 최대 1000)
@@ -1440,8 +1484,10 @@ async def query_events(
where.append("event_type = %s")
params.append(event_type.upper())
if area:
where.append("area = %s")
params.append(area)
# area 코드 정확매칭 / "P6-1" sub_area 코드면 tag_metadata 토큰 매칭
cond, cparams = _area_or_subarea_filter(area, "tagname", "area")
where.append(cond)
params.extend(cparams)
sql = f"""
SELECT id, tagname, prev_value, curr_value, event_type, event_time,
@@ -1486,7 +1532,7 @@ async def active_alarms(area: str | None = None, limit: int = 100) -> str:
event_type이 ALARM 또는 TRIP인 것만 필터. 이후 NORMAL이 들어왔다면 해제된 것이므로 제외.
Args:
area: (선택) area 필터
area: (선택) area 코드("P6") 필터. "P6-1"처럼 '-'가 있으면 sub_area로 자동 처리(공용 포함)
limit: 최대 반환 수 (기본 100)
Returns:
@@ -1495,7 +1541,9 @@ async def active_alarms(area: str | None = None, limit: int = 100) -> str:
conn = None
try:
limit = max(1, min(limit, 500))
sql = """
# area 코드 정확매칭 / "P6-1" sub_area 코드면 tag_metadata 토큰 매칭 (공용 포함)
area_cond, area_params = _area_or_subarea_filter(area, "tagname", "area")
sql = f"""
WITH latest AS (
SELECT DISTINCT ON (tagname)
id, tagname, curr_value, event_type, event_time,
@@ -1509,14 +1557,14 @@ async def active_alarms(area: str | None = None, limit: int = 100) -> str:
EXTRACT(EPOCH FROM (NOW() - event_time))::bigint AS age_seconds
FROM latest
WHERE event_type IN ('ALARM', 'TRIP')
AND (%s::text IS NULL OR area = %s)
AND {area_cond}
ORDER BY event_time DESC
LIMIT %s
"""
conn = await _get_db_connection()
with conn.cursor() as cur:
cur.execute(f"SET statement_timeout = {SQL_STATEMENT_TIMEOUT_MS}")
cur.execute(sql, (area, area, limit))
cur.execute(sql, (*area_params, limit))
rows = cur.fetchall()
alarms = [
{"id": r[0], "tag_name": r[1], "curr_value": r[2], "event_type": r[3],