feat(report): P1a 1초 링버퍼 히스토리안 (history_1s)
온라인 히스토리안 1계층 — 디스크 상한 고정 1초 버퍼. - history_1s Timescale 하이퍼테이블(1h 청크) + 압축(6h) + 보존 14일(청크DROP, 디스크 상한 고정). - Hc900FastHistoryService: 매 1초 realtime_table 큐레이션 태그 → history_1s append. 기동 시 스키마/정책 멱등 생성. Hc900HistoryService(60s) 패턴. - ReportColumnMap.HistorianTags(): 메트릭/마스크가 읽는 태그를 기존 config에서 유도(~105개, 중복정의 없음) — 유량 .PV/.QV, 진공, 민감단 TC, 하부루프 .PV/.SP/.OP. - Report:Historian config(Enabled/IntervalSeconds/RetentionDays). 검증: 라이브 초당 ~38행 append, 샘플 간격 ~1.004s, 보존/압축 정책 활성. 주의(데모): realtime 폴링이 areas 61/62/81만 커버 → 1s 버퍼도 해당 컬럼만(배포 시 전 컨트롤러). 1s 버퍼 주 가치는 .PV/.OP 동특성; .QV 적산은 60s로도 정확. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
22
scripts/sql/p1_historian.sql
Normal file
22
scripts/sql/p1_historian.sql
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
-- P1a: 1초 링버퍼 히스토리안 (hc900.history_1s)
|
||||||
|
-- 적용: psql "host=localhost dbname=iiot_platform user=postgres" -f scripts/sql/p1_historian.sql
|
||||||
|
-- 서비스(Hc900FastHistoryService)가 기동 시 동일 DDL을 멱등 적용하므로 수동 실행은 선택.
|
||||||
|
SET search_path TO hc900;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS hc900.history_1s (
|
||||||
|
tagname text NOT NULL,
|
||||||
|
recorded_at timestamptz NOT NULL DEFAULT now(),
|
||||||
|
value text,
|
||||||
|
controller_id text
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 하이퍼테이블 (1시간 청크 — evict/압축 단위)
|
||||||
|
SELECT create_hypertable('hc900.history_1s', 'recorded_at',
|
||||||
|
chunk_time_interval => INTERVAL '1 hour', if_not_exists => TRUE);
|
||||||
|
|
||||||
|
-- 압축 (6시간 지난 청크) + 링버퍼 보존 (14일 — 청크 DROP, 비용≈0)
|
||||||
|
ALTER TABLE hc900.history_1s SET (timescaledb.compress, timescaledb.compress_segmentby = 'tagname');
|
||||||
|
SELECT add_compression_policy('hc900.history_1s', INTERVAL '6 hours', if_not_exists => TRUE);
|
||||||
|
SELECT add_retention_policy('hc900.history_1s', INTERVAL '14 days', if_not_exists => TRUE);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_h1s_tag ON hc900.history_1s (tagname, recorded_at DESC);
|
||||||
@@ -175,6 +175,8 @@ builder.WebHost.UseUrls("http://0.0.0.0:5000");
|
|||||||
|
|
||||||
// ── P0 셀프서비스 리포트 ──────────────────────────────────────────────────────
|
// ── P0 셀프서비스 리포트 ──────────────────────────────────────────────────────
|
||||||
builder.Services.AddSingleton<Hc900Crawler.Infrastructure.Reporting.ReportColumnMap>();
|
builder.Services.AddSingleton<Hc900Crawler.Infrastructure.Reporting.ReportColumnMap>();
|
||||||
|
// P1a: 1초 링버퍼 히스토리안 (history_1s, 보존정책으로 디스크 상한 고정)
|
||||||
|
builder.Services.AddHostedService<Hc900Crawler.Infrastructure.Hc900.Hc900FastHistoryService>();
|
||||||
builder.Services.AddScoped<Hc900Crawler.Core.Application.Interfaces.IReportMetricService,
|
builder.Services.AddScoped<Hc900Crawler.Core.Application.Interfaces.IReportMetricService,
|
||||||
Hc900Crawler.Infrastructure.Reporting.ReportMetricService>();
|
Hc900Crawler.Infrastructure.Reporting.ReportMetricService>();
|
||||||
builder.Services.AddScoped<Hc900Crawler.Infrastructure.Reporting.ReportFillService>();
|
builder.Services.AddScoped<Hc900Crawler.Infrastructure.Reporting.ReportFillService>();
|
||||||
|
|||||||
@@ -87,6 +87,11 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"Report": {
|
"Report": {
|
||||||
|
"Historian": {
|
||||||
|
"Enabled": true,
|
||||||
|
"IntervalSeconds": 1,
|
||||||
|
"RetentionDays": 14
|
||||||
|
},
|
||||||
"Cleaning": {
|
"Cleaning": {
|
||||||
"VacMax": 300,
|
"VacMax": 300,
|
||||||
"ProductMin": 10,
|
"ProductMin": 10,
|
||||||
|
|||||||
101
src/Infrastructure/Hc900/Hc900FastHistoryService.cs
Normal file
101
src/Infrastructure/Hc900/Hc900FastHistoryService.cs
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
using System.Data;
|
||||||
|
using Hc900Crawler.Infrastructure.Database;
|
||||||
|
using Hc900Crawler.Infrastructure.Reporting;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Hosting;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace Hc900Crawler.Infrastructure.Hc900;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// P1a 1초 링버퍼 히스토리안. 매 N초(기본 1s) realtime_table의 큐레이션 태그를 history_1s에 append.
|
||||||
|
/// Timescale 보존정책이 윈도(기본 14일)로 디스크 상한 고정. 60초 Hc900HistoryService 패턴.
|
||||||
|
/// </summary>
|
||||||
|
public class Hc900FastHistoryService : BackgroundService
|
||||||
|
{
|
||||||
|
private readonly IServiceScopeFactory _scopeFactory;
|
||||||
|
private readonly ILogger<Hc900FastHistoryService> _logger;
|
||||||
|
private readonly Hc900RealtimeService _realtime;
|
||||||
|
private readonly ReportColumnMap _map;
|
||||||
|
private readonly bool _enabled;
|
||||||
|
private readonly int _intervalSec;
|
||||||
|
private readonly int _retentionDays;
|
||||||
|
private string[] _tags = Array.Empty<string>();
|
||||||
|
|
||||||
|
public Hc900FastHistoryService(IServiceScopeFactory scopeFactory, ILogger<Hc900FastHistoryService> logger,
|
||||||
|
Hc900RealtimeService realtime, ReportColumnMap map, IConfiguration config)
|
||||||
|
{
|
||||||
|
_scopeFactory = scopeFactory; _logger = logger; _realtime = realtime; _map = map;
|
||||||
|
_enabled = config.GetValue("Report:Historian:Enabled", true);
|
||||||
|
_intervalSec = Math.Max(1, config.GetValue("Report:Historian:IntervalSeconds", 1));
|
||||||
|
_retentionDays = Math.Max(1, config.GetValue("Report:Historian:RetentionDays", 14));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
if (!_enabled) { _logger.LogInformation("[FastHistory] 비활성(Report:Historian:Enabled=false)"); return; }
|
||||||
|
|
||||||
|
_tags = _map.HistorianTags().ToArray();
|
||||||
|
_logger.LogInformation("[FastHistory] 시작 — 간격 {Int}s, 보존 {Ret}일, 큐레이션 태그 {N}개",
|
||||||
|
_intervalSec, _retentionDays, _tags.Length);
|
||||||
|
|
||||||
|
try { await EnsureSchemaAsync(stoppingToken); }
|
||||||
|
catch (Exception ex) { _logger.LogError(ex, "[FastHistory] history_1s 스키마 준비 실패 — 서비스 중단"); return; }
|
||||||
|
|
||||||
|
if (_tags.Length == 0) { _logger.LogWarning("[FastHistory] 큐레이션 태그 0개 — 중단"); return; }
|
||||||
|
|
||||||
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await Task.Delay(TimeSpan.FromSeconds(_intervalSec), stoppingToken);
|
||||||
|
if (!_realtime.IsConnected) continue; // 미연결 시 스킵
|
||||||
|
|
||||||
|
using var scope = _scopeFactory.CreateScope();
|
||||||
|
var ctx = scope.ServiceProvider.GetRequiredService<Hc900DbContext>();
|
||||||
|
var conn = ctx.Database.GetDbConnection();
|
||||||
|
if (conn.State != ConnectionState.Open) await conn.OpenAsync(stoppingToken);
|
||||||
|
await using var cmd = conn.CreateCommand();
|
||||||
|
cmd.CommandText = @"
|
||||||
|
INSERT INTO hc900.history_1s (tagname, recorded_at, value, controller_id)
|
||||||
|
SELECT tagname, now(), livevalue, controller_id
|
||||||
|
FROM hc900.realtime_table
|
||||||
|
WHERE tagname = ANY(@tags)";
|
||||||
|
var p = cmd.CreateParameter(); p.ParameterName = "@tags"; p.Value = _tags; cmd.Parameters.Add(p);
|
||||||
|
var n = await cmd.ExecuteNonQueryAsync(stoppingToken);
|
||||||
|
_logger.LogTrace("[FastHistory] {Count}행 append", n);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { break; }
|
||||||
|
catch (Exception ex) { _logger.LogError(ex, "[FastHistory] append 실패"); }
|
||||||
|
}
|
||||||
|
_logger.LogInformation("[FastHistory] 종료");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>history_1s 하이퍼테이블 + 압축 + 보존(링버퍼) 멱등 생성.</summary>
|
||||||
|
private async Task EnsureSchemaAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
using var scope = _scopeFactory.CreateScope();
|
||||||
|
var ctx = scope.ServiceProvider.GetRequiredService<Hc900DbContext>();
|
||||||
|
var conn = ctx.Database.GetDbConnection();
|
||||||
|
if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);
|
||||||
|
|
||||||
|
var stmts = new[]
|
||||||
|
{
|
||||||
|
@"CREATE TABLE IF NOT EXISTS hc900.history_1s (tagname text NOT NULL,
|
||||||
|
recorded_at timestamptz NOT NULL DEFAULT now(), value text, controller_id text)",
|
||||||
|
"SELECT create_hypertable('hc900.history_1s','recorded_at', chunk_time_interval => INTERVAL '1 hour', if_not_exists => TRUE)",
|
||||||
|
"ALTER TABLE hc900.history_1s SET (timescaledb.compress, timescaledb.compress_segmentby = 'tagname')",
|
||||||
|
"SELECT add_compression_policy('hc900.history_1s', INTERVAL '6 hours', if_not_exists => TRUE)",
|
||||||
|
$"SELECT add_retention_policy('hc900.history_1s', INTERVAL '{_retentionDays} days', if_not_exists => TRUE)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS ix_h1s_tag ON hc900.history_1s (tagname, recorded_at DESC)",
|
||||||
|
};
|
||||||
|
foreach (var s in stmts)
|
||||||
|
{
|
||||||
|
try { await using var cmd = conn.CreateCommand(); cmd.CommandText = s; await cmd.ExecuteNonQueryAsync(ct); }
|
||||||
|
catch (Exception ex) { _logger.LogDebug("[FastHistory] DDL 스킵: {Msg}", ex.Message); } // 이미 적용된 ALTER 등
|
||||||
|
}
|
||||||
|
_logger.LogInformation("[FastHistory] history_1s 준비 완료(보존 {Ret}일)", _retentionDays);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -32,6 +32,37 @@ public sealed class ReportColumnMap
|
|||||||
public IReadOnlyList<string> Columns()
|
public IReadOnlyList<string> Columns()
|
||||||
=> _config.GetSection("SteamAdvisor:Columns").GetChildren().Select(c => c.Key).ToList();
|
=> _config.GetSection("SteamAdvisor:Columns").GetChildren().Select(c => c.Key).ToList();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 1초 히스토리안 큐레이션 태그셋 — 메트릭/마스크가 읽는 태그만 기존 config에서 유도(중복정의 없음).
|
||||||
|
/// 컬럼당: 유량(feed/product/steam/lights/heavies) .PV+.QV, 진공 .PV, 민감단 TC .PV, 하부루프 .PV/.SP/.OP.
|
||||||
|
/// </summary>
|
||||||
|
public IReadOnlyList<string> HistorianTags()
|
||||||
|
{
|
||||||
|
var set = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||||
|
void addPvQv(string? baseOrTag)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(baseOrTag)) return;
|
||||||
|
var b = StripAttr(baseOrTag!);
|
||||||
|
set.Add(b + ".PV"); set.Add(b + ".QV");
|
||||||
|
}
|
||||||
|
foreach (var col in Columns())
|
||||||
|
{
|
||||||
|
var sa = _config.GetSection($"SteamAdvisor:Columns:{col}");
|
||||||
|
addPvQv(sa["Feed"]); addPvQv(sa["Product"]); addPvQv(sa["SteamFlow"]);
|
||||||
|
if (Norm(sa["TC"]) is string tc) set.Add(tc); // 민감단 .PV
|
||||||
|
if (!string.IsNullOrWhiteSpace(sa["SteamOp"])) // 하부루프 TICA-*A
|
||||||
|
{ var lb = StripAttr(sa["SteamOp"]!); set.Add(lb + ".PV"); set.Add(lb + ".SP"); set.Add(lb + ".OP"); }
|
||||||
|
|
||||||
|
var vac = _config[$"Report:Cleaning:VacTag:{col}"]; // 진공
|
||||||
|
if (!string.IsNullOrWhiteSpace(vac)) set.Add(vac!.Contains('.') ? vac! : vac + ".PV");
|
||||||
|
|
||||||
|
var cl = _config.GetSection($"Report:Closure:{col}"); // 폐합 스트림
|
||||||
|
addPvQv(cl["Feed"]);
|
||||||
|
foreach (var o in cl.GetSection("Outputs").Get<string[]>() ?? Array.Empty<string>()) addPvQv(o);
|
||||||
|
}
|
||||||
|
return set.OrderBy(x => x).ToList();
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>해당 컬럼에 폐합(물질수지) 설정이 있는지.</summary>
|
/// <summary>해당 컬럼에 폐합(물질수지) 설정이 있는지.</summary>
|
||||||
public bool HasClosure(string column)
|
public bool HasClosure(string column)
|
||||||
=> _config.GetSection($"Report:Closure:{column}").Exists();
|
=> _config.GetSection($"Report:Closure:{column}").Exists();
|
||||||
|
|||||||
Reference in New Issue
Block a user