Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL. 기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체. - industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버) - src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer) - mcp-server: Python FastMCP (RAG/NL2SQL/P&ID) - 다중 컨트롤러(N-Controller) 지원 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
102 lines
3.0 KiB
Python
102 lines
3.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
HC900 Modbus TCP Simulator
|
|
Loads register-map.json and serves dummy values via pymodbus.
|
|
"""
|
|
import json
|
|
import struct
|
|
import logging
|
|
from pymodbus.server import StartTcpServer
|
|
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
|
|
from pymodbus.device import ModbusDeviceIdentification
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
|
|
|
REG_MAP_PATH = "docs/register-map.json"
|
|
|
|
def build_datastore(map_path: str) -> ModbusSlaveContext:
|
|
with open(map_path) as f:
|
|
data = json.load(f)
|
|
|
|
# store up to 0x8000 (32768) holding registers
|
|
store = [0] * (0x8000)
|
|
|
|
seed = 0.0
|
|
for r in data["registers"]:
|
|
a = r["addr"]
|
|
t = r["type"]
|
|
tag = r["tag"]
|
|
|
|
# generate dummy value based on tag name
|
|
seed += 100.0
|
|
if "MODE" in tag or "FUZZY" in tag or "EN" in tag or "STATE" in tag or "_REQ" in tag:
|
|
val = 1 # integer status
|
|
elif "SP_LO" in tag or "OP_LO" in tag:
|
|
val = 0.0
|
|
elif "SP_HI" in tag or "OP_HI" in tag:
|
|
val = 100.0
|
|
elif "PV" in tag or "SP" in tag or "OP" in tag:
|
|
val = seed * 0.01
|
|
elif "DIR" in tag:
|
|
val = 1.0
|
|
elif "LOOP_STATUS" in tag:
|
|
val = 0x8000
|
|
elif "PV_LO" in tag:
|
|
val = 0.0
|
|
elif "PV_HI" in tag:
|
|
val = 150.0
|
|
elif "DEV" in tag:
|
|
val = 2.5
|
|
elif "ALM" in tag:
|
|
val = 80.0
|
|
elif "LSP" in tag:
|
|
val = 50.0
|
|
elif "GAIN" in tag:
|
|
val = 1.0
|
|
elif "RESET" in tag:
|
|
val = 10.0
|
|
elif "RATE" in tag:
|
|
val = 0.0
|
|
elif "RSP" in tag:
|
|
val = -1.0 # none
|
|
elif "TRIP" in tag or "ESD" in tag or "IL" in tag:
|
|
val = 0.0
|
|
elif "_HS" in tag:
|
|
val = 0.0
|
|
else:
|
|
val = 42.0
|
|
|
|
if t == "float32":
|
|
raw = struct.pack(">f", val) # FP_B = big-endian IEEE 754
|
|
store[a] = struct.unpack(">H", raw[:2])[0]
|
|
store[a + 1] = struct.unpack(">H", raw[2:])[0]
|
|
elif t == "uint16":
|
|
store[a] = int(val) & 0xFFFF
|
|
|
|
ctx = ModbusSlaveContext(
|
|
zero_mode=True, # 0-based addressing (hw addr = array index)
|
|
di=None, co=None, ir=None,
|
|
hr=store,
|
|
)
|
|
print(f"Datastore built: {len(data['registers'])} registers, {len(store)} words")
|
|
return ctx
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys, os
|
|
os.chdir(os.path.join(os.path.dirname(__file__), ".."))
|
|
ctx = build_datastore(REG_MAP_PATH)
|
|
identity = ModbusDeviceIdentification()
|
|
identity.VendorName = "HC900 Sim"
|
|
identity.ProductCode = "HC900"
|
|
identity.ModelName = "HC900-C70 Simulator"
|
|
identity.MajorMinorRevision = "4.4x"
|
|
|
|
print("Starting HC900 Modbus TCP simulator on 0.0.0.0:5020 ...")
|
|
print(" (use port 5020 to avoid root)")
|
|
StartTcpServer(
|
|
context=ModbusServerContext(slaves=ctx, single=True),
|
|
identity=identity,
|
|
address=("0.0.0.0", 5020),
|
|
)
|