Files
dbserver/OpcPksPlatform/Services/OpcUaCollector.cs

140 lines
5.1 KiB
C#

using Npgsql;
using Opc.Ua;
using Opc.Ua.Client;
using OpcPks.Core.Services; // OpcSessionManager 참조
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace OpcPks.Collector.Services
{
public class OpcUaCollector
{
private readonly OpcSessionManager _sessionManager;
private ISession _session;
private Subscription _subscription;
private readonly string _dbConnString = "Host=localhost;Username=postgres;Password=postgres;Database=opcdb";
public OpcUaCollector(OpcSessionManager sessionManager)
{
_sessionManager = sessionManager;
}
public async Task StartCollection()
{
try
{
// 1. 공통 세션 관리자로부터 하니웰 세션 가져오기
_session = await _sessionManager.GetSessionAsync();
// 2. DB(tag_master)에서 엔지니어가 등록한 감시 대상 노드 리스트 조회
var nodesToMonitor = await GetNodesFromMaster();
if (nodesToMonitor.Count == 0)
{
Console.WriteLine("⚠️ 감시 대상으로 등록된 태그가 없습니다.");
return;
}
// 3. Subscription 생성 (1초 주기 감시)
_subscription = new Subscription(_session.DefaultSubscription) { PublishingInterval = 1000 };
foreach (var nodeId in nodesToMonitor)
{
var item = new MonitoredItem(_subscription.DefaultItem)
{
DisplayName = nodeId,
StartNodeId = nodeId
};
// 중요: 값이 변했을 때만 실행될 이벤트 핸들러 등록
item.Notification += OnDataChangeReceived;
_subscription.AddItem(item);
}
_session.AddSubscription(_subscription);
_subscription.Create();
Console.WriteLine($"🚀 수집 엔진 가동 중: {nodesToMonitor.Count}개의 태그 감시 시작.");
}
catch (Exception ex)
{
Console.WriteLine($"❌ 수집 시작 중 오류 발생: {ex.Message}");
}
}
private void OnDataChangeReceived(MonitoredItem item, MonitoredItemNotificationEventArgs e)
{
foreach (var value in item.DequeueValues())
{
// 하니웰에서 값이 변경되어 통보(COV)가 오면 DB 업데이트 수행
UpdateLiveDb(item.StartNodeId.ToString(), value.Value?.ToString() ?? "NULL", value.StatusCode.ToString());
}
}
private void UpdateLiveDb(string nodeId, string val, string quality)
{
try
{
using var conn = new NpgsqlConnection(_dbConnString);
// IS DISTINCT FROM: 실제 값이 다를 때만 쓰기(Write)를 수행하여 SSD 부하 감소
var sql = @"
UPDATE tag_live_data
SET live_value = @val, quality = @q, last_updated = NOW()
WHERE full_node_id = @id AND live_value IS DISTINCT FROM @val";
using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("id", nodeId);
cmd.Parameters.AddWithValue("val", val);
cmd.Parameters.AddWithValue("q", quality);
conn.Open();
int affectedRows = cmd.ExecuteNonQuery();
if (affectedRows > 0)
{
Console.WriteLine($"[변경수집] {nodeId} -> {val} ({quality})");
}
}
catch (Exception ex)
{
Console.WriteLine($"❌ DB 업데이트 오류: {ex.Message}");
}
}
private async Task<List<string>> GetNodesFromMaster()
{
var nodes = new List<string>();
try
{
using var conn = new NpgsqlConnection(_dbConnString);
await conn.OpenAsync();
// tag_master에서 활성화된(is_active) 태그들만 가져옴
var sql = "SELECT full_node_id FROM tag_master WHERE is_active = TRUE";
using var cmd = new NpgsqlCommand(sql, conn);
using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
nodes.Add(reader.GetString(0));
}
}
catch (Exception ex)
{
Console.WriteLine($"❌ 마스터 노드 조회 실패: {ex.Message}");
}
return nodes;
}
public async Task StopCollection()
{
if (_subscription != null)
{
_subscription.Delete(true);
}
Console.WriteLine("⏹️ 수집 엔진 정지.");
}
}
}