feat: 운전판정 고도화 — realtime stall 수정 + 교차검증 + 단위/레인지

- ExperionRealtimeService를 단일 SuperviseAsync supervisor로 재설계:
  비블로킹 부팅, PublishingStopped/KeepAliveStopped 워치독으로 silent
  stall 감지, 30초 주기 무한 재연결, flush 루프 단일화
- RealtimeServiceStatus에 LastDataAgeSeconds/Stalled 추가, History는
  Stalled 시 스냅샷 skip
- v_plant_running_state에 진공펌프(vp-) 포함 + 교차검증 4객체
  (pump_corroboration_manual, v_pump_signal_map,
  v_plant_running_state_corroborated, v_plant_running_state_agg)
  + v_instrument_range 뷰 (boot DDL)
- MetadataLoaderService에 euhi/eulo/units 메타속성 추가
- generate_status_report에 agg 조회 연동 + sample/focus 버그 수정
- plant_context.md에 펌프 prefix(p-/vp-) + 교차검증 뷰 사용법

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
windpacer
2026-05-24 16:47:20 +09:00
parent 7dbeb36218
commit 2e844abf11
9 changed files with 1015 additions and 183 deletions

View File

@@ -7,6 +7,58 @@
## 완료된 작업
### 운전판정 고도화 — realtime writer stall 수정 + 교차검증(corroboration) + 단위/레인지 (2026-05-24)
#### 배경
"6-1차 펌프 4대 vs 5대" 질문에서 출발 → ① v_plant_running_state가 `p-%`만 집계해 진공펌프(`vp-`) 누락, ② 펌프 enum(RUN)만으론 허위 운전(deadhead·센서이상·frozen) 미검출, ③ 진단 중 realtime_table이 09:58 KST에 **silent stall**(수집 멈춤)된 운영 장애 발견.
#### 구현 내역
| # | 항목 | 핵심 |
|---|------|------|
| 1 | 진공펌프 포함 | `v_plant_running_state` 필터 `p-%``(p-% OR vp-%)`. VP-6117 등 진공펌프도 운전 집계 |
| 2 | **realtime writer stall 수정** | `ExperionRealtimeService`를 단일 `SuperviseAsync` supervisor로 통일: 부팅 비블로킹(`StartAsync(ct)` 즉시 반환), `GetLinkFault()``Subscription.PublishingStopped`/`Session.KeepAliveStopped`/`Connected`로 silent stall 감지 → 30초 주기 무한 재연결(3회-후-포기 제거), flush 루프 1회만 기동. `RealtimeServiceStatus``LastDataAgeSeconds`/`Stalled` 추가, History는 Stalled 시 스냅샷 skip |
| 3 | 교차검증 뷰 | `pump_corroboration_manual`(수동 매핑) + `v_pump_signal_map`(토폴로지 `FT.from_tag=펌프`→FICQ 1:N + 수동) + `v_plant_running_state_corroborated`(신선도 게이트 120s + STALE + 유량·진공 임계) + `v_plant_running_state_agg`(CONFIRMED 기준 RUNNING, suspicious/stale 부가 카운트) |
| 4 | 단위/레인지 메타데이터 | 별도 테이블 없이 `tag_metadata` 재사용 — `MetadataLoaderService.MetaAttributes``euhi/eulo/units` 추가(메타갱신 트리거 자동 편승). 타입 접근 `v_instrument_range` 뷰 |
| 5 | 유량 임계 보정 | FS 5%가 과대사이징 계기(FS 2000, 운전 ~11~57)엔 부적합(정상→SUSPICIOUS 오판) 발견 → `GREATEST(1.0, LEAST(eu_hi*0.05, 5.0))`로 [1~5 kg/hr] deadhead 밴드 캡 |
| 6 | MCP 연동 | `generate_status_report``v_plant_running_state_agg` 조회 추가(응답 `pump_corroboration` + LLM 프롬프트). 기존 `sample`/`focus` NameError 버그도 수정 |
#### 수정 파일
| 파일 | 변경 |
|------|------|
| `src/Infrastructure/OpcUa/ExperionRealtimeService.cs` | supervisor 재설계(비블로킹·워치독·무한재시도·flush 단일화) |
| `src/Infrastructure/OpcUa/ExperionHistoryService.cs` | Stalled 시 스냅샷 skip |
| `src/Infrastructure/OpcUa/MetadataLoaderService.cs` | MetaAttributes에 euhi/eulo/units |
| `src/Core/Application/Interfaces/IExperionServices.cs` | RealtimeServiceStatus에 LastDataAgeSeconds·Stalled |
| `src/Infrastructure/Database/ExperionDbContext.cs` | v_plant_running_state(vp- 포함) + 교차검증 4객체 + v_instrument_range (boot DDL) |
| `mcp-server/server.py` | generate_status_report agg 연동 + 버그수정 |
| `prompts/plant_context.md` | 펌프 prefix(p-/vp-) + 교차검증 뷰 사용법 |
| `plans/운전판정-고도화-플랜.md` | §0 감리 진단 결과(초안 정정·구현현황) |
#### 설계 결정
| 항목 | 결정 |
|------|------|
| stall 진단 신호 | realtime.timestamp frozen + history(realtime 복사본)는 신선·값 frozen → 수집기 정지. UTC/KST 무관 |
| 생존 판정 | `_session.Connected` 단독 → SDK `PublishingStopped`/`KeepAliveStopped` 추가(silent stall 감지) |
| 단위/레인지 저장 | 별도 테이블 X — `euhi/eulo/units`가 OPC 자식노드라 `tag_metadata` EAV에 적합. 트리거 배선 0 |
| 진공 의미 | pica-6111 = mmHg(≈torr), 0~760(760=대기압). 깊은진공=저압 → `< 300` CONFIRMED |
| 유량 임계 | %FS 부적합(계기 과대사이징) → deadhead 절대 밴드 캡 |
| MCP 재적용 | MCP 서버 재시작 필요(미반영) |
#### 검증 (라이브 :5000, Web 재기동 2회)
- `dotnet build` 경고 0/에러 0, `py_compile` OK
- 재기동 후 supervisor 자동재개(929 포인트), realtime stall 복구(lag 1:13→<2s, fresh 0→929)
- 메타갱신 2946건 적재(euhi/eulo/units), `v_instrument_range` 채워짐(ficq-6113 0~2000 kg/hr, pica-6111 0~760 mmHg)
- P6 운전 5대(진공 vp-6117 포함) 전부 **CONFIRMED_RUNNING**(유량 43 kg/hr, 진공 43 mmHg), agg confirmed=5/suspicious=0
#### 잔여
- MCP 서버 재시작해야 `generate_status_report` 변경 반영
- 신버전 Web 앱은 백그라운드 `dotnet run`으로 기동 중 — 영속화는 운전원 터미널/`deploy.sh`(systemd) 권장
- 진공 임계(300)·유량 deadhead 밴드는 운전 데이터로 추가 튜닝 여지
- `active_alarms`에 SUSPICIOUS 주입은 운전원 검증 후(보류)
### 문서 탐색기 (Tab 16) 구현 (2026-05-24)
#### 배경

View File

@@ -1708,6 +1708,25 @@ async def generate_status_report(area: str | None = None, hours: int = 24) -> st
for ev in events:
by_type[ev["event_type"]] = by_type.get(ev["event_type"], 0) + 1
# 2.5) 펌프 운전 교차검증 (유량/진공 기반) — v_plant_running_state_agg
pump_corr: list[dict] = []
try:
corr_raw = await _execute_sql_internal(
"SELECT area_code, status, total_pumps, confirmed_running, suspicious_running, "
"stale_running, indeterminate_running, tripped_pumps, "
"confirmed_tags, suspicious_tags, stale_tags "
"FROM v_plant_running_state_agg ORDER BY area_code"
)
corr_parsed = json.loads(corr_raw)
if corr_parsed.get("success"):
pump_corr = corr_parsed.get("data", [])
if area:
_f = [r for r in pump_corr if (r.get("area_code") or "").upper() == area.upper()]
if _f:
pump_corr = _f
except Exception:
pump_corr = []
# 3) LLM 보고서
alarm_lines = [
f"- [{a['event_type']}] {a['tag_name']} since {_kst_str(a['since'])} "
@@ -1718,9 +1737,16 @@ async def generate_status_report(area: str | None = None, hours: int = 24) -> st
f"- [{ev['event_type']}] {ev['tag_name']} @ {_kst_str(ev['event_time'])} "
f"({ev.get('prev_value')}{ev.get('curr_value')})"
f" (직전상태유지={ev.get('prev_state_duration_s', '?')}s)"
for ev in sample
for ev in events[:40]
]
focus_line = f"\n특히 다음 관점을 우선해 설명하세요: {focus}\n" if focus else ""
corr_lines = [
f"- {r.get('area_code')}: {r.get('status')} "
f"(확인 {r.get('confirmed_running', 0)}, 의심 {r.get('suspicious_running', 0)}, "
f"정체 {r.get('stale_running', 0)}, 트립 {r.get('tripped_pumps', 0)})"
+ (f" 의심펌프={r.get('suspicious_tags')}" if r.get('suspicious_running') else "")
+ (f" 정체펌프={r.get('stale_tags')}" if r.get('stale_running') else "")
for r in pump_corr
] or ["- 펌프 교차검증 데이터 없음"]
system = (
"당신은 IIoT/공장 운전 분석 전문가입니다. 디지털 포인트의 상태 변경 이벤트 로그를 보고 "
@@ -1731,13 +1757,17 @@ async def generate_status_report(area: str | None = None, hours: int = 24) -> st
"4) 다음 점검 권고 (있다면)\n"
"구체적인 태그명과 시각을 포함하되 추측은 자제합니다.\n\n"
"참고 - 모든 시각은 KST(UTC+9, Asia/Seoul)입니다. "
"`직전상태유지`는 이 이벤트 직전에 태그가 머물렀던 시간(초)입니다."
"`직전상태유지`는 이 이벤트 직전에 태그가 머물렀던 시간(초)입니다.\n"
"펌프 운전 교차검증: 펌프 RUN 상태를 유량(kg/hr)·진공압(torr)으로 확인한 결과입니다. "
"'확인'=실질 운전, '의심'=RUN인데 유량/진공 없음(deadhead·센서이상 가능), "
"'정체'=실시간 수집 지연으로 판정 보류. 의심/정체가 있으면 보고서에 우선 명시하세요."
)
user_msg = (
f"대상 area: {area or '전체'}\n"
f"분석 윈도우: 최근 {hours}시간\n"
f"이벤트 통계 (type별): {by_type}\n"
f"활성 알람 {len(alarms)}건:\n" + "\n".join(alarm_lines) + "\n\n"
f"펌프 운전 교차검증 (area별):\n" + "\n".join(corr_lines) + "\n\n"
f"최근 이벤트 표본 (최대 40건):\n" + "\n".join(recent_lines)
)
@@ -1766,6 +1796,7 @@ async def generate_status_report(area: str | None = None, hours: int = 24) -> st
"active_alarms_count": len(alarms),
"recent_events_count": len(events),
"by_type": by_type,
"pump_corroboration": pump_corr,
"window_hours": hours,
"area": area,
},

