fix: fastSession 시작 시 엔티티 변경 오류 수정 - CreateFastSessionAsync에서 Status를 Pending에서 Running으로 변경

This commit is contained in:
windpacer
2026-04-29 18:43:02 +09:00
parent 86dd268dab
commit 6689612b3b
2 changed files with 552 additions and 0 deletions

View File

@@ -3,6 +3,8 @@ using ExperionCrawler.Core.Domain.Entities;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Npgsql;
using System.Text.Json;
using System.Globalization;
namespace ExperionCrawler.Infrastructure.Database;
@@ -17,6 +19,8 @@ public class ExperionDbContext : DbContext
public DbSet<NodeMapMaster> NodeMapMasters => Set<NodeMapMaster>();
public DbSet<RealtimePoint> RealtimePoints => Set<RealtimePoint>();
public DbSet<HistoryRecord> HistoryRecords => Set<HistoryRecord>();
public DbSet<FastSession> FastSessions => Set<FastSession>();
public DbSet<FastRecord> FastRecords => Set<FastRecord>();
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
@@ -54,6 +58,20 @@ public class ExperionDbContext : DbContext
e.HasIndex(x => x.TagName);
e.HasIndex(x => x.RecordedAt);
});
modelBuilder.Entity<FastSession>(e =>
{
e.HasKey(x => x.Id);
e.HasIndex(x => x.Status);
e.HasIndex(x => x.StartedAt);
});
modelBuilder.Entity<FastRecord>(e =>
{
e.HasKey(x => x.Id);
e.HasIndex(x => x.SessionId);
e.HasIndex(x => new { x.SessionId, x.TagName, x.RecordedAt });
});
}
}
@@ -76,6 +94,42 @@ public class ExperionDbService : IExperionDbService
{
await _ctx.Database.EnsureCreatedAsync();
// ── fast_session / fast_record 테이블 생성 ────────────────────────────────
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS fast_session (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'Pending',
sampling_ms INTEGER NOT NULL,
duration_sec INTEGER NOT NULL,
tag_list JSONB NOT NULL DEFAULT '[]',
row_count INTEGER NOT NULL DEFAULT 0,
retention_days INTEGER,
pinned BOOLEAN NOT NULL DEFAULT FALSE
)
""");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS fast_record (
id SERIAL PRIMARY KEY,
session_id INTEGER NOT NULL REFERENCES fast_session(id) ON DELETE CASCADE,
recorded_at TIMESTAMPTZ NOT NULL,
tagname TEXT NOT NULL,
value TEXT
)
""");
// TimescaleDB hypertable 생성 (recorded_at 기준, chunk_interval = 1 day)
await _ctx.Database.ExecuteSqlRawAsync("""
SELECT create_hypertable('fast_record', 'recorded_at', if_not_exists => TRUE)
""");
await _ctx.Database.ExecuteSqlRawAsync("""
SELECT set_chunk_time_interval('fast_record', INTERVAL '1 day')
""");
// TimeScaleDB 확장 활성화
await _ctx.Database.ExecuteSqlRawAsync("CREATE EXTENSION IF NOT EXISTS timescaledb");
@@ -652,6 +706,163 @@ public class ExperionDbService : IExperionDbService
.ToDictionary(g => g.Key, g => g.First().DataType);
}
public async Task<IEnumerable<RealtimePoint>> GetRealtimeRecordsByTagNamesAsync(IEnumerable<string> tagNames)
{
try
{
var tags = tagNames.ToList();
if (tags.Count == 0) return Enumerable.Empty<RealtimePoint>();
var records = await _ctx.RealtimePoints
.Where(x => tags.Contains(x.TagName))
.ToListAsync();
_logger.LogInformation("[Realtime] 태그 {Count}개의 라이브 데이터 조회 완료", tags.Count);
return records;
}
catch (Exception ex)
{
_logger.LogError(ex, "[Realtime] 태그 라이브 데이터 조회 실패");
return Enumerable.Empty<RealtimePoint>();
}
}
// ── FastSession / FastRecord ─────────────────────────────────────────────────
public async Task<FastSession> CreateFastSessionAsync(FastSessionCreateRequest request)
{
var session = new FastSession
{
Name = request.Name,
SamplingMs = request.SamplingMs,
DurationSec = request.DurationSec,
TagList = JsonSerializer.Serialize(request.TagList), // string[] → JSONB
StartedAt = DateTime.UtcNow,
Status = "Running",
RowCount = 0,
RetentionDays = request.RetentionDays,
Pinned = false
};
_ctx.FastSessions.Add(session);
await _ctx.SaveChangesAsync();
return session;
}
public async Task UpdateFastSessionStatusAsync(int sessionId, string status)
{
var session = await _ctx.FastSessions.FindAsync(sessionId);
if (session == null) return;
session.Status = status;
if (status is "Completed" or "Cancelled" or "Failed" or "RowLimitReached")
session.EndedAt = DateTime.UtcNow;
await _ctx.SaveChangesAsync();
}
public async Task UpdateFastSessionRowCountAsync(int sessionId, int rowCount)
{
var session = await _ctx.FastSessions.FindAsync(sessionId);
if (session == null) return;
session.RowCount = rowCount;
await _ctx.SaveChangesAsync();
}
public async Task UpdateFastSessionPinnedAsync(int sessionId, bool pinned)
{
var session = await _ctx.FastSessions.FindAsync(sessionId);
if (session == null) return;
session.Pinned = pinned;
await _ctx.SaveChangesAsync();
}
public async Task<FastSession?> GetFastSessionAsync(int sessionId)
=> await _ctx.FastSessions.FindAsync(sessionId);
public async Task<IEnumerable<FastSession>> GetFastSessionsAsync()
=> await _ctx.FastSessions.OrderByDescending(x => x.StartedAt).ToListAsync();
public async Task DeleteFastSessionAsync(int sessionId)
{
var session = await _ctx.FastSessions.FindAsync(sessionId);
if (session == null) return;
_ctx.FastSessions.Remove(session);
await _ctx.SaveChangesAsync();
}
public async Task<IEnumerable<FastSession>> GetExpiredFastSessionsAsync()
{
var now = DateTime.UtcNow;
return await _ctx.FastSessions
.Where(x => x.EndedAt != null
&& !x.Pinned
&& x.RetentionDays.HasValue
&& x.EndedAt.Value.AddDays(x.RetentionDays.Value) < now)
.OrderBy(x => x.EndedAt)
.ToListAsync();
}
public async Task<FastQueryResult> GetFastRecordsAsync(int sessionId, DateTime? from, DateTime? to)
{
var query = _ctx.FastRecords.Where(x => x.SessionId == sessionId);
if (from.HasValue) query = query.Where(x => x.RecordedAt >= from.Value);
if (to.HasValue) query = query.Where(x => x.RecordedAt <= to.Value);
var records = await query.OrderBy(x => x.RecordedAt).ToListAsync();
var tagNames = records.Select(x => x.TagName).Distinct().ToArray();
var items = records.Select(r => new FastRecord
{
Id = r.Id,
SessionId = r.SessionId,
RecordedAt = r.RecordedAt,
TagName = r.TagName,
Value = r.Value
});
return new FastQueryResult(
SessionId: sessionId,
From: from ?? records.MinBy(x => x.RecordedAt)?.RecordedAt ?? DateTime.UtcNow,
To: to ?? records.MaxBy(x => x.RecordedAt)?.RecordedAt ?? DateTime.UtcNow,
TagNames: tagNames,
Items: items,
TotalCount: records.Count
);
}
public async Task BatchInsertFastRecordsAsync(IEnumerable<FastRecord> records)
{
await _ctx.FastRecords.AddRangeAsync(records);
await _ctx.SaveChangesAsync();
}
public async Task ExportFastRecordsToCsvAsync(int sessionId, Stream stream, DateTime? from, DateTime? to)
{
var query = _ctx.FastRecords.Where(x => x.SessionId == sessionId);
if (from.HasValue) query = query.Where(x => x.RecordedAt >= from.Value);
if (to.HasValue) query = query.Where(x => x.RecordedAt <= to.Value);
var records = await query.OrderBy(x => x.RecordedAt).ThenBy(x => x.TagName).ToListAsync();
var tagNames = records.Select(x => x.TagName).Distinct().OrderBy(x => x).ToArray();
using var writer = new StreamWriter(stream, leaveOpen: true);
await writer.WriteLineAsync("recorded_at," + string.Join(",", tagNames));
foreach (var g in records.GroupBy(x => x.RecordedAt).OrderBy(g => g.Key))
{
var values = g.ToDictionary(r => r.TagName, r => r.Value);
var row = g.Key.ToString("o") + "," +
string.Join(",", tagNames.Select(t => values.TryGetValue(t, out var v) ? $"\"{v}\"" : ""));
await writer.WriteLineAsync(row);
}
await writer.FlushAsync();
}
public async Task<string?> GetNodeIdByTagNameAsync(string tagName)
{
return await _ctx.RealtimePoints
.Where(x => x.TagName == tagName)
.Select(x => x.NodeId)
.FirstOrDefaultAsync();
}
/// <summary>
/// 하이퍼테이블 상태 조회합니다.
/// 하이퍼테이블인지 여부, 레코드 수, 보존 정책, 압축, 연속 집계 설정 등을 확인합니다.

