feat(report): P1c 온라인 KPI 누적기 (live_kpi) + 메트릭 소스 일반화

온라인 히스토리안 3계층 — 실시간 KPI를 history_1s에서 산출·캐싱.

- 메트릭 소스 일반화: history_table | history_1s | history_1min_src | fast_record.
  history_1min_src = 연속집계 드롭인 호환뷰(bucket→recorded_at).
- Hc900LiveKpiService: 매 15s 오늘(KST) 컬럼별 KPI(production/yield/energy/closure)를
  history_1s에서 재계산해 live_kpi upsert. 재계산=러닝상태의 stateless 동등판(causal 동치,
  크래시 복구 불필요). 컬럼 state(normal/idle/error) 산출.
- live_kpi 테이블 + GET /api/report/live 조회 엔드포인트.
- Report:LiveKpi config(Enabled/IntervalSeconds/Source).

검증: 라이브 live_kpi 28행(7컬럼×4KPI) 15s 갱신, /live 정상. 데모 sim은 totalizer가
평평해 값 0/idle(플러밍 정상, 실데이터면 실값).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
windpacer
2026-06-15 07:52:31 +09:00
parent 3506a67c28
commit b820e6c33a
7 changed files with 178 additions and 4 deletions

View File

@@ -33,3 +33,13 @@ WITH NO DATA;
SELECT add_continuous_aggregate_policy('hc900.history_1min',
start_offset => INTERVAL '3 hours', end_offset => INTERVAL '10 minutes',
schedule_interval => INTERVAL '5 minutes', if_not_exists => TRUE);
-- P1c: 메트릭엔진 드롭인 소스(bucket→recorded_at) + 온라인 KPI 누적 테이블
CREATE OR REPLACE VIEW hc900.history_1min_src AS
SELECT tagname, bucket AS recorded_at, value, controller_id FROM hc900.history_1min;
CREATE TABLE IF NOT EXISTS hc900.live_kpi (
column_id text NOT NULL, kpi text NOT NULL, window_start date NOT NULL,
value double precision, unit text, state text, excluded_min int, status text,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (column_id, kpi, window_start));

View File

@@ -1,7 +1,10 @@
using System.Data;
using Hc900Crawler.Core.Application.DTOs;
using Hc900Crawler.Core.Application.Interfaces;
using Hc900Crawler.Infrastructure.Database;
using Hc900Crawler.Infrastructure.Reporting;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
namespace Hc900Crawler.Web.Controllers;
@@ -13,14 +16,42 @@ public class ReportController : ControllerBase
private readonly ReportFillService _fill;
private readonly IReportTemplateStore _store;
private readonly ReportColumnMap _map;
private readonly Hc900DbContext _db;
// 웹 대시보드 기본 메트릭 세트
private static readonly string[] SUMMARY_METRICS =
{ "production_total", "yield_qv", "energy_intensity_qv", "mass_balance_closure", "control_residual" };
public ReportController(IReportMetricService metrics, ReportFillService fill,
IReportTemplateStore store, ReportColumnMap map)
{ _metrics = metrics; _fill = fill; _store = store; _map = map; }
IReportTemplateStore store, ReportColumnMap map, Hc900DbContext db)
{ _metrics = metrics; _fill = fill; _store = store; _map = map; _db = db; }
/// <summary>온라인 KPI(live_kpi) 직독 — 누적기가 history_1s에서 갱신한 당일 실시간 값.</summary>
[HttpGet("live")]
public async Task<IActionResult> Live(string? column = null, CancellationToken ct = default)
{
var conn = _db.Database.GetDbConnection();
if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);
await using var cmd = conn.CreateCommand();
cmd.CommandText = @"SELECT column_id, kpi, value, unit, state, excluded_min, status, window_start, updated_at
FROM hc900.live_kpi" + (column == null ? "" : " WHERE column_id=@col") +
" ORDER BY column_id, kpi";
if (column != null) { var p = cmd.CreateParameter(); p.ParameterName = "@col"; p.Value = column; cmd.Parameters.Add(p); }
var items = new List<object>();
await using var rd = await cmd.ExecuteReaderAsync(ct);
while (await rd.ReadAsync(ct))
items.Add(new {
Column = rd.GetString(0), Kpi = rd.GetString(1),
Value = rd.IsDBNull(2) ? (double?)null : rd.GetDouble(2),
Unit = rd.IsDBNull(3) ? null : rd.GetString(3),
State = rd.IsDBNull(4) ? null : rd.GetString(4),
ExcludedMin = rd.IsDBNull(5) ? (int?)null : rd.GetInt32(5),
Status = rd.IsDBNull(6) ? null : rd.GetString(6),
WindowStart = rd.GetFieldValue<DateTime>(7).ToString("yyyy-MM-dd"),
UpdatedAt = rd.GetFieldValue<DateTime>(8)
});
return Ok(new { Count = items.Count, Items = items });
}
/// <summary>설정된 컬럼 목록(웹 UI 셀렉트용).</summary>
[HttpGet("columns")]

