using ExperionCrawler.Core.Application.Feedforward; using ExperionCrawler.Core.Application.Interfaces; using ExperionCrawler.Core.Domain.Entities; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Collections.Concurrent; using System.Globalization; namespace ExperionCrawler.Infrastructure.Control; public sealed class FeedforwardSupervisor : BackgroundService { private readonly IServiceScopeFactory _scopeFactory; private readonly FeedforwardEngine _engine; private readonly IFeedforwardAdvisoryStore _store; private readonly IFeedforwardWriteGuard _writeGuard; private readonly ILogger _logger; private readonly Microsoft.Extensions.Configuration.IConfiguration _appConfig; private readonly ISimOverrideStore _sim; // WP0 확장: 엔진 스냅샷 입력 치환(DEMO) private readonly Dictionary _states = new(); // Phase II: 마지막 쓰기 시각(스트림별 rate-limit) 및 결과 private readonly ConcurrentDictionary<(int colId, string streamKey), DateTime> _lastWriteTimes = new(); private readonly ConcurrentDictionary<(int colId, string streamKey), (double? sp, string? error, DateTime? at)> _lastWriteResults = new(); public FeedforwardSupervisor( IServiceScopeFactory scopeFactory, FeedforwardEngine engine, IFeedforwardAdvisoryStore store, IFeedforwardWriteGuard writeGuard, ILogger logger, Microsoft.Extensions.Configuration.IConfiguration appConfig, ISimOverrideStore sim) { _scopeFactory = scopeFactory; _engine = engine; _store = store; _writeGuard = writeGuard; _logger = logger; _appConfig = appConfig; _sim = sim; } // Phase II: 쓰기 결과 조회 (Controller에서 사용) public (double? sp, string? error, DateTime? at) GetLastWrite(int colId, string streamKey) => _lastWriteResults.TryGetValue((colId, streamKey), out var r) ? r : (null, null, null); protected override async Task ExecuteAsync(CancellationToken ct) { await Task.Yield(); while (!ct.IsCancellationRequested) { double minScan = 2.0; try { using var scope = _scopeFactory.CreateScope(); var cfgStore = scope.ServiceProvider.GetRequiredService(); var db = scope.ServiceProvider.GetRequiredService(); var writeClient = scope.ServiceProvider.GetService(); var auditService = scope.ServiceProvider.GetService(); var columns = await cfgStore.LoadAllAsync(ct); var enabled = columns.Where(c => c.Enabled).ToList(); if (enabled.Count > 0) minScan = enabled.Min(c => c.ScanSec); foreach (var cfg in enabled) { try { var snap = await BuildSnapshotAsync(db, cfg); var st = GetState(cfg.Id); var res = _engine.Tick(cfg, snap, st, DateTime.UtcNow); // Phase II: auto-write // 안전가드: Sim Override 활성 시 입력이 가짜이므로 실제 쓰기 금지(advisory-only로 강등) if (!cfg.AdvisoryOnly && writeClient is not null && auditService is not null && !_sim.Enabled) { await AutoWriteAsync(cfg, res, st, writeClient, auditService, ct); res = res with { AutoWriteActive = true }; } else if (!cfg.AdvisoryOnly && _sim.Enabled) _logger.LogWarning("[FF] Sim Override 활성 — col {Id} auto-write 억제(가짜 입력)", cfg.Id); _store.Set(res); } catch (Exception ex) { _logger.LogWarning(ex, "FF tick 실패: column {Id}", cfg.Id); } } } catch (Exception ex) { _logger.LogError(ex, "FF supervisor 루프 오류"); } await Task.Delay(TimeSpan.FromSeconds(Math.Clamp(minScan, 1.0, 10.0)), ct); } } // ── Phase II: auto-write ───────────────────────────────────────────── private async Task AutoWriteAsync(ColumnConfig cfg, AdvisoryResult column, ColumnState st, IExperionOpcWriteClient writeClient, IFeedforwardAuditService audit, CancellationToken ct) { if (column.Transient) return; foreach (var s in cfg.Streams) { if (s.Role != StreamRole.Commanded) continue; if (string.IsNullOrWhiteSpace(s.SpNodeId)) continue; // 쓰기 대상 미지정 var adv = column.Streams.FirstOrDefault(a => a.Key == s.Key); if (adv is null) continue; // 1) WriteGuard 검증 var check = _writeGuard.Check(cfg, adv, s, column); if (!check.Allowed) { // 차단 로그 _lastWriteResults[(cfg.Id, s.Key)] = (adv.RecommendedSp, check.Reason, DateTime.UtcNow); await audit.LogAsync(new FfActionLogEntry(cfg.Id, "sp_write", StreamKey: s.Key, SpValue: adv.RecommendedSp, NodeId: s.SpNodeId, Result: "blocked", WriteguardReason: check.Reason), ct); continue; } // 2) Rate-limit: 최소 ScanSec*2 간격 var lastWrite = _lastWriteTimes.GetValueOrDefault((cfg.Id, s.Key), DateTime.MinValue); var minInterval = TimeSpan.FromSeconds(Math.Max(cfg.ScanSec * 2, 2.0)); if (DateTime.UtcNow - lastWrite < minInterval) continue; // 3) OPC UA 쓰기 double spVal = adv.RecommendedSp!.Value; var result = await writeClient.WriteTagAsync(LoadServerConfig(), s.SpNodeId, spVal, ct); // 4) 결과 저장 _lastWriteTimes[(cfg.Id, s.Key)] = DateTime.UtcNow; if (result.Success) { _lastWriteResults[(cfg.Id, s.Key)] = (spVal, null, DateTime.UtcNow); _logger.LogInformation("[FF] SP 쓰기 성공 col={Col} stream={Key} node={Node} val={Val:F2}", cfg.Id, s.Key, s.SpNodeId, spVal); } else { _lastWriteResults[(cfg.Id, s.Key)] = (spVal, result.Error, DateTime.UtcNow); _logger.LogWarning("[FF] SP 쓰기 실패 col={Col} stream={Key} node={Node} err={Err}", cfg.Id, s.Key, s.SpNodeId, result.Error); } await audit.LogAsync(new FfActionLogEntry(cfg.Id, "sp_write", StreamKey: s.Key, SpValue: spVal, NodeId: s.SpNodeId, Result: result.Success ? "success" : $"error: {result.Error}"), ct); } } private ExperionServerConfig LoadServerConfig() { var section = _appConfig.GetSection("Experion:Default"); return new ExperionServerConfig { ServerHostName = section["ServerHostName"] ?? "192.168.0.20", Port = int.TryParse(section["Port"], out var p) ? p : 4840, ClientHostName = section["ClientHostName"] ?? "dbsvr", UserName = section["UserName"] ?? "mngr", Password = section["Password"] ?? "mngr" }; } private ColumnState GetState(int id) { if (!_states.TryGetValue(id, out var s)) { s = new ColumnState(); _states[id] = s; } return s; } // WO-6: 운전원 ARM/취소 (모드 판정용 플래그만 — 쓰기 아님). 다음 Tick에서 소비. public bool Arm(int columnId) { lock (_states) { GetState(columnId).OperatorArmed = true; } return true; } public bool Cancel(int columnId) { lock (_states) { GetState(columnId).OperatorCancel = true; } return true; } private async Task BuildSnapshotAsync(IExperionDbService db, ColumnConfig cfg) { string PvTag(string baseTag) { var t = baseTag.ToLowerInvariant(); return t.EndsWith(".pv") ? t : t + ".pv"; } var feedTag = PvTag(cfg.FeedTag); var tags = new List { feedTag }; if (cfg.PressureTag is not null) tags.Add(PvTag(cfg.PressureTag)); tags.AddRange(cfg.LevelTags.Select(PvTag)); tags.AddRange(cfg.Streams.Where(s => s.LevelTag is not null).Select(s => PvTag(s.LevelTag!))); tags.AddRange(cfg.Streams.Select(s => PvTag(s.FlowTag))); tags.AddRange(cfg.TempTags.Select(PvTag)); // WO-2 온도 프로파일 if (cfg.SteamOpTag is not null) tags.Add(cfg.SteamOpTag.ToLowerInvariant()); // WO-3 스팀 OP(.op 그대로) if (cfg.DeltaPTag is not null) tags.Add(PvTag(cfg.DeltaPTag)); // WO-6 차압(.pv) var rows = (await db.GetRealtimeRecordsByTagNamesAsync(tags)) .ToDictionary(r => r.TagName.ToLowerInvariant(), r => r); TagSample Sample(string baseTag) { var tag = PvTag(baseTag); if (_sim.Enabled && _sim.TryGet(tag, out var sov)) // WP0 확장: override 우선(신선 처리) return new TagSample(tag, sov, Good: true, DateTime.UtcNow); if (rows.TryGetValue(tag.ToLowerInvariant(), out var r) && double.TryParse(r.LiveValue, NumberStyles.Float, CultureInfo.InvariantCulture, out var v)) { bool fresh = (DateTime.UtcNow - r.Timestamp.ToUniversalTime()).TotalSeconds <= cfg.StaleSec; return new TagSample(tag, v, Good: fresh, r.Timestamp); } return new TagSample(tag, double.NaN, Good: false, DateTime.MinValue); } // WO-3: .op 등 비-.pv 태그를 접미사 강제 없이 그대로 읽음 TagSample SampleExact(string rawTag) { var tag = rawTag.ToLowerInvariant(); if (_sim.Enabled && _sim.TryGet(tag, out var sov)) // WP0 확장: override 우선 return new TagSample(tag, sov, Good: true, DateTime.UtcNow); if (rows.TryGetValue(tag, out var r) && double.TryParse(r.LiveValue, NumberStyles.Float, CultureInfo.InvariantCulture, out var v)) { bool fresh = (DateTime.UtcNow - r.Timestamp.ToUniversalTime()).TotalSeconds <= cfg.StaleSec; return new TagSample(tag, v, Good: fresh, r.Timestamp); } return new TagSample(tag, double.NaN, Good: false, DateTime.MinValue); } var feed = Sample(cfg.FeedTag); var press = cfg.PressureTag is null ? null : Sample(cfg.PressureTag); var levels = cfg.LevelTags.Select(Sample).ToList(); var streams = cfg.Streams.ToDictionary(s => s.Key, s => Sample(s.FlowTag)); var temps = cfg.TempTags.Count > 0 ? cfg.TempTags.Select(Sample).ToList() : null; var steam = cfg.SteamOpTag is not null ? SampleExact(cfg.SteamOpTag) : null; var deltaP = cfg.DeltaPTag is not null ? Sample(cfg.DeltaPTag) : null; return new PvSnapshot(feed, press, levels, streams) { Temps = temps, SteamOp = steam, DeltaP = deltaP }; } }