View File

@@ -0,0 +1,341 @@
using System.Collections.Concurrent;
using System.Text.Json;
using ExperionCrawler.Core.Application.Interfaces;
using ExperionCrawler.Core.Domain.Entities;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace ExperionCrawler.Infrastructure.OpcUa;
/// <summary>
/// fastRecord 데이터 수집 서비스.
/// realtime_table에서 지정한 샘플링 간격마다 태그 값을 복사하여 fast_records 테이블에 저장.
/// OPC UA 직접 연결 없이 기존 실시간 구독 결과(realtime_table)를 재활용.
/// </summary>
public class ExperionFastService : IExperionFastService, IHostedService, IDisposable
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<ExperionFastService> _logger;
private readonly ConcurrentDictionary<int, FastSessionContext> _sessions = new();
private CancellationTokenSource? _cts;
private Task? _monitorTask;
private const int MaxConcurrentSessions = 3;
private const int MaxRowsPerSession = 5_000_000;
private const int MonitorIntervalMs = 1_000;
private static readonly int[] AllowedSamplingMs = [1000, 5000, 10000, 30000, 60000];
public ExperionFastService(
IServiceScopeFactory scopeFactory,
ILogger<ExperionFastService> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
// ── IHostedService ────────────────────────────────────────────────────────
public async Task StartAsync(CancellationToken cancellationToken)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
var sessions = await db.GetFastSessionsAsync();
foreach (var s in sessions.Where(s => s.Status == "Running"))
{
_logger.LogWarning("[Fast] 앱 시작 시 Running 세션 {Id} → Failed 마킹", s.Id);
await db.UpdateFastSessionStatusAsync(s.Id, "Failed");
}
_cts = new CancellationTokenSource();
_monitorTask = Task.Run(() => MonitorLoopAsync(_cts.Token), _cts.Token);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_cts?.Cancel();
if (_monitorTask != null)
await _monitorTask.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
}
public void Dispose() => _cts?.Dispose();
// ── IExperionFastService ──────────────────────────────────────────────────
public async Task<FastSessionInfo> StartSessionAsync(FastSessionStartRequest request)
{
if (request.TagList.Length == 0 || request.TagList.Length > 8)
throw new ArgumentException("태그는 1~8개까지 가능합니다.");
if (!AllowedSamplingMs.Contains(request.SamplingMs))
throw new ArgumentException(
$"샘플링 간격은 {string.Join('/', AllowedSamplingMs.Select(ms => ms / 1000 + "s"))} 중 하나여야 합니다.");
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
var runningCount = (await db.GetFastSessionsAsync()).Count(s => s.Status == "Running");
if (runningCount >= MaxConcurrentSessions)
throw new InvalidOperationException($"동시 실행 가능한 세션은 {MaxConcurrentSessions}개까지입니다.");
// 태그가 realtime_table에 존재하는지 검증
var realtimeRecords = (await db.GetRealtimeRecordsByTagNamesAsync(request.TagList)).ToList();
var found = realtimeRecords.Select(r => r.TagName).ToHashSet();
foreach (var tag in request.TagList)
{
if (!found.Contains(tag))
throw new ArgumentException($"태그 '{tag}'이 realtime_table에 없습니다. 포인트빌더에서 추가 후 구독을 시작하세요.");
}
var session = await db.CreateFastSessionAsync(new FastSessionCreateRequest(
Name: request.Name,
SamplingMs: request.SamplingMs,
DurationSec: request.DurationSec,
TagList: request.TagList,
RetentionDays: request.RetentionDays));
var ctx = new FastSessionContext
{
SessionId = session.Id,
TagList = request.TagList,
SamplingMs = request.SamplingMs,
DurationSec = request.DurationSec,
StartedAt = DateTime.UtcNow,
LastSampledAt = DateTime.MinValue
};
_sessions[session.Id] = ctx;
_logger.LogInformation("[Fast] 세션 {Id} 시작 — 태그 {Count}개, {Ms}ms, {Sec}s",
session.Id, request.TagList.Length, request.SamplingMs, request.DurationSec);
return MapToInfo(session);
}
public async Task StopSessionAsync(int sessionId)
{
if (!_sessions.TryGetValue(sessionId, out var ctx))
throw new InvalidOperationException($"세션 {sessionId}를 찾을 수 없습니다.");
ctx.Cancel = true;
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
await db.UpdateFastSessionStatusAsync(sessionId, "Completed");
await db.UpdateFastSessionRowCountAsync(sessionId, ctx.TotalRows);
_sessions.TryRemove(sessionId, out _);
_logger.LogInformation("[Fast] 세션 {Id} 중지 — 총 {Count}행", sessionId, ctx.TotalRows);
}
public async Task DeleteSessionAsync(int sessionId)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
await db.DeleteFastSessionAsync(sessionId);
_sessions.TryRemove(sessionId, out _);
}
public async Task PinSessionAsync(int sessionId, bool pinned)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
await db.UpdateFastSessionPinnedAsync(sessionId, pinned);
}
public async Task<FastSessionInfo?> GetSessionAsync(int sessionId)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
var session = await db.GetFastSessionAsync(sessionId);
return session == null ? null : MapToInfo(session);
}
public async Task<IEnumerable<FastSessionInfo>> GetSessionsAsync()
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
return (await db.GetFastSessionsAsync()).Select(MapToInfo);
}
public async Task<FastQueryResult> GetRecordsAsync(int sessionId, DateTime? from, DateTime? to, string format = "long")
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
return await db.GetFastRecordsAsync(sessionId, from, to);
}
public async Task ExportCsvAsync(int sessionId, Stream stream, DateTime? from = null, DateTime? to = null)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
await db.ExportFastRecordsToCsvAsync(sessionId, stream, from, to);
}
// ── Private ────────────────────────────────────────────────────────────────
private async Task MonitorLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
await Task.Delay(MonitorIntervalMs, ct);
foreach (var kvp in _sessions.ToList())
{
var ctx = kvp.Value;
if (ctx.Cancel) continue;
if ((DateTime.UtcNow - ctx.StartedAt).TotalSeconds >= ctx.DurationSec)
{
ctx.Cancel = true;
await CompleteSessionAsync(ctx.SessionId, ctx.TotalRows, "Completed");
continue;
}
if ((DateTime.UtcNow - ctx.LastSampledAt).TotalMilliseconds >= ctx.SamplingMs)
{
ctx.LastSampledAt = DateTime.UtcNow;
await SampleAsync(ctx);
}
}
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
_logger.LogError(ex, "[Fast] 모니터링 루프 오류");
}
}
}
private async Task SampleAsync(FastSessionContext ctx)
{
try
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
var realtimeRecords = await db.GetRealtimeRecordsByTagNamesAsync(ctx.TagList);
var now = DateTime.UtcNow;
var records = realtimeRecords
.Select(r => new FastRecord
{
SessionId = ctx.SessionId,
RecordedAt = now,
TagName = r.TagName,
Value = r.LiveValue
})
.ToList();
if (records.Count == 0) return;
await db.BatchInsertFastRecordsAsync(records);
ctx.TotalRows += records.Count;
await db.UpdateFastSessionRowCountAsync(ctx.SessionId, ctx.TotalRows);
if (ctx.TotalRows >= MaxRowsPerSession)
{
ctx.Cancel = true;
await db.UpdateFastSessionStatusAsync(ctx.SessionId, "RowLimitReached");
_sessions.TryRemove(ctx.SessionId, out _);
_logger.LogWarning("[Fast] 세션 {Id} RowLimitReached ({Max}행)", ctx.SessionId, MaxRowsPerSession);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "[Fast] 세션 {Id} 샘플링 오류", ctx.SessionId);
}
}
private async Task CompleteSessionAsync(int sessionId, int totalRows, string status)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
await db.UpdateFastSessionStatusAsync(sessionId, status);
await db.UpdateFastSessionRowCountAsync(sessionId, totalRows);
_sessions.TryRemove(sessionId, out _);
_logger.LogInformation("[Fast] 세션 {Id} {Status} — 총 {Count}행", sessionId, status, totalRows);
}
private static FastSessionInfo MapToInfo(FastSession s) => new(
Id: s.Id,
Name: s.Name,
StartedAt: s.StartedAt,
EndedAt: s.EndedAt,
Status: s.Status,
SamplingMs: s.SamplingMs,
DurationSec: s.DurationSec,
TagList: JsonSerializer.Deserialize<string[]>(s.TagList) ?? [],
RowCount: s.RowCount,
RetentionDays: s.RetentionDays,
Pinned: s.Pinned);
private sealed class FastSessionContext
{
public int SessionId { get; set; }
public string[] TagList { get; set; } = [];
public int SamplingMs { get; set; }
public int DurationSec { get; set; }
public DateTime StartedAt { get; set; }
public DateTime LastSampledAt { get; set; }
public int TotalRows { get; set; }
public bool Cancel { get; set; }
}
}
/// <summary>
/// 만료된 FastSession을 정리하는 BackgroundService.
/// 매일 03:00 UTC에 실행. pinned = true 세션과 retention_days = null 세션은 제외.
/// </summary>
public class ExperionFastCleanupService : BackgroundService
{
private readonly IServiceProvider _sp;
private readonly ILogger<ExperionFastCleanupService> _logger;
public ExperionFastCleanupService(
IServiceProvider sp,
ILogger<ExperionFastCleanupService> logger)
{
_sp = sp;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var now = DateTime.UtcNow;
var next = now.Date.AddDays(1).AddHours(3);
var delay = next - now;
if (delay < TimeSpan.Zero) delay = TimeSpan.Zero;
try { await Task.Delay(delay, stoppingToken); }
catch (OperationCanceledException) { break; }
try
{
using var scope = _sp.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<IExperionDbService>();
var sessions = await db.GetFastSessionsAsync();
var cutoff = DateTime.UtcNow;
foreach (var s in sessions.Where(s =>
!s.Pinned &&
s.RetentionDays.HasValue &&
s.StartedAt.AddDays(s.RetentionDays.Value) < cutoff))
{
_logger.LogInformation("[FastCleanup] 세션 {Id} 삭제 (retention {Days}일 초과)", s.Id, s.RetentionDays);
await db.DeleteFastSessionAsync(s.Id);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "[FastCleanup] 정리 작업 오류");
}
}
}
}