Files
HC900-Crawler/src/Infrastructure/Database/Hc900DbContext.cs
2026-06-10 08:12:01 +09:00

3160 lines
150 KiB
C#
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Hc900Crawler.Core.Application.DTOs;
using Hc900Crawler.Core.Application.Interfaces;
using Hc900Crawler.Core.Domain.Entities;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Npgsql;
using System.Text.Json;
using System.Globalization;
namespace Hc900Crawler.Infrastructure.Database;
// ── DbContext ────────────────────────────────────────────────────────────────
public class Hc900DbContext : DbContext
{
public Hc900DbContext(DbContextOptions<Hc900DbContext> options) : base(options) { }
public DbSet<RealtimePoint> RealtimePoints => Set<RealtimePoint>();
public DbSet<HistoryRecord> HistoryRecords => Set<HistoryRecord>();
public DbSet<FastSession> FastSessions => Set<FastSession>();
public DbSet<FastRecord> FastRecords => Set<FastRecord>();
public DbSet<TagMetadata> TagMetadata => Set<TagMetadata>();
public DbSet<Hc900MapEntry> Hc900MapEntries => Set<Hc900MapEntry>();
// P&ID 데이터베이스용 DbSet
public DbSet<PidEquipment> PidEquipment => Set<PidEquipment>();
public DbSet<PidPrefixRule> PidPrefixRules => Set<PidPrefixRule>();
public DbSet<PidAuditLog> PidAuditLog => Set<PidAuditLog>();
public DbSet<FfOperatorAction> FfOperatorActions => Set<FfOperatorAction>();
public DbSet<PidGraphStatus> PidGraphStatuses => Set<PidGraphStatus>();
public DbSet<EventHistoryRecord> EventHistoryRecords => Set<EventHistoryRecord>();
// ── Knowledge Base ────────────────────────────────────────────────────────
public DbSet<KbCollection> KbCollections => Set<KbCollection>();
public DbSet<KbDocument> KbDocuments => Set<KbDocument>();
public DbSet<KbIngestJob> KbIngestJobs => Set<KbIngestJob>();
public DbSet<KbAdminCredential> KbAdminCredentials => Set<KbAdminCredential>();
public DbSet<KbAdminSession> KbAdminSessions => Set<KbAdminSession>();
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.HasDefaultSchema("hc900");
modelBuilder.Entity<Hc900MapEntry>(e =>
{
e.HasKey(x => x.Id);
e.HasIndex(x => new { x.ControllerId, x.TagName }).IsUnique();
e.HasIndex(x => x.Hc900Tag);
});
modelBuilder.Entity<RealtimePoint>(e =>
{
e.HasKey(x => x.Id);
e.HasIndex(x => x.NodeId).IsUnique();
e.HasIndex(x => x.TagName);
});
modelBuilder.Entity<HistoryRecord>(e =>
{
e.HasKey(x => x.Id);
e.HasIndex(x => x.TagName);
e.HasIndex(x => x.RecordedAt);
});
modelBuilder.Entity<FastSession>(e =>
{
e.HasKey(x => x.Id);
e.HasIndex(x => x.Status);
e.HasIndex(x => x.StartedAt);
e.Property(x => x.TagList).HasColumnType("jsonb");
});
modelBuilder.Entity<FastRecord>(e =>
{
e.HasKey(x => new { x.SessionId, x.RecordedAt, x.TagName });
e.HasIndex(x => x.SessionId);
});
modelBuilder.Entity<TagMetadata>(e =>
{
e.HasKey(x => x.Id);
e.HasIndex(x => new { x.BaseTag, x.Attribute }).IsUnique();
e.HasIndex(x => x.BaseTag);
});
// P&ID 엔티티 설정
modelBuilder.Entity<PidEquipment>(entity =>
{
entity.ToTable("pid_equipment");
entity.HasKey(e => e.Id);
entity.Property(e => e.TagNo)
.IsRequired()
.HasMaxLength(50);
entity.Property(e => e.EquipmentName)
.HasMaxLength(200);
entity.Property(e => e.InstrumentType)
.HasMaxLength(10);
entity.Property(e => e.LineNumber)
.HasMaxLength(100);
entity.Property(e => e.PidDrawingNo)
.HasMaxLength(50);
entity.Property(e => e.Confidence)
.HasPrecision(4, 3);
entity.Property(e => e.IsActive)
.HasDefaultValue(true);
entity.Property(e => e.ConnectionLocked)
.HasDefaultValue(false);
entity.Property(e => e.ExtractedAt)
.HasDefaultValueSql("NOW()");
entity.Property(e => e.UpdatedAt)
.ValueGeneratedOnAdd()
.HasDefaultValueSql("NOW()");
entity.HasIndex(e => e.TagNo);
entity.HasIndex(e => e.InstrumentType);
entity.HasIndex(e => e.ExtractedAt);
entity.HasIndex(e => e.Category);
entity.Property(e => e.Category)
.HasMaxLength(30);
entity.Property(e => e.Role)
.HasMaxLength(100);
entity.Property(e => e.FromTag)
.HasMaxLength(50);
entity.Property(e => e.ToTag)
.HasMaxLength(50);
entity.HasOne(e => e.ExperionTag)
.WithMany()
.HasForeignKey(e => e.ExperionTagId)
.OnDelete(DeleteBehavior.SetNull);
});
modelBuilder.Entity<PidAuditLog>(entity =>
{
entity.ToTable("pid_audit_log");
entity.HasKey(e => e.Id);
entity.Property(e => e.Source)
.HasMaxLength(50)
.HasDefaultValue("WebUI");
entity.Property(e => e.Action)
.HasMaxLength(50);
entity.Property(e => e.TargetTagNo)
.HasMaxLength(50);
entity.Property(e => e.LoggedAt)
.HasDefaultValueSql("NOW()");
entity.HasIndex(e => e.LoggedAt);
});
modelBuilder.Entity<FfOperatorAction>(entity =>
{
entity.ToTable("ff_operator_action");
entity.HasKey(e => e.Id);
entity.Property(e => e.StreamKey).HasMaxLength(50);
entity.Property(e => e.ActionType).HasMaxLength(50);
entity.Property(e => e.NodeId).HasMaxLength(255);
entity.Property(e => e.Result).HasMaxLength(50);
entity.Property(e => e.OperatorName).HasMaxLength(100);
entity.Property(e => e.CreatedAt).HasDefaultValueSql("NOW()");
entity.HasIndex(e => e.ColumnId);
entity.HasIndex(e => e.CreatedAt);
});
modelBuilder.Entity<PidPrefixRule>(entity =>
{
entity.ToTable("pid_prefix_rules");
entity.HasKey(e => e.Id);
entity.Property(e => e.Prefix)
.IsRequired()
.HasMaxLength(30);
entity.HasIndex(e => e.Prefix).IsUnique();
entity.Property(e => e.Category)
.IsRequired()
.HasMaxLength(30);
entity.Property(e => e.Description)
.HasMaxLength(200);
entity.Property(e => e.SortOrder)
.HasDefaultValue(0);
entity.Property(e => e.CreatedAt)
.HasDefaultValueSql("NOW()");
entity.Property(e => e.UpdatedAt)
.HasDefaultValueSql("NOW()");
entity.HasIndex(e => e.Category);
});
modelBuilder.Entity<PidGraphStatus>(entity =>
{
entity.ToTable("pid_graph_status");
entity.HasKey(e => e.TaskId);
entity.Property(e => e.Status)
.HasMaxLength(20);
entity.Property(e => e.Message)
.HasMaxLength(500);
entity.HasIndex(e => e.UpdatedAt);
});
modelBuilder.Entity<EventHistoryRecord>(entity =>
{
entity.ToTable("event_history_table");
entity.HasKey(e => e.Id);
entity.Property(e => e.Metadata)
.HasColumnType("jsonb");
entity.HasIndex(e => new { e.TagName, e.EventTime });
entity.HasIndex(e => new { e.Area, e.EventTime });
entity.HasIndex(e => new { e.EventType, e.EventTime });
});
// ── Knowledge Base ───────────────────────────────────────────────────
modelBuilder.Entity<KbCollection>(e =>
{
e.HasKey(x => x.CollectionKey);
});
modelBuilder.Entity<KbDocument>(e =>
{
e.HasKey(x => x.Id);
e.Property(x => x.Tags).HasColumnType("text[]");
e.HasIndex(x => new { x.CollectionKey, x.Status, x.UploadedAt });
e.HasIndex(x => x.Title);
});
modelBuilder.Entity<KbIngestJob>(e =>
{
e.HasKey(x => x.Id);
e.HasIndex(x => new { x.Stage, x.FinishedAt });
});
modelBuilder.Entity<KbAdminCredential>(e =>
{
e.HasKey(x => x.Id);
});
modelBuilder.Entity<KbAdminSession>(e =>
{
e.HasKey(x => x.Token);
e.HasIndex(x => x.ExpiresAt);
});
}
// ── HC900 Point Builder ───────────────────────────────────────────────────
public async Task<int> Hc900BuildRealtimeTableAsync(
IEnumerable<(string GroupKey, Hc900PointBuilderGroupDto Group)> groups)
{
using var tx = await Database.BeginTransactionAsync();
try
{
await Database.ExecuteSqlRawAsync(
"UPDATE hc900_map_master SET is_active = false");
int total = 0;
foreach (var (groupKey, group) in groups)
{
var matched = await BuildGroupQuery(group).ToListAsync();
foreach (var entry in matched)
{
entry.IsActive = true;
total++;
}
}
await SaveChangesAsync();
await Hc900SyncRealtimeTableAsync();
await tx.CommitAsync();
return total;
}
catch
{
await tx.RollbackAsync();
throw;
}
}
public async Task<Hc900PointBuilderPreviewResult> Hc900PreviewRealtimeBuildAsync(
IEnumerable<(string GroupKey, Hc900PointBuilderGroupDto Group)> groups)
{
var items = new List<Hc900PointBuilderPreviewItem>();
foreach (var (groupKey, group) in groups)
{
var matched = await BuildGroupQuery(group)
.Select(e => new Hc900PointBuilderPreviewItem
{
TagName = e.TagName,
Hc900Tag = e.Hc900Tag,
ModbusAddr = e.ModbusAddr,
ParamType = e.ParamType ?? "",
DataType = e.DataType,
LoopNo = e.LoopNo,
Access = e.Access,
ControllerId = e.ControllerId,
Group = groupKey,
IsActive = e.IsActive
})
.ToListAsync();
items.AddRange(matched);
}
return new Hc900PointBuilderPreviewResult { Count = items.Count, Items = items };
}
public async Task<int> Hc900ApplySelectedPointsAsync(IEnumerable<string> selectedTagNames)
{
var tagNames = selectedTagNames.Where(n => !string.IsNullOrEmpty(n)).ToList();
if (tagNames.Count == 0) return 0;
using var tx = await Database.BeginTransactionAsync();
try
{
await Database.ExecuteSqlRawAsync(
"UPDATE hc900_map_master SET is_active = false");
var count = await Hc900MapEntries
.Where(x => tagNames.Contains(x.TagName))
.ExecuteUpdateAsync(s => s.SetProperty(x => x.IsActive, true));
await Hc900SyncRealtimeTableAsync();
await tx.CommitAsync();
return count;
}
catch
{
await tx.RollbackAsync();
throw;
}
}
public async Task<int> Hc900AppendPointsAsync(IEnumerable<string> tagNames)
{
var tagNamesList = tagNames.Where(n => !string.IsNullOrEmpty(n)).ToList();
if (tagNamesList.Count == 0) return 0;
using var tx = await Database.BeginTransactionAsync();
try
{
var count = await Hc900MapEntries
.Where(x => tagNamesList.Contains(x.TagName) && !x.IsActive)
.ExecuteUpdateAsync(s => s.SetProperty(x => x.IsActive, true));
await Hc900SyncRealtimeTableAsync();
await tx.CommitAsync();
return count;
}
catch
{
await tx.RollbackAsync();
throw;
}
}
public async Task<RealtimePoint> Hc900AddRealtimePointAsync(
string tagName, string hc900Tag, int modbusAddr,
string dataType, string? paramType, string access, string controllerId)
{
var existing = await Hc900MapEntries
.FirstOrDefaultAsync(x => x.TagName == tagName && x.ControllerId == controllerId);
if (existing == null)
{
existing = new Hc900MapEntry
{
TagName = tagName,
Hc900Tag = hc900Tag ?? tagName,
ModbusAddr = modbusAddr,
DataType = dataType,
ParamType = paramType,
Access = access,
ControllerId = controllerId,
IsActive = true
};
Hc900MapEntries.Add(existing);
}
else
{
existing.IsActive = true;
}
await SaveChangesAsync();
return await Hc900SyncSinglePointAsync(tagName, controllerId);
}
// ── Point Builder Helpers ─────────────────────────────────────────────────
private IQueryable<Hc900MapEntry> BuildGroupQuery(Hc900PointBuilderGroupDto group)
{
var q = Hc900MapEntries.AsQueryable();
if (group.TagPatterns?.Any() == true)
{
var patterns = group.TagPatterns.Where(p => !string.IsNullOrEmpty(p)).ToList();
if (patterns.Count > 0)
{
q = q.Where(x => patterns.Any(p => EF.Functions.ILike(x.TagName, $"%{p}%")));
}
}
if (group.ParamTypes?.Any() == true)
{
var validTypes = group.ParamTypes.Where(p => !string.IsNullOrEmpty(p)).ToList();
if (validTypes.Count > 0)
q = q.Where(x => validTypes.Contains(x.ParamType));
}
if (!string.IsNullOrEmpty(group.DataType))
q = q.Where(x => x.DataType == group.DataType);
if (group.LoopNo.HasValue)
q = q.Where(x => x.LoopNo == group.LoopNo.Value);
return q;
}
private async Task Hc900SyncRealtimeTableAsync()
{
var activeTags = await Hc900MapEntries
.Where(x => x.IsActive)
.Select(x => new { x.TagName, x.Hc900Tag, x.ControllerId })
.ToListAsync();
var activeTagNames = activeTags.Select(t => t.TagName).ToList();
var existingTagNames = await RealtimePoints
.Select(r => r.TagName)
.ToListAsync();
var toRemove = existingTagNames.Except(activeTagNames).ToList();
if (toRemove.Count > 0)
{
RealtimePoints.RemoveRange(
RealtimePoints.Where(r => toRemove.Contains(r.TagName)));
}
var toAddTagNames = activeTagNames.Except(existingTagNames).ToList();
if (toAddTagNames.Count > 0)
{
var toAdd = activeTags
.Where(t => toAddTagNames.Contains(t.TagName))
.Select(t => new RealtimePoint
{
TagName = t.TagName,
NodeId = t.Hc900Tag,
LiveValue = null,
Timestamp = DateTime.UtcNow,
ControllerId = t.ControllerId
}).ToList();
RealtimePoints.AddRange(toAdd);
}
await SaveChangesAsync();
}
private async Task<RealtimePoint> Hc900SyncSinglePointAsync(string tagName, string controllerId)
{
var existing = await RealtimePoints
.FirstOrDefaultAsync(r => r.TagName == tagName && r.ControllerId == controllerId);
if (existing != null)
{
existing.Timestamp = DateTime.UtcNow;
await SaveChangesAsync();
return existing;
}
var point = new RealtimePoint
{
TagName = tagName,
NodeId = tagName,
LiveValue = null,
Timestamp = DateTime.UtcNow,
ControllerId = controllerId
};
RealtimePoints.Add(point);
await SaveChangesAsync();
return point;
}
}
// ── Service ──────────────────────────────────────────────────────────────────
public class Hc900DbService : IExperionDbService
{
private readonly Hc900DbContext _ctx;
private readonly ILogger<Hc900DbService> _logger;
private const int DigitalTagCacheTtlSeconds = 300;
// archive_enabled = TRUE 인 (controller_id, tagname) 쌍 캐시.
// peer 통신으로 동일 tagname이 컨트롤러마다 다른 주소/플래그로 존재할 수 있어 컨트롤러 단위로 매칭한다.
private static HashSet<(string ControllerId, string TagName)> _archiveTagPairCache = new();
private static DateTime _archiveTagCacheTime = DateTime.MinValue;
private static Task? _archiveCacheRefreshTask = null;
private static readonly object _archiveCacheLock = new();
// 디지털 포인트 (controller_id, name) 쌍 캐시 — 위와 동일한 이유로 컨트롤러 단위 매칭.
private static HashSet<(string ControllerId, string Name)> _digitalTagPairCache = new();
private static DateTime _digitalTagPairCacheTime = DateTime.MinValue;
private static Task? _digitalPairCacheRefreshTask = null;
private static readonly object _digitalPairCacheLock = new();
public Hc900DbService(Hc900DbContext ctx, ILogger<Hc900DbService> logger)
{
_ctx = ctx;
_logger = logger;
}
public async Task<bool> InitializeAsync()
{
try
{
await _ctx.Database.EnsureCreatedAsync();
// ── fast_session / fast_record 테이블 생성 ────────────────────────────────
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS fast_session (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'Pending',
sampling_ms INTEGER NOT NULL,
duration_sec INTEGER NOT NULL,
tag_list JSONB NOT NULL DEFAULT '[]',
row_count INTEGER NOT NULL DEFAULT 0,
retention_days INTEGER,
pinned BOOLEAN NOT NULL DEFAULT FALSE
)
""");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS fast_record (
session_id INTEGER NOT NULL REFERENCES fast_session(id) ON DELETE CASCADE,
recorded_at TIMESTAMPTZ NOT NULL,
tagname TEXT NOT NULL,
value TEXT,
PRIMARY KEY (session_id, recorded_at, tagname)
)
""");
// PK 마이그레이션: 기존 테이블 PK에 recorded_at 없으면 수정 (TimescaleDB hypertable 요건)
await _ctx.Database.ExecuteSqlRawAsync("""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = 'fast_record' AND schemaname = 'public')
AND NOT EXISTS (
SELECT 1 FROM pg_constraint c
JOIN pg_class t ON t.oid = c.conrelid
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(c.conkey)
WHERE t.relname = 'fast_record' AND c.contype = 'p' AND a.attname = 'recorded_at'
)
THEN
ALTER TABLE fast_record DROP CONSTRAINT IF EXISTS fast_record_pkey;
ALTER TABLE fast_record ADD PRIMARY KEY (session_id, recorded_at, tagname);
END IF;
END $$;
""");
// TimescaleDB hypertable 생성 — EF Core를 거치지 않아야 fail 로그 없음
try
{
var connStr = _ctx.Database.GetConnectionString()!;
await using var conn = new NpgsqlConnection(connStr);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT create_hypertable('fast_record'::regclass, 'recorded_at'::name, if_not_exists => TRUE, migrate_data => TRUE)";
await cmd.ExecuteNonQueryAsync();
cmd.CommandText = "SELECT set_chunk_time_interval('fast_record'::regclass, INTERVAL '1 day')";
await cmd.ExecuteNonQueryAsync();
}
catch
{
// TimescaleDB 미설치 또는 이미 하이퍼테이블 — 무시하고 계속
}
// EnsureCreatedAsync는 기존 DB에 새 테이블을 추가하지 않으므로
// raw_node_map / node_map_master 는 DDL로 직접 보장
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS raw_node_map (
id SERIAL PRIMARY KEY,
level INTEGER NOT NULL,
class TEXT NOT NULL,
name TEXT NOT NULL,
node_id TEXT NOT NULL,
data_type TEXT NOT NULL
)
""");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS node_map_master (
id SERIAL PRIMARY KEY,
level INTEGER NOT NULL,
class TEXT NOT NULL,
name TEXT NOT NULL,
node_id TEXT NOT NULL,
data_type TEXT NOT NULL
)
""");
// realtime_table 생성 (실시간 모니터링 포인트)
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS realtime_table (
id SERIAL PRIMARY KEY,
tagname TEXT NOT NULL,
node_id TEXT NOT NULL,
livevalue TEXT,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
controller_id TEXT NOT NULL DEFAULT 'HC1'
)
""");
// realtime_table은 실시간 값 저장용이므로 하이퍼테이블로 변환하지 않음
// tag_metadata 테이블 생성 (메타데이터 - 변경 드묾)
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS tag_metadata (
id SERIAL PRIMARY KEY,
base_tag TEXT NOT NULL,
attribute TEXT NOT NULL,
value TEXT,
node_id TEXT,
loaded_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(base_tag, attribute)
)
""");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE INDEX IF NOT EXISTS idx_tag_metadata_base ON tag_metadata(base_tag)
""");
// pid_equipment.connection_locked — EnsureCreated 는 기존 테이블에 컬럼을
// 추가하지 않으므로 멱등 ALTER 로 보장 (엑셀 import 수동교정 보호 플래그)
await _ctx.Database.ExecuteSqlRawAsync("""
ALTER TABLE pid_equipment
ADD COLUMN IF NOT EXISTS connection_locked BOOLEAN NOT NULL DEFAULT FALSE
""");
// event_history_table 생성 (디지털 포인트 상태 변경 이벤트)
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS event_history_table (
id BIGSERIAL PRIMARY KEY,
tagname TEXT NOT NULL,
node_id TEXT NOT NULL,
prev_value TEXT,
curr_value TEXT NOT NULL DEFAULT '',
event_type TEXT NOT NULL,
event_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
area TEXT,
sub_area TEXT,
duration_seconds INT,
metadata JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""");
await _ctx.Database.ExecuteSqlRawAsync("""
DROP INDEX IF EXISTS idx_event_history_section_time;
CREATE INDEX IF NOT EXISTS idx_event_history_tagname_time
ON event_history_table(tagname, event_time DESC);
CREATE INDEX IF NOT EXISTS idx_event_history_area_time
ON event_history_table(area, event_time DESC);
CREATE INDEX IF NOT EXISTS idx_event_history_sub_area_time
ON event_history_table(sub_area, event_time DESC);
CREATE INDEX IF NOT EXISTS idx_event_history_event_type
ON event_history_table(event_type, event_time DESC);
CREATE INDEX IF NOT EXISTS idx_event_history_tagname_event_type
ON event_history_table(tagname, event_type, event_time DESC);
""");
// ── Multi-Controller Migration (controller_id) ─────────────────────
// 기존 테이블에 controller_id 컬럼 추가 (멱등)
await _ctx.Database.ExecuteSqlRawAsync(
"ALTER TABLE realtime_table ADD COLUMN IF NOT EXISTS controller_id TEXT NOT NULL DEFAULT 'HC1'");
await _ctx.Database.ExecuteSqlRawAsync(
"ALTER TABLE history_table ADD COLUMN IF NOT EXISTS controller_id TEXT");
await _ctx.Database.ExecuteSqlRawAsync(
"ALTER TABLE event_history_table ADD COLUMN IF NOT EXISTS controller_id TEXT NOT NULL DEFAULT 'HC1'");
await _ctx.Database.ExecuteSqlRawAsync(
"ALTER TABLE tag_metadata ADD COLUMN IF NOT EXISTS controller_id TEXT NOT NULL DEFAULT 'HC1'");
await _ctx.Database.ExecuteSqlRawAsync(
"ALTER TABLE hc900_map_master ADD COLUMN IF NOT EXISTS controller_id TEXT NOT NULL DEFAULT 'HC1'");
// hc900_map_master: realtime_enabled / archive_enabled columns (register-map generation v2)
await _ctx.Database.ExecuteSqlRawAsync(
"ALTER TABLE hc900_map_master ADD COLUMN IF NOT EXISTS realtime_enabled BOOLEAN NOT NULL DEFAULT TRUE");
await _ctx.Database.ExecuteSqlRawAsync(
"ALTER TABLE hc900_map_master ADD COLUMN IF NOT EXISTS archive_enabled BOOLEAN NOT NULL DEFAULT FALSE");
// hc900_map_master: tagname is unique PER controller, not globally — the same
// SignalTag name can exist on several controllers (peer comms). Replace the old
// UNIQUE(tagname) with UNIQUE(controller_id, tagname).
await _ctx.Database.ExecuteSqlRawAsync("""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = 'hc900_map_master'::regclass
AND conname = 'hc900_map_master_tagname_key'
) THEN
ALTER TABLE hc900_map_master DROP CONSTRAINT hc900_map_master_tagname_key;
END IF;
IF NOT EXISTS (
SELECT 1 FROM pg_indexes
WHERE tablename = 'hc900_map_master'
AND indexname = 'ux_hc900_map_ctrl_tag'
) THEN
CREATE UNIQUE INDEX ux_hc900_map_ctrl_tag
ON hc900_map_master(controller_id, tagname);
END IF;
END $$;
""");
// tag_metadata: same base_tag/attribute can exist per controller (peer comms),
// so the unique key must include controller_id. The register-map loader upserts
// with ON CONFLICT(base_tag, attribute, controller_id); the old UNIQUE(base_tag,
// attribute) made every metadata upsert fail. Replace it.
await _ctx.Database.ExecuteSqlRawAsync("""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = 'tag_metadata'::regclass
AND conname = 'tag_metadata_base_tag_attribute_key'
) THEN
ALTER TABLE tag_metadata DROP CONSTRAINT tag_metadata_base_tag_attribute_key;
END IF;
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = 'tag_metadata'::regclass
AND conname = 'tag_metadata_base_tag_attribute_controller_key'
) THEN
ALTER TABLE tag_metadata
ADD CONSTRAINT tag_metadata_base_tag_attribute_controller_key
UNIQUE (base_tag, attribute, controller_id);
END IF;
END $$;
""");
// realtime_table: UNIQUE(controller_id, tagname) for ON CONFLICT upsert
// Also drop the legacy tagname-only UNIQUE index/constraint that would
// conflict with peer-mirrored tags (same tagname, different controller_id).
await _ctx.Database.ExecuteSqlRawAsync("""
DO $$
BEGIN
-- Drop legacy tagname-only unique constraint if present
IF EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = 'realtime_table'::regclass
AND conname = 'realtime_table_tagname_key'
) THEN
ALTER TABLE realtime_table DROP CONSTRAINT realtime_table_tagname_key;
END IF;
-- Drop legacy tagname-only unique index if present
IF EXISTS (
SELECT 1 FROM pg_indexes
WHERE tablename = 'realtime_table'
AND indexname = 'realtime_table_tagname_key'
) THEN
DROP INDEX IF EXISTS realtime_table_tagname_key;
END IF;
-- Ensure composite unique index exists
IF NOT EXISTS (
SELECT 1 FROM pg_indexes
WHERE tablename = 'realtime_table'
AND indexname = 'idx_realtime_table_ctrl_tag_unique'
) THEN
CREATE UNIQUE INDEX idx_realtime_table_ctrl_tag_unique
ON realtime_table(controller_id, tagname);
END IF;
END $$;
""");
// v_tag_summary 뷰 재생성 (controller_id 포함)
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_tag_summary CASCADE");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE VIEW v_tag_summary AS
SELECT
rt_base.base_tag,
pv_rt.livevalue AS pv,
sp_rt.livevalue AS sp,
op_rt.livevalue AS op,
instate0_rt.livevalue AS instate0,
instate1_rt.livevalue AS instate1,
instate2_rt.livevalue AS instate2,
desc_md.value AS description,
area_md.value AS area,
sub_area_md.value AS sub_area,
rt_base.controller_id
FROM (SELECT DISTINCT split_part(tagname, '.', 1) AS base_tag, controller_id FROM realtime_table) rt_base
LEFT JOIN realtime_table pv_rt ON pv_rt.tagname = rt_base.base_tag || '.pv' AND pv_rt.controller_id = rt_base.controller_id
LEFT JOIN realtime_table sp_rt ON sp_rt.tagname = rt_base.base_tag || '.sp' AND sp_rt.controller_id = rt_base.controller_id
LEFT JOIN realtime_table op_rt ON op_rt.tagname = rt_base.base_tag || '.op' AND op_rt.controller_id = rt_base.controller_id
LEFT JOIN realtime_table instate0_rt ON instate0_rt.tagname = rt_base.base_tag || '.instate0' AND instate0_rt.controller_id = rt_base.controller_id
LEFT JOIN realtime_table instate1_rt ON instate1_rt.tagname = rt_base.base_tag || '.instate1' AND instate1_rt.controller_id = rt_base.controller_id
LEFT JOIN realtime_table instate2_rt ON instate2_rt.tagname = rt_base.base_tag || '.instate2' AND instate2_rt.controller_id = rt_base.controller_id
LEFT JOIN tag_metadata desc_md ON desc_md.base_tag = rt_base.base_tag AND desc_md.attribute = 'desc'
LEFT JOIN tag_metadata area_md ON area_md.base_tag = rt_base.base_tag AND area_md.attribute = 'area'
LEFT JOIN tag_metadata sub_area_md ON sub_area_md.base_tag = rt_base.base_tag AND sub_area_md.attribute = 'sub_area'
""");
// v_plant_running_state 뷰 — area별 펌프 RUN/STOP/TRIP 집계 (v_tag_summary 의존)
// view만 재생성 (DROP VIEW v_plant_running_state IF EXISTS + CREATE)
// v_tag_summary를 CASCADE로 drop했으므로 여기서 재생성 필요
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_plant_running_state");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE VIEW v_plant_running_state AS
WITH pump_state AS (
SELECT
trim(split_part(area, '|', 2)) AS area_code,
area AS area_raw,
base_tag,
pv,
controller_id
FROM v_tag_summary
WHERE area IS NOT NULL
AND (base_tag LIKE 'p-%' OR base_tag LIKE 'vp-%')
AND pv ~ '\|\s*(L-RUN|R-RUN|L-STOP|R-STOP|L-TRIP|R-TRIP)\s*\|'
)
SELECT
area_code,
MAX(area_raw) AS area_raw,
COUNT(*) AS total_pumps,
COUNT(*) FILTER (WHERE pv ~ '\|\s*[LR]-RUN\s*\|') AS running_pumps,
COUNT(*) FILTER (WHERE pv ~ '\|\s*[LR]-TRIP\s*\|') AS tripped_pumps,
COUNT(*) FILTER (WHERE pv ~ '\|\s*(L-STOP|R-STOP)\s*\|') AS stopped_pumps,
CASE
WHEN COUNT(*) FILTER (WHERE pv ~ '\|\s*[LR]-RUN\s*\|') > 0 THEN 'RUNNING'
WHEN COUNT(*) FILTER (WHERE pv ~ '\|\s*[LR]-TRIP\s*\|') > 0 THEN 'TRIPPED'
ELSE 'STOPPED'
END AS status,
array_agg(base_tag) FILTER (WHERE pv ~ '\|\s*[LR]-RUN\s*\|') AS running_pump_tags
FROM pump_state
WHERE area_code IS NOT NULL AND area_code <> ''
GROUP BY area_code
ORDER BY area_code
""");
// ── P&ID 테이블 ──────────────────────────────────────────────────
// PidEquipment/PidAuditLog/PidGraphStatus 엔티티는 OnModelCreating에
// 등록되어 있지만 EnsureCreatedAsync는 기존 DB에 새 테이블을 추가하지 않으므로
// DDL로 직접 보장한다.
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS pid_equipment (
id BIGSERIAL PRIMARY KEY,
tag_no VARCHAR(50) NOT NULL,
equipment_name VARCHAR(200),
instrument_type VARCHAR(10),
line_number VARCHAR(100),
pid_drawing_no VARCHAR(50),
confidence NUMERIC(4,3) NOT NULL DEFAULT 0,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
extracted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ,
experion_tag_id INTEGER REFERENCES realtime_table(id) ON DELETE SET NULL
)
""");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE INDEX IF NOT EXISTS idx_pid_equipment_tag_no
ON pid_equipment(tag_no);
CREATE INDEX IF NOT EXISTS idx_pid_equipment_instrument_type
ON pid_equipment(instrument_type);
CREATE INDEX IF NOT EXISTS idx_pid_equipment_extracted_at
ON pid_equipment(extracted_at DESC);
""");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS pid_audit_log (
id BIGSERIAL PRIMARY KEY,
source VARCHAR(50) NOT NULL DEFAULT 'WebUI',
action VARCHAR(50) NOT NULL,
target_tag_no VARCHAR(50) NOT NULL,
old_value TEXT,
new_value TEXT,
logged_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE INDEX IF NOT EXISTS idx_pid_audit_log_logged_at
ON pid_audit_log(logged_at DESC);
""");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS pid_graph_status (
task_id TEXT PRIMARY KEY,
progress DOUBLE PRECISION NOT NULL DEFAULT 0,
status VARCHAR(20) NOT NULL DEFAULT 'Pending',
message VARCHAR(500) NOT NULL DEFAULT '',
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE INDEX IF NOT EXISTS idx_pid_graph_status_updated_at
ON pid_graph_status(updated_at DESC);
""");
// pid_equipment 카테고리 컬럼 추가 (migration 없이)
await _ctx.Database.ExecuteSqlRawAsync("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name='pid_equipment' AND column_name='category'
) THEN
ALTER TABLE pid_equipment
ADD COLUMN category TEXT,
ADD COLUMN role TEXT,
ADD COLUMN from_tag TEXT,
ADD COLUMN to_tag TEXT;
CREATE INDEX IF NOT EXISTS idx_pid_equipment_category
ON pid_equipment(category);
END IF;
END $$;
""");
// pid_equipment 계기 하위분류 컬럼 (field/system) — 기존 DB에도 안전 추가
await _ctx.Database.ExecuteSqlRawAsync(
"ALTER TABLE pid_equipment ADD COLUMN IF NOT EXISTS tag_class TEXT;");
// ── tag_dcs 마이그레이션 (DCS 함수블록 vs 현장 계기 구별) ─────────────────
// Step 1: pid_prefix_rules 컬럼 추가
await _ctx.Database.ExecuteSqlRawAsync("""
ALTER TABLE pid_prefix_rules
ADD COLUMN IF NOT EXISTS tag_dcs BOOLEAN NOT NULL DEFAULT FALSE;
""");
// Step 2: DCS prefix 마킹 (기본형 — compound형은 Step 4 StartsWith로 커버)
await _ctx.Database.ExecuteSqlRawAsync("""
UPDATE pid_prefix_rules
SET tag_dcs = TRUE
WHERE prefix IN ('FIC','TIC','PIC','LIC','FY','TY','PY','LY','FV','TV','PV','LV');
""");
// Step 3+4: pid_equipment 컬럼 추가 + backfill (최초 1회만 실행)
// ADD COLUMN IF NOT EXISTS는 멱등하지만 뒤따르는 UPDATE backfill이
// 매번 재실행되면 수동 변경(FIT-9128 tag_dcs=false)을 덮어쓰는 문제가 있음.
// → PL/pgSQL 블록으로 컬럼 존재 여부를 검사하여 최초 1회만 실행
await _ctx.Database.ExecuteSqlRawAsync("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name='pid_equipment' AND column_name='tag_dcs'
) THEN
ALTER TABLE pid_equipment
ADD COLUMN tag_dcs BOOLEAN NOT NULL DEFAULT FALSE;
UPDATE pid_equipment pe
SET tag_dcs = TRUE
FROM pid_prefix_rules pr
WHERE pe.instrument_type LIKE (pr.prefix || '%')
AND pr.tag_dcs = TRUE;
END IF;
END $$;
""");
// ─────────────────────────────────────────────────────────────────────────
// pid_equipment 좌표/파일명 컬럼
await _ctx.Database.ExecuteSqlRawAsync("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name='pid_equipment' AND column_name='pos_x'
) THEN
ALTER TABLE pid_equipment
ADD COLUMN pos_x DOUBLE PRECISION,
ADD COLUMN pos_y DOUBLE PRECISION,
ADD COLUMN drawing_file VARCHAR(255);
CREATE INDEX IF NOT EXISTS idx_pid_equipment_drawing_file
ON pid_equipment(drawing_file);
END IF;
END $$;
""");
// pid_prefix_rules 테이블
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS pid_prefix_rules (
id SERIAL PRIMARY KEY,
prefix TEXT NOT NULL UNIQUE,
category TEXT NOT NULL,
description TEXT,
sort_order INT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE INDEX IF NOT EXISTS idx_pid_prefix_rules_category
ON pid_prefix_rules(category);
""");
// pid_prefix_rules 시드 데이터 (SQL 주석 제거 — ExecuteSqlRawAsync에서 구문 오류 발생)
await _ctx.Database.ExecuteSqlRawAsync("""
INSERT INTO pid_prefix_rules (prefix, category, description, sort_order) VALUES
('FT', 'instrument', 'Flow Transmitter', 10),
('TT', 'instrument', 'Temperature Transmitter', 10),
('PT', 'instrument', 'Pressure Transmitter', 10),
('LT', 'instrument', 'Level Transmitter', 10),
('FCV', 'instrument', 'Flow Control Valve', 10),
('PCV', 'instrument', 'Pressure Control Valve', 10),
('LCV', 'instrument', 'Level Control Valve', 10),
('TCV', 'instrument', 'Temperature Control Valve', 10),
('PSV', 'instrument', 'Pressure Safety Valve', 10),
('XV', 'instrument', 'Shut-off Valve', 10),
('FIC', 'instrument', 'Flow Indicator Controller', 10),
('TIC', 'instrument', 'Temperature Indicator Controller', 10),
('PIC', 'instrument', 'Pressure Indicator Controller', 10),
('LIC', 'instrument', 'Level Indicator Controller', 10),
('DP', 'instrument', 'Differential Pressure', 10),
('FV', 'instrument', 'Flow Valve', 10),
('TV', 'instrument', 'Temperature Valve', 10),
('PV', 'instrument', 'Pressure Valve', 10),
('LV', 'instrument', 'Level Valve', 10),
('FG', 'instrument', 'Flow Gauge', 10),
('TG', 'instrument', 'Temperature Gauge', 10),
('PG', 'instrument', 'Pressure Gauge', 10),
('LG', 'instrument', 'Level Gauge', 10),
('FY', 'instrument', 'Flow Relay/Converter', 10),
('TY', 'instrument', 'Temperature Relay/Converter', 10),
('PY', 'instrument', 'Pressure Relay/Converter', 10),
('LY', 'instrument', 'Level Relay/Converter', 10),
('BV', 'instrument', 'Ball/Butterfly Valve', 10),
('VIP', 'instrument', 'Vibration Probe', 10),
('VIT', 'instrument', 'Vibration Transmitter', 10),
('P-', 'power_equipment', 'Pump', 20),
('C-', 'power_equipment', 'Compressor', 20),
('K-', 'power_equipment', 'Agitator/Mixer', 20),
('F-', 'power_equipment', 'Fan/Blower', 20),
('M-', 'power_equipment', 'Motor', 20),
('T-', 'storage_equipment', 'Tank', 30),
('D-', 'storage_equipment', 'Drum', 30),
('V-', 'storage_equipment', 'Vessel', 30),
('TK-', 'storage_equipment', 'Tank (alt)', 30),
('E-', 'process_equipment', 'Heat Exchanger', 40),
('H-', 'process_equipment', 'Heater/Furnace', 40),
('R-', 'process_equipment', 'Reactor', 40),
('COL-','process_equipment', 'Column', 40),
('S-', 'process_equipment', 'Separator', 40),
('FIL-','process_equipment', 'Filter', 40),
('SP-', 'process_equipment', 'Separator', 40),
('CWR-','utility_equipment', 'Cooling Water Return', 50),
('CWS-','utility_equipment', 'Cooling Water Supply', 50),
('IA-', 'utility_equipment', 'Instrument Air', 50),
('ST-', 'utility_equipment', 'Steam', 50),
('WW-', 'utility_equipment', 'Waste Water', 50),
('PW-', 'utility_equipment', 'Process/Potable Water', 50),
('SW-', 'utility_equipment', 'Service Water', 50),
('VG-', 'utility_equipment', 'Vent Gas', 50),
('CD-', 'utility_equipment', 'Condensate', 50),
('SAM-','utility_equipment', 'Sample Connection', 50),
('CHR-','utility_equipment', 'Chemical Return', 50),
('CHS-','utility_equipment', 'Chemical Supply', 50),
('CH-', 'utility_equipment', 'Chemical', 50),
('NG-', 'utility_equipment', 'Natural Gas', 50),
('BD-', 'utility_equipment', 'Blowdown', 50),
('FL-', 'utility_equipment', 'Flare', 50),
('LO-', 'utility_equipment', 'Lube Oil', 50)
ON CONFLICT (prefix) DO NOTHING;
""");
// history 테이블은 수동으로 하이퍼테이블 생성 필요
// CreateHypertableAsync() 메서드를 사용하여 수동 생성 가능
// 참고: 하이퍼테이블 생성 후 보존 정책, 압축 정책, 연속 집계 설정은
// CreateHypertableAsync() 메서드에서 선택적으로 설정 가능
// ── Knowledge Base (RAG) 테이블 ──────────────────────────────────
// 주의: KB DDL은 JSONB '{}' / TEXT[] '{}' / 시드 JSON 객체에 중괄호가 들어가는데
// EF Core의 ExecuteSqlRawAsync는 SQL을 String.Format으로 통과시켜 {}를 placeholder로 오해함.
// 이를 피하기 위해 raw NpgsqlCommand를 직접 사용한다.
await using (var kbConn = new NpgsqlConnection(_ctx.Database.GetConnectionString()))
{
await kbConn.OpenAsync();
async Task ExecKbAsync(string sql)
{
await using var cmd = new NpgsqlCommand(sql, kbConn);
await cmd.ExecuteNonQueryAsync();
}
await ExecKbAsync("CREATE EXTENSION IF NOT EXISTS \"pgcrypto\"");
await ExecKbAsync("""
CREATE TABLE IF NOT EXISTS kb_collections (
collection_key TEXT PRIMARY KEY,
display_name TEXT NOT NULL,
qdrant_name TEXT NOT NULL UNIQUE,
chunking_policy JSONB NOT NULL DEFAULT '{}'::jsonb,
description TEXT,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""");
await ExecKbAsync("""
CREATE TABLE IF NOT EXISTS kb_documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
collection_key TEXT NOT NULL REFERENCES kb_collections(collection_key),
title TEXT NOT NULL,
original_path TEXT NOT NULL,
file_sha256 TEXT NOT NULL,
file_size BIGINT,
mime_type TEXT,
tags TEXT[] NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'pending',
chunk_count INTEGER NOT NULL DEFAULT 0,
error_message TEXT,
uploaded_by TEXT,
uploaded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
indexed_at TIMESTAMPTZ,
disabled_at TIMESTAMPTZ
)
""");
await ExecKbAsync("""
CREATE INDEX IF NOT EXISTS idx_kb_docs_coll_status
ON kb_documents(collection_key, status, uploaded_at DESC);
CREATE INDEX IF NOT EXISTS idx_kb_docs_title
ON kb_documents(title);
""");
await ExecKbAsync("""
CREATE TABLE IF NOT EXISTS kb_ingest_jobs (
id BIGSERIAL PRIMARY KEY,
doc_id UUID NOT NULL REFERENCES kb_documents(id) ON DELETE CASCADE,
stage TEXT NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ
)
""");
await ExecKbAsync("""
CREATE INDEX IF NOT EXISTS idx_kb_jobs_pending
ON kb_ingest_jobs(stage, finished_at)
WHERE finished_at IS NULL;
""");
await ExecKbAsync("""
CREATE TABLE IF NOT EXISTS kb_admin_credential (
id INTEGER PRIMARY KEY DEFAULT 1 CHECK (id = 1),
password_hash TEXT NOT NULL,
salt TEXT NOT NULL,
algorithm TEXT NOT NULL DEFAULT 'argon2id',
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""");
await ExecKbAsync("""
CREATE TABLE IF NOT EXISTS kb_admin_sessions (
token TEXT PRIMARY KEY,
issued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
client_ip TEXT
)
""");
await ExecKbAsync("""
CREATE INDEX IF NOT EXISTS idx_kb_sessions_expires
ON kb_admin_sessions(expires_at);
""");
// ── 시드: kb_collections 5종 ─────────────────────────────────
await ExecKbAsync("""
INSERT INTO kb_collections (collection_key, display_name, qdrant_name, chunking_policy, description)
VALUES
('system_instrument', ' & ', 'kb_system_instrument',
'{"pdf":"section+table","xlsx":"row+sheet","docx":"heading"}'::jsonb,
' datasheet, P&ID, , '),
('plant_operation', ' ', 'kb_plant_operation',
'{"xlsx":"row","docx":"heading","md":"heading"}'::jsonb,
', , , '),
('procedure', '/SOP', 'kb_procedure',
'{"docx":"heading","md":"heading","pdf":"section"}'::jsonb,
'SOP, , '),
('report', '', 'kb_report',
'{"pdf":"section+table","docx":"heading"}'::jsonb,
'// , , '),
('vendor_doc', ' ', 'kb_vendor_doc',
'{"pdf":"section+table","docx":"heading"}'::jsonb,
', , ')
ON CONFLICT (collection_key) DO NOTHING
""");
}
// ── 운전판정 교차검증(Corroboration) — 펌프 RUN을 유량(kg/hr)·진공압(torr)으로 검증 ──
// 펌프 enum(RUN)만으론 deadhead·센서오류·frozen 데이터 등 '허위 운전'을 못 거른다.
// 신선도 게이트: realtime_table.timestamp가 NOW()-120s 이내일 때만 값 신뢰(아니면 STALE=판정보류)
// 매핑: 토폴로지(FT.from_tag=펌프 → 같은 번호 FICQ 컨트롤러, 1:N) + 수동 예외 테이블
// (pid_equipment·from_tag/category 컬럼이 모두 보장된 이 지점 이후에 생성)
// 수동 매핑 — 토폴로지로 못 잡는 예외 (펌프-FT 사이 중간설비, VP↔진공압). 운전원 편집 가능.
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS pump_corroboration_manual (
id BIGSERIAL PRIMARY KEY,
pump_base_tag TEXT NOT NULL,
signal_tag TEXT NOT NULL,
signal_kind TEXT NOT NULL CHECK (signal_kind IN ('flow','vacuum')),
unit TEXT,
note TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (pump_base_tag, signal_tag)
)
""");
// 수동 매핑 시드 (P6 알려진 예외). ON CONFLICT DO NOTHING → 부팅 시 운전원 수정 보존.
await _ctx.Database.ExecuteSqlRawAsync("""
INSERT INTO pump_corroboration_manual (pump_base_tag, signal_tag, signal_kind, unit, note) VALUES
('p-6102', 'ficq-6101.pv', 'flow', 'kg/hr', ' (P-6102FT-6101), 2-hop'),
('vp-6117', 'pica-6111.pv', 'vacuum', 'torr', 'C-6111 (PT-6111 PV)'),
('vp-6217', 'pica-6211.pv', 'vacuum', 'torr', 'C-6211 ')
ON CONFLICT (pump_base_tag, signal_tag) DO NOTHING
""");
// 펌프→신호 매핑 뷰 (토폴로지 유량 1:N + 수동)
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_pump_signal_map");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE VIEW v_pump_signal_map AS
SELECT lower(ft.from_tag) AS pump_base_tag,
'ficq-' || split_part(ft.tag_no, '-', 2) || '.pv' AS signal_tag,
'flow'::text AS signal_kind,
'kg/hr'::text AS unit,
'topology'::text AS mapping_source
FROM pid_equipment ft
WHERE ft.category = 'instrument'
AND ft.tag_no LIKE 'FT-%'
AND lower(ft.from_tag) LIKE 'p-%'
AND ft.from_tag NOT LIKE '%,%'
UNION ALL
SELECT pump_base_tag, signal_tag, signal_kind, unit, 'manual'
FROM pump_corroboration_manual
""");
// 계기 단위/레인지 — tag_metadata(EAV) 피벗·타입캐스트 (별도 테이블 없이 재사용).
// corroboration FS 5% 임계 + 향후 SP/제어 계산용. 단위/레인지는 메타갱신 시 자동 적재됨.
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_instrument_range");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE VIEW v_instrument_range AS
SELECT base_tag,
max(value) FILTER (WHERE attribute='units') AS unit,
max(CASE WHEN attribute='eulo' AND value ~ '^-?[0-9]+(\.[0-9]+)?$'
THEN value::double precision END) AS eu_lo,
max(CASE WHEN attribute='euhi' AND value ~ '^-?[0-9]+(\.[0-9]+)?$'
THEN value::double precision END) AS eu_hi
FROM tag_metadata
WHERE attribute IN ('units','eulo','euhi')
GROUP BY base_tag
""");
// ── 트렌드 워크스페이스 ────────────────────────────────────────────────
// trend_group: 멤버는 JSONB([{tag,color,axis}]). 운전원 그룹 영속(공유).
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS trend_group (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
members JSONB NOT NULL DEFAULT '[]',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""");
// v_analog_points: 숫자 livevalue = 아날로그 (enum/디지털 '{n | LABEL | }' 자동 제외).
// v_instrument_range(단위/레인지) + v_tag_summary(설명/구역) 조인으로 그룹빌더 표 정보 제공.
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_analog_points");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE VIEW v_analog_points AS
SELECT rt.tagname,
split_part(rt.tagname, '.', 1) AS base_tag,
rt.livevalue::double precision AS value,
rt.timestamp,
ir.unit,
ir.eu_lo,
ir.eu_hi,
ts.description,
ts.area
FROM realtime_table rt
LEFT JOIN v_instrument_range ir ON ir.base_tag = split_part(rt.tagname, '.', 1)
LEFT JOIN v_tag_summary ts ON ts.base_tag = split_part(rt.tagname, '.', 1)
WHERE rt.livevalue ~ '^-?[0-9]+(\.[0-9]+)?$'
ORDER BY rt.tagname
""");
// 펌프별 교차검증 상세 — 신선도 게이트(120s) + STALE + 유량(FS 5%, fallback 0.5 kg/hr)·진공(300 torr)
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_plant_running_state_corroborated");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE VIEW v_plant_running_state_corroborated AS
WITH pump_base AS (
SELECT trim(split_part(area, '|', 2)) AS area_code,
area AS area_raw, base_tag, pv, description, sub_area
FROM v_tag_summary
WHERE area IS NOT NULL
AND (base_tag LIKE 'p-%' OR base_tag LIKE 'vp-%')
AND pv ~ '\|\s*(L-RUN|R-RUN|L-STOP|R-STOP|L-TRIP|R-TRIP)\s*\|'
),
sig_eval AS (
SELECT m.pump_base_tag, m.signal_kind,
(rt.tagname IS NOT NULL) AS has_row,
(rt.tagname IS NOT NULL
AND (NOW() - rt.timestamp) < interval '120 seconds') AS fresh,
CASE WHEN rt.livevalue ~ '^-?[0-9]+(\.[0-9]+)?$'
THEN rt.livevalue::double precision END AS val,
-- '': + deadhead floor . = FS 5% [1~5 kg/hr]
-- ( : FS 2000 ~11 raw 5%FS=100 SUSPICIOUS로 )
(m.signal_kind='flow'
AND rt.tagname IS NOT NULL
AND (NOW() - rt.timestamp) < interval '120 seconds'
AND rt.livevalue ~ '^-?[0-9]+(\.[0-9]+)?$'
AND rt.livevalue::double precision > GREATEST(1.0, LEAST(COALESCE(ir.eu_hi, 100) * 0.05, 5.0))) AS flow_ok,
-- '': + (torr; , 300 fallback)
(m.signal_kind='vacuum'
AND rt.tagname IS NOT NULL
AND (NOW() - rt.timestamp) < interval '120 seconds'
AND rt.livevalue ~ '^-?[0-9]+(\.[0-9]+)?$'
AND rt.livevalue::double precision < 300) AS vac_ok
FROM v_pump_signal_map m
LEFT JOIN realtime_table rt ON rt.tagname = m.signal_tag
LEFT JOIN v_instrument_range ir ON ir.base_tag = split_part(m.signal_tag, '.', 1)
),
pump_sig AS (
SELECT pump_base_tag,
count(*) FILTER (WHERE signal_kind='flow') AS flow_mapped,
count(*) FILTER (WHERE signal_kind='flow' AND has_row) AS flow_have,
count(*) FILTER (WHERE signal_kind='flow' AND fresh) AS flow_fresh,
bool_or(flow_ok) AS any_flow_ok,
max(val) FILTER (WHERE signal_kind='flow' AND fresh) AS flow_max,
count(*) FILTER (WHERE signal_kind='vacuum') AS vac_mapped,
count(*) FILTER (WHERE signal_kind='vacuum' AND has_row) AS vac_have,
count(*) FILTER (WHERE signal_kind='vacuum' AND fresh) AS vac_fresh,
bool_or(vac_ok) AS any_vac_ok,
min(val) FILTER (WHERE signal_kind='vacuum' AND fresh) AS vac_min
FROM sig_eval GROUP BY pump_base_tag
)
SELECT
b.area_code, b.area_raw, b.base_tag, b.pv AS raw_pv, b.description, b.sub_area,
ps.flow_max AS flow_kg_hr, ps.vac_min AS vacuum_torr,
CASE
WHEN b.pv ~ '\|\s*[LR]-TRIP\s*\|' THEN 'TRIPPED'
WHEN b.pv ~ '\|\s*(L-STOP|R-STOP)\s*\|' THEN 'STOPPED'
WHEN b.base_tag LIKE 'vp-%' THEN
CASE
WHEN COALESCE(ps.vac_mapped,0)=0 OR COALESCE(ps.vac_have,0)=0 THEN 'INDETERMINATE_RUNNING'
WHEN ps.vac_fresh=0 THEN 'STALE'
WHEN ps.any_vac_ok THEN 'CONFIRMED_RUNNING'
ELSE 'SUSPICIOUS_RUNNING'
END
ELSE
CASE
WHEN COALESCE(ps.flow_mapped,0)=0 OR COALESCE(ps.flow_have,0)=0 THEN 'INDETERMINATE_RUNNING'
WHEN ps.flow_fresh=0 THEN 'STALE'
WHEN ps.any_flow_ok THEN 'CONFIRMED_RUNNING'
ELSE 'SUSPICIOUS_RUNNING'
END
END AS corroborated_status
FROM pump_base b
LEFT JOIN pump_sig ps ON ps.pump_base_tag = b.base_tag
WHERE b.area_code IS NOT NULL AND b.area_code <> ''
""");
// area별 집계 — overall은 CONFIRMED 기준 RUNNING, suspicious/stale는 부가 카운트(전체 오염 금지)
await _ctx.Database.ExecuteSqlRawAsync("DROP VIEW IF EXISTS v_plant_running_state_agg");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE VIEW v_plant_running_state_agg AS
SELECT
area_code, MAX(area_raw) AS area_raw, COUNT(*) AS total_pumps,
COUNT(*) FILTER (WHERE corroborated_status='CONFIRMED_RUNNING') AS confirmed_running,
COUNT(*) FILTER (WHERE corroborated_status='SUSPICIOUS_RUNNING') AS suspicious_running,
COUNT(*) FILTER (WHERE corroborated_status='STALE') AS stale_running,
COUNT(*) FILTER (WHERE corroborated_status='INDETERMINATE_RUNNING') AS indeterminate_running,
COUNT(*) FILTER (WHERE corroborated_status='TRIPPED') AS tripped_pumps,
COUNT(*) FILTER (WHERE corroborated_status='STOPPED') AS stopped_pumps,
CASE
WHEN COUNT(*) FILTER (WHERE corroborated_status IN
('CONFIRMED_RUNNING','SUSPICIOUS_RUNNING','STALE','INDETERMINATE_RUNNING')) > 0 THEN 'RUNNING'
WHEN COUNT(*) FILTER (WHERE corroborated_status='TRIPPED') > 0 THEN 'TRIPPED'
ELSE 'STOPPED'
END AS status,
array_agg(base_tag) FILTER (WHERE corroborated_status='CONFIRMED_RUNNING') AS confirmed_tags,
array_agg(base_tag) FILTER (WHERE corroborated_status='SUSPICIOUS_RUNNING') AS suspicious_tags,
array_agg(base_tag) FILTER (WHERE corroborated_status='STALE') AS stale_tags,
array_agg(DISTINCT sub_area) FILTER (WHERE sub_area IS NOT NULL) AS sub_areas
FROM v_plant_running_state_corroborated
WHERE area_code IS NOT NULL AND area_code <> ''
GROUP BY area_code ORDER BY area_code
""");
// ── Feedforward advisory engine config tables ─────────────────────
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS ff_column_config (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT FALSE,
feed_tag TEXT NOT NULL,
pressure_tag TEXT,
level_tags TEXT,
scan_sec DOUBLE PRECISION NOT NULL DEFAULT 2,
feed_filter_tau_sec DOUBLE PRECISION NOT NULL DEFAULT 300,
feed_move_thr_per_min DOUBLE PRECISION NOT NULL DEFAULT 0,
press_filter_tau_sec DOUBLE PRECISION NOT NULL DEFAULT 60,
pressure_band DOUBLE PRECISION NOT NULL DEFAULT 1e9,
settle_sec DOUBLE PRECISION NOT NULL DEFAULT 0,
stale_sec DOUBLE PRECISION NOT NULL DEFAULT 120,
product_key TEXT NOT NULL DEFAULT 'P',
advisory_only BOOLEAN NOT NULL DEFAULT TRUE
);
""");
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS ff_stream_config (
id SERIAL PRIMARY KEY,
column_id INTEGER NOT NULL REFERENCES ff_column_config(id) ON DELETE CASCADE,
key TEXT NOT NULL,
flow_tag TEXT NOT NULL,
role TEXT NOT NULL,
target_coeff DOUBLE PRECISION NOT NULL DEFAULT 0,
theta_up_sec DOUBLE PRECISION NOT NULL DEFAULT 0,
theta_dn_sec DOUBLE PRECISION NOT NULL DEFAULT 0,
tau_sec DOUBLE PRECISION NOT NULL DEFAULT 0,
sp_min DOUBLE PRECISION NOT NULL DEFAULT 0,
sp_max DOUBLE PRECISION NOT NULL DEFAULT 1e9,
rate_up_per_min DOUBLE PRECISION NOT NULL DEFAULT 1e9,
rate_dn_per_min DOUBLE PRECISION NOT NULL DEFAULT 1e9,
reflux_from_product BOOLEAN NOT NULL DEFAULT FALSE,
grade TEXT NOT NULL DEFAULT 'A',
level_tag TEXT
);
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS level_tag TEXT;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS is_reflux BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS recovery_sp DOUBLE PRECISION;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS sp_node_id TEXT;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS temp_tags TEXT;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS sensitive_tray_tag TEXT;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS dtdp DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS p_ref DOUBLE PRECISION;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS steam_op_tag TEXT;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS theta_auto_tune BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS bias_ma_window_sec DOUBLE PRECISION NOT NULL DEFAULT 21600;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS recovery_enabled BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS recovery_auto_arm BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS imbalance_trigger_frac DOUBLE PRECISION NOT NULL DEFAULT 0.10;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS imbalance_trigger_sec DOUBLE PRECISION NOT NULL DEFAULT 600;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS recovery_settle_sec DOUBLE PRECISION NOT NULL DEFAULT 1800;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS return_ramp_sec DOUBLE PRECISION NOT NULL DEFAULT 600;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS feed_recovery_sp DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS delta_p_tag TEXT;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS delta_p_flood_limit DOUBLE PRECISION NOT NULL DEFAULT 1e9;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS temp_high_limit DOUBLE PRECISION NOT NULL DEFAULT 1e9;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS temp_low_limit DOUBLE PRECISION NOT NULL DEFAULT -1e9;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS tc_return_reb_target DOUBLE PRECISION;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS tc_return_reb_band DOUBLE PRECISION NOT NULL DEFAULT 0.5;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS tc_return_delta_ad_ref DOUBLE PRECISION;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS tc_return_delta_ad_band DOUBLE PRECISION NOT NULL DEFAULT 0.4;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS tc_return_tc_target DOUBLE PRECISION;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS tc_return_tc_band DOUBLE PRECISION NOT NULL DEFAULT 1.0;
-- : SP + FEED
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS controller_id TEXT NOT NULL DEFAULT 'C1';
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS feed_sp_node_id TEXT;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS feed_sp_min DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS feed_sp_max DOUBLE PRECISION NOT NULL DEFAULT 1e9;
-- migration: missing columns from the original CREATE TABLE (schema was json-based)
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS feed_tag TEXT NOT NULL DEFAULT '';
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS pressure_tag TEXT;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS level_tags TEXT;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS feed_filter_tau_sec DOUBLE PRECISION NOT NULL DEFAULT 300;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS feed_move_thr_per_min DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS press_filter_tau_sec DOUBLE PRECISION NOT NULL DEFAULT 60;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS pressure_band DOUBLE PRECISION NOT NULL DEFAULT 1e9;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS settle_sec DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS stale_sec DOUBLE PRECISION NOT NULL DEFAULT 120;
ALTER TABLE ff_column_config ADD COLUMN IF NOT EXISTS product_key TEXT NOT NULL DEFAULT 'P';
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema='hc900' AND table_name='ff_stream_config' AND column_name='stream_key')
AND NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema='hc900' AND table_name='ff_stream_config' AND column_name='key') THEN
ALTER TABLE hc900.ff_stream_config RENAME COLUMN stream_key TO key;
END IF;
END $$;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS flow_tag TEXT NOT NULL DEFAULT '';
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS role TEXT NOT NULL DEFAULT 'Monitor';
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS target_coeff DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS theta_up_sec DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS theta_dn_sec DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS tau_sec DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS sp_min DOUBLE PRECISION NOT NULL DEFAULT 0;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS sp_max DOUBLE PRECISION NOT NULL DEFAULT 1e9;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS rate_up_per_min DOUBLE PRECISION NOT NULL DEFAULT 1e9;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS rate_dn_per_min DOUBLE PRECISION NOT NULL DEFAULT 1e9;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS reflux_from_product BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE ff_stream_config ADD COLUMN IF NOT EXISTS grade TEXT NOT NULL DEFAULT 'A';
""");
// ── FF operator action audit log ────────────────────────────────
await _ctx.Database.ExecuteSqlRawAsync("""
CREATE TABLE IF NOT EXISTS ff_operator_action (
id BIGSERIAL PRIMARY KEY,
column_id INTEGER NOT NULL,
stream_key VARCHAR(50),
action_type VARCHAR(50) NOT NULL,
sp_value DOUBLE PRECISION,
node_id VARCHAR(255),
result VARCHAR(50) NOT NULL,
writeguard_reason TEXT,
operator_name VARCHAR(100),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_ff_op_action_column ON ff_operator_action(column_id);
CREATE INDEX IF NOT EXISTS idx_ff_op_action_created ON ff_operator_action(created_at DESC);
""");
_logger.LogInformation("[ExperionDb] 데이터베이스 초기화 완료 (TimeScaleDB 활성화)");
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "[ExperionDb] 초기화 실패");
return false;
}
}
/// <summary>
/// history 테이블을 TimeScaleDB 하이퍼테이블로 생성/마이그레이션
/// </summary>
/// <param name="hypertableName">생성할 하이퍼테이블 이름 (예: history_hypertable)</param>
/// <param name="tableName">기존 테이블 이름 (예: history_table, null이면 새 테이블 생성)</param>
/// <param name="timeColumn">시간 컬럼 이름</param>
/// <param name="interval">청크 간격 (예: '1 day')</param>
/// <returns>하이퍼테이블 생성 여부 (true: 생성됨/기존 있음, false: 실패/스킵)</returns>
private async Task<bool> CreateHistoryHypertableIfNotExistsAsync(
string hypertableName, string? tableName, string timeColumn, string interval)
{
try
{
// SQL injection 방지를 위해 식별자 검증
if (!IsValidSqlIdentifier(hypertableName))
{
_logger.LogWarning("[ExperionDb] 하이퍼테이블 이름 '{HypertableName}'이(가) 유효하지 않음", hypertableName);
return false;
}
// 1⃣ 하이퍼테이블이 이미 존재하면 스킵
await using var conn = new NpgsqlConnection(_ctx.Database.GetConnectionString());
await conn.OpenAsync();
await using var cmd1 = new NpgsqlCommand(
$"SELECT 1 FROM pg_catalog.pg_tables WHERE tablename = '{hypertableName.Replace("'", "''")}' LIMIT 1", conn);
var result = await cmd1.ExecuteScalarAsync();
if (result != null)
{
_logger.LogInformation("[ExperionDb] 하이퍼테이블 '{HypertableName}' 이미 존재함", hypertableName);
return true;
}
// 2⃣ 기존 테이블 이름이 제공되면 검증
if (tableName != null && !IsValidSqlIdentifier(tableName))
{
_logger.LogWarning("[ExperionDb] 테이블 이름 '{TableName}'이(가) 유효하지 않음", tableName);
return false;
}
// 3⃣ 기존 테이블이 존재하면 → 하이퍼테이블로 마이그레이션
if (tableName != null)
{
await using var cmd2 = new NpgsqlCommand(
"SELECT 1 FROM pg_catalog.pg_tables WHERE tablename = @tableName LIMIT 1", conn);
cmd2.Parameters.AddWithValue("@tableName", tableName.Replace("'", "''"));
result = await cmd2.ExecuteScalarAsync();
if (result != null)
{
// 데이터가 있는 경우 migrate_data => true 옵션 필요
await using var cmd3 = new NpgsqlCommand(
"SELECT create_hypertable(@tableName, @timeColumn, chunk_time_interval => INTERVAL @interval, create_default_indexes => true, migrate_data => true)", conn);
cmd3.Parameters.AddWithValue("@tableName", tableName);
cmd3.Parameters.AddWithValue("@timeColumn", timeColumn);
cmd3.Parameters.AddWithValue("@interval", interval);
await cmd3.ExecuteNonQueryAsync();
_logger.LogInformation("[ExperionDb] 테이블 '{TableName}'을(를) 하이퍼테이블로 변환 완료", tableName);
return true;
}
}
// 4⃣ 기존 테이블이 없으면 → 새 하이퍼테이블 테이블 생성
// TimeScaleDB 요구사항: 고유 인덱스를 위해서는 partitioning 컬럼이 primary key에 포함되어야 함
await using var cmd4 = new NpgsqlCommand(
@"
CREATE TABLE IF NOT EXISTS @hypertableName (
id SERIAL,
tagname TEXT NOT NULL,
node_id TEXT,
value TEXT,
livevalue TEXT,
@timeColumn TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id, @timeColumn)
)", conn);
cmd4.Parameters.AddWithValue("@hypertableName", hypertableName);
cmd4.Parameters.AddWithValue("@timeColumn", timeColumn);
await cmd4.ExecuteNonQueryAsync();
await using var cmd5 = new NpgsqlCommand(
"SELECT create_hypertable(@hypertableName, @timeColumn, chunk_time_interval => INTERVAL @interval, create_default_indexes => true)", conn);
cmd5.Parameters.AddWithValue("@hypertableName", hypertableName);
cmd5.Parameters.AddWithValue("@timeColumn", timeColumn);
cmd5.Parameters.AddWithValue("@interval", interval);
await cmd5.ExecuteNonQueryAsync();
_logger.LogInformation("[ExperionDb] 새 하이퍼테이블 '{HypertableName}' 생성 완료", hypertableName);
return true;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[ExperionDb] 하이퍼테이블 '{HypertableName}' 생성 실패 (기본 테이블 사용 계속)", hypertableName);
return false;
}
}
public async Task<int> ClearRecordsAsync()
{
throw new NotImplementedException("OPC UA method not applicable in HC900Crawler");
}
public async Task<int> BuildMasterFromRawAsync(bool truncate = false)
{
throw new NotImplementedException("OPC UA method not applicable in HC900Crawler");
}
public Task<int> GetTotalCountAsync()
=> Task.FromResult(0);
public async Task<IEnumerable<string>> GetNameListAsync()
{
throw new NotImplementedException("OPC UA method not applicable in HC900Crawler");
}
public async Task<NodeMapStats> GetMasterStatsAsync()
{
throw new NotImplementedException("OPC UA method not applicable in HC900Crawler");
}
// ── RealtimeTable ─────────────────────────────────────────────────────────
private static string ExtractTagName(string nodeId)
{
var idx = nodeId.LastIndexOf(':');
return idx >= 0 ? nodeId[(idx + 1)..] : nodeId;
}
public async Task<int> BuildRealtimeTableAsync(IEnumerable<PointBuilderGroupDto> groups)
{
throw new NotImplementedException("OPC UA method not applicable in HC900Crawler");
}
public async Task<PointBuilderPreviewResult> PreviewRealtimeBuildAsync(
IEnumerable<(string GroupKey, PointBuilderGroupDto Group)> groups)
{
throw new NotImplementedException("OPC UA method not applicable in HC900Crawler");
}
public async Task<int> ApplySelectedPointsAsync(IEnumerable<string> selectedNodeIds)
{
var nodeIds = selectedNodeIds.Where(n => !string.IsNullOrEmpty(n)).ToList();
if (nodeIds.Count == 0) return 0;
await _ctx.Database.ExecuteSqlRawAsync(
"TRUNCATE TABLE realtime_table RESTART IDENTITY");
var points = nodeIds.Select(nodeId => new RealtimePoint
{
TagName = ExtractTagName(nodeId),
NodeId = nodeId,
LiveValue = null,
Timestamp = DateTime.UtcNow
}).ToList();
await _ctx.RealtimePoints.AddRangeAsync(points);
await _ctx.SaveChangesAsync();
_logger.LogInformation("[ExperionDb] realtime_table 적용: {Count}건 (선택)", points.Count);
return points.Count;
}
public async Task<int> AppendPointsAsync(IEnumerable<string> nodeIds)
{
var nodeIdsList = nodeIds.Where(n => !string.IsNullOrEmpty(n)).ToList();
if (nodeIdsList.Count == 0) return 0;
var existingNodeIds = await _ctx.RealtimePoints
.Where(p => nodeIdsList.Contains(p.NodeId))
.Select(p => p.NodeId)
.ToListAsync();
var newPoints = nodeIdsList
.Where(n => !existingNodeIds.Contains(n))
.Select(nodeId => new RealtimePoint
{
TagName = ExtractTagName(nodeId),
NodeId = nodeId,
LiveValue = null,
Timestamp = DateTime.UtcNow
}).ToList();
if (newPoints.Count == 0) return 0;
await _ctx.RealtimePoints.AddRangeAsync(newPoints);
await _ctx.SaveChangesAsync();
_logger.LogInformation("[ExperionDb] realtime_table 추가: {Count}건 (중복 제외)", newPoints.Count);
return newPoints.Count;
}
public async Task<IEnumerable<RealtimePoint>> GetRealtimePointsAsync()
{
try
{
var points = await _ctx.RealtimePoints
.OrderBy(x => x.TagName)
.ToListAsync();
_logger.LogInformation("[Realtime] 포인트 조회 완료: {Count}건", points.Count);
return points;
}
catch (Exception ex)
{
_logger.LogError(ex, "[Realtime] 포인트 조회 실패");
return Enumerable.Empty<RealtimePoint>();
}
}
public async Task<RealtimePoint> AddRealtimePointAsync(string nodeId)
{
var existing = await _ctx.RealtimePoints.FirstOrDefaultAsync(x => x.NodeId == nodeId);
if (existing != null) return existing;
var point = new RealtimePoint
{
TagName = ExtractTagName(nodeId),
NodeId = nodeId,
LiveValue = null,
Timestamp = DateTime.UtcNow
};
_ctx.RealtimePoints.Add(point);
await _ctx.SaveChangesAsync();
_logger.LogInformation("[ExperionDb] 수동 추가: {NodeId}", nodeId);
return point;
}
public async Task<PointDeleteResult> DeleteRealtimePointAsync(int id, bool purgeHistory = false)
{
var point = await _ctx.RealtimePoints.FindAsync(id);
if (point == null) return new PointDeleteResult { Deleted = false };
var tagName = point.TagName; // 예: "FICQ-6101.pv"
var baseTag = tagName.Contains('.') ? tagName[..tagName.IndexOf('.')] : tagName;
_ctx.RealtimePoints.Remove(point);
await _ctx.SaveChangesAsync();
// (pid_equipment.experion_tag_id 는 FK ON DELETE SET NULL 로 자동 처리)
var result = new PointDeleteResult { Deleted = true, BaseTag = baseTag };
// 같은 base_tag 의 잔여 realtime 행이 0개면 → 고아 메타데이터(desc/area/sub_area) 정리.
// v_tag_summary 는 뷰라 자동 반영되므로 별도 처리 불필요.
var remaining = await _ctx.RealtimePoints
.CountAsync(p => p.TagName == baseTag || p.TagName.StartsWith(baseTag + "."));
if (remaining == 0)
{
var metaRemoved = await _ctx.Database.ExecuteSqlRawAsync(
"DELETE FROM tag_metadata WHERE base_tag = {0}", baseTag);
result.MetadataPurged = metaRemoved > 0;
}
// history_table 이력 삭제는 복구 불가 → 명시적 opt-in 일 때만.
if (purgeHistory)
{
result.HistoryRowsDeleted = await _ctx.Database.ExecuteSqlRawAsync(
"DELETE FROM history_table WHERE tagname = {0}", tagName);
}
return result;
}
public async Task<int> UpdateLiveValueAsync(string nodeId, string? value, DateTime timestamp)
{
return await _ctx.RealtimePoints
.Where(x => x.NodeId == nodeId)
.ExecuteUpdateAsync(s => s
.SetProperty(x => x.LiveValue, value)
.SetProperty(x => x.Timestamp, timestamp));
}
public async Task<int> BatchUpdateLiveValuesAsync(IEnumerable<LiveValueUpdate> updates)
{
var list = updates.ToList();
if (list.Count == 0) return 0;
// 단일 DbContext(단일 DB 커넥션)에서 순차 업데이트 — 커넥션 폭발 방지
int total = 0;
foreach (var u in list)
{
total += await _ctx.RealtimePoints
.Where(x => x.NodeId == u.NodeId)
.ExecuteUpdateAsync(s => s
.SetProperty(x => x.LiveValue, u.Value)
.SetProperty(x => x.Timestamp, u.Timestamp));
}
return total;
}
// ── HistoryTable ──────────────────────────────────────────────────────────
public async Task<int> SnapshotToHistoryAsync(string? controllerId = null, bool includeDigital = false)
{
var now = DateTime.UtcNow;
var query = _ctx.RealtimePoints.AsQueryable();
if (!string.IsNullOrEmpty(controllerId))
query = query.Where(p => p.ControllerId == controllerId);
var points = await query.ToListAsync();
if (points.Count == 0) return 0;
// archive_enabled = TRUE 인 (controller_id, tagname) 쌍만 아카이브.
// EF가 튜플 Contains를 SQL로 변환하지 못하므로 머티리얼라이즈 후 인메모리로 필터링한다
// (realtime_table은 수천 행 규모라 비용 무시 가능). 쌍이 비면(미설정/마이그레이션 전) 전체 아카이브 — 레거시 폴백.
var archivePairs = await GetArchiveEnabledTagPairsCachedAsync();
if (archivePairs.Count > 0)
points = points.Where(p => archivePairs.Contains((p.ControllerId, p.TagName))).ToList();
// 디지털 포인트 제외 — 동일하게 컨트롤러 단위 쌍 매칭.
if (!includeDigital)
{
var digitalPairs = await GetDigitalTagPairsCachedAsync();
if (digitalPairs.Count > 0)
points = points.Where(p => !digitalPairs.Contains((p.ControllerId, p.TagName))).ToList();
}
if (points.Count == 0) return 0;
var rows = points.Select(p => new HistoryRecord
{
TagName = p.TagName,
NodeId = p.NodeId,
Value = p.LiveValue,
RecordedAt = now,
ControllerId = p.ControllerId
}).ToList();
await _ctx.HistoryRecords.AddRangeAsync(rows);
var saved = await _ctx.SaveChangesAsync();
_logger.LogDebug("[ExperionDb] history 스냅샷: {Count}건 @ {Time:HH:mm:ss}", saved, now);
return saved;
}
// 이력 조회 드롭다운용 — history_table에 실제 존재하는 태그만 반환한다.
// (realtime_table을 추종하면 history에 없는 폴링 태그까지 노출됨. idx_hist_tag로 DISTINCT는 수 ms.)
public async Task<IEnumerable<string>> GetTagNamesAsync()
=> await _ctx.HistoryRecords.Select(x => x.TagName).Distinct().OrderBy(x => x).ToListAsync();
public async Task<HistoryQueryResult> QueryHistoryAsync(
IEnumerable<string> tagNames, DateTime? from, DateTime? to, int limit)
{
var tags = tagNames.Where(t => !string.IsNullOrEmpty(t)).ToList();
var q = _ctx.HistoryRecords.AsQueryable();
if (tags.Count > 0) q = q.Where(x => tags.Contains(x.TagName));
if (from.HasValue) q = q.Where(x => x.RecordedAt >= from.Value);
if (to.HasValue) q = q.Where(x => x.RecordedAt <= to.Value);
var rows = await q.OrderBy(x => x.RecordedAt)
.Take(Math.Min(limit, 5000))
.ToListAsync();
// recorded_at 기준으로 행을 묶어서 pivot 구성
var grouped = rows
.GroupBy(x => x.RecordedAt)
.Select(g => new HistoryRow(
g.Key,
g.GroupBy(r => r.TagName)
.ToDictionary(tg => tg.Key, tg => tg.Last().Value)
as IReadOnlyDictionary<string, string?>))
.ToList();
var usedTags = tags.Count > 0
? tags
: rows.Select(x => x.TagName).Distinct().OrderBy(x => x).ToList();
return new HistoryQueryResult(usedTags, grouped);
}
// ── History Interval Query ──────────────────────────────────────────────────
public async Task<HistoryIntervalQueryResult> QueryHistoryWithIntervalAsync(
HistoryIntervalQueryRequest request)
{
const int BaseIntervalSeconds = 60; // history_table 기본 저장 간격 (60초)
var tags = request.TagNames.Where(t => !string.IsNullOrEmpty(t)).ToList();
var limit = Math.Min(request.Limit, 5000);
// SQL 인젝션 방지를 위해 식별자 검증
if (!IsValidSqlIdentifier("history_table"))
{
throw new ArgumentException("Invalid table name");
}
// 간격 파싱 (예: "1 minute", "5 minutes", "1 hour", "10 seconds")
var intervalStr = ParseIntervalToPostgresInterval(request.Interval);
await _ctx.Database.GetDbConnection().OpenAsync();
try
{
// TimeScaleDB time_bucket 함수를 사용한 간격별 집계 쿼리
var sql = BuildHistoryIntervalQuerySql(tags, request.From, request.To, intervalStr, limit);
using var cmd = new NpgsqlCommand(sql, _ctx.Database.GetDbConnection() as NpgsqlConnection);
// 파라미터 바인딩 (Npgsql는 @paramName 형식 지원)
if (tags.Count > 0)
{
var tagParam = cmd.CreateParameter();
tagParam.ParameterName = "tagNames";
tagParam.Value = tags.ToArray();
cmd.Parameters.Add(tagParam);
}
if (request.From.HasValue)
{
var fromParam = cmd.CreateParameter();
fromParam.ParameterName = "fromTime";
fromParam.Value = request.From.Value.ToUniversalTime(); // 타임존 포맷 무관 UTC 정규화('+00:00'/Local 입력 9h 시프트 방지)
cmd.Parameters.Add(fromParam);
}
if (request.To.HasValue)
{
var toParam = cmd.CreateParameter();
toParam.ParameterName = "toTime";
toParam.Value = request.To.Value.ToUniversalTime(); // 타임존 포맷 무관 UTC 정규화
cmd.Parameters.Add(toParam);
}
using var reader = await cmd.ExecuteReaderAsync();
var rows = new List<HistoryIntervalRow>();
var allTagNames = new HashSet<string>();
while (await reader.ReadAsync())
{
var timeBucket = reader.GetDateTime(0);
var values = new Dictionary<string, string?>();
for (int i = 1; i < reader.FieldCount; i++)
{
var tagName = reader.GetName(i);
allTagNames.Add(tagName);
values[tagName] = reader.IsDBNull(i) ? null : reader.GetString(i);
}
rows.Add(new HistoryIntervalRow(timeBucket, values));
}
var usedTags = tags.Count > 0
? tags
: allTagNames.OrderBy(x => x).ToList();
return new HistoryIntervalQueryResult(
usedTags,
rows,
BaseIntervalSeconds,
request.Interval);
}
finally
{
await _ctx.Database.GetDbConnection().CloseAsync();
}
}
/// <summary>
/// 사용자 지정 간격으로 history 이력 조회용 SQL 생성
/// TimeScaleDB time_bucket 함수를 사용하여 간격별 집계 수행
/// </summary>
private string BuildHistoryIntervalQuerySql(
List<string> tags, DateTime? from, DateTime? to, string intervalStr, int limit)
{
var selectParts = new List<string>();
// date_trunc 단위를 intervalStr에서 추출 (예: "1 minute" → "minute")
var truncUnit = ParseIntervalToTruncUnit(intervalStr);
selectParts.Add($"date_trunc('{truncUnit}', recorded_at) AS time_bucket");
// 태그명별로 동적으로 PIVOT 컬럼 생성 (MAX + CASE)
foreach (var tag in tags)
{
var safeTag = tag.Replace("'", "''");
selectParts.Add(
$"MAX(CASE WHEN tagname = '{safeTag}' THEN value END) AS \"{tag}\"");
}
var sql = $"SELECT {string.Join(", ", selectParts)} FROM history_table WHERE 1=1";
if (tags.Count > 0)
{
sql += $" AND tagname = ANY(ARRAY[{string.Join(", ", tags.Select(t => $"'{t.Replace("'", "''")}'"))}])";
}
if (from.HasValue)
{
sql += $" AND recorded_at >= @fromTime";
}
if (to.HasValue)
{
sql += $" AND recorded_at <= @toTime";
}
sql += $" GROUP BY time_bucket ORDER BY time_bucket LIMIT {limit}";
return sql;
}
/// <summary>
/// intervalStr에서 date_trunc 단위 추출 (예: "1 minute" → "minute", "5 minutes" → "minute")
/// </summary>
private static string ParseIntervalToTruncUnit(string interval)
{
if (string.IsNullOrWhiteSpace(interval)) return "minute";
var m = System.Text.RegularExpressions.Regex.Match(interval.ToLower().Trim(), @"^\d+\s+(second|minute|hour|day|week)");
return m.Success ? m.Groups[1].Value : "minute";
}
/// <summary>
/// 사용자 입력 간격을 PostgreSQL INTERVAL 형식으로 변환
/// 예: "1 minute" → "1 minute", "5 minutes" → "5 minutes", "1 hour" → "1 hour"
/// </summary>
private string ParseIntervalToPostgresInterval(string interval)
{
if (string.IsNullOrWhiteSpace(interval))
return "1 minute";
var lower = interval.ToLower().Trim();
// 이미 PostgreSQL 형식인 경우 (예: "1 minute", "5 minutes", "1 hour")
if (System.Text.RegularExpressions.Regex.IsMatch(lower, @"^\d+\s+(second|minute|hour|day|week)s?$"))
{
return lower;
}
// 숫자만 있는 경우 (초 단위)
if (int.TryParse(lower, out var seconds))
{
if (seconds >= 3600 && seconds % 3600 == 0)
return $"{seconds / 3600} hour{(seconds / 3600 > 1 ? "s" : "")}";
if (seconds >= 60 && seconds % 60 == 0)
return $"{seconds / 60} minute{(seconds / 60 > 1 ? "s" : "")}";
return $"{seconds} second{(seconds > 1 ? "s" : "")}";
}
// 기본값
return "1 minute";
}
public async Task<NodeMapQueryResult> QueryMasterAsync(
int? minLevel, int? maxLevel, string? nodeClass,
IEnumerable<string>? names, string? nodeId, string? dataType,
int limit, int offset)
{
throw new NotImplementedException("OPC UA method not applicable in HC900Crawler");
}
/// <summary>realtime_table × node_map_master 조인 → nodeId → dataType 사전 반환</summary>
public async Task<IReadOnlyDictionary<string, string>> GetRealtimeNodeDataTypesAsync()
{
throw new NotImplementedException("OPC UA method not applicable in HC900Crawler");
}
public async Task<string?> GetControllerIdForTagAsync(string tagName)
{
if (string.IsNullOrWhiteSpace(tagName)) return null;
// 베이스 태그(점 앞)로 매칭 — 같은 루프의 어떤 파라미터든 같은 컨트롤러
var baseLower = tagName.Split('.')[0].ToLowerInvariant();
var prefix = baseLower + ".";
try
{
return await _ctx.RealtimePoints
.Where(x => x.TagName.ToLower() == baseLower || x.TagName.ToLower().StartsWith(prefix))
.Select(x => x.ControllerId)
.FirstOrDefaultAsync();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[Realtime] 태그 {Tag} 컨트롤러 해석 실패", tagName);
return null;
}
}
public async Task<IEnumerable<RealtimePoint>> GetRealtimeRecordsByTagNamesAsync(IEnumerable<string> tagNames)
{
try
{
var tags = tagNames.ToList();
if (tags.Count == 0) return Enumerable.Empty<RealtimePoint>();
var lowerTags = tags.Select(t => t.ToLowerInvariant()).ToList();
var records = await _ctx.RealtimePoints
.Where(x => lowerTags.Contains(x.TagName.ToLower()))
.ToListAsync();
_logger.LogDebug("[Realtime] 태그 {Count}개의 라이브 데이터 조회 완료", tags.Count);
return records;
}
catch (Exception ex)
{
_logger.LogError(ex, "[Realtime] 태그 라이브 데이터 조회 실패");
return Enumerable.Empty<RealtimePoint>();
}
}
// ── FastSession / FastRecord ─────────────────────────────────────────────────
public async Task<FastSession> CreateFastSessionAsync(FastSessionCreateRequest request)
{
var session = new FastSession
{
Name = request.Name,
SamplingMs = request.SamplingMs,
DurationSec = request.DurationSec,
TagList = JsonSerializer.Serialize(request.TagList), // string[] → JSONB
StartedAt = DateTime.UtcNow,
Status = "Running",
RowCount = 0,
RetentionDays = request.RetentionDays,
Pinned = false
};
_ctx.FastSessions.Add(session);
await _ctx.SaveChangesAsync();
return session;
}
public async Task UpdateFastSessionStatusAsync(int sessionId, string status)
{
var session = await _ctx.FastSessions.FindAsync(sessionId);
if (session == null) return;
session.Status = status;
if (status is "Completed" or "Cancelled" or "Failed" or "RowLimitReached")
session.EndedAt = DateTime.UtcNow;
await _ctx.SaveChangesAsync();
}
public async Task UpdateFastSessionRowCountAsync(int sessionId, int rowCount)
{
var session = await _ctx.FastSessions.FindAsync(sessionId);
if (session == null) return;
session.RowCount = rowCount;
await _ctx.SaveChangesAsync();
}
public async Task UpdateFastSessionPinnedAsync(int sessionId, bool pinned)
{
var session = await _ctx.FastSessions.FindAsync(sessionId);
if (session == null) return;
session.Pinned = pinned;
await _ctx.SaveChangesAsync();
}
public async Task<FastSession?> GetFastSessionAsync(int sessionId)
=> await _ctx.FastSessions.FindAsync(sessionId);
public async Task<IEnumerable<FastSession>> GetFastSessionsAsync()
=> await _ctx.FastSessions.OrderByDescending(x => x.StartedAt).ToListAsync();
public async Task DeleteFastSessionAsync(int sessionId)
{
var session = await _ctx.FastSessions.FindAsync(sessionId);
if (session == null) return;
_ctx.FastSessions.Remove(session);
await _ctx.SaveChangesAsync();
}
public async Task<IEnumerable<FastSession>> GetExpiredFastSessionsAsync()
{
var now = DateTime.UtcNow;
return await _ctx.FastSessions
.Where(x => x.EndedAt != null
&& !x.Pinned
&& x.RetentionDays.HasValue
&& x.EndedAt.Value.AddDays(x.RetentionDays.Value) < now)
.OrderBy(x => x.EndedAt)
.ToListAsync();
}
public async Task<FastQueryResult> GetFastRecordsAsync(int sessionId, DateTime? from, DateTime? to)
{
var query = _ctx.FastRecords.Where(x => x.SessionId == sessionId);
if (from.HasValue) query = query.Where(x => x.RecordedAt >= from.Value);
if (to.HasValue) query = query.Where(x => x.RecordedAt <= to.Value);
var records = await query.OrderBy(x => x.RecordedAt).ToListAsync();
var tagNames = records.Select(x => x.TagName).Distinct().ToArray();
var items = records.Select(r => new FastRecord
{
Id = r.Id,
SessionId = r.SessionId,
RecordedAt = r.RecordedAt,
TagName = r.TagName,
Value = r.Value
});
return new FastQueryResult(
SessionId: sessionId,
From: from ?? records.MinBy(x => x.RecordedAt)?.RecordedAt ?? DateTime.UtcNow,
To: to ?? records.MaxBy(x => x.RecordedAt)?.RecordedAt ?? DateTime.UtcNow,
TagNames: tagNames,
Items: items,
TotalCount: records.Count
);
}
public async Task BatchInsertFastRecordsAsync(IEnumerable<FastRecord> records)
{
await _ctx.FastRecords.AddRangeAsync(records);
await _ctx.SaveChangesAsync();
}
public async Task ExportFastRecordsToCsvAsync(int sessionId, Stream stream, DateTime? from, DateTime? to)
{
var query = _ctx.FastRecords.Where(x => x.SessionId == sessionId);
if (from.HasValue) query = query.Where(x => x.RecordedAt >= from.Value);
if (to.HasValue) query = query.Where(x => x.RecordedAt <= to.Value);
var records = await query.OrderBy(x => x.RecordedAt).ThenBy(x => x.TagName).ToListAsync();
var tagNames = records.Select(x => x.TagName).Distinct().OrderBy(x => x).ToArray();
using var writer = new StreamWriter(stream, leaveOpen: true);
await writer.WriteLineAsync("recorded_at," + string.Join(",", tagNames));
foreach (var g in records.GroupBy(x => x.RecordedAt).OrderBy(g => g.Key))
{
var values = g.ToDictionary(r => r.TagName, r => r.Value);
var row = g.Key.ToString("o") + "," +
string.Join(",", tagNames.Select(t => values.TryGetValue(t, out var v) ? $"\"{v}\"" : ""));
await writer.WriteLineAsync(row);
}
await writer.FlushAsync();
}
public async Task<string?> GetNodeIdByTagNameAsync(string tagName)
{
return await _ctx.RealtimePoints
.Where(x => x.TagName == tagName)
.Select(x => x.NodeId)
.FirstOrDefaultAsync();
}
// ── Digital Event History ──────────────────────────────────────────────────
public async Task<IEnumerable<string>> GetDigitalTagNamesAsync()
{
// 디지털/상태 포인트 = tag_metadata에 state 라벨(state0~7)을 가진 base_tag.
// 값 N → state{N} 라벨(L-STOP/L-RUN/R-TRIP 등). 구 'i=7594' OPC UA 마커는
// 새 register-map 로더가 더 이상 생성하지 않으므로 state 라벨 보유를 기준으로 한다.
var fromMetadata = await _ctx.TagMetadata
.Where(m => m.Attribute.StartsWith("state") && m.Value != null && m.Value != "")
.Select(m => m.BaseTag)
.Distinct()
.ToListAsync();
if (fromMetadata.Any())
// 롤아웃 후 realtime 태그명은 {base}.PV 이므로 .PV 를 붙여 반환한다(메모리에서 —
// SQL 번역 불필요). 이러면 호출부는 단순 Contains(p.TagName) 만으로 매칭 가능.
return fromMetadata.Select(b => b + ".PV").ToList();
// 폴백: realtime livevalue가 '{N | label | }' (FormatValue가 만든 디지털 포맷)
return await _ctx.RealtimePoints
.Where(p => p.LiveValue != null && p.LiveValue.StartsWith("{"))
.Select(p => p.TagName)
.Distinct()
.ToListAsync();
}
public async Task<IEnumerable<RealtimePoint>> GetDigitalPointsAsync()
{
var digitalTagNames = await GetDigitalTagNamesAsync();
var tagSet = new HashSet<string>(digitalTagNames);
if (tagSet.Count == 0)
return Enumerable.Empty<RealtimePoint>();
// digitalTagNames 는 이미 .PV 명이므로 단순 Contains — substring/LastIndexOf 불필요
// (LastIndexOf 는 Npgsql 이 SQL 로 번역하지 못해 런타임 에러를 일으킴).
return await _ctx.RealtimePoints
.Where(p => tagSet.Contains(p.TagName))
.ToListAsync();
}
public async Task<string?> GetAreaByTagNameAsync(string tagName)
{
var baseTag = tagName.Contains('.') ? tagName[..tagName.LastIndexOf('.')] : tagName;
var meta = await _ctx.TagMetadata
.Where(m => m.BaseTag == baseTag && m.Attribute == "area")
.Select(m => m.Value)
.FirstOrDefaultAsync();
if (string.IsNullOrEmpty(meta)) return null;
var match = System.Text.RegularExpressions.Regex.Match(meta, @"{\s*\d+\s*\|\s*(\w+)\s*\|");
return match.Success ? match.Groups[1].Value : null;
}
// ── Sub-Area (세부 Area) ────────────────────────────────────────────────────
// tag_metadata attribute='sub_area' (EAV). 값 형식: 단일 "P6-1" 또는 공용 "P6-1,P6-2".
// 공용 매칭은 어디서나 `<code> = ANY(string_to_array(value, ','))` 토큰 매칭을 사용한다.
public async Task<string?> GetSubAreaByTagNameAsync(string tagName)
{
var baseTag = tagName.Contains('.') ? tagName[..tagName.LastIndexOf('.')] : tagName;
return await _ctx.TagMetadata
.Where(m => m.BaseTag == baseTag && m.Attribute == "sub_area")
.Select(m => m.Value)
.FirstOrDefaultAsync();
}
public async Task<(List<SubAreaTagDto> tags, int total)> GetSubAreaListByAreaAsync(
string area, int page, int pageSize)
{
page = Math.Max(1, page);
pageSize = Math.Clamp(pageSize, 1, 500);
var isSubArea = area.Contains('-'); // "P6-1" 형식이면 sub_area 코드
// v_tag_summary는 base_tag별 area/sub_area를 이미 LEFT JOIN으로 노출한다.
// sub_area 코드면 토큰 매칭(공용 포함), area 코드면 area 정확 매칭.
var where = isSubArea
? "@area = ANY(string_to_array(sub_area, ','))"
: "trim(split_part(area, '|', 2)) = @area";
var listSql = $@"SELECT base_tag, trim(split_part(area, '|', 2)) AS area, sub_area, description
FROM v_tag_summary
WHERE {where}
ORDER BY base_tag
LIMIT @limit OFFSET @offset";
var countSql = $"SELECT COUNT(*) FROM v_tag_summary WHERE {where}";
var conn = (NpgsqlConnection)_ctx.Database.GetDbConnection();
var mustClose = conn.State != System.Data.ConnectionState.Open;
if (mustClose) await conn.OpenAsync();
try
{
int total;
await using (var countCmd = new NpgsqlCommand(countSql, conn))
{
countCmd.Parameters.AddWithValue("area", area);
total = Convert.ToInt32(await countCmd.ExecuteScalarAsync());
}
var tags = new List<SubAreaTagDto>();
await using (var cmd = new NpgsqlCommand(listSql, conn))
{
cmd.Parameters.AddWithValue("area", area);
cmd.Parameters.AddWithValue("limit", pageSize);
cmd.Parameters.AddWithValue("offset", (page - 1) * pageSize);
await using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
tags.Add(new SubAreaTagDto
{
BaseTag = reader.GetString(0),
Area = reader.IsDBNull(1) ? null : reader.GetString(1),
SubArea = reader.IsDBNull(2) ? null : reader.GetString(2),
Description = reader.IsDBNull(3) ? null : reader.GetString(3),
});
}
}
return (tags, total);
}
finally
{
if (mustClose) await conn.CloseAsync();
}
}
public async Task<bool> UpdateSubAreaAsync(string baseTag, string? subArea)
{
if (string.IsNullOrWhiteSpace(subArea))
{
var deleted = await _ctx.Database.ExecuteSqlRawAsync(
"DELETE FROM tag_metadata WHERE base_tag = {0} AND attribute = 'sub_area'", baseTag);
return deleted > 0;
}
var affected = await _ctx.Database.ExecuteSqlRawAsync(
@"INSERT INTO tag_metadata (base_tag, attribute, value)
VALUES ({0}, 'sub_area', {1})
ON CONFLICT (base_tag, attribute) DO UPDATE SET value = EXCLUDED.value, loaded_at = NOW()",
baseTag, subArea.Trim());
return affected > 0;
}
// area-scoped 번호 prefix 규칙 + 공용 검출을 단일 CASE로 분류.
// 공용(여러 sub_area 공유)은 두 코드를 콤마로 부여하여 어느 sub_area 필터에서도 노출되게 한다.
private const string SubAreaClassifySql = @"
WITH base AS (
SELECT DISTINCT
base_tag,
regexp_replace(value, '.*\|\s*(\w+)\s*\|.*', '\1') AS area,
regexp_replace(base_tag, '[^0-9]', '', 'g') AS digits
FROM tag_metadata
WHERE attribute = 'area'
-- 가드: realtime_table에 실재하는 base_tag만 (삭제된 포인트의 고아 메타데이터 제외)
AND EXISTS (SELECT 1 FROM realtime_table r WHERE split_part(r.tagname, '.', 1) = tag_metadata.base_tag)
),
shared_role AS (
SELECT DISTINCT LOWER(tag_no) AS base_tag
FROM pid_equipment
WHERE role ILIKE '%공용%' OR role ILIKE '%공통%'
),
classified AS (
SELECT b.base_tag, b.area,
CASE
-- 1) 공용(여러 sub_area 공유) → 두 코드 모두 부여
WHEN b.base_tag IN (SELECT base_tag FROM shared_role)
OR (b.area = 'P6' AND b.digits LIKE '6201%')
OR (b.area = 'P2' AND (b.digits LIKE '2127%' OR b.digits LIKE '2128%' OR b.digits LIKE '2129%'))
THEN CASE b.area
WHEN 'P6' THEN 'P6-1,P6-2'
WHEN 'P9' THEN 'P9-1,P9-2'
WHEN 'P10' THEN 'P10-1,P10-2'
WHEN 'P1' THEN 'P1-1,P1-2'
WHEN 'P2' THEN 'P2-1,P2-2'
END
-- 2) area-scoped 번호 prefix (긴 prefix 먼저)
WHEN b.area = 'P10' AND b.digits LIKE '101%' THEN 'P10-1'
WHEN b.area = 'P10' AND b.digits LIKE '102%' THEN 'P10-2'
WHEN b.area = 'P2' AND b.digits LIKE '211%' THEN 'P2-1'
WHEN b.area = 'P2' AND b.digits LIKE '212%' THEN 'P2-2'
WHEN b.area = 'P2' AND b.digits LIKE '213%' THEN 'P2-2'
WHEN b.area = 'P6' AND b.digits LIKE '61%' THEN 'P6-1'
WHEN b.area = 'P6' AND b.digits LIKE '62%' THEN 'P6-2'
WHEN b.area = 'P9' AND b.digits LIKE '91%' THEN 'P9-1'
WHEN b.area = 'P9' AND b.digits LIKE '92%' THEN 'P9-2'
WHEN b.area = 'P1' AND b.digits LIKE '11%' THEN 'P1-1'
WHEN b.area = 'P1' AND b.digits LIKE '12%' THEN 'P1-2'
WHEN b.area = 'P1' AND b.digits LIKE '13%' THEN 'P1-2'
ELSE NULL
END AS sub_area
FROM base b
WHERE b.area IN ('P6','P9','P10','P1','P2')
)
SELECT base_tag, area, sub_area FROM classified ORDER BY area, base_tag";
public async Task<SubAreaSeedResultDto> SeedSubAreaAsync(bool dryRun)
{
var result = new SubAreaSeedResultDto { DryRun = dryRun };
var conn = (NpgsqlConnection)_ctx.Database.GetDbConnection();
var mustClose = conn.State != System.Data.ConnectionState.Open;
if (mustClose) await conn.OpenAsync();
try
{
await using (var cmd = new NpgsqlCommand(SubAreaClassifySql, conn))
await using (var reader = await cmd.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
var baseTag = reader.GetString(0);
var area = reader.GetString(1);
var subArea = reader.IsDBNull(2) ? null : reader.GetString(2);
string action;
if (subArea == null) { action = "unmatched"; result.Unmatched++; }
else if (subArea.Contains(',')) { action = "shared"; result.Shared++; }
else { action = "assign"; result.Assigned++; }
if (subArea != null)
result.BySubArea[subArea] = result.BySubArea.GetValueOrDefault(subArea) + 1;
result.Details.Add(new SubAreaSeedDetailDto
{
BaseTag = baseTag, Area = area, Action = action, SubArea = subArea,
Reason = action == "shared" ? "여러 sub_area 공용" : null,
});
}
}
if (!dryRun)
{
await _ctx.Database.ExecuteSqlRawAsync(@"
WITH base AS (
SELECT DISTINCT
base_tag,
regexp_replace(value, '.*\|\s*(\w+)\s*\|.*', '\1') AS area,
regexp_replace(base_tag, '[^0-9]', '', 'g') AS digits
FROM tag_metadata
WHERE attribute = 'area'
-- 가드: realtime_table에 실재하는 base_tag만 (삭제된 포인트의 고아 메타데이터 제외)
AND EXISTS (SELECT 1 FROM realtime_table r WHERE split_part(r.tagname, '.', 1) = tag_metadata.base_tag)
),
shared_role AS (
SELECT DISTINCT LOWER(tag_no) AS base_tag
FROM pid_equipment
WHERE role ILIKE '%공용%' OR role ILIKE '%공통%'
),
classified AS (
SELECT b.base_tag,
CASE
WHEN b.base_tag IN (SELECT base_tag FROM shared_role)
OR (b.area = 'P6' AND b.digits LIKE '6201%')
OR (b.area = 'P2' AND (b.digits LIKE '2127%' OR b.digits LIKE '2128%' OR b.digits LIKE '2129%'))
THEN CASE b.area
WHEN 'P6' THEN 'P6-1,P6-2' WHEN 'P9' THEN 'P9-1,P9-2'
WHEN 'P10' THEN 'P10-1,P10-2' WHEN 'P1' THEN 'P1-1,P1-2'
WHEN 'P2' THEN 'P2-1,P2-2' END
WHEN b.area = 'P10' AND b.digits LIKE '101%' THEN 'P10-1'
WHEN b.area = 'P10' AND b.digits LIKE '102%' THEN 'P10-2'
WHEN b.area = 'P2' AND b.digits LIKE '211%' THEN 'P2-1'
WHEN b.area = 'P2' AND b.digits LIKE '212%' THEN 'P2-2'
WHEN b.area = 'P2' AND b.digits LIKE '213%' THEN 'P2-2'
WHEN b.area = 'P6' AND b.digits LIKE '61%' THEN 'P6-1'
WHEN b.area = 'P6' AND b.digits LIKE '62%' THEN 'P6-2'
WHEN b.area = 'P9' AND b.digits LIKE '91%' THEN 'P9-1'
WHEN b.area = 'P9' AND b.digits LIKE '92%' THEN 'P9-2'
WHEN b.area = 'P1' AND b.digits LIKE '11%' THEN 'P1-1'
WHEN b.area = 'P1' AND b.digits LIKE '12%' THEN 'P1-2'
WHEN b.area = 'P1' AND b.digits LIKE '13%' THEN 'P1-2'
ELSE NULL
END AS sub_area
FROM base b
WHERE b.area IN ('P6','P9','P10','P1','P2')
)
INSERT INTO tag_metadata (base_tag, attribute, value)
SELECT base_tag, 'sub_area', sub_area FROM classified WHERE sub_area IS NOT NULL
ON CONFLICT (base_tag, attribute) DO NOTHING");
}
return result;
}
finally
{
if (mustClose) await conn.CloseAsync();
}
}
public async Task<int> RecordDigitalEventAsync(DigitalEventRecord record)
{
var row = new EventHistoryRecord
{
TagName = record.TagName,
NodeId = record.NodeId,
PrevValue = record.PrevValue,
CurrValue = record.CurrValue,
EventType = record.EventType,
EventTime = record.EventTime,
DurationSeconds = record.DurationSeconds,
Area = record.Area,
SubArea = record.SubArea,
Metadata = record.Metadata
};
await _ctx.EventHistoryRecords.AddAsync(row);
return await _ctx.SaveChangesAsync();
}
public async Task<int> BatchRecordDigitalEventsAsync(IEnumerable<DigitalEventRecord> records)
{
var rows = records.Select(r => new EventHistoryRecord
{
ControllerId = r.ControllerId,
TagName = r.TagName,
NodeId = r.NodeId,
PrevValue = r.PrevValue,
CurrValue = r.CurrValue,
EventType = r.EventType,
EventTime = r.EventTime,
DurationSeconds = r.DurationSeconds,
Area = r.Area,
SubArea = r.SubArea,
Metadata = r.Metadata
}).ToList();
await _ctx.EventHistoryRecords.AddRangeAsync(rows);
return await _ctx.SaveChangesAsync();
}
private Dictionary<string, string>? _tagNameMap;
private async Task<Dictionary<string, string>> GetTagNameMapAsync()
{
if (_tagNameMap != null) return _tagNameMap;
var map = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
var entries = await _ctx.Hc900MapEntries
.Where(m => m.IsActive)
.Select(m => m.TagName)
.Distinct()
.ToListAsync();
foreach (var t in entries)
{
map[t] = t;
var baseName = t.EndsWith(".PV", StringComparison.OrdinalIgnoreCase)
? t[..^3] : t;
if (!map.ContainsKey(baseName))
map[baseName] = t;
}
_tagNameMap = map;
return map;
}
private static string MapTagName(string tagName, Dictionary<string, string> map)
{
if (map.TryGetValue(tagName, out var canonical))
return canonical;
if (tagName.EndsWith(".pv", StringComparison.OrdinalIgnoreCase)
&& map.TryGetValue(tagName[..^3], out canonical))
return canonical;
return tagName;
}
public async Task<IEnumerable<EventHistoryRow>> QueryEventHistoryAsync(
string? tagName, string? area, string? subArea,
string? eventType, DateTime from, DateTime to, int limit = 500)
{
var query = _ctx.EventHistoryRecords
.Where(r => r.EventTime >= from && r.EventTime <= to);
if (!string.IsNullOrEmpty(tagName))
query = query.Where(r => r.TagName == tagName);
if (!string.IsNullOrEmpty(area))
query = query.Where(r => r.Area == area);
if (!string.IsNullOrEmpty(subArea))
query = query.Where(r => r.SubArea == subArea
|| r.SubArea!.StartsWith(subArea + ",")
|| r.SubArea.EndsWith("," + subArea)
|| r.SubArea.Contains("," + subArea + ","));
if (!string.IsNullOrEmpty(eventType))
query = query.Where(r => r.EventType == eventType);
var records = await query
.OrderByDescending(r => r.EventTime)
.Take(limit)
.ToListAsync();
var map = await GetTagNameMapAsync();
return records.Select(r => new EventHistoryRow
{
Id = r.Id,
TagName = MapTagName(r.TagName, map),
NodeId = r.NodeId,
PrevValue = r.PrevValue,
CurrValue = r.CurrValue,
EventType = r.EventType,
EventTime = r.EventTime,
Area = r.Area,
SubArea = r.SubArea,
DurationSeconds = r.DurationSeconds,
Metadata = r.Metadata
});
}
// ── archive_enabled (controller_id, tagname) 쌍 캐시 ─────────────────────────
private async Task<HashSet<(string, string)>> GetArchiveEnabledTagPairsCachedAsync()
{
if ((DateTime.UtcNow - _archiveTagCacheTime).TotalSeconds < DigitalTagCacheTtlSeconds)
return _archiveTagPairCache;
Task refreshTask;
lock (_archiveCacheLock)
{
if (_archiveCacheRefreshTask == null)
_archiveCacheRefreshTask = RefreshArchiveTagCacheAsync();
refreshTask = _archiveCacheRefreshTask;
}
await refreshTask;
lock (_archiveCacheLock)
{
_archiveCacheRefreshTask = null;
}
return _archiveTagPairCache;
}
private async Task RefreshArchiveTagCacheAsync()
{
var rows = await _ctx.Hc900MapEntries
.Where(m => m.ArchiveEnabled && m.IsActive)
.Select(m => new { m.ControllerId, m.TagName })
.Distinct()
.ToListAsync();
_archiveTagPairCache = rows.Select(r => (r.ControllerId, r.TagName)).ToHashSet();
_archiveTagCacheTime = DateTime.UtcNow;
}
// ── 디지털 포인트 (controller_id, name) 쌍 캐시 ──────────────────────────────
private async Task<HashSet<(string, string)>> GetDigitalTagPairsCachedAsync()
{
if ((DateTime.UtcNow - _digitalTagPairCacheTime).TotalSeconds < DigitalTagCacheTtlSeconds)
return _digitalTagPairCache;
Task refreshTask;
lock (_digitalPairCacheLock)
{
if (_digitalPairCacheRefreshTask == null)
_digitalPairCacheRefreshTask = RefreshDigitalTagPairCacheAsync();
refreshTask = _digitalPairCacheRefreshTask;
}
await refreshTask;
lock (_digitalPairCacheLock)
{
_digitalPairCacheRefreshTask = null;
}
return _digitalTagPairCache;
}
private async Task RefreshDigitalTagPairCacheAsync()
{
_digitalTagPairCache = await GetDigitalTagPairsAsync();
_digitalTagPairCacheTime = DateTime.UtcNow;
}
/// <summary>디지털 태그를 (controller_id, name) 쌍으로 조회.
/// GetDigitalTagNamesAsync 와 동일한 소스(메타데이터 state 라벨 보유 base_tag,
/// 폴백: realtime live "{")에 controller_id 차원만 추가한 버전.</summary>
private async Task<HashSet<(string, string)>> GetDigitalTagPairsAsync()
{
var fromMetadata = await _ctx.TagMetadata
.Where(m => m.Attribute.StartsWith("state") && m.Value != null && m.Value != "")
.Select(m => new { m.ControllerId, m.BaseTag })
.Distinct()
.ToListAsync();
if (fromMetadata.Any())
return fromMetadata.Select(m => (m.ControllerId, m.BaseTag + ".PV")).ToHashSet();
var fromRealtime = await _ctx.RealtimePoints
.Where(p => p.LiveValue != null && p.LiveValue.StartsWith("{"))
.Select(p => new { p.ControllerId, p.TagName })
.Distinct()
.ToListAsync();
return fromRealtime.Select(p => (p.ControllerId, p.TagName)).ToHashSet();
}
/// <summary>
/// 하이퍼테이블 상태 조회합니다.
/// 하이퍼테이블인지 여부, 레코드 수, 보존 정책, 압축, 연속 집계 설정 등을 확인합니다.
/// </summary>
public async Task<HypertableStatusInfo> GetHypertableStatusAsync()
{
try
{
await _ctx.Database.GetDbConnection().OpenAsync();
// 하이퍼테이블 존재 여부 확인
await using var hypertableCheckCmd = _ctx.Database.GetDbConnection().CreateCommand();
hypertableCheckCmd.CommandText = @"
SELECT EXISTS (
SELECT 1 FROM timescaledb_information.hypertables
WHERE hypertable_name = 'history_table'
)";
var isHypertableResult = Convert.ToBoolean(await hypertableCheckCmd.ExecuteScalarAsync());
// 레코드 수 조회
var recordCount = 0;
if (isHypertableResult)
{
await using var countCmd = _ctx.Database.GetDbConnection().CreateCommand();
countCmd.CommandText = "SELECT COUNT(*) FROM history_table";
recordCount = Convert.ToInt32(await countCmd.ExecuteScalarAsync());
}
// 보존 정책 확인 (pg_extension으로 TimeScaleDB 활성화 여부만 확인)
var hasRetentionPolicy = false;
try
{
await using var retentionCmd = _ctx.Database.GetDbConnection().CreateCommand();
retentionCmd.CommandText = @"
SELECT EXISTS (
SELECT 1 FROM timescaledb_information.policies
WHERE policy_type = 'data_retention'
AND hypertable_name = 'history_table'
)";
hasRetentionPolicy = Convert.ToBoolean(await retentionCmd.ExecuteScalarAsync());
}
catch
{
// policies 뷰가 없는 경우 false 유지
hasRetentionPolicy = false;
}
// 압축 확인
var hasCompression = false;
try
{
await using var compressionCmd = _ctx.Database.GetDbConnection().CreateCommand();
compressionCmd.CommandText = @"
SELECT is_compressed = 't'
FROM timescaledb_information.hypertables
WHERE hypertable_name = 'history_table'";
var compressionResult = await compressionCmd.ExecuteScalarAsync();
if (compressionResult != null)
{
hasCompression = compressionResult.ToString() == "t";
}
}
catch
{
// 압축 정보 조회 실패 시 false 유지
hasCompression = false;
}
// 연속 집계 확인
var hasContinuousAggregate = false;
try
{
await using var aggregateCmd = _ctx.Database.GetDbConnection().CreateCommand();
aggregateCmd.CommandText = @"
SELECT EXISTS (
SELECT 1 FROM timescaledb_information.continuous_aggregates
WHERE hypertable_name = 'history_table'
)";
hasContinuousAggregate = Convert.ToBoolean(await aggregateCmd.ExecuteScalarAsync());
}
catch
{
// continuous_aggregates 뷰가 없는 경우 false 유지
hasContinuousAggregate = false;
}
return new HypertableStatusInfo
{
IsHypertable = isHypertableResult,
TableName = "history_table",
StatusMessage = isHypertableResult
? "하이퍼테이블이 활성화되어 있습니다."
: "일반 테이블입니다. CreateHypertableAsync()를 사용하여 하이퍼테이블로 변환할 수 있습니다.",
RecordCount = recordCount,
HasRetentionPolicy = hasRetentionPolicy,
HasCompression = hasCompression,
HasContinuousAggregate = hasContinuousAggregate
};
}
catch (Exception ex)
{
return new HypertableStatusInfo
{
IsHypertable = false,
TableName = "history_table",
StatusMessage = $"상태 확인 중 오류 발생: {ex.Message}",
RecordCount = 0,
HasRetentionPolicy = false,
HasCompression = false,
HasContinuousAggregate = false
};
}
}
/// <summary>
/// 수동으로 하이퍼테이블을 생성합니다.
/// 테이블이 이미 존재하거나 하이퍼테이블로 변환된 경우 예외를 throw합니다.
/// </summary>
public async Task<HypertableCreateResult> CreateHypertableAsync(HypertableCreateRequest request)
{
// 식별자 검증 - SQL injection 방지
if (!IsValidSqlIdentifier(request.TableName))
{
return HypertableCreateResult.Failed($"테이블 이름 '{request.TableName}'은 유효하지 않습니다. 영문, 숫자, 언더스코어, 하이픈, 마침표만 사용 가능합니다.");
}
if (!IsValidSqlIdentifier(request.TimeColumn))
{
return HypertableCreateResult.Failed($"시간 컬럼 이름 '{request.TimeColumn}'은 유효하지 않습니다. 영문, 숫자, 언더스코어, 하이픈, 마침표만 사용 가능합니다.");
}
var timeInterval = request.TimeInterval ?? "";
if (!IsValidSqlIdentifier(timeInterval.Replace(" ", "")))
{
return HypertableCreateResult.Failed($"시간 간격 '{request.TimeInterval ?? "(null)"}'은 유효하지 않습니다.");
}
bool connectionOpenedHere = false;
try
{
if (_ctx.Database.GetDbConnection().State != System.Data.ConnectionState.Open)
{
await _ctx.Database.OpenConnectionAsync();
connectionOpenedHere = true;
}
// 1. 테이블 존재 여부 확인
await using var tableCheckCmd = _ctx.Database.GetDbConnection().CreateCommand();
tableCheckCmd.CommandText = @"
SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = @tableName)";
tableCheckCmd.Parameters.Add(new NpgsqlParameter("@tableName", request.TableName));
var tableExists = Convert.ToBoolean(await tableCheckCmd.ExecuteScalarAsync());
if (!tableExists)
{
return HypertableCreateResult.Failed($"테이블 '{request.TableName}'이 존재하지 않습니다.");
}
// 2. 이미 하이퍼테이블인지 확인
await using var hypertableCheckCmd = _ctx.Database.GetDbConnection().CreateCommand();
hypertableCheckCmd.CommandText = @"
SELECT EXISTS (
SELECT 1 FROM timescaledb_information.hypertables
WHERE hypertable_name = @tableName)";
hypertableCheckCmd.Parameters.Add(new NpgsqlParameter("@tableName", request.TableName));
var isHypertable = Convert.ToBoolean(await hypertableCheckCmd.ExecuteScalarAsync());
if (isHypertable)
{
return HypertableCreateResult.AlreadyExists($"테이블 '{request.TableName}'은 이미 하이퍼테이블입니다.");
}
// 3. TimescaleDB 확장 활성화
await _ctx.Database.ExecuteSqlRawAsync("CREATE EXTENSION IF NOT EXISTS timescaledb");
// 3-1. 기존 SERIAL PRIMARY KEY 제약사항 제거 (TimeScaleDB 호환성)
var dropPrimaryKeySql = $"ALTER TABLE {request.TableName} DROP CONSTRAINT IF EXISTS {request.TableName}_pkey";
_logger.LogDebug("[ExperionDb] 기본키 제약 제거: {Sql}", dropPrimaryKeySql);
#pragma warning disable EF1002
await _ctx.Database.ExecuteSqlRawAsync(dropPrimaryKeySql);
#pragma warning restore EF1002
// 4. 하이퍼테이블 생성 (기존 데이터 마이그레이션 포함)
var createHypertableSql = $"SELECT create_hypertable('{request.TableName}'::regclass, '{request.TimeColumn}'::text, if_not_exists => TRUE, migrate_data => TRUE)";
_logger.LogDebug("[ExperionDb] 하이퍼테이블 생성: {Sql}", createHypertableSql);
#pragma warning disable EF1002
await _ctx.Database.ExecuteSqlRawAsync(createHypertableSql);
#pragma warning restore EF1002
// 4-1. TimeScaleDB 하이퍼테이블에 적합한 새로운 기본키 생성
var addPrimaryKeySql = $"ALTER TABLE {request.TableName} ADD CONSTRAINT {request.TableName}_pkey PRIMARY KEY ({request.TimeColumn}, id)";
_logger.LogDebug("[ExperionDb] 새 기본키 생성: {Sql}", addPrimaryKeySql);
#pragma warning disable EF1002
await _ctx.Database.ExecuteSqlRawAsync(addPrimaryKeySql);
#pragma warning restore EF1002
// 6. 보존 정책 설정 (요청된 경우)
if (request.SetRetentionPolicy && !string.IsNullOrEmpty(request.RetentionPeriod))
{
if (!IsValidSqlInterval(request.RetentionPeriod))
{
return HypertableCreateResult.Failed($"보존 기간 '{request.RetentionPeriod}'은 유효하지 않습니다. (예: '30 days', '12 hours')");
}
var retentionSql = $"SELECT add_retention_policy('{request.TableName}'::regclass, INTERVAL '{request.RetentionPeriod}')";
_logger.LogDebug("[ExperionDb] 보존 정책 적용: {Sql}", retentionSql);
#pragma warning disable EF1002
await _ctx.Database.ExecuteSqlRawAsync(retentionSql);
#pragma warning restore EF1002
}
// 7. 압축 정책 설정 (요청된 경우)
if (request.EnableCompression && !string.IsNullOrEmpty(request.CompressionPeriod))
{
if (!IsValidSqlInterval(request.CompressionPeriod))
{
return HypertableCreateResult.Failed($"압축 기간 '{request.CompressionPeriod}'은 유효하지 않습니다. (예: '7 days', '24 hours')");
}
var compressionSql = $"SELECT add_compression_policy('{request.TableName}'::regclass, INTERVAL '{request.CompressionPeriod}')";
_logger.LogDebug("[ExperionDb] 압축 정책 적용: {Sql}", compressionSql);
#pragma warning disable EF1002
await _ctx.Database.ExecuteSqlRawAsync(compressionSql);
#pragma warning restore EF1002
}
// 7-1. history_table 컬럼명 검증 (tag_name vs tagname)
if (request.TableName == "history_table")
{
await using var columnCheckCmd = _ctx.Database.GetDbConnection().CreateCommand();
columnCheckCmd.CommandText = @"
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'history_table'
AND column_name IN ('tag_name', 'tagname')
ORDER BY column_name;";
var columns = new List<string>();
await using var reader = await columnCheckCmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
columns.Add(reader.GetString(0));
}
_logger.LogDebug("[ExperionDb] history_table 컬럼명 검증: {Columns}", string.Join(", ", columns));
}
// 8. 연속 집계 생성 (요청된 경우)
if (request.CreateContinuousAggregate)
{
// 8-1. 기존 MATERIALIZED VIEW 삭제 (별도 실행)
var dropViewSql = "DROP MATERIALIZED VIEW IF EXISTS history_5min_agg";
_logger.LogDebug("[ExperionDb] 연속 집계 DROP VIEW: {Sql}", dropViewSql);
#pragma warning disable EF1002
await _ctx.Database.ExecuteSqlRawAsync(dropViewSql);
#pragma warning restore EF1002
// 8-2. 연속 집계 MATERIALIZED VIEW 생성 (별도 실행)
var createViewSql = $@"
CREATE MATERIALIZED VIEW history_5min_agg
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '5 minutes', {request.TimeColumn}) AS time_bucket,
tagname,
AVG(value) AS avg_value,
first(value, {request.TimeColumn}) AS min_value,
last(value, {request.TimeColumn}) AS max_value
FROM {request.TableName}
GROUP BY time_bucket, tagname;";
_logger.LogDebug("[ExperionDb] 연속 집계 CREATE VIEW: {Sql}", createViewSql);
#pragma warning disable EF1002
await _ctx.Database.ExecuteSqlRawAsync(createViewSql);
#pragma warning restore EF1002
_logger.LogDebug("[ExperionDb] 연속 집계 CREATE VIEW 성공");
// 8-3. 연속 집계 정책 추가
var aggregatePolicySql = $"SELECT add_continuous_aggregate_policy('history_5min_agg', '10 minutes', '1 minute', '5 minutes')";
_logger.LogDebug("[ExperionDb] 연속 집계 정책: {Sql}", aggregatePolicySql);
#pragma warning disable EF1002
await _ctx.Database.ExecuteSqlRawAsync(aggregatePolicySql);
#pragma warning restore EF1002
}
return HypertableCreateResult.Ok();
}
catch (Exception ex)
{
_logger.LogError(ex, "[ExperionDb] 하이퍼테이블 생성 실패 - 테이블: {Table}", request.TableName);
return HypertableCreateResult.Failed($"하이퍼테이블 생성 실패: {ex.Message}");
}
finally
{
if (connectionOpenedHere)
{
try { await _ctx.Database.CloseConnectionAsync(); }
catch (Exception ex) { _logger.LogWarning(ex, "[ExperionDb] 커넥션 종료 중 예외 (무시)"); }
}
}
}
/// <summary>
/// SQL 식별자로 안전한지 검증합니다. 영문, 숫자, 언더스코어, 하이픈, 마침표만 허용합니다.
/// EF1002 SQL injection 방지를 위해 DDL 문에서 식별자를 사용할 때 이 메서드로 검증을 필수로 합니다.
/// </summary>
private static bool IsValidSqlIdentifier(string identifier)
{
if (string.IsNullOrEmpty(identifier))
return false;
if (identifier.Length > 63) // PostgreSQL 식별자 최대 길이
return false;
// 영문, 숫자, 언더스코어, 하이픈, 마침표만 허용
return System.Text.RegularExpressions.Regex.IsMatch(
identifier,
@"^[a-zA-Z0-9_\-\.]+$");
}
// INTERVAL 문자열 검증 — TimescaleDB add_retention_policy/add_compression_policy에 사용.
// 허용 패턴: "30 days", "12 hours 30 minutes", "1 year", "1 day 12 hours" 등.
// 숫자 + 단위(공백 1칸 또는 없음)의 반복만 허용 → 따옴표/세미콜론/주석 등 모든 메타문자 차단.
private static readonly System.Text.RegularExpressions.Regex _intervalRegex =
new(@"^\s*(\d+\s*(second|seconds|minute|minutes|hour|hours|day|days|week|weeks|month|months|year|years)\s*)+$",
System.Text.RegularExpressions.RegexOptions.IgnoreCase | System.Text.RegularExpressions.RegexOptions.Compiled);
private static bool IsValidSqlInterval(string? interval)
{
if (string.IsNullOrWhiteSpace(interval)) return false;
if (interval.Length > 100) return false;
return _intervalRegex.IsMatch(interval);
}
}
/// <summary>
/// 하이퍼테이블 상태 정보 결과 클래스
/// </summary>
public record HypertableStatusInfo
{
public bool IsHypertable { get; init; }
public string? TableName { get; init; }
public string? StatusMessage { get; init; }
public int RecordCount { get; init; }
public bool HasRetentionPolicy { get; init; }
public bool HasCompression { get; init; }
public bool HasContinuousAggregate { get; init; }
}
/// <summary>
/// 하이퍼테이블 생성 요청 클래스
/// </summary>
public record HypertableCreateRequest
{
public string TableName { get; init; } = "history_table";
public string TimeColumn { get; init; } = "recorded_at";
public string TimeInterval { get; init; } = "1 day";
public bool MigrateData { get; init; } = true;
public bool SetRetentionPolicy { get; init; } = true;
public string RetentionPeriod { get; init; } = "90 days";
public bool EnableCompression { get; init; } = true;
public string CompressionPeriod { get; init; } = "1 day";
public bool CreateContinuousAggregate { get; init; } = true;
}
/// <summary>
/// 하이퍼테이블 생성 결과 클래스
/// </summary>
public record HypertableCreateResult
{
public bool Success { get; init; }
public string Message { get; init; } = string.Empty;
public string? TableName { get; init; }
public static HypertableCreateResult Ok(string? tableName = null, string? message = null)
=> new() { Success = true, TableName = tableName, Message = message ?? "하이퍼테이블이 성공적으로 생성되었습니다." };
public static HypertableCreateResult Failed(string message)
=> new() { Success = false, Message = message };
public static HypertableCreateResult AlreadyExists(string message)
=> new() { Success = false, Message = message };
}