View File

@@ -0,0 +1,604 @@
# 운전판정 고도화 플랜 — 유량계·진공압 교차검증(Corroboration) 도입
> **문서 상태**: 초안 + 감리 진단 반영 (2026-05-24) — **§0이 §1~§8 초안보다 우선**
> **작성일**: 2026-05-24
> **관련 시스템**: ExperionCrawler v_plant_running_state, MCP server, pid_equipment
---
## 0. 감리 진단 결과 (2026-05-24, 실 DB·pid_equipment·코드 검증)
> 초안(§1~§8)을 실제 데이터로 검증한 결과다. 본문과 충돌 시 **이 절의 정정/보완이 우선**한다.
### 0.1 검증된 사실 (초안 대비 정정)
| 항목 | 초안 주장 | 실측 결과 | 판정 |
|---|---|---|---|
| 유량 데이터 성격 | "dummy 20.27 고정" (risk #1) | `ficq-6113`이 하루 동안 **38~55로 실측 변동**(history_table). 단 **수집기 stall 시 전 태그 frozen**(2026-05-24 09:58 KST 실제 발생, 별도 수정 완료) | 라이브 ✅ / **신선도 게이트 필수** |
| 펌프번호=유량번호 | "일관됨" (obs #1) | P-6102→ficq-**6101**, P-6114→ficq-6113+**6114**(1:N) — 번호 불일치·1:N | ❌ 오류 |
| `pica-6111.pv` 값 | "20.8" (§2.1) | 쿼리마다 변동(5.87/20.8…) — 고정값 아님 | ❌ 값 오류 |
| VP↔진공압 매핑 | "pid_equipment 기반" (§4.1.1 Step3) | VP-6117: from=D-6113, to=SC-6128 → **압력계와 토폴로지 연결 없음**. Step3는 사실상 하드코딩 manual | ⚠️ 명칭 정정 |
| 진공압 태그 의미 | "PICA.pv<50=진공유지" | `pi-6111`="VACUUM PRESSURE", `pica-6111`은 PT-6111→PCV-6111 **압력제어 루프**(다른 점일 수). 센서 sense(절대압/진공계)·단위 미확정 | ⚠️ 임계 근거 부족 |
| 실시간 유량 태그 | FT 기준 매핑 (§4.1.1) | realtime에 `ft-6113` **없음**(count=0). 값은 **FICQ 컨트롤러**(`ficq-6113.pv`)에 존재 | ❌ 매핑 대상 오류 |
### 0.2 치명적 문제 — Phase 1 SQL 그대로면 동작하지 않음
1. **[높음] 하이픈 제거 버그**: `LOWER(REPLACE(ft.tag_no,'-',''))``'ft6113'`. realtime base_tag는 하이픈 유지(`ficq-6113`) → **매칭 0건** → 전부 INDETERMINATE.
2. **[높음] FT vs FICQ 대상 오류**: 유량값은 `ft-*`가 아니라 `ficq-*.pv`. FT 태그로는 realtime 조인 불가.
3. **[높음] heuristic 오매핑**: `'ficq-'||SUBSTRING(base_tag FROM 3)` → P-6102는 `ficq-6102`(없음, 실제 6101), vp-6117은 `ficq--6117`(깨짐). 번호 가정 자체가 틀림.
4. **[중간] 1:N 손실**: flow_tag 단일 컬럼 → P-6114(6113+6114) 한쪽만 저장.
5. **[중간] forward JOIN 실패**: P-6102.to_tag=필터, P-6116.to_tag=C-6111/FCV → to_tag에 FT 없음. 역방향(FT.from_tag=pump)만 일부 성립.
### 0.3 설계 공백
6. **[최우선] 데이터 신선도 게이트 부재** — realtime 값이 stale/frozen이어도 임계 비교 → frozen `20.2667`을 "유량 정상=CONFIRMED"로 **오판**. (이번 수집기 stall이 정확히 이 시나리오.) 판정 전 `NOW()-timestamp` 확인, stale면 판정 보류.
7. **[중간] 임계 0.5 절대값** — 계기별 Full Scale 무시(작은 레인지 과대·큰 레인지 과소).
8. **[중간] 집계가 1대 의심에 전체 오염** — suspicious 1대로 area가 RUNNING_WITH_SUSPICIOUS. 정상 standby/kickback(sp=0) 펌프 상시 의심 위험. corroborated_pct 분모에 INDETERMINATE 혼입.
9. **[중간] active_alarms 주입 위험** — 미검증 휴리스틱(+데이터 품질 이슈)을 운전원 알람화 → 알람 피로. 검증 전 advisory만.
10. **[낮음] VP 신호 선택** — `pica-6111`(압력제어)보다 `pi-6111b`("VACUUM PRESSURE", C-6111 직결)가 진공 판정에 적합해 보임. sense/단위 확인 후 결정.
### 0.4 권장 보완
**(a) 매핑 — 토폴로지 규칙(번호 heuristic 폐기)**
- 1차: `FT.from_tag`가 펌프를 참조하는 행 수집(역방향, 1:N 자연 지원). 값 태그 = 같은 번호 **FICQ 컨트롤러** `ficq-<FT번호>.pv`. 하이픈 **유지**(`lower(tag_no)`만, REPLACE 금지).
- 2차: 펌프-FT 사이 중간설비(P-6102→F-6101A/B→FT-6101)는 토폴로지 2-hop 또는 **manual** 항목.
- map 테이블은 1:N 허용(펌프당 flow_tag 복수 행) + `mapping_source ∈ {topology, manual}`.
**(b) 신선도 게이트 (신규·최우선)**
```sql
-- 값 신뢰 조건: 수집 후 N초 이내(예: 120s). realtime_table.timestamp 기준.
(NOW() - rt.timestamp) < interval '120 seconds'
```
신선하지 않으면 `STALE`(운전 여부 판정 보류). 수집기 측은 supervisor가 stall을 30초 내 자동 복구하고 `RealtimeServiceStatus.Stalled`로도 노출(2026-05-24 적용).
**(c) 판정 상태에 STALE 추가**
| 판정 | 조건 |
|---|---|
| CONFIRMED_RUNNING | RUN + 유량 **신선** + PV>임계 |
| SUSPICIOUS_RUNNING | RUN + 유량 **신선** + PV≤임계 |
| **STALE** (신규) | RUN + 유량 **stale/frozen** (수집 지연·stall) |
| INDETERMINATE_RUNNING | RUN + 유량 매핑/데이터 없음 |
| STOPPED / TRIPPED | enum 기준 |
**(d) 임계** — Phase 1은 절대 0.5 단독 대신 "신선 AND PV<계기군 기본임계"로 무유량/frozen만 포착. Full Scale 5%는 Phase 2 `instrument_range`로.
**(e) 집계·알람** — overall_status는 CONFIRMED 기준으로 RUNNING 유지하고 suspicious/stale는 **부가 카운트**로 노출(전체 상태 오염 금지). active_alarms 주입은 **운전원 검증(§6.4) 통과 후**로 보류.
### 0.5 갱신된 의사결정 체크리스트
- [x] 데이터 성격: **라이브**(dummy 아님), 단 stale 가능 → 신선도 게이트 필수
- [ ] 매핑: FT.from_tag 역방향 + FICQ 값태그 + manual 보강 (번호 heuristic 폐기) ← **재결정**
- [ ] 임계: 계기군 기본값 + 신선도 동반 (절대 0.5 단독 폐기) ← **재결정**
- [ ] VP 신호: `pi-6111b` vs `pica-6111` + sense/단위 확인 ← **미결**
- [ ] STALE 상태 도입 ← **신규 결정 필요**
- [ ] active_alarms 주입: 검증 전 보류 ← **재결정**
### 0.6 구현 현황 (2026-05-24)
-**DB 뷰 계층 구현·검증** (`ExperionDbContext` boot DDL):
- `pump_corroboration_manual` (수동 매핑 테이블, P6 예외 시드: p-6102→ficq-6101, vp-6117→pica-6111, vp-6217→pica-6211)
- `v_pump_signal_map` (토폴로지 `FT.from_tag=펌프`→FICQ 1:N + 수동 UNION)
- `v_plant_running_state_corroborated` (신선도 게이트 120s + STALE + 유량 > 0.5 kg/hr · 진공 < 300 torr)
- `v_plant_running_state_agg` (overall은 CONFIRMED 기준 RUNNING, suspicious/stale는 부가 카운트)
- 빌드 0/0. 라이브 검증: 현재 frozen 데이터가 전부 `STALE`로 분류됨 확인(게이트 정상 작동).
-**plant_context.md** 교차검증 사용법 추가.
-**단위/레인지 메타데이터화** (별도 테이블 X — 복잡도 최소화 결정): `MetadataLoaderService.MetaAttributes``euhi/eulo/units` 추가 → `tag_metadata`(EAV) 재사용. node_map_master에 점 레벨 `euhi`(Double FS-Hi)·`eulo`·`units`(String) 노드 존재 확인. **PointBuilder 작성·수동 메타갱신 트리거에 자동 편승**(스코프=구독 아날로그 ⓐ). 타입 접근은 `v_instrument_range` 뷰(피벗+캐스트). 값은 단위 torr / 유량 kg/hr.
-**유량 임계 FS 5%**: corroborated가 `flow > COALESCE(eu_hi*0.05, 0.5 kg/hr)` — 레인지 적재되면 자동 FS 5% 승급, 없으면 절대 fallback. 진공은 300 torr 절대(실 레인지 확인 후 보정).
-**OPC 복구 후 실값 적재**: 현재 수집 stall이라 `euhi/eulo/units` 값 미적재 → 복구 후 메타갱신 1회 시 채워지며 FS 5% 자동 적용.
-**MCP 연동 보류**: `generate_status_report``v_plant_running_state_agg` 노출은 후속. `active_alarms` 주입은 운전원 검증 후(§0.4e).
---
## 1. 배경 및 문제 정의
### 1.1 현재 상황
현재 공장 운전 판정(`v_plant_running_state` 뷰)은 **펌프의 상태 워드(enum 값)만**으로 이루어짐:
```sql
-- 현재 로직 (의사코드)
CASE
WHEN pv ~ '[LR]-RUN' THEN 'RUNNING'
WHEN pv ~ '[LR]-TRIP' THEN 'TRIPPED'
ELSE 'STOPPED'
END AS status
```
예: P-6102의 PV = `{5 | R-RUN | }`**RUNNING**
### 1.2 문제점 — 허위 운전 미검출
펌프 상태 워드가 `R-RUN`이어도 **실질적 운전**이 아닌 경우가 있음:
| 상황 | 펌프 상태 | 유량계 | 실질 운전? |
|------|----------|--------|-----------|
| 정상 운전 | R-RUN | > 0 | ✅ |
| 밸브 닫힘/Deadhead | R-RUN | ≈ 0 | ❌ (기계 손상 위험) |
| 커플링 파손 | R-RUN | ≈ 0 | ❌ (무부하 운전) |
| 센서 오류 | R-RUN | ≈ 0 | ❌ (신호 끊김) |
| Kickback 순환 | R-RUN | 0 (메인) | ⚠️ (의도된 운전, main line은 닫힘) |
**현재는 이 4가지 케이스를 모두 동일하게 `RUNNING`으로 판정 → 허위 정보 제공**
### 1.3 진공 펌프의 특수성
진공 펌프(VP)는 유량계가 없고 **진공압(PI/PICA)** 으로 운전 상태를 검증:
| 상황 | VP 상태 | 진공압 | 실질 운전? |
|------|---------|--------|-----------|
| 정상 진공 유지 | R-RUN | 목표압 도달 | ✅ |
| 펌프 고장/RUN 신호 오류 | R-RUN | 대기압 (≈0) | ❌ |
| 계통 누설 | R-RUN | 대기압 (≈0) | ❌ |
---
## 2. 조사 결과 — P6 데이터 기반 분석
### 2.1 Pump↔Flow Meter 매핑 (pid_equipment 기반)
**P6-1 (C-6111 증류탑):**
| Pump | 상태 | P&ID 연결 | Experion 유량계 | SETPOINT |
|------|------|----------|----------------|----------|
| P-6101 | L-STOP | (미매핑, 번호 일치) | ficq-6101.pv=20.3 | sp=36.0 |
| P-6102 | **R-RUN** | →F-6101A/B→FT-6101→FCV-6101 | ficq-6101.pv=20.3 | sp=36.0 |
| P-6114 | **R-RUN** | →FT-6113( reflux) + FT-6114(light ends) | ficq-6113.pv=20.3, ficq-6114.pv=20.3 | sp=36.4 / 0 |
| P-6116 | **R-RUN** | →FT-6116→FCV-6116 | ficq-6116.pv=20.3 | sp=0 |
| P-6118 | **R-RUN** | →FT-6118→FCV-6118 | ficq-6118.pv=20.3 | sp=0 |
| VP-6117 | **R-RUN** | C-6111 진공 유지 | pi-6111.pv=0, pica-6111.pv=20.8 | — |
| P-6120 | OFF | (미매핑) | fiq-6120.pv=0 | — |
| P-6123 | L-STOP | (미매핑) | — | — |
| P-6128a/b | L-STOP | (미매핑) | — | — |
**P6-2 (C-6211 증류탑):**
| Pump | 상태 | Experion 유량계 | 비고 |
|------|------|----------------|------|
| P-6201 | L-STOP | ficq-6201.pv=20.8 | P6-1/P6-2 공용 |
| P-6202~6223 | 전부 STOP | ficq-62XX.pv=20.3 | — |
| VP-6217 | L-STOP | pi-6211.pv=0, pica-6211.pv=20.8 | — |
### 2.2 key observations
1. ⚠️ **(정정 §0.1) 펌프 번호 ≠ 유량계 번호**: P-6102→ficq-6101, P-6114→ficq-6113+6114(1:N). 유량 번호는 stream/line 번호 → pid_equipment 토폴로지(`FT.from_tag=pump`)로 매핑해야 함
2. **pid_equipment.from_tag/to_tag 토폴로지**로 1:N 매핑 추적 가능 (예: P-6114→FT-6113 + FT-6114)
3. **Setpoint(SP) 데이터 존재**: ficq-XXXX.sp 사용 가능 — SP=0은 밸브 닫힘(킥백) 신호
4. **VP는 유량계 없음**: 대신 pica-6111/sp, pi-6111로 진공압 교차검증 필요
5. **FCV-XXXX.op(밸브 위치) 데이터 없음**: 현재 realtime_table에 미등록
---
## 3. 설계 결정
### 3.1 매핑 전략: pid_equipment 기반 + 번호 heuristic fallback
> ⚠️ **§0.4(a) 우선** — 번호 heuristic은 오매핑(P-6102→6101 어긋남, vp- 깨짐)이라 폐기. `FT.from_tag` 역방향(1:N) + FICQ 값태그 + manual 보강으로 대체.
```
[1차] pid_equipment.from_tag/to_tag 정방향/역방향 조회
└─ P-6114의 to_tag = FT-6113, FT-6114 → ficq-6113, ficq-6114 매핑
[2차] 번호 heuristic fallback (pid_equipment 커버 안 되는 경우)
└─ P-6101 (pid_equipment 미존재) → ficq-6101 (번호 일치)
```
**pump_corroboration_map 테이블 (신규):**
```sql
CREATE TABLE pump_corroboration_map (
pump_base_tag TEXT PRIMARY KEY,
flow_tag TEXT, -- ficq-XXXX 또는 fiq-XXXX
flow_sp_tag TEXT, -- ficq-XXXX.sp (threshold 계산용)
vacuum_tag TEXT, -- VP용: pica-XXXX.pv 또는 pi-XXXX.pv
vacuum_sp_tag TEXT, -- VP용: pica-XXXX.sp
mapping_source TEXT DEFAULT 'auto', -- 'pid_equipment' | 'heuristic' | 'manual'
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
> **MetadataLoaderService가 건드리지 않음** — sub_area와 동일한 원칙 (자동 덮어쓰기 금지)
### 3.2 Threshold 기준
| 구분 | Phase 1 기준 | Phase 2 기준 |
|------|-------------|-------------|
| **유량계 (FICQ/FIQ)** | PV > **0.5** (절대값) | PV > **5% of Full Scale** (instrument_range 테이블 도입 시) |
| **진공압 (PICA)** | PV < **50** (mmHg 절대압 가정) | PV < **5% of Full Scale** |
| **진공압 (PI)** | PV ≈ **0** (≈대기압이면 의심) | 동일 |
**Phase 1에서 SP(Setpoint)를 threshold 기준으로 사용하지 않는 이유:**
운전원은 잦은 수동 밸브 조작으로 SP를 변경하지 않고 Control Valve만 조작함.
예: ficq-6101.sp=36으로 설정되어 있지만, PV가 1.2로 수동 조작 중 → SP*5%=1.8 > PV=1.2 → false SUSPICIOUS.
절대값 0.5를 기준으로 하면 PV=1.2는 정상 판정 → **false positive 방지**.
### 3.3 3단계 판정 로직
> ⚠️ **§0.4(c): `STALE` 상태 추가로 4단계+STALE. 신선도 미확인 시 frozen 데이터 오판(CONFIRMED) 차단.**
킥백 라인 상황(펌프 RUN + 메인 밸브 닫힘 + kickback만 순환) 고려:
| 판정 | 조건 | 의미 |
|------|------|------|
| **CONFIRMED_RUNNING** | pump RUN + flow PV > threshold | 유량 있음, 실질 운전 중 |
| **SUSPICIOUS_RUNNING** | pump RUN + flow PV ≤ threshold | RUN인데 유량 없음 (deadhead / 센서오류 / 커플링파손) |
| **INDETERMINATE_RUNNING** | pump RUN + flow 데이터 없음 | kickback 가능성, 추가 정보 필요 |
| **STOPPED** | pump STOP/TRIP/OFF | 정지 또는 트립 |
킥백 상황(P-6114 RUN + ficq-6114.sp=0 → valve closed)은 유량 PV=0이어도 PV=0이라 SUSPICIOUS 대상이지만, **이건 의도된 운전이므로 사용자가 해석 시 고려**. Phase 2에서 FCV-XXXX.op 데이터 추가 시 자동 구분 가능.
### 3.4 진공 펌프(VP) 교차검증
VP는 유량계가 없으므로 **진공압(PICA/PI)** 으로 검증:
- **VP R-RUN** + PICA.pv < threshold (진공 유지 중) → CONFIRMED
- **VP R-RUN** + PI.pv ≈ 0 (대기압, 진공 안 잡힘) → SUSPICIOUS
- **VP STOP/TRIP** → STOPPED
---
## 4. 구현 계획 — 3 Phase
### Phase 1: SQL View 확장 (즉시)
#### 4.1.1 pump_corroboration_map 생성 및 시딩
> ⚠️ **아래 시딩 SQL은 현 상태로 동작하지 않음** (§0.2: ① 하이픈 제거 `REPLACE(...,'-','')` → 매칭 0건, ② FT/FICQ 대상 혼동, ③ 번호 heuristic 오류, ④ 1:N 손실). **§0.4(a)의 토폴로지 규칙으로 대체할 것.**
```sql
-- Step 1: pid_equipment 기반 pump→FT 매핑 (from_tag/to_tag 정/역방향)
INSERT INTO pump_corroboration_map (pump_base_tag, flow_tag, flow_sp_tag, mapping_source)
SELECT DISTINCT
LOWER(REPLACE(p.tag_no, '-', '')) AS pump_base_tag,
LOWER(REPLACE(ft.tag_no, '-', '')) || '.pv' AS flow_tag,
LOWER(REPLACE(ft.tag_no, '-', '')) || '.sp' AS flow_sp_tag,
'pid_equipment'
FROM pid_equipment p
JOIN pid_equipment ft ON (
-- 정방향: pump.to_tag = FT
p.to_tag LIKE '%' || ft.tag_no || '%'
-- 역방향: FT.from_tag = pump
OR ft.from_tag LIKE '%' || p.tag_no || '%'
)
WHERE p.category = '펌프'
AND ft.category IN ('계기', '제어')
AND ft.tag_no ~ '^FT-|^FIC-';
-- Step 2: 번호 heuristic fallback (pid_equipment에 없는 pump)
INSERT INTO pump_corroboration_map (pump_base_tag, flow_tag, flow_sp_tag, mapping_source)
SELECT DISTINCT
v.base_tag,
'ficq-' || SUBSTRING(v.base_tag FROM 3) || '.pv',
'ficq-' || SUBSTRING(v.base_tag FROM 3) || '.sp',
'heuristic'
FROM v_tag_summary v
WHERE (v.base_tag LIKE 'p-6%' OR v.base_tag LIKE 'vp-6%')
AND v.pv ~ '[LR]-RUN|L-STOP|R-STOP|OFF'
AND NOT EXISTS (
SELECT 1 FROM pump_corroboration_map m WHERE m.pump_base_tag = v.base_tag
);
-- Step 3: VP 전용 vacuum 태그 매핑
INSERT INTO pump_corroboration_map (pump_base_tag, vacuum_tag, vacuum_sp_tag, mapping_source)
SELECT
LOWER(REPLACE(vp.tag_no, '-', '')),
LOWER(REPLACE(pi.tag_no, '-', '')) || '.pv',
LOWER(REPLACE(pi.tag_no, '-', '')) || '.sp',
'pid_equipment'
FROM pid_equipment vp
CROSS JOIN pid_equipment pi
WHERE vp.tag_no LIKE 'VP-%'
AND pi.tag_no IN ('PICA-6111', 'PICA-6211', 'PI-6111B', 'PI-6211B');
```
#### 4.1.2 v_plant_running_state_corroborated 뷰
> ⚠️ **신선도 게이트(§0.4b)·STALE 분기(§0.4c) 미반영.** 아래 뷰에 `(NOW() - rt.timestamp) < interval '120 seconds'` 조건과 `STALE` 분기를 추가하고, flow 값은 `ft-*`가 아니라 `ficq-*.pv`(컨트롤러)에서 조인할 것.
```sql
CREATE OR REPLACE VIEW v_plant_running_state_corroborated AS
WITH pump_base AS (
-- 기존 pump_state 로직 + corroboration 매핑 LEFT JOIN
SELECT
trim(split_part(v.area, '|', 2)) AS area_code,
v.area AS area_raw,
v.base_tag,
v.pv,
v.description,
v.sub_area,
m.flow_tag,
m.flow_sp_tag,
m.vacuum_tag,
m.vacuum_sp_tag,
m.mapping_source,
-- 유량계 PV/SP 값 조회 (realtime_table에서)
flow_rt.livevalue AS flow_pv,
flow_sp_rt.livevalue AS flow_sp,
vac_rt.livevalue AS vacuum_pv,
vac_sp_rt.livevalue AS vacuum_sp
FROM v_tag_summary v
LEFT JOIN pump_corroboration_map m ON m.pump_base_tag = v.base_tag
LEFT JOIN realtime_table flow_rt ON flow_rt.tagname = m.flow_tag
LEFT JOIN realtime_table flow_sp_rt ON flow_sp_rt.tagname = m.flow_sp_tag
LEFT JOIN realtime_table vac_rt ON vac_rt.tagname = m.vacuum_tag
LEFT JOIN realtime_table vac_sp_rt ON vac_sp_rt.tagname = m.vacuum_sp_tag
WHERE v.area IS NOT NULL
AND (v.base_tag LIKE 'p-%' OR v.base_tag LIKE 'vp-%')
AND v.pv ~ '\|\s*(L-RUN|R-RUN|L-STOP|R-STOP|L-TRIP|R-TRIP)\s*\|'
),
pump_with_corroboration AS (
SELECT *,
CASE
-- pump STOP/TRIP/OFF
WHEN pv ~ '\|\s*[LR]-TRIP\s*\|' THEN 'TRIPPED'
WHEN pv ~ '\|\s*(L-STOP|R-STOP|OFF|STOP)\s*\|' THEN 'STOPPED'
-- pump RUN - vacuum pump (VP)
WHEN base_tag LIKE 'vp-%' THEN
CASE
WHEN vacuum_pv IS NOT NULL
AND vacuum_pv ~ '^\d+\.?\d*$'
AND CAST(vacuum_pv AS DOUBLE PRECISION) < 50
THEN 'CONFIRMED_RUNNING'
WHEN vacuum_pv IS NOT NULL
AND (vacuum_pv ~ '^\{' OR CAST(vacuum_pv AS DOUBLE PRECISION) >= 50)
THEN 'SUSPICIOUS_RUNNING'
ELSE 'INDETERMINATE_RUNNING'
END
-- pump RUN - 유량계 있음
WHEN flow_pv IS NOT NULL AND flow_pv ~ '^\d+\.?\d*$' THEN
CASE
WHEN CAST(flow_pv AS DOUBLE PRECISION) > 0.5 THEN 'CONFIRMED_RUNNING'
ELSE 'SUSPICIOUS_RUNNING'
END
-- pump RUN - 유량계 없음
ELSE 'INDETERMINATE_RUNNING'
END AS corroborated_status
FROM pump_base
)
SELECT
area_code,
area_raw,
base_tag,
pv AS raw_pv,
description,
sub_area,
flow_tag,
flow_pv,
flow_sp,
vacuum_tag,
vacuum_pv,
vacuum_sp,
mapping_source,
corroborated_status,
CASE
WHEN corroborated_status = 'CONFIRMED_RUNNING' THEN TRUE
ELSE FALSE
END AS is_corroborated_running,
CASE
WHEN corroborated_status = 'SUSPICIOUS_RUNNING' THEN TRUE
ELSE FALSE
END AS is_suspicious_running,
CASE
WHEN corroborated_status = 'INDETERMINATE_RUNNING' THEN TRUE
ELSE FALSE
END AS is_indeterminate_running
FROM pump_with_corroboration
WHERE area_code IS NOT NULL AND area_code <> '';
```
#### 4.1.3 v_plant_running_state_agg 뷰 (area별 집계)
```sql
CREATE OR REPLACE VIEW v_plant_running_state_agg AS
SELECT
area_code,
MAX(area_raw) AS area_raw,
COUNT(*) AS total_pumps,
COUNT(*) FILTER (WHERE corroborated_status = 'CONFIRMED_RUNNING') AS confirmed_running,
COUNT(*) FILTER (WHERE corroborated_status = 'SUSPICIOUS_RUNNING') AS suspicious_running,
COUNT(*) FILTER (WHERE corroborated_status = 'INDETERMINATE_RUNNING') AS indeterminate_running,
COUNT(*) FILTER (WHERE corroborated_status = 'TRIPPED') AS tripped_pumps,
COUNT(*) FILTER (WHERE corroborated_status = 'STOPPED') AS stopped_pumps,
CASE
WHEN COUNT(*) FILTER (WHERE corroborated_status IN ('CONFIRMED_RUNNING', 'SUSPICIOUS_RUNNING', 'INDETERMINATE_RUNNING')) > 0
AND COUNT(*) FILTER (WHERE corroborated_status = 'SUSPICIOUS_RUNNING') = 0
THEN 'RUNNING'
WHEN COUNT(*) FILTER (WHERE corroborated_status = 'SUSPICIOUS_RUNNING') > 0
THEN 'RUNNING_WITH_SUSPICIOUS'
WHEN COUNT(*) FILTER (WHERE corroborated_status = 'TRIPPED') > 0
THEN 'TRIPPED'
ELSE 'STOPPED'
END AS overall_status,
-- corroborated_rate: 전체 RUN 펌프 중 확인된 비율
CASE
WHEN COUNT(*) FILTER (WHERE corroborated_status IN ('CONFIRMED_RUNNING', 'SUSPICIOUS_RUNNING', 'INDETERMINATE_RUNNING')) > 0
THEN ROUND(
COUNT(*) FILTER (WHERE corroborated_status = 'CONFIRMED_RUNNING')::NUMERIC
/ COUNT(*) FILTER (WHERE corroborated_status IN ('CONFIRMED_RUNNING', 'SUSPICIOUS_RUNNING', 'INDETERMINATE_RUNNING'))
* 100, 1
)
ELSE NULL
END AS corroborated_pct,
array_agg(base_tag) FILTER (WHERE corroborated_status = 'SUSPICIOUS_RUNNING') AS suspicious_pump_tags,
array_agg(base_tag) FILTER (WHERE corroborated_status = 'CONFIRMED_RUNNING') AS confirmed_running_tags
FROM v_plant_running_state_corroborated
WHERE area_code IS NOT NULL AND area_code <> ''
GROUP BY area_code
ORDER BY area_code;
```
### Phase 1: MCP Server 통합
#### 4.1.4 server.py — 새 뷰 조회 추가
`generate_status_report` 함수 내에서 `v_plant_running_state_agg` 조회:
```python
# server.py — generate_status_report 내부
cur.execute("""
SELECT area_code, overall_status, total_pumps, confirmed_running,
suspicious_running, suspicious_pump_tags, corroborated_pct
FROM v_plant_running_state_agg
WHERE (%s IS NULL OR area_code = %s)
ORDER BY area_code
""", (area, area))
```
응답 JSON에 추가:
```python
{
"active_alarms": [...],
"recent_events": [...],
"by_type": {...},
"pump_corroboration": {
"by_area": [
{
"area": "P6",
"status": "RUNNING_WITH_SUSPICIOUS",
"total_pumps": 22,
"confirmed_running": 4,
"suspicious_running": 1,
"corroborated_pct": 80.0,
"suspicious_pumps": ["p-6114"]
}
]
}
}
```
#### 4.1.5 active_alarms — SUSPICIOUS_RUNNING 의심 알람 추가
```sql
-- active_alarms에 suspicious pump 추가
SELECT base_tag AS tag_name, 'SUSPICIOUS_RUNNING' AS event_type,
'펌프 RUN 상태이나 유량 없음' AS description,
area_code
FROM v_plant_running_state_corroborated
WHERE is_suspicious_running = TRUE;
```
#### 4.1.6 trace_connections — flow_pv/run_status 노출
```python
# 각 path 노드에 flow_pv, corroborated_status 추가
# pid_equipment tag_no → base_tag 변환 후 v_plant_running_state_corroborated 조회
```
### Phase 1: plant_context.md 업데이트
프롬프트에 교차검증 관련 컨텍스트 추가:
```
## 운전 판정 교차검증 (Corroboration)
펌프의 상태 워드(R-RUN/L-RUN)만으로 운전을 판정하지 않고,
연결된 유량계(FICQ/FIQ)의 PV 값을 교차검증하여 3단계 판정:
| 판정 | 의미 |
|------|------|
| CONFIRMED_RUNNING | 펌프 RUN + 유량계 PV > 0.5 (실질 운전) |
| SUSPICIOUS_RUNNING | 펌프 RUN + 유량계 PV ≤ 0.5 (의심: deadhead, 센서오류) |
| INDETERMINATE_RUNNING | 펌프 RUN + 유량계 데이터 없음 (킥백 가능성) |
진공펌프(VP)는 유량계 대신 진공압(PICA.pv < 50)으로 판정.
- `v_plant_running_state_corroborated`: 태그별 상세 판정
- `v_plant_running_state_agg`: area별 집계 (corroborated_pct 포함)
```
---
### Phase 2: 정밀화 (Phase 1 검증 후)
| 항목 | 내용 | 우선순위 |
|------|------|---------|
| **instrument_range 테이블** | 각 유량계의 Full Scale / Unit 저장 → 5% threshold 계산 | 높음 |
| **Plant Load Rate 기반 검증** | 원료투입량(FICQ-6101.pv) 대비 각 유량계 비율 계산 → 수율(Throughput) 추정 | 중간 |
| **FCV-XXXX.op 태그 추가** | Control Valve position 실시간 감시 → kickback 자동 인식 | 중간 |
| **AI/통계 threshold** | 정상 운전 기간의 평균±3σ로 이상 감지 | 낮음 |
| **PumpCorroborationService** | C# BackgroundService로 주기적 검증 → pump_corroboration_history 테이블에 이벤트 기록 | 낮음 |
#### instrument_range 테이블 설계 (Phase 2)
```sql
CREATE TABLE instrument_range (
base_tag TEXT PRIMARY KEY,
full_scale DOUBLE PRECISION NOT NULL,
unit TEXT,
source TEXT DEFAULT 'manual', -- 'pid_equipment' | 'opc_ua' | 'manual'
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Phase 2 threshold: PV > 0.05 * full_scale (5% of Full Scale)
```
Phase 2에서 기존 view의 threshold만 변경:
```sql
-- Phase 2 수정안
WHEN CAST(flow_pv AS DOUBLE PRECISION) > 0.05 * ir.full_scale THEN 'CONFIRMED_RUNNING'
```
---
### Phase 3: 프론트엔드 대시보드
- Area Overview에 `corroborated_pct` 게이지 표시
- SUSPICIOUS_RUNNING 펌프 빨간색 하이라이트
- 클릭 시 flow_pv / vacuum_pv 상세 표시
---
## 5. 리스크 분석
| 리스크 | 영향 | 대응 |
|--------|------|------|
| ~~Flow PV가 dummy 값~~**(정정 §0.1) 라이브 데이터** (ficq-6113 38~55 변동 확인). 단 **수집기 stall 시 frozen** | frozen을 정상으로 오판(CONFIRMED) | **신선도 게이트(§0.4b)** + 수집기 supervisor 수정(2026-05-24 적용) |
| **pid_equipment 미완성** | 일부 pump 교차검증 불가 | 번호 heuristic fallback으로 커버. 이후 수동 보강 |
| **킥백 상황 오판** | SUSPICIOUS_RUNNING 과다 | 0.5 threshold로 PV=0 케이스만 포착. Phase 2에서 FCV-XXXX.op 추가 |
| **진공압 범위/단위 불명** | threshold 값 부정확 | 현재 PICA.pv=20.8(임의값). 실제 단위 확인 필요 (mmHg / kPa / bar) |
| **성능: realtime_table LEFT JOIN** | 뷰 조회 속도 저하 | pump_corroboration_map에 인덱스. 실운영 모니터링 |
## 6. 검증 계획
1. **유닛 테스트**: 각 판정 CASE별 샘플 데이터 생성 → 예상 결과와 일치 확인
2. **DB 뷰 검증**: production replica에서 `v_plant_running_state_corroborated` 조회, 수동 확인
3. **MCP 응답 체크**: `generate_status_report`에 suspicious 필드 정상 포함 확인
4. **운전원 피드백**: SUSPICIOUS_RUNNING 케이스 실제 상황과 일치하는지 확인
---
## 7. 일정 (예상)
| Phase | 작업 | 예상 기간 | 비고 |
|-------|------|----------|------|
| **Phase 1** | pump_corroboration_map 시딩 스크립트 | 1일 | |
| | v_plant_running_state_corroborated 뷰 | 0.5일 | |
| | v_plant_running_state_agg 뷰 | 0.5일 | |
| | MCP server.py 통합 | 1일 | |
| | plant_context.md 업데이트 | 0.5일 | |
| | **소계** | **3.5일** | |
| **Phase 2** | instrument_range 테이블 + 시딩 | 1일 | Phase 1 검증 후 |
| | Plant Load Rate 계산 로직 | 2일 | |
| | FCV-XXXX.op 추가 | 0.5일 | |
| | 통계 threshold | 2일 | |
| | **소계** | **5.5일** | |
| **Phase 3** | 프론트엔드 대시보드 | 2일 | |
| | **총계** | **11일** | |
---
## 8. 결론
현재 펌프 상태 워드 단일 판정을 유량계·진공압 교차검증으로 고도화하여 **허위 운전 정보 제공을 방지**하고, **실질 운전 여부를 정확히 판정**할 수 있음.
Phase 1은 SQL view 확장만으로 즉시 적용 가능하며, OPC UA 실제 데이터 연결 후 검증 즉시 가동 가능.
**핵심 의사결정 사항:**
- [ ] Pump↔유량계 매핑 방식 → pid_equipment 기반 + 번호 heuristic fallback (✅ 결정)
- [ ] Threshold 기준 → 절대값 0.5 (Phase 1), Full Scale 5% (Phase 2) (✅ 결정)
- [ ] 킥백 처리 → Phase 1에서 별도 미처리, Phase 2에서 valve position 추가 (✅ 결정)
- [ ] VP 교차검증 → PICA.pv < 50 기준 (단위 확인 필요)

View File

@@ -128,10 +128,15 @@ WHERE livevalue ~ '\|\s*[LR]-TRIP\s*\|'
WHERE livevalue ~ '\|\s*(L-STOP|R-STOP|STOP|OFF)\s*\|'
```
### ⚠️ p-prefix 주의사항
### ⚠️ 펌프 prefix 주의사항 (p- / vp-)
`p-NNNN` prefix는 펌프 전용이 아닙니다. panel point, alarm point 등 다른 디지털 포인트도 같은 prefix를 공유합니다.
- 운전 판정 시 enum이 위 6종(L-STOP/L-RUN/L-TRIP/R-STOP/R-RUN/R-TRIP) 중 하나인 태그만 펌프로 취급.
펌프 prefix는 **두 종류**입니다:
- `p-NNNN` = 공정 펌프 (원료/리플럭스/제품 이송 등)
- `vp-NNNN` = **진공 펌프** (Vacuum Pump, 감압 컬럼용). 예: `vp-6117`(C-6111 진공), `vp-3204`, `vp-5117`, `vp-6217`, `vp-8117`
`p-NNNN` prefix는 펌프 전용이 아닙니다. panel point, alarm point 등 다른 디지털 포인트(`p-201_hs`, `p-2202_run` 등)도 같은 prefix를 공유합니다.
- 운전 판정 시 enum이 위 6종(L-STOP/L-RUN/L-TRIP/R-STOP/R-RUN/R-TRIP) 중 하나인 태그만 펌프로 취급. 패널/피드백 포인트는 단순 `STOP`/`OFF` enum이라 자동 제외됨.
- `v_plant_running_state` 뷰는 `p-``vp-`**모두** 펌프로 집계하므로 진공 펌프도 `running_pumps`/`running_pump_tags`에 포함됨.
- 또는 `node_map_master.description` / `v_tag_summary.description`으로 펌프 여부 추가 확인.
### "운전 중인 플랜트" 판정 — `v_plant_running_state` 뷰 사용 (1순위)
@@ -173,6 +178,24 @@ ORDER BY area_code;
- "P6 펌프 어떤 게 돌아가?" → `SELECT running_pump_tags FROM v_plant_running_state WHERE area_code='P6'`
- "트립 펌프 있어?" → `SELECT area_code, tripped_pumps FROM v_plant_running_state WHERE tripped_pumps > 0`
### 실질 운전 판정 — 교차검증 뷰 (정밀, 선택)
펌프 상태 워드(RUN)만으로는 deadhead·센서오류·수집 stall(frozen 데이터) 등 **허위 운전**을 못 거른다. 연결된 유량계(kg/hr)·진공압(torr)을 **신선도 게이트(120초)** 와 함께 교차검증한 뷰:
- `v_plant_running_state_corroborated` — 펌프별 상세: `corroborated_status`, `flow_kg_hr`, `vacuum_torr`
- `v_plant_running_state_agg` — area별 집계: `confirmed_running`/`suspicious_running`/`stale_running`/`indeterminate_running` + `status`
| corroborated_status | 의미 |
|---|---|
| `CONFIRMED_RUNNING` | RUN + 신선한 유량 > 0.5 kg/hr (진공펌프: 진공압 < 300 torr) — **실질 운전** |
| `SUSPICIOUS_RUNNING` | RUN + 신선한데 유량 없음 / 진공 안 잡힘 — deadhead·센서오류·standby 의심 |
| `STALE` | RUN + 유량/진공 값이 **stale·frozen**(수집 지연/중단) — **판정 보류, 운전 단정 금지** |
| `INDETERMINATE_RUNNING` | RUN + 신호 매핑/데이터 없음 |
| `STOPPED` / `TRIPPED` | enum 기준 |
- "지금 6차 진짜 돌아가?" → `SELECT base_tag, corroborated_status, flow_kg_hr FROM v_plant_running_state_corroborated WHERE area_code='P6'`
- `STALE`가 많으면 "실시간 수집이 지연/중단된 상태"로 안내(운전 여부 단정 금지). 펌프-신호 매핑 보강은 `pump_corroboration_manual`(수동)·`v_pump_signal_map`(토폴로지).
## 시간대 및 날짜 처리
- DB 저장은 **UTC** 입니다 (`recorded_at`, `event_time` 모두 TIMESTAMPTZ).

View File

@@ -180,7 +180,11 @@ public interface IExperionRealtimeService
Task<(bool Success, string Message)> AddMonitoredItemAsync(string nodeId);
}
public record RealtimeServiceStatus(bool Running, int SubscribedCount, string Message);
/// <param name="LastDataAgeSeconds">마지막 데이터 수신 후 경과초 (-1=수신 이력 없음). 신선도 진단용.</param>
/// <param name="Stalled">링크 이상(연결끊김/KeepAlive 중단/구독 publish 중단) 감지 후 재연결 대기 상태.</param>
public record RealtimeServiceStatus(
bool Running, int SubscribedCount, string Message,
int LastDataAgeSeconds = -1, bool Stalled = false);
// ── OPC UA Server ─────────────────────────────────────────────────────────────

View File

@@ -439,7 +439,8 @@ public class ExperionDbService : IExperionDbService
// v_plant_running_state 뷰 — area별 펌프 RUN/STOP/TRIP 집계.
// "데이터 갱신 = 운전 중"이라는 잘못된 휴리스틱 차단을 위해 펌프 enum 기반으로 판정.
// - 운전 중 판정: pv 값이 '{? | L-RUN | }' 또는 '{? | R-RUN | }'
// - 펌프 식별: base_tag가 'p-'로 시작하면서 pv가 펌프 enum 6종 중 하나 (panel/alarm 포인트 제외)
// - 펌프 식별: base_tag가 'p-'(공정펌프) 또는 'vp-'(진공펌프)로 시작하면서 pv가 펌프 enum 6종 중 하나
// (panel/alarm 포인트는 단순 STOP/OFF enum이라 L/R enum 정규식에서 자동 제외됨)
// - area_code: v_tag_summary.area의 '{12 | P6 | }' 형식에서 'P6' 추출
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_plant_running_state");
await _ctx.Database.ExecuteSqlRawAsync("""
@@ -452,7 +453,7 @@ public class ExperionDbService : IExperionDbService
pv
FROM v_tag_summary
WHERE area IS NOT NULL
AND base_tag LIKE 'p-%'
AND (base_tag LIKE 'p-%' OR base_tag LIKE 'vp-%')
AND pv ~ '\|\s*(L-RUN|R-RUN|L-STOP|R-STOP|L-TRIP|R-TRIP)\s*\|'
)
SELECT
@@ -820,6 +821,172 @@ public class ExperionDbService : IExperionDbService
""");
}
// ── 운전판정 교차검증(Corroboration) — 펌프 RUN을 유량(kg/hr)·진공압(torr)으로 검증 ──
// 펌프 enum(RUN)만으론 deadhead·센서오류·frozen 데이터 등 '허위 운전'을 못 거른다.
// 신선도 게이트: realtime_table.timestamp가 NOW()-120s 이내일 때만 값 신뢰(아니면 STALE=판정보류)
// 매핑: 토폴로지(FT.from_tag=펌프 → 같은 번호 FICQ 컨트롤러, 1:N) + 수동 예외 테이블
// (pid_equipment·from_tag/category 컬럼이 모두 보장된 이 지점 이후에 생성)
// 수동 매핑 — 토폴로지로 못 잡는 예외 (펌프-FT 사이 중간설비, VP↔진공압). 운전원 편집 가능.
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS pump_corroboration_manual (
id BIGSERIAL PRIMARY KEY,
pump_base_tag TEXT NOT NULL,
signal_tag TEXT NOT NULL,
signal_kind TEXT NOT NULL CHECK (signal_kind IN ('flow','vacuum')),
unit TEXT,
note TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (pump_base_tag, signal_tag)
)
""");
// 수동 매핑 시드 (P6 알려진 예외). ON CONFLICT DO NOTHING → 부팅 시 운전원 수정 보존.
await _ctx.Database.ExecuteSqlRawAsync("""
INSERT INTO pump_corroboration_manual (pump_base_tag, signal_tag, signal_kind, unit, note) VALUES
('p-6102', 'ficq-6101.pv', 'flow', 'kg/hr', ' (P-6102FT-6101), 2-hop'),
('vp-6117', 'pica-6111.pv', 'vacuum', 'torr', 'C-6111 (PT-6111 PV)'),
('vp-6217', 'pica-6211.pv', 'vacuum', 'torr', 'C-6211 ')
ON CONFLICT (pump_base_tag, signal_tag) DO NOTHING
""");
// 펌프→신호 매핑 뷰 (토폴로지 유량 1:N + 수동)
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_pump_signal_map");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE VIEW v_pump_signal_map AS
SELECT lower(ft.from_tag) AS pump_base_tag,
'ficq-' || split_part(ft.tag_no, '-', 2) || '.pv' AS signal_tag,
'flow'::text AS signal_kind,
'kg/hr'::text AS unit,
'topology'::text AS mapping_source
FROM pid_equipment ft
WHERE ft.category = ''
AND ft.tag_no LIKE 'FT-%'
AND lower(ft.from_tag) LIKE 'p-%'
AND ft.from_tag NOT LIKE '%,%'
UNION ALL
SELECT pump_base_tag, signal_tag, signal_kind, unit, 'manual'
FROM pump_corroboration_manual
""");
// 계기 단위/레인지 — tag_metadata(EAV) 피벗·타입캐스트 (별도 테이블 없이 재사용).
// corroboration FS 5% 임계 + 향후 SP/제어 계산용. 단위/레인지는 메타갱신 시 자동 적재됨.
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_instrument_range");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE VIEW v_instrument_range AS
SELECT base_tag,
max(value) FILTER (WHERE attribute='units') AS unit,
max(CASE WHEN attribute='eulo' AND value ~ '^-?[0-9]+(\.[0-9]+)?$'
THEN value::double precision END) AS eu_lo,
max(CASE WHEN attribute='euhi' AND value ~ '^-?[0-9]+(\.[0-9]+)?$'
THEN value::double precision END) AS eu_hi
FROM tag_metadata
WHERE attribute IN ('units','eulo','euhi')
GROUP BY base_tag
""");
// 펌프별 교차검증 상세 — 신선도 게이트(120s) + STALE + 유량(FS 5%, fallback 0.5 kg/hr)·진공(300 torr)
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_plant_running_state_corroborated");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE VIEW v_plant_running_state_corroborated AS
WITH pump_base AS (
SELECT trim(split_part(area, '|', 2)) AS area_code,
area AS area_raw, base_tag, pv, description, sub_area
FROM v_tag_summary
WHERE area IS NOT NULL
AND (base_tag LIKE 'p-%' OR base_tag LIKE 'vp-%')
AND pv ~ '\|\s*(L-RUN|R-RUN|L-STOP|R-STOP|L-TRIP|R-TRIP)\s*\|'
),
sig_eval AS (
SELECT m.pump_base_tag, m.signal_kind,
(rt.tagname IS NOT NULL) AS has_row,
(rt.tagname IS NOT NULL
AND (NOW() - rt.timestamp) < interval '120 seconds') AS fresh,
CASE WHEN rt.livevalue ~ '^-?[0-9]+(\.[0-9]+)?$'
THEN rt.livevalue::double precision END AS val,
-- '': + deadhead floor . = FS 5% [1~5 kg/hr]
-- ( : FS 2000 ~11 raw 5%FS=100 SUSPICIOUS로 )
(m.signal_kind='flow'
AND rt.tagname IS NOT NULL
AND (NOW() - rt.timestamp) < interval '120 seconds'
AND rt.livevalue ~ '^-?[0-9]+(\.[0-9]+)?$'
AND rt.livevalue::double precision > GREATEST(1.0, LEAST(COALESCE(ir.eu_hi, 100) * 0.05, 5.0))) AS flow_ok,
-- '': + (torr; , 300 fallback)
(m.signal_kind='vacuum'
AND rt.tagname IS NOT NULL
AND (NOW() - rt.timestamp) < interval '120 seconds'
AND rt.livevalue ~ '^-?[0-9]+(\.[0-9]+)?$'
AND rt.livevalue::double precision < 300) AS vac_ok
FROM v_pump_signal_map m
LEFT JOIN realtime_table rt ON rt.tagname = m.signal_tag
LEFT JOIN v_instrument_range ir ON ir.base_tag = split_part(m.signal_tag, '.', 1)
),
pump_sig AS (
SELECT pump_base_tag,
count(*) FILTER (WHERE signal_kind='flow') AS flow_mapped,
count(*) FILTER (WHERE signal_kind='flow' AND has_row) AS flow_have,
count(*) FILTER (WHERE signal_kind='flow' AND fresh) AS flow_fresh,
bool_or(flow_ok) AS any_flow_ok,
max(val) FILTER (WHERE signal_kind='flow' AND fresh) AS flow_max,
count(*) FILTER (WHERE signal_kind='vacuum') AS vac_mapped,
count(*) FILTER (WHERE signal_kind='vacuum' AND has_row) AS vac_have,
count(*) FILTER (WHERE signal_kind='vacuum' AND fresh) AS vac_fresh,
bool_or(vac_ok) AS any_vac_ok,
min(val) FILTER (WHERE signal_kind='vacuum' AND fresh) AS vac_min
FROM sig_eval GROUP BY pump_base_tag
)
SELECT
b.area_code, b.area_raw, b.base_tag, b.pv AS raw_pv, b.description, b.sub_area,
ps.flow_max AS flow_kg_hr, ps.vac_min AS vacuum_torr,
CASE
WHEN b.pv ~ '\|\s*[LR]-TRIP\s*\|' THEN 'TRIPPED'
WHEN b.pv ~ '\|\s*(L-STOP|R-STOP)\s*\|' THEN 'STOPPED'
WHEN b.base_tag LIKE 'vp-%' THEN
CASE
WHEN COALESCE(ps.vac_mapped,0)=0 OR COALESCE(ps.vac_have,0)=0 THEN 'INDETERMINATE_RUNNING'
WHEN ps.vac_fresh=0 THEN 'STALE'
WHEN ps.any_vac_ok THEN 'CONFIRMED_RUNNING'
ELSE 'SUSPICIOUS_RUNNING'
END
ELSE
CASE
WHEN COALESCE(ps.flow_mapped,0)=0 OR COALESCE(ps.flow_have,0)=0 THEN 'INDETERMINATE_RUNNING'
WHEN ps.flow_fresh=0 THEN 'STALE'
WHEN ps.any_flow_ok THEN 'CONFIRMED_RUNNING'
ELSE 'SUSPICIOUS_RUNNING'
END
END AS corroborated_status
FROM pump_base b
LEFT JOIN pump_sig ps ON ps.pump_base_tag = b.base_tag
WHERE b.area_code IS NOT NULL AND b.area_code <> ''
""");
// area별 집계 — overall은 CONFIRMED 기준 RUNNING, suspicious/stale는 부가 카운트(전체 오염 금지)
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_plant_running_state_agg");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE VIEW v_plant_running_state_agg AS
SELECT
area_code, MAX(area_raw) AS area_raw, COUNT(*) AS total_pumps,
COUNT(*) FILTER (WHERE corroborated_status='CONFIRMED_RUNNING') AS confirmed_running,
COUNT(*) FILTER (WHERE corroborated_status='SUSPICIOUS_RUNNING') AS suspicious_running,
COUNT(*) FILTER (WHERE corroborated_status='STALE') AS stale_running,
COUNT(*) FILTER (WHERE corroborated_status='INDETERMINATE_RUNNING') AS indeterminate_running,
COUNT(*) FILTER (WHERE corroborated_status='TRIPPED') AS tripped_pumps,
COUNT(*) FILTER (WHERE corroborated_status='STOPPED') AS stopped_pumps,
CASE
WHEN COUNT(*) FILTER (WHERE corroborated_status IN
('CONFIRMED_RUNNING','SUSPICIOUS_RUNNING','STALE','INDETERMINATE_RUNNING')) > 0 THEN 'RUNNING'
WHEN COUNT(*) FILTER (WHERE corroborated_status='TRIPPED') > 0 THEN 'TRIPPED'
ELSE 'STOPPED'
END AS status,
array_agg(base_tag) FILTER (WHERE corroborated_status='CONFIRMED_RUNNING') AS confirmed_tags,
array_agg(base_tag) FILTER (WHERE corroborated_status='SUSPICIOUS_RUNNING') AS suspicious_tags,
array_agg(base_tag) FILTER (WHERE corroborated_status='STALE') AS stale_tags
FROM v_plant_running_state_corroborated
WHERE area_code IS NOT NULL AND area_code <> ''
GROUP BY area_code ORDER BY area_code
""");
_logger.LogInformation("[ExperionDb] 데이터베이스 초기화 완료 (TimeScaleDB 활성화)");
return true;
}

View File

@@ -39,10 +39,12 @@ public class ExperionHistoryService : BackgroundService
{
await Task.Delay(TimeSpan.FromSeconds(_intervalSeconds), stoppingToken);
// 실시간 구독이 OFF 상태이면 스냅샷 건너뜀
if (!_realtimeService.GetStatus().Running)
// 실시간 구독이 OFF이거나 링크 이상(stall)이면 스냅샷 건너뜀
// — stall 시 frozen 값을 정상 이력처럼 적재하던 문제 방지 (데이터 품질)
var rtStatus = _realtimeService.GetStatus();
if (!rtStatus.Running || rtStatus.Stalled)
{
_logger.LogDebug("[HistoryService] 구독 중지 상태 — 스냅샷 건너뜀");
_logger.LogDebug("[HistoryService] 구독 중지/링크 이상(Stalled={Stalled}) — 스냅샷 건너뜀", rtStatus.Stalled);
continue;
}

View File

@@ -45,7 +45,9 @@ public class ExperionRealtimeService : IExperionRealtimeService, IHostedService,
private int _subscribedCount;
private string _statusMsg = "중지됨";
private ExperionServerConfig? _currentCfg;
private volatile bool _restarting = false; // 재진입 방지 플래그
private volatile bool _stalled = false; // 링크 비정상(silent stall) 감지 여부
private long _lastNotificationTicks; // 마지막 데이터 수신 시각(UTC ticks) — 신선도 워치독
private volatile string _linkFault = ""; // 최근 감지된 링크 이상 사유
// 자동 재시작 플래그 파일 경로
private static readonly string FlagPath =
@@ -65,212 +67,134 @@ public class ExperionRealtimeService : IExperionRealtimeService, IHostedService,
// ── IHostedService ────────────────────────────────────────────────────────
public async Task StartAsync(CancellationToken cancellationToken)
public Task StartAsync(CancellationToken cancellationToken)
{
// 앱 기동 시 플래그 파일이 있으면 자동 구독 시작 (최대 3회 재시도, 5초 간격)
if (!File.Exists(FlagPath)) return;
const int maxRetries = 3;
const int retryDelayMs = 5_000;
bool connected = false;
// 부팅 비블로킹: 자동시작 플래그가 있으면 백그라운드 supervisor만 띄우고 즉시 반환.
// 연결/재구독/재시도는 supervisor가 앱 부팅과 무관하게 전담한다 (부팅 지연 0).
if (!File.Exists(FlagPath)) return Task.CompletedTask;
try
{
var json = await File.ReadAllTextAsync(FlagPath, cancellationToken);
var json = File.ReadAllText(FlagPath);
var cfg = JsonSerializer.Deserialize<ExperionServerConfig>(json);
if (cfg == null) return;
if (cfg == null) return Task.CompletedTask;
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
_logger.LogInformation("[Realtime] 자동 재시도 {Attempt}/{Max} — {Url}", attempt, maxRetries, cfg.EndpointUrl);
try
{
await TryConnectAsync(cfg, cancellationToken);
connected = true;
_logger.LogInformation("[Realtime] 자동 연결 성공 (시도 {Attempt}/{Max})", attempt, maxRetries);
break;
StartSupervisor(cfg);
_logger.LogInformation("[Realtime] 자동시작 — 백그라운드 supervisor 기동 (부팅 비블로킹): {Url}", cfg.EndpointUrl);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[Realtime] 자동 연결 실패 (시도 {Attempt}/{Max})", attempt, maxRetries);
if (attempt < maxRetries)
{
try { await Task.Delay(retryDelayMs, cancellationToken); } catch { return; }
}
}
}
if (!connected)
{
_logger.LogWarning("[Realtime] {Max}회 시도 후 연결 실패 — 자동 재시작 플래그 삭제", maxRetries);
if (File.Exists(FlagPath)) File.Delete(FlagPath);
_statusMsg = $"연결 실패 ({maxRetries}회 시도) — 수동 시작 필요";
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[Realtime] 자동 재시작 플래그 읽기 실패 — 무시");
}
}
/// <summary>OPC UA 연결 및 구독 설정 시도 (예외 발생 시 정리 후 재던지기)</summary>
private async Task TryConnectAsync(ExperionServerConfig cfg, CancellationToken ct)
{
try
{
_currentCfg = cfg;
_cts = new CancellationTokenSource();
_monitorTask = Task.Run(() => RunLoopOnceAsync(_cts.Token), _cts.Token);
// 플래그 파일 저장 (앱 재기동 시 자동 재시작용)
try
{
var json = JsonSerializer.Serialize(cfg);
await File.WriteAllTextAsync(FlagPath, json);
}
catch (Exception ex) { _logger.LogWarning(ex, "[Realtime] 플래그 파일 저장 실패 (무시)"); }
// 연결 성공 확인을 위해 잠시 대기
await Task.Delay(2_000, ct);
if (!_running) throw new InvalidOperationException("연결이 설정되지 않았습니다.");
}
catch
{
// 연결 실패 시 정리
_cts?.Cancel();
_cts = null;
_currentCfg = null;
throw;
}
}
/// <summary>단일 연결-구독-유지 루프 (재연결 없이 한 번만 시도)</summary>
private async Task RunLoopOnceAsync(CancellationToken ct)
{
try
{
await ConnectAndSubscribeAsync(ct);
// 세션이 살아있는 동안 KeepAlive 대기
while (!ct.IsCancellationRequested &&
_session != null && _session.Connected)
{
await Task.Delay(5_000, ct);
}
}
catch (OperationCanceledException) { /* 정상 종료 */ }
catch (Exception ex)
{
_logger.LogWarning(ex, "[Realtime] 연결 오류 — 재연결 안 함 (자동 시작 모드)");
await CleanupSessionAsync();
throw; // 호출자에게 실패 전달
}
finally
{
_running = false;
if (_statusMsg != "중지됨") _statusMsg = "중지됨";
_logger.LogWarning(ex, "[Realtime] 자동시작 플래그 읽기 실패 — 무시");
}
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
// 앱 종료(Ctrl+C 등) 시: 플래그 파일은 유지 → 재기동 시 자동 재시작
_cts?.Cancel();
var tasks = new[] { _monitorTask, _flushTask }
.Where(t => t != null).Select(t => t!).ToArray();
if (tasks.Length > 0)
{
try
{
// 종료 시 대기 시간을 2초로 단축하여 빠른 셧다운 유도
await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(2), cancellationToken).ConfigureAwait(false);
// 앱 종료(Ctrl+C 등): supervisor만 정지하고 플래그는 유지 → 재기동 시 자동 재개.
await StopInternalAsync(deleteFlag: false, ct: cancellationToken);
_logger.LogInformation("[Realtime] 구독 중지 완료 (앱 종료 — 자동시작 플래그 유지)");
}
catch (Exception ex) { _logger.LogDebug(ex, "[Realtime] StopAsync 대기 중 타임아웃 또는 취소 발생"); }
}
_running = false;
_logger.LogInformation("[Realtime] 구독 중지 완료 (앱 종료 — 자동 재시작 플래그 유지)");
/// <summary>의도=ON: 백그라운드 supervisor 루프를 새로 기동한다 (호출 전 기존 것을 정지해 둘 것).</summary>
private void StartSupervisor(ExperionServerConfig cfg)
{
_currentCfg = cfg;
_cts = new CancellationTokenSource();
_monitorTask = Task.Run(() => SuperviseAsync(_cts.Token));
}
// ── IExperionRealtimeService ──────────────────────────────────────────────
public async Task StartAsync(ExperionServerConfig cfg)
{
if (_running || _restarting)
// 이미 supervisor가 돌고 있으면 정지 후 교체 (플래그는 아래서 새 cfg로 덮어씀)
if (_monitorTask is { IsCompleted: false })
{
_logger.LogWarning("[Realtime] 이미 실행 중 또는 재시작 중. 무시합니다.");
return;
_logger.LogInformation("[Realtime] 이미 실행 중 — 정지 후 재시작합니다.");
await StopInternalAsync(deleteFlag: false);
}
_restarting = true;
try
{
if (_running)
{
_logger.LogWarning("[Realtime] 이미 실행 중. 재시작합니다.");
await StopAsync();
}
}
finally
{
_restarting = false;
}
// 플래그 파일 저장 (앱 재기동 시 자동 재시작용)
// 플래그 저장 (의도=ON, 부팅 시 자동 재개용)
try
{
var json = JsonSerializer.Serialize(cfg);
await File.WriteAllTextAsync(FlagPath, json);
_logger.LogInformation("[Realtime] 자동시작 플래그 저장: {Path}", FlagPath);
_logger.LogInformation("[Realtime] 자동시작 플래그 저장: {Path}", FlagPath);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[Realtime] 플래그 파일 저장 실패 (무시)");
}
_currentCfg = cfg;
_cts = new CancellationTokenSource();
_monitorTask = Task.Run(() => RunLoopAsync(_cts.Token));
_logger.LogInformation("[Realtime] 구독 시작 요청: {Url}", cfg.EndpointUrl);
StartSupervisor(cfg);
_logger.LogInformation("[Realtime] 구독 supervisor 시작 요청: {Url}", cfg.EndpointUrl);
}
public async Task StopAsync()
{
if (_restarting)
{
_logger.LogWarning("[Realtime] 재시작 중이므로 StopAsync 무시 (restarting 플래그 취소)");
return;
// 운전원 '구독중지': 의도=OFF → 플래그 삭제 + supervisor 정지 (자동 재개 안 함)
await StopInternalAsync(deleteFlag: true);
_logger.LogInformation("[Realtime] 구독 중지 완료 (운전원 정지 — 자동시작 비활성화)");
}
// 플래그 파일 삭제 (자동 재시작 비활성화)
/// <summary>supervisor·flush 정지 및 세션 정리. deleteFlag=true면 자동시작 플래그도 삭제(의도=OFF).</summary>
private async Task StopInternalAsync(bool deleteFlag, CancellationToken ct = default)
{
if (deleteFlag)
{
try
{
if (File.Exists(FlagPath)) File.Delete(FlagPath);
_logger.LogInformation("[Realtime] 자동시작 플래그 삭제");
_logger.LogInformation("[Realtime] 자동시작 플래그 삭제");
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[Realtime] 플래그 파일 삭제 실패 (무시)");
catch (Exception ex) { _logger.LogWarning(ex, "[Realtime] 플래그 파일 삭제 실패 (무시)"); }
}
_cts?.Cancel();
var tasks = new List<Task>();
if (_monitorTask != null) tasks.Add(_monitorTask);
if (_flushTask != null) tasks.Add(_flushTask);
if (tasks.Count > 0)
await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false);
var tasks = new[] { _monitorTask, _flushTask }
.Where(t => t != null).Select(t => t!).ToArray();
if (tasks.Length > 0)
{
try
{
await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(10), ct).ConfigureAwait(false);
}
catch (Exception ex) { _logger.LogDebug(ex, "[Realtime] 정지 대기 중 타임아웃/취소"); }
}
await CleanupSessionAsync();
_pendingUpdates.Clear();
_cts?.Dispose();
_cts = null;
_monitorTask = null;
_flushTask = null;
_running = false;
_stalled = false;
_subscribedCount = 0;
_statusMsg = "중지됨";
_logger.LogInformation("[Realtime] 구독 중지 완료");
}
/// <summary>링크 건강 점검 — 정상이면 null, 이상이면 사유. silent stall(publish 중단)까지 감지.</summary>
private string? GetLinkFault()
{
var s = _session;
if (s == null || !s.Connected) return "세션 끊김";
if (s.KeepAliveStopped) return "KeepAlive 중단";
var sub = _subscription;
if (sub != null && sub.PublishingStopped) return "구독 publish 중단";
return null;
}
public RealtimeServiceStatus GetStatus()
=> new(_running, _subscribedCount, _statusMsg);
{
var ticks = Interlocked.Read(ref _lastNotificationTicks);
int ageSec = ticks == 0
? -1
: (int)Math.Max(0, (DateTime.UtcNow.Ticks - ticks) / TimeSpan.TicksPerSecond);
return new(_running, _subscribedCount, _statusMsg, ageSec, _stalled);
}
public async Task<(bool Success, string Message)> AddMonitoredItemAsync(string nodeId)
{
@@ -327,31 +251,50 @@ public class ExperionRealtimeService : IExperionRealtimeService, IHostedService,
// ── 내부 루프 ─────────────────────────────────────────────────────────────
private async Task RunLoopAsync(CancellationToken ct)
// supervisor 루프 — 부팅과 무관하게 의도=ON 동안 연결·재구독·재시도를 전담.
// 성공 → 건강 감시(연결끊김/KeepAlive 중단/구독 publish 중단=silent stall) → 이상 시 재연결
// 실패/이상 → RetryDelay 후 무한 재시도 (3회-후-포기 없음, 주기적 자동 재개)
private async Task SuperviseAsync(CancellationToken ct)
{
const int RetryDelayMs = 30_000; // 연결 실패/이상 후 재시도 주기
const int HealthPollMs = 5_000; // 건강 점검 주기
// flush 루프는 supervisor 수명 동안 단 1회만 기동 (재연결마다 누적되던 버그 차단)
_flushTask = Task.Run(() => FlushLoopAsync(ct), ct);
while (!ct.IsCancellationRequested)
{
try
{
await ConnectAndSubscribeAsync(ct);
// 세션이 살아있는 동안 KeepAlive 대기
while (!ct.IsCancellationRequested &&
_session != null && _session.Connected)
{
await Task.Delay(5_000, ct);
}
// 연결 유지 동안 건강 감시
string? fault = null;
while (!ct.IsCancellationRequested && (fault = GetLinkFault()) == null)
await Task.Delay(HealthPollMs, ct);
if (ct.IsCancellationRequested) break;
_running = false;
_stalled = true;
_linkFault = fault ?? "알 수 없음";
_statusMsg = $"재연결 대기 중: {_linkFault}";
_logger.LogWarning("[Realtime] 링크 이상 감지 ({Fault}) — {Sec}초 후 재연결", _linkFault, RetryDelayMs / 1000);
await CleanupSessionAsync();
}
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
_running = false;
_stalled = true;
_linkFault = ex.Message;
_statusMsg = $"재연결 대기 중: {ex.Message}";
_logger.LogWarning(ex, "[Realtime] 연결 오류, 30초 후 재시도");
_logger.LogWarning(ex, "[Realtime] 연결 오류 — {Sec}초 후 재시도", RetryDelayMs / 1000);
await CleanupSessionAsync();
try { await Task.Delay(30_000, ct); }
catch (OperationCanceledException) { break; }
}
try { await Task.Delay(RetryDelayMs, ct); }
catch (OperationCanceledException) { break; }
}
_running = false;
@@ -423,16 +366,19 @@ public class ExperionRealtimeService : IExperionRealtimeService, IHostedService,
_subscribedCount = points.Count;
_running = true;
_stalled = false;
_linkFault = "";
Interlocked.Exchange(ref _lastNotificationTicks, DateTime.UtcNow.Ticks);
_statusMsg = $"구독 중 ({_subscribedCount}개 포인트)";
_logger.LogInformation("[Realtime] 구독 완료: {Count}개 포인트", _subscribedCount);
// 배치 flush 태스크 시작 (콜백 → dictionary → 500ms 단위 배치 DB 업데이트)
_flushTask = Task.Run(() => FlushLoopAsync(ct), ct);
// flush 루프는 SuperviseAsync에서 1회만 기동한다 (여기서 시작하지 않음 — 재연결 누적 방지)
}
// 콜백: Task.Run 없이 dictionary에만 기록 (최신 값 덮어쓰기)
private void OnNotification(MonitoredItem item, MonitoredItemNotificationEventArgs e)
{
// 데이터 수신 시각 기록 — 신선도/상태 노출용 (GetLinkFault는 SDK의 PublishingStopped를 사용)
Interlocked.Exchange(ref _lastNotificationTicks, DateTime.UtcNow.Ticks);
foreach (var val in item.DequeueValues())
{
var nodeId = item.DisplayName;

View File

@@ -8,7 +8,9 @@ using Npgsql;
namespace ExperionCrawler.Infrastructure.OpcUa;
/// <summary>
/// 메타데이터(desc, area)를 OPC UA에서 읽어 tag_metadata 테이블에 저장/갱신
/// 메타데이터(desc, area, 단위/레인지 euhi·eulo·units)를 OPC UA에서 읽어 tag_metadata에 저장/갱신.
/// 단위·레인지는 corroboration 임계(FS 5%)·향후 SP/제어 계산용. 별도 테이블 없이 EAV(tag_metadata) 재사용 →
/// PointBuilder 작성·수동 메타갱신 트리거에 자동 편승. 타입 접근은 v_instrument_range 뷰.
/// </summary>
public class MetadataLoaderService : IMetadataLoaderService
{
@@ -17,9 +19,10 @@ public class MetadataLoaderService : IMetadataLoaderService
private readonly ILogger<MetadataLoaderService> _logger;
// 로드할 메타데이터 속성 목록 (state0~7descriptor 제거 — pv 값에서 파싱)
// euhi/eulo/units = 점 레벨 EU 레인지/단위 (node_map_master의 자식 노드명과 일치). 'qv.euhi' 등 서브속성은 정확매칭으로 제외.
private static readonly string[] MetaAttributes =
{
"desc", "area"
"desc", "area", "euhi", "eulo", "units"
};
public MetadataLoaderService(