View File

@@ -177,6 +177,8 @@ builder.WebHost.UseUrls("http://0.0.0.0:5000");
builder.Services.AddSingleton<Hc900Crawler.Infrastructure.Reporting.ReportColumnMap>();
// P1a: 1초 링버퍼 히스토리안 (history_1s, 보존정책으로 디스크 상한 고정)
builder.Services.AddHostedService<Hc900Crawler.Infrastructure.Hc900.Hc900FastHistoryService>();
// P1c: 온라인 KPI 누적기 (history_1s → live_kpi)
builder.Services.AddHostedService<Hc900Crawler.Infrastructure.Hc900.Hc900LiveKpiService>();
builder.Services.AddScoped<Hc900Crawler.Core.Application.Interfaces.IReportMetricService,
Hc900Crawler.Infrastructure.Reporting.ReportMetricService>();
builder.Services.AddScoped<Hc900Crawler.Infrastructure.Reporting.ReportFillService>();

View File

@@ -92,6 +92,11 @@
"IntervalSeconds": 1,
"RetentionDays": 14
},
"LiveKpi": {
"Enabled": true,
"IntervalSeconds": 15,
"Source": "history_1s"
},
"Cleaning": {
"VacMax": 300,
"ProductMin": 10,

View File

@@ -99,6 +99,9 @@ WHERE tagname = ANY(@tags)";
@"SELECT add_continuous_aggregate_policy('hc900.history_1min',
start_offset => INTERVAL '3 hours', end_offset => INTERVAL '10 minutes',
schedule_interval => INTERVAL '5 minutes', if_not_exists => TRUE)",
// 메트릭엔진 드롭인 소스(bucket→recorded_at)
@"CREATE OR REPLACE VIEW hc900.history_1min_src AS
SELECT tagname, bucket AS recorded_at, value, controller_id FROM hc900.history_1min",
};
foreach (var s in stmts)
{

View File

@@ -0,0 +1,117 @@
using System.Data;
using Hc900Crawler.Core.Application.DTOs;
using Hc900Crawler.Core.Application.Interfaces;
using Hc900Crawler.Infrastructure.Database;
using Hc900Crawler.Infrastructure.Reporting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Hc900Crawler.Infrastructure.Hc900;
/// <summary>
/// P1c 온라인 KPI 누적기. 매 N초 오늘(KST)의 컬럼별 KPI를 history_1s에서 재계산해 live_kpi에 upsert.
/// 재계산 방식 = 러닝 상태의 stateless 동등판: 결과(T) ≡ 배치(당일 시작~now), causal 보장(상태 복구 불필요).
/// </summary>
public class Hc900LiveKpiService : BackgroundService
{
private static readonly string[] KPIS = { "production_total", "yield_qv", "energy_intensity_qv", "mass_balance_closure" };
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<Hc900LiveKpiService> _logger;
private readonly Hc900RealtimeService _realtime;
private readonly ReportColumnMap _map;
private readonly bool _enabled;
private readonly int _intervalSec;
private readonly string _source;
public Hc900LiveKpiService(IServiceScopeFactory scopeFactory, ILogger<Hc900LiveKpiService> logger,
Hc900RealtimeService realtime, ReportColumnMap map, IConfiguration config)
{
_scopeFactory = scopeFactory; _logger = logger; _realtime = realtime; _map = map;
_enabled = config.GetValue("Report:LiveKpi:Enabled", true);
_intervalSec = Math.Max(5, config.GetValue("Report:LiveKpi:IntervalSeconds", 15));
_source = config.GetValue("Report:LiveKpi:Source", "history_1s")!; // 1초 버퍼 기본
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_enabled) { _logger.LogInformation("[LiveKpi] 비활성"); return; }
try { await EnsureSchemaAsync(stoppingToken); } catch (Exception ex) { _logger.LogError(ex, "[LiveKpi] live_kpi 준비 실패"); return; }
_logger.LogInformation("[LiveKpi] 시작 — 간격 {Int}s, 소스 {Src}", _intervalSec, _source);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(_intervalSec), stoppingToken);
if (!_realtime.IsConnected) continue;
var todayKst = DateTime.UtcNow.AddHours(9).Date;
using var scope = _scopeFactory.CreateScope();
var metrics = scope.ServiceProvider.GetRequiredService<IReportMetricService>();
var ctx = scope.ServiceProvider.GetRequiredService<Hc900DbContext>();
var conn = ctx.Database.GetDbConnection();
if (conn.State != ConnectionState.Open) await conn.OpenAsync(stoppingToken);
int written = 0;
foreach (var col in _map.Columns())
{
var results = new List<MetricResultDto>();
foreach (var kpi in KPIS)
results.Add(await metrics.ComputeAsync(new MetricRequestDto
{ Column = col, Metric = kpi, PeriodDateKst = todayKst, SourceTable = _source }, stoppingToken));
// 컬럼 상태: error=계산오류, 생산>0=normal, 그 외(0·no_data)=idle(미가동)
var prod = results.First(r => r.Metric == "production_total");
string state = prod.Status == "error" ? "error"
: prod.Value is > 0 ? "normal" : "idle";
foreach (var r in results)
{
int? excl = r.Extra.TryGetValue("excluded_min", out var e) && e is double ed ? (int)ed : null;
await UpsertAsync(conn, col, r.Metric, todayKst, r.Value, r.Unit, state, excl, r.Status, stoppingToken);
written++;
}
}
_logger.LogDebug("[LiveKpi] {N}개 KPI 갱신 @ {Day}", written, todayKst);
}
catch (OperationCanceledException) { break; }
catch (Exception ex) { _logger.LogError(ex, "[LiveKpi] 갱신 실패"); }
}
_logger.LogInformation("[LiveKpi] 종료");
}
private static async Task UpsertAsync(System.Data.Common.DbConnection conn, string col, string kpi,
DateTime ws, double? val, string? unit, string state, int? excl, string status, CancellationToken ct)
{
await using var cmd = conn.CreateCommand();
cmd.CommandText = @"
INSERT INTO hc900.live_kpi (column_id, kpi, window_start, value, unit, state, excluded_min, status, updated_at)
VALUES (@col,@kpi,@ws,@val,@unit,@state,@excl,@status, now())
ON CONFLICT (column_id, kpi, window_start) DO UPDATE
SET value=EXCLUDED.value, unit=EXCLUDED.unit, state=EXCLUDED.state,
excluded_min=EXCLUDED.excluded_min, status=EXCLUDED.status, updated_at=now()";
void P(string n, object? v) { var p = cmd.CreateParameter(); p.ParameterName = n; p.Value = v ?? DBNull.Value; cmd.Parameters.Add(p); }
P("@col", col); P("@kpi", kpi); P("@ws", ws.Date); P("@val", val);
P("@unit", unit); P("@state", state); P("@excl", excl); P("@status", status);
await cmd.ExecuteNonQueryAsync(ct);
}
private async Task EnsureSchemaAsync(CancellationToken ct)
{
using var scope = _scopeFactory.CreateScope();
var conn = scope.ServiceProvider.GetRequiredService<Hc900DbContext>().Database.GetDbConnection();
if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);
await using var cmd = conn.CreateCommand();
cmd.CommandText = @"
CREATE TABLE IF NOT EXISTS hc900.live_kpi (
column_id text NOT NULL, kpi text NOT NULL, window_start date NOT NULL,
value double precision, unit text, state text, excluded_min int, status text,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (column_id, kpi, window_start))";
await cmd.ExecuteNonQueryAsync(ct);
}
}

