diff --git a/scripts/sql/p1_historian.sql b/scripts/sql/p1_historian.sql new file mode 100644 index 0000000..8a9a89b --- /dev/null +++ b/scripts/sql/p1_historian.sql @@ -0,0 +1,22 @@ +-- P1a: 1초 링버퍼 히스토리안 (hc900.history_1s) +-- 적용: psql "host=localhost dbname=iiot_platform user=postgres" -f scripts/sql/p1_historian.sql +-- 서비스(Hc900FastHistoryService)가 기동 시 동일 DDL을 멱등 적용하므로 수동 실행은 선택. +SET search_path TO hc900; + +CREATE TABLE IF NOT EXISTS hc900.history_1s ( + tagname text NOT NULL, + recorded_at timestamptz NOT NULL DEFAULT now(), + value text, + controller_id text +); + +-- 하이퍼테이블 (1시간 청크 — evict/압축 단위) +SELECT create_hypertable('hc900.history_1s', 'recorded_at', + chunk_time_interval => INTERVAL '1 hour', if_not_exists => TRUE); + +-- 압축 (6시간 지난 청크) + 링버퍼 보존 (14일 — 청크 DROP, 비용≈0) +ALTER TABLE hc900.history_1s SET (timescaledb.compress, timescaledb.compress_segmentby = 'tagname'); +SELECT add_compression_policy('hc900.history_1s', INTERVAL '6 hours', if_not_exists => TRUE); +SELECT add_retention_policy('hc900.history_1s', INTERVAL '14 days', if_not_exists => TRUE); + +CREATE INDEX IF NOT EXISTS ix_h1s_tag ON hc900.history_1s (tagname, recorded_at DESC); diff --git a/src/Hc900Crawler/Program.cs b/src/Hc900Crawler/Program.cs index 1c63fbc..ddaec0e 100644 --- a/src/Hc900Crawler/Program.cs +++ b/src/Hc900Crawler/Program.cs @@ -175,6 +175,8 @@ builder.WebHost.UseUrls("http://0.0.0.0:5000"); // ── P0 셀프서비스 리포트 ────────────────────────────────────────────────────── builder.Services.AddSingleton(); +// P1a: 1초 링버퍼 히스토리안 (history_1s, 보존정책으로 디스크 상한 고정) +builder.Services.AddHostedService(); builder.Services.AddScoped(); builder.Services.AddScoped(); diff --git a/src/Hc900Crawler/appsettings.json b/src/Hc900Crawler/appsettings.json index 027ffe2..2156048 100644 --- a/src/Hc900Crawler/appsettings.json +++ b/src/Hc900Crawler/appsettings.json @@ -87,6 +87,11 @@ } }, "Report": { + "Historian": { + "Enabled": true, + "IntervalSeconds": 1, + "RetentionDays": 14 + }, "Cleaning": { "VacMax": 300, "ProductMin": 10, diff --git a/src/Infrastructure/Hc900/Hc900FastHistoryService.cs b/src/Infrastructure/Hc900/Hc900FastHistoryService.cs new file mode 100644 index 0000000..d931560 --- /dev/null +++ b/src/Infrastructure/Hc900/Hc900FastHistoryService.cs @@ -0,0 +1,101 @@ +using System.Data; +using Hc900Crawler.Infrastructure.Database; +using Hc900Crawler.Infrastructure.Reporting; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Hc900Crawler.Infrastructure.Hc900; + +/// +/// P1a 1초 링버퍼 히스토리안. 매 N초(기본 1s) realtime_table의 큐레이션 태그를 history_1s에 append. +/// Timescale 보존정책이 윈도(기본 14일)로 디스크 상한 고정. 60초 Hc900HistoryService 패턴. +/// +public class Hc900FastHistoryService : BackgroundService +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly ILogger _logger; + private readonly Hc900RealtimeService _realtime; + private readonly ReportColumnMap _map; + private readonly bool _enabled; + private readonly int _intervalSec; + private readonly int _retentionDays; + private string[] _tags = Array.Empty(); + + public Hc900FastHistoryService(IServiceScopeFactory scopeFactory, ILogger logger, + Hc900RealtimeService realtime, ReportColumnMap map, IConfiguration config) + { + _scopeFactory = scopeFactory; _logger = logger; _realtime = realtime; _map = map; + _enabled = config.GetValue("Report:Historian:Enabled", true); + _intervalSec = Math.Max(1, config.GetValue("Report:Historian:IntervalSeconds", 1)); + _retentionDays = Math.Max(1, config.GetValue("Report:Historian:RetentionDays", 14)); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + if (!_enabled) { _logger.LogInformation("[FastHistory] 비활성(Report:Historian:Enabled=false)"); return; } + + _tags = _map.HistorianTags().ToArray(); + _logger.LogInformation("[FastHistory] 시작 — 간격 {Int}s, 보존 {Ret}일, 큐레이션 태그 {N}개", + _intervalSec, _retentionDays, _tags.Length); + + try { await EnsureSchemaAsync(stoppingToken); } + catch (Exception ex) { _logger.LogError(ex, "[FastHistory] history_1s 스키마 준비 실패 — 서비스 중단"); return; } + + if (_tags.Length == 0) { _logger.LogWarning("[FastHistory] 큐레이션 태그 0개 — 중단"); return; } + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await Task.Delay(TimeSpan.FromSeconds(_intervalSec), stoppingToken); + if (!_realtime.IsConnected) continue; // 미연결 시 스킵 + + using var scope = _scopeFactory.CreateScope(); + var ctx = scope.ServiceProvider.GetRequiredService(); + var conn = ctx.Database.GetDbConnection(); + if (conn.State != ConnectionState.Open) await conn.OpenAsync(stoppingToken); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = @" +INSERT INTO hc900.history_1s (tagname, recorded_at, value, controller_id) +SELECT tagname, now(), livevalue, controller_id +FROM hc900.realtime_table +WHERE tagname = ANY(@tags)"; + var p = cmd.CreateParameter(); p.ParameterName = "@tags"; p.Value = _tags; cmd.Parameters.Add(p); + var n = await cmd.ExecuteNonQueryAsync(stoppingToken); + _logger.LogTrace("[FastHistory] {Count}행 append", n); + } + catch (OperationCanceledException) { break; } + catch (Exception ex) { _logger.LogError(ex, "[FastHistory] append 실패"); } + } + _logger.LogInformation("[FastHistory] 종료"); + } + + /// history_1s 하이퍼테이블 + 압축 + 보존(링버퍼) 멱등 생성. + private async Task EnsureSchemaAsync(CancellationToken ct) + { + using var scope = _scopeFactory.CreateScope(); + var ctx = scope.ServiceProvider.GetRequiredService(); + var conn = ctx.Database.GetDbConnection(); + if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct); + + var stmts = new[] + { + @"CREATE TABLE IF NOT EXISTS hc900.history_1s (tagname text NOT NULL, + recorded_at timestamptz NOT NULL DEFAULT now(), value text, controller_id text)", + "SELECT create_hypertable('hc900.history_1s','recorded_at', chunk_time_interval => INTERVAL '1 hour', if_not_exists => TRUE)", + "ALTER TABLE hc900.history_1s SET (timescaledb.compress, timescaledb.compress_segmentby = 'tagname')", + "SELECT add_compression_policy('hc900.history_1s', INTERVAL '6 hours', if_not_exists => TRUE)", + $"SELECT add_retention_policy('hc900.history_1s', INTERVAL '{_retentionDays} days', if_not_exists => TRUE)", + "CREATE INDEX IF NOT EXISTS ix_h1s_tag ON hc900.history_1s (tagname, recorded_at DESC)", + }; + foreach (var s in stmts) + { + try { await using var cmd = conn.CreateCommand(); cmd.CommandText = s; await cmd.ExecuteNonQueryAsync(ct); } + catch (Exception ex) { _logger.LogDebug("[FastHistory] DDL 스킵: {Msg}", ex.Message); } // 이미 적용된 ALTER 등 + } + _logger.LogInformation("[FastHistory] history_1s 준비 완료(보존 {Ret}일)", _retentionDays); + } +} diff --git a/src/Infrastructure/Reporting/ReportColumnMap.cs b/src/Infrastructure/Reporting/ReportColumnMap.cs index 42ada7f..7d04b00 100644 --- a/src/Infrastructure/Reporting/ReportColumnMap.cs +++ b/src/Infrastructure/Reporting/ReportColumnMap.cs @@ -32,6 +32,37 @@ public sealed class ReportColumnMap public IReadOnlyList Columns() => _config.GetSection("SteamAdvisor:Columns").GetChildren().Select(c => c.Key).ToList(); + /// + /// 1초 히스토리안 큐레이션 태그셋 — 메트릭/마스크가 읽는 태그만 기존 config에서 유도(중복정의 없음). + /// 컬럼당: 유량(feed/product/steam/lights/heavies) .PV+.QV, 진공 .PV, 민감단 TC .PV, 하부루프 .PV/.SP/.OP. + /// + public IReadOnlyList HistorianTags() + { + var set = new HashSet(StringComparer.OrdinalIgnoreCase); + void addPvQv(string? baseOrTag) + { + if (string.IsNullOrWhiteSpace(baseOrTag)) return; + var b = StripAttr(baseOrTag!); + set.Add(b + ".PV"); set.Add(b + ".QV"); + } + foreach (var col in Columns()) + { + var sa = _config.GetSection($"SteamAdvisor:Columns:{col}"); + addPvQv(sa["Feed"]); addPvQv(sa["Product"]); addPvQv(sa["SteamFlow"]); + if (Norm(sa["TC"]) is string tc) set.Add(tc); // 민감단 .PV + if (!string.IsNullOrWhiteSpace(sa["SteamOp"])) // 하부루프 TICA-*A + { var lb = StripAttr(sa["SteamOp"]!); set.Add(lb + ".PV"); set.Add(lb + ".SP"); set.Add(lb + ".OP"); } + + var vac = _config[$"Report:Cleaning:VacTag:{col}"]; // 진공 + if (!string.IsNullOrWhiteSpace(vac)) set.Add(vac!.Contains('.') ? vac! : vac + ".PV"); + + var cl = _config.GetSection($"Report:Closure:{col}"); // 폐합 스트림 + addPvQv(cl["Feed"]); + foreach (var o in cl.GetSection("Outputs").Get() ?? Array.Empty()) addPvQv(o); + } + return set.OrderBy(x => x).ToList(); + } + /// 해당 컬럼에 폐합(물질수지) 설정이 있는지. public bool HasClosure(string column) => _config.GetSection($"Report:Closure:{column}").Exists();