View File

@@ -26,19 +26,25 @@ public sealed class ReportMetricService : IReportMetricService
private static readonly HashSet<string> QV_METRICS =
new() { "production_total", "yield_qv", "energy_intensity_qv", "mass_balance_closure" };
// recorded_at 윈도로 조회 가능한 시계열 소스(동일 long 포맷). history_1min_src=연속집계 호환뷰.
private static readonly HashSet<string> SERIES_SOURCES =
new() { "history_table", "history_1s", "history_1min_src" };
public async Task<MetricResultDto> ComputeAsync(MetricRequestDto req, CancellationToken ct = default)
{
bool isFast = req.SourceTable == "fast_record";
// history_table(60s) | history_1s(1s 버퍼) | history_1min_src(연속집계) | fast_record. 미지정/미허용→history_table.
string tbl = isFast ? "fast_record"
: SERIES_SOURCES.Contains(req.SourceTable) ? req.SourceTable : "history_table";
var res = new MetricResultDto
{
Metric = req.Metric, Column = req.Column,
Source = req.SourceTable, SamplingMs = isFast ? 0 : 60000
Source = tbl, SamplingMs = isFast ? 0 : (tbl == "history_1s" ? 1000 : 60000)
};
// KST 날짜 [00:00, +1d) → UTC (recorded_at은 UTC)
var fromUtc = DateTime.SpecifyKind(req.PeriodDateKst.Date, DateTimeKind.Unspecified).AddHours(-9);
var toUtc = fromUtc.AddDays(1);
string tbl = isFast ? "fast_record" : "history_table";
try
{