feat: Sinam_xlsx 기반 register-map 재생성 + C1-C4 컨트롤러 추가

- register-map.json 재생성 (C3: 328→1974 registers)
  - .PV/.SP/.OP 등 suffix 추가, write_addr 필드 도입
  - LOOP_LAYOUT 기반 고정 레이아웃 전개 + 명명되지 않은 레지스터 보존
  - FC16 쓰기를 위한 write_addr 분리 (.MD는 LOOPSTAT 읽기/MODEIN 쓰기)
- build_register_map_from_sinam.py 리팩터
  - --controller 인자 추가 (C3/C4/C5 복수 컨트롤러 지원)
  - --validate-csv 옵션 추가 (HC Designer CSV 교차검증)
  - tag명 대문자 유지 (ToLower 금지)
  - 출력 경로 Path 객체 사용
- gateway-config.json: C1-C4 컨트롤러 설정 추가
This commit is contained in:
windpacer
2026-06-04 09:43:14 +09:00
parent e0598f5775
commit f9f6a04054
3 changed files with 16424 additions and 1087 deletions

View File

@@ -6,14 +6,44 @@
}, },
"controllers": [ "controllers": [
{ {
"id": "C3", "id": "C1",
"name": "HC900 C3 Controller", "name": "HC900 C1 Controller",
"controllerIp": "192.168.0.230", "controllerIp": "192.168.0.250",
"controllerPort": 502, "controllerPort": 502,
"grpcPort": 50051, "grpcPort": 50051,
"pollIntervalMs": 1000, "pollIntervalMs": 500,
"registerMapPath": "/home/windpacer/projects/hc900_ax/docs/register-map.json", "registerMapPath": "/home/windpacer/projects/hc900_ax/docs/register-map-c1.json",
"enabled": false
},
{
"id": "C2",
"name": "HC900 C2 Controller",
"controllerIp": "192.168.0.230",
"controllerPort": 502,
"grpcPort": 50052,
"pollIntervalMs": 500,
"registerMapPath": "/home/windpacer/projects/hc900_ax/docs/register-map-c2.json",
"enabled": false
},
{
"id": "C3",
"name": "HC900 C3 Controller",
"controllerIp": "192.168.0.240",
"controllerPort": 502,
"grpcPort": 50053,
"pollIntervalMs": 500,
"registerMapPath": "/home/windpacer/projects/hc900_ax/docs/register-map-c3.json",
"enabled": true "enabled": true
},
{
"id": "C4",
"name": "HC900 C4 Controller",
"controllerIp": "192.168.0.220",
"controllerPort": 502,
"grpcPort": 50054,
"pollIntervalMs": 500,
"registerMapPath": "/home/windpacer/projects/hc900_ax/docs/register-map-c4.json",
"enabled": false
} }
] ]
} }

File diff suppressed because it is too large Load Diff

View File

@@ -1,322 +1,487 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Build register-map.json from Sinam_Tag_all.xlsx SourceAddress columns. Build register-map.json from the Experion point export (Sinam_Tag_all.xlsx) ALONE.
Reads Sinam_Tag_all.xlsx (Experion tag→HC900 address mapping), filters The Experion export carries, for every point, its HC900 source/destination
for C3 controller entries, converts Experion indexed addresses to addresses written as Honeywell "named" indexed addresses (e.g. ``C3 LOOP 1 PV``,
fixed Modbus addresses, and writes register-map.json. ``C3 TAG 542 VALUE``, ``C3 MATH_VAR 22 VALUE``) or, for custom-mapped registers,
as raw "non-named" addresses (e.g. ``C3 4:0x789e UINT2``).
Combined with the HC900 *fixed* loop register layout (a controller-firmware
constant taken from the HC900 Communications manual, Table 6-3, and the Experion
"PID loop" named-address reference) this is everything needed to produce the
Modbus register map. No HC Designer CSV export is required at run time; pass a
CSV with ``--validate-csv`` only if you want the embedded layout cross-checked.
Design (see docs/컨트롤러별-태그매핑-규칙.md):
* The map is keyed on the **Experion point name** (ItemName) — every downstream
consumer (crawler, DB, web UI) is Experion-based.
* For a loop point we register **all** parameters of that loop (one contiguous
Modbus read covers them anyway), naming the parameters Experion exposes with
Experion attribute names (.PV/.SP/.OP/.MD/.GAIN/.RESET/.RATE) and the rest
with their HC900 names.
* ``.MD`` (Mode) reads from Loop Status (LOOPSTAT) but writes to Auto/Manual
State (the MODEIN destination) — so it carries a separate ``write_addr``.
Usage: Usage:
python3 build_register_map_from_sinam.py \ python3 build_register_map_from_sinam.py --controller C3 \
--sinam docs/Sinam_Tag_all.xlsx \ --sinam docs/Sinam_Tag_all.xlsx -o docs/register-map.json
--csv docs/C3-All-Modbus-Map.csv \ # optional cross-check against an HC Designer export:
-o docs/register-map.json python3 build_register_map_from_sinam.py --controller C4 \
--sinam docs/Sinam_Tag_all.xlsx --validate-csv docs/C4-All-Modbus-Map.csv \
-o docs/register-map-c4.json
""" """
import re import re
import csv
import json import json
import argparse import argparse
import csv import datetime
from pathlib import Path from pathlib import Path
import openpyxl import openpyxl
# ─── Experion indexed address → Modbus address ─── # ─────────────────────────── HC900 fixed loop layout ───────────────────────────
# Offset within a loop block (loop base = 0x40 + (n-1)*0x100 for loops 1-24,
PARAM_MAP = { # 0x7840 + (n-25)*0x100 for loops 25-32). Validated against C3/C4 HC Designer
'PV': (0x00, 2, 'float32'), # exports and the HC900 Communications manual Table 6-3.
'RSP': (0x02, 2, 'float32'), #
'WSP': (0x04, 2, 'float32'), # offset : (default_suffix, count, type, access)
'SP': (0x04, 2, 'float32'), # count 2 = float32 (IEEEFP), count 1 = uint16 (bit-packed / unscaled int).
'OP': (0x06, 2, 'float32'), LOOP_LAYOUT = {
'OPWORK': (0x06, 2, 'float32'), 0x00: ('PV', 2, 'float32', 'R'),
'LSP1': (0x2A, 2, 'float32'), 0x02: ('RSP_SP2', 2, 'float32', 'RW'),
'LSP2': (0x2C, 2, 'float32'), 0x04: ('WSP', 2, 'float32', 'RW'),
'AMSTAT': (0xBA, 1, 'uint16'), 0x06: ('Output', 2, 'float32', 'RW'),
'LOOPSTAT': (0xBE, 1, 'uint16'), 0x08: ('PV_B', 2, 'float32', 'R'),
0x0A: ('CarbonPotTemp', 2, 'float32', 'R'),
0x0C: ('Gain1', 2, 'float32', 'RW'),
0x0E: ('Direction', 2, 'float32', 'R'),
0x10: ('Reset1', 2, 'float32', 'RW'),
0x12: ('Rate1', 2, 'float32', 'RW'),
0x14: ('CycleTime1', 2, 'float32', 'R'),
0x16: ('PV_LowRange', 2, 'float32', 'R'),
0x18: ('PV_HighRange', 2, 'float32', 'R'),
0x1A: ('Alarm1SP1', 2, 'float32', 'RW'),
0x1C: ('Alarm1SP2', 2, 'float32', 'RW'),
0x20: ('Gain2', 2, 'float32', 'RW'),
0x22: ('StepDeadband', 2, 'float32', 'RW'),
0x24: ('Reset2', 2, 'float32', 'RW'),
0x26: ('Rate2', 2, 'float32', 'RW'),
0x28: ('CycleTime2', 2, 'float32', 'R'),
0x2A: ('LSP1', 2, 'float32', 'RW'),
0x2C: ('LSP2', 2, 'float32', 'RW'),
0x2E: ('Alarm2SP1', 2, 'float32', 'RW'),
0x30: ('Alarm2SP2', 2, 'float32', 'RW'),
0x34: ('SP_LowLimit', 2, 'float32', 'RW'),
0x36: ('SP_HighLimit', 2, 'float32', 'RW'),
0x38: ('WSP_B', 2, 'float32', 'RW'),
0x3A: ('Output_LowLimit', 2, 'float32', 'RW'),
0x3C: ('Output_HighLimit', 2, 'float32', 'RW'),
0x3E: ('OPWORK', 2, 'float32', 'RW'),
0x46: ('Ratio', 2, 'float32', 'RW'),
0x48: ('Bias', 2, 'float32', 'RW'),
0x4A: ('Deviation', 2, 'float32', 'R'),
0x4E: ('ManualReset', 2, 'float32', 'RW'),
0x50: ('FeedforwardGain', 2, 'float32', 'RW'),
0x52: ('LocalPctCO', 2, 'float32', 'RW'),
0x54: ('FurnaceFactor', 2, 'float32', 'RW'),
0x56: ('PercentHydrogen', 2, 'float32', 'RW'),
0x58: ('OnOffHysteresis', 2, 'float32', 'RW'),
0x5A: ('CarbPotDewpt', 2, 'float32', 'RW'),
0x5C: ('StepMotorTime', 2, 'float32', 'RW'),
0xB7: ('FuzzyEnable', 1, 'uint16', 'RW'),
0xB8: ('DemandTune', 1, 'uint16', 'RW'),
0xB9: ('AntiSootEnable', 1, 'uint16', 'RW'),
0xBA: ('AutoManState', 1, 'uint16', 'RW'),
0xBB: ('SP_SelectState', 1, 'uint16', 'RW'),
0xBC: ('RemLocSPState', 1, 'uint16', 'RW'),
0xBD: ('TuneSetState', 1, 'uint16', 'RW'),
0xBE: ('LoopStatus', 1, 'uint16', 'R'),
} }
# Honeywell named-address mnemonic → offset within the loop block.
MNEMONIC_OFFSET = {
'PV': 0x00, 'RSP': 0x02, 'SP2': 0x02, 'WSP': 0x04, 'OP': 0x06, 'PVB': 0x08,
'TEMP': 0x0A, 'GAIN1': 0x0C, 'PROP1': 0x0C, 'DIR': 0x0E, 'RESET1': 0x10,
'RATE1': 0x12, 'CYCLE1': 0x14, 'PVLOW': 0x16, 'PVHIGH': 0x18, 'AL1SP1': 0x1A,
'AL1SP2': 0x1C, 'GAIN2': 0x20, 'PROP2': 0x20, 'DB': 0x22, 'RESET2': 0x24,
'RATE2': 0x26, 'CYCLE2': 0x28, 'LSP1': 0x2A, 'LSP2': 0x2C, 'AL2SP1': 0x2E,
'AL2SP2': 0x30, 'SPLOW': 0x34, 'SPHIGH': 0x36, 'SPWORK': 0x38, 'OPLOW': 0x3A,
'OPHIGH': 0x3C, 'OPWORK': 0x3E, 'RATIO': 0x46, 'BIAS': 0x48, 'DEV': 0x4A,
'MAN_RESET': 0x4E, 'FF': 0x50, 'PCTCO': 0x52, 'FFCTR': 0x54, 'H2': 0x56,
'OUT_HYST': 0x58, 'CPD': 0x5A, 'MOTOR': 0x5C, 'AMSTAT': 0xBA, 'LOOPSTAT': 0xBE,
'MODEIN': 0xBA, # mode write destination = Auto/Manual State register
}
def parse_indexed_addr(addr_str: str, csv_lookup: dict) -> dict | None: # Mnemonic → Experion point-parameter (attribute) name. Only these get renamed;
"""Convert an Experion indexed address (e.g. 'C3 LOOP 1 OP') # every other loop register keeps its HC900 name from LOOP_LAYOUT.
to a dict with modbus_addr, count, type, and optional description. EXPERION_ATTR = {
'PV': 'PV',
'WSP': 'SP', 'SPWORK': 'SP',
'OP': 'OP', 'OPWORK': 'OP',
'GAIN1': 'GAIN',
'RESET1': 'RESET',
'RATE1': 'RATE',
'LOOPSTAT': 'MD', # mode read source
}
csv_lookup: {(partition, index): {'addr': int, 'dtype': str, 'tag': str}} MD_READ_OFFSET = 0xBE # Loop Status (read)
MD_WRITE_OFFSET = 0xBA # Auto/Manual State (MODEIN destination, write)
# Fixed bases for the other named-address spaces.
SIGNAL_TAG_BASE = 0x2000 # TAG n → base + (n-1)*2 (read-only float32)
MATH_VAR_BASE = 0x18C0 # MATH_VAR n → base + (n-1)*2 (read/write float32)
def loop_base(n: int) -> int:
"""Base holding-register address of loop n (1-32)."""
if 1 <= n <= 24:
return 0x0040 + (n - 1) * 0x0100
if 25 <= n <= 32:
return 0x7840 + (n - 25) * 0x0100
raise ValueError(f'loop number {n} out of range 1-32')
# ─────────────────────────── Sinam (Experion export) parsing ───────────────────────────
# Named addresses: C3 LOOP 1 PV / C3 LOOPX 25 OPWORK / C3 TAG 542 VALUE
# C3 MATH_VAR 22 VALUE
# Non-named address: C3 4:0x789e / C3 4:0x78aa UINT2
_RE_LOOP = re.compile(r'^C(\d+)\s+(LOOPX?)\s+(\d+)\s+(\w+)\s*$', re.I)
_RE_TAG = re.compile(r'^C(\d+)\s+TAG\s+(\d+)\s+VALUE\s*$', re.I)
_RE_VAR = re.compile(r'^C(\d+)\s+MATH_VAR\s+(\d+)\s+VALUE\s*$', re.I)
_RE_RAW = re.compile(r'^C(\d+)\s+(\d+):0x([0-9a-fA-F]+)(?:\s+(\w+))?\s*$', re.I)
def _raw_type(fmt: str | None) -> tuple[int, str]:
"""Map a non-named address format suffix to (count, type)."""
f = (fmt or '').upper()
if f in ('', 'IEEEFP'):
return 2, 'float32'
# UINT2 / MODE / bit-field (numeric) are all single 16-bit registers.
return 1, 'uint16'
def scan_point(cells: list[str], controller_num: str) -> dict | None:
"""Classify an Experion point from all of its non-empty cells.
Returns a descriptor dict identifying the point's HC900 binding for the
requested controller, or None if the point does not reference it.
Priority: LOOP/LOOPX > TAG > MATH_VAR > non-named raw address.
""" """
addr_str = addr_str.strip() loop_refs: list[tuple[int, str]] = [] # (loop_no, mnemonic) — a point may reference
# several loops (e.g. tuning from another loop)
tag_n = var_n = None
raw = None
# LOOP / LOOPX for cell in cells:
m = re.match(r'C3 (LOOP|LOOPX) (\d+) (\w+)', addr_str) s = cell.strip()
if m: m = _RE_LOOP.match(s)
loop_type, n_str, param = m.group(1), int(m.group(2)), m.group(3) if m and m.group(1) == controller_num:
if loop_type == 'LOOP': loop_refs.append((int(m.group(3)), m.group(4).upper()))
assert 1 <= n_str <= 24, f'LOOP number {n_str} out of range 1-24' continue
base = 0x0040 + (n_str - 1) * 0x0100 m = _RE_TAG.match(s)
else: if m and m.group(1) == controller_num:
assert 25 <= n_str <= 32, f'LOOPX number {n_str} out of range 25-32' tag_n = int(m.group(2)); continue
base = 0x7840 + (n_str - 25) * 0x0100 m = _RE_VAR.match(s)
param_info = PARAM_MAP.get(param) if m and m.group(1) == controller_num:
if param_info is None: var_n = int(m.group(2)); continue
print(f' ⚠ Unknown param "{param}" in "{addr_str}", skipping') m = _RE_RAW.match(s)
return None if m and m.group(1) == controller_num:
off, count, dtype = param_info count, dtype = _raw_type(m.group(4))
addr = base + off raw = {'addr': int(m.group(3), 16), 'table': int(m.group(2)),
return { 'count': count, 'type': dtype}
'modbus_addr': addr, continue
'count': count,
'type': dtype,
'access': 'R',
'description': f'{loop_type} #{n_str} {param}',
}
# TAG (Signal Tag) if loop_refs:
m = re.match(r'C3 TAG (\d+) VALUE', addr_str) # Primary loop = the loop whose PV this point reads (its own loop). If no PV
if m: # ref, fall back to the most-referenced loop. (A point can cross-reference
n = int(m.group(1)) # another loop's tuning constants, so "last match wins" is wrong.)
addr = 0x2000 + (n - 1) * 2 from collections import Counter
info = csv_lookup.get(('TAG', n), {}) counts = Counter(n for n, _ in loop_refs)
dtype = info.get('dtype', 'float32') pv_loops = [n for n, mn in loop_refs if mn == 'PV']
count = 2 if dtype == 'float32' else 1 primary = (max(pv_loops, key=lambda n: counts[n]) if pv_loops
tag_name = info.get('tag', f'SIG_TAG_{n}') else counts.most_common(1)[0][0])
return { mnems = {mn for n, mn in loop_refs if n == primary}
'modbus_addr': addr, return {'kind': 'loop', 'n': primary, 'mnemonics': mnems}
'count': count, if tag_n is not None:
'type': dtype, return {'kind': 'tag', 'n': tag_n}
'access': 'R', if var_n is not None:
'description': f'Signal Tag #{n} ({tag_name})', return {'kind': 'var', 'n': var_n}
} if raw is not None:
return {'kind': 'raw', **raw}
# MATH_VAR (Variable)
m = re.match(r'C3 MATH_VAR (\d+) VALUE', addr_str)
if m:
n = int(m.group(1))
addr = 0x18C0 + (n - 1) * 2
info = csv_lookup.get(('MATH_VAR', n), {})
dtype = info.get('dtype', 'float32')
count = 2 if dtype == 'float32' else 1
tag_name = info.get('tag', f'VAR_{n}')
return {
'modbus_addr': addr,
'count': count,
'type': dtype,
'access': 'RW',
'description': f'Variable #{n} ({tag_name})',
}
# Raw address format: C3 4:0x789e
m = re.match(r'C3 (\d+):0x([0-9a-fA-F]+)', addr_str)
if m:
addr = int(m.group(2), 16)
dtype_str = addr_str.split()[-1] if 'UINT' in addr_str.upper() else 'uint16'
return {
'modbus_addr': addr,
'count': 1,
'type': 'uint16',
'access': 'R',
'description': f'Raw addr 0x{addr:04X}',
}
print(f' ⚠ Could not parse indexed address: "{addr_str}", skipping')
return None return None
# ─── CSV lookup ─── def build_loop_entries(item_name: str, n: int, mnemonics: set[str]) -> list[dict]:
"""Register every parameter of loop n, keyed on the Experion ItemName.
def build_csv_lookup(csv_path: Path) -> dict: The parameters Experion actually exposes (per the mnemonics referenced in the
"""Build a lookup dict from C3-All-Modbus-Map.csv. export) are renamed to Experion attribute names; the rest keep their HC900
names so the whole loop block is available.
Returns: {(partition, index): {'tag': str, 'dtype': str, 'addr': int}}
""" """
lookup = {} base = loop_base(n)
with open(csv_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
rows = list(reader)
for row in rows: # offset → Experion attribute, derived from the mnemonics this point uses.
if len(row) < 6: offset_attr: dict[int, str] = {}
continue for mn in mnemonics:
hex_addr_raw = row[0].strip() attr = EXPERION_ATTR.get(mn)
if not hex_addr_raw.startswith('0x'): if attr and mn in MNEMONIC_OFFSET:
continue offset_attr[MNEMONIC_OFFSET[mn]] = attr
addr = int(hex_addr_raw, 16) has_mode = 'LOOPSTAT' in mnemonics or 'MODEIN' in mnemonics
tag = row[2].strip()
typ = row[4].strip()
num_str = row[5].strip()
dtype_raw = row[6].strip() if len(row) > 6 else 'float 32'
# Normalize dtype entries = []
if 'unsigned 16' in dtype_raw.lower() or 'signed 16' in dtype_raw.lower() or 'integer' in dtype_raw.lower(): for off, (suffix, count, dtype, access) in sorted(LOOP_LAYOUT.items()):
dtype = 'uint16' if off == MD_READ_OFFSET and has_mode:
continue # emitted below as ".MD" with a distinct write_addr
name = offset_attr.get(off, suffix)
entries.append({
'tag': f'{item_name}.{name}',
'addr': base + off,
'write_addr': base + off,
'count': count, 'type': dtype, 'access': access,
'description': f'LOOP #{n} {suffix}',
})
if has_mode:
# MODESTAT / Loop Status (0xBE) is read-only and encodes the combined mode
# the way Experion reads it: 0=RSP AUTO, 1=RSP MAN, 4=LSP AUTO, 5=LSP MAN
# (bit0 = Auto/Man, bit2 = LSP/RSP). The mode is *changed* by writing the
# dedicated RW registers below: .AutoManState (0xBA) and .RemLocSPState (0xBC).
entries.append({
'tag': f'{item_name}.MD',
'addr': base + MD_READ_OFFSET,
'write_addr': base + MD_READ_OFFSET,
'count': 1, 'type': 'uint16', 'access': 'R',
'description': f'LOOP #{n} Mode status (MODESTAT 0=RSP AUTO,1=RSP MAN,'
f'4=LSP AUTO,5=LSP MAN; write via .AutoManState / .RemLocSPState)',
})
return entries
def build_point_entry(item_name: str, desc: dict) -> dict | None:
"""Build the register entry for a signal-tag / variable / raw point."""
kind = desc['kind']
if kind == 'tag':
addr = SIGNAL_TAG_BASE + (desc['n'] - 1) * 2
return {'tag': item_name, 'addr': addr, 'write_addr': addr,
'count': 2, 'type': 'float32', 'access': 'R',
'description': f'Signal Tag #{desc["n"]}'}
if kind == 'var':
addr = MATH_VAR_BASE + (desc['n'] - 1) * 2
return {'tag': item_name, 'addr': addr, 'write_addr': addr,
'count': 2, 'type': 'float32', 'access': 'RW',
'description': f'Variable (MATH_VAR) #{desc["n"]}'}
if kind == 'raw':
if desc['table'] != 4:
print(f'{item_name}: non-named table {desc["table"]} (not holding '
f'registers) — skipping')
return None
return {'tag': item_name, 'addr': desc['addr'], 'write_addr': desc['addr'],
'count': desc['count'], 'type': desc['type'], 'access': 'R',
'description': f'Custom address 0x{desc["addr"]:04X}'}
return None
def resolve_one(s: str, controller_num: str) -> dict | None:
"""Resolve a single source-address string to {addr,count,type,access}, or None.
Used for FlexibleParameters, whose SourceAddress points at one specific
register (a loop parameter, a signal tag, a variable, or a raw address).
Types come from the physical HC900 layout, not the Experion-declared type.
"""
s = s.strip()
m = _RE_LOOP.match(s)
if m and m.group(1) == controller_num:
n, mn = int(m.group(3)), m.group(4).upper()
off = MNEMONIC_OFFSET.get(mn)
if off is None:
return None
if off in LOOP_LAYOUT:
_suf, count, dtype, access = LOOP_LAYOUT[off]
else: else:
dtype = 'float32' count, dtype, access = 2, 'float32', 'RW'
return {'addr': loop_base(n) + off, 'count': count, 'type': dtype, 'access': access}
# PID loop → map loop number m = _RE_TAG.match(s)
if typ == 'PID' and num_str.startswith('#'): if m and m.group(1) == controller_num:
n = int(num_str[1:]) return {'addr': SIGNAL_TAG_BASE + (int(m.group(2)) - 1) * 2,
lookup[('LOOP', n)] = {'tag': tag, 'dtype': dtype, 'addr': addr} 'count': 2, 'type': 'float32', 'access': 'R'}
m = _RE_VAR.match(s)
# Signal Tag if m and m.group(1) == controller_num:
if typ == 'Signal Tag' and num_str.isdigit(): return {'addr': MATH_VAR_BASE + (int(m.group(2)) - 1) * 2,
n = int(num_str) 'count': 2, 'type': 'float32', 'access': 'RW'}
lookup[('TAG', n)] = {'tag': tag, 'dtype': dtype, 'addr': addr} m = _RE_RAW.match(s)
if m and m.group(1) == controller_num and int(m.group(2)) == 4:
# Variable / Math Variable count, dtype = _raw_type(m.group(4))
if typ in ('Variable', 'Math Variable') and num_str.isdigit(): return {'addr': int(m.group(3), 16), 'count': count, 'type': dtype, 'access': 'R'}
n = int(num_str) return None
lookup[('MATH_VAR', n)] = {'tag': tag, 'dtype': dtype, 'addr': addr}
return lookup
# ─── Sinam parsing ─── def build_registers(sinam_path: Path, controller: str) -> list[dict]:
controller_num = controller.lstrip('Cc') # "C3" → "3"
def iter_sinam_c3_entries(sinam_path: Path, csv_lookup: dict) -> list[dict]:
"""Iterate over Sinam_Tag_all.xlsx rows with C3 source addresses.
Yields register entries dicts ready for JSON output.
"""
wb = openpyxl.load_workbook(sinam_path, read_only=True, data_only=True) wb = openpyxl.load_workbook(sinam_path, read_only=True, data_only=True)
ws = wb['Sheet1'] ws = wb['Sheet1']
registers = [] registers: list[dict] = []
seen_global = set() # {(tag_name, addr)} — global dedup seen_tags: set[str] = set()
seen_loops: set[int] = set()
for row in ws.iter_rows(min_row=2, values_only=True): for row in ws.iter_rows(min_row=2, values_only=True):
item_name = row[0] item_name = row[0]
cls = row[1]
if not item_name: if not item_name:
continue continue
item_name = str(item_name).strip() item_name = str(item_name).strip()
if not cls: cells = [str(v) for v in row if v is not None]
continue
cls = str(cls).strip()
pv = str(row[28]).strip() if row[28] else '' # FlexibleParameters: one extra named parameter per row, keyed on the parent
op = str(row[29]).strip() if row[29] else '' # point. col1=Class, col2=ParamName, source address sits among the cells
md = str(row[30]).strip() if row[30] else '' # (e.g. "C4 LOOP 22 AL1SP1", "C4 TAG 359 VALUE", "C4 MATH_VAR 114 VALUE").
cls = str(row[1]).strip() if len(row) > 1 and row[1] else ''
# Collect C3 source addresses if cls == 'FlexibleParameters':
sources = [] param_name = str(row[2]).strip() if len(row) > 2 and row[2] else ''
for field, val in [('PV', pv), ('OP', op), ('MD', md)]: if param_name:
if val and val.upper().startswith('C3'): res = next((r for r in (resolve_one(c, controller_num) for c in cells) if r), None)
sources.append((field, val)) if res:
tag = f'{item_name}.{param_name}'
if not sources: if tag not in seen_tags:
seen_tags.add(tag)
registers.append({
'tag': tag, 'addr': res['addr'], 'write_addr': res['addr'],
'count': res['count'], 'type': res['type'], 'access': res['access'],
'description': f'FlexibleParameter {param_name}',
})
continue continue
# Per-row dedup: track addresses to avoid OP/MD pointing to same reg desc = scan_point(cells, controller_num)
row_addrs = set() if desc is None:
continue
# Determine primary field: the main source that drops the suffix if desc['kind'] == 'loop':
# AnalogPoint: PV = main analog value, OP = loop status (supplement) if desc['n'] in seen_loops:
# StatusPoint: OP = digital state (main), MD = supplement (if different) continue # this loop already registered by another point
if cls == 'AnalogPoint': seen_loops.add(desc['n'])
primary_field = 'PV' new = build_loop_entries(item_name, desc['n'], desc['mnemonics'])
elif cls == 'StatusPoint':
primary_field = 'OP'
else: else:
primary_field = sources[0][0] if item_name in seen_tags:
# For each source address, create register entry
for field, addr_str in sources:
result = parse_indexed_addr(addr_str, csv_lookup)
if result is None:
continue continue
one = build_point_entry(item_name, desc)
new = [one] if one else []
addr = result['modbus_addr'] for e in new:
if e['tag'] in seen_tags:
# Per-row address dedup (OP/MD often point to same register)
if addr in row_addrs:
continue continue
row_addrs.add(addr) seen_tags.add(e['tag'])
registers.append(e)
# Tag naming:
# single source → ItemName
# multi-source, primary field → ItemName (no suffix)
# multi-source, other field → ItemName.field
if len(sources) == 1 or field == primary_field:
tag_name = item_name
else:
tag_name = f'{item_name}.{field}'
# Global dedup by (tag_name, addr)
key = (tag_name, addr)
if key in seen_global:
continue
seen_global.add(key)
entry = {
'tag': tag_name,
'addr': result['modbus_addr'],
'count': result['count'],
'type': result['type'],
'access': result['access'],
'description': result['description'],
}
registers.append(entry)
wb.close() wb.close()
registers.sort(key=lambda r: r['addr'])
return registers return registers
# ─── main ─── # ─────────────────────────── optional CSV cross-check ───────────────────────────
def build(sinam_path: Path, csv_path: Path | None, output_path: Path): def validate_against_csv(csv_path: Path) -> None:
csv_lookup = build_csv_lookup(csv_path) if csv_path else {} """Warn if the embedded LOOP_LAYOUT disagrees with an HC Designer export.
print(f'Building CSV lookup from {csv_path}...') Handles both report formats: the 8-column "Modbus Full Address Report"
print(f' LOOP entries: {sum(1 for k in csv_lookup if k[0] == "LOOP")}') (Fixed map, C3) and the 15-column "Modbus All Partitions Report" (Custom
print(f' TAG entries: {sum(1 for k in csv_lookup if k[0] == "TAG")}') map, C4) which inserts a Partition Name column.
print(f' MATH_VAR entries: {sum(1 for k in csv_lookup if k[0] == "MATH_VAR")}') """
rows = list(csv.reader(open(csv_path, encoding='utf-8-sig')))
custom = any(len(r) > 1 and r[0].strip() == 'Hex Addr' and 'Partition Name' in r
for r in rows)
tag_col = 3 if custom else 2
type_col = 5 if custom else 4
print(f'Parsing C3 entries from {sinam_path}...') def norm(s):
registers = iter_sinam_c3_entries(sinam_path, csv_lookup) return re.sub(r'[^a-z0-9]', '', s.lower())
registers.sort(key=lambda r: r['addr']) expected = {off: norm(suf) for off, (suf, *_ ) in LOOP_LAYOUT.items()}
# a few HC900-name aliases between the manual layout and HC Designer text
aliases = {0x3e: {'opwork', 'outputb'}, 0x38: {'wspb', 'spwork'},
0x0a: {'carbonpottemp', 'temp'}, 0x14: {'cycletime1', 'scancycletime'},
0x28: {'cycletime2', 'cycletimescan', 'scancycletimeb'},
0x52: {'localpctco', 'localpercentcarbmonoxide'},
0x58: {'onoffhysteresis', 'onoffouthysterisis'},
0x5a: {'carbpotdewpt', 'carbpotdewpt'}, 0x5c: {'stepmotortime', '3posstepmotortime'},
0x22: {'stepdeadband', '3posstepdeadband'},
0xbb: {'spselectstate', 'lspselectstate'},
0xbe: {'loopstatus', 'loopstatusregister'},
0xb9: {'antisootenable', 'antisootsplimenable'},
0xb7: {'fuzzyenable', 'enabledisablefuzzy'},
0xb8: {'demandtune', 'demandtunereq'}, 0x4e: {'manualreset'},
0x50: {'feedforwardgain'}, 0x08: {'pvb', 'pv'}}
mism = 0
for r in rows:
if len(r) <= type_col or not r[0].startswith('0x'):
continue
if r[type_col].strip() != 'PID':
continue
addr = int(r[0], 16)
off = (addr - 0x40) % 0x100 # offset within the loop block
if off not in LOOP_LAYOUT:
continue
suff = r[tag_col].split('.', 1)[1] if '.' in r[tag_col] else r[tag_col]
got = norm(suff)
ok = got == expected[off] or got in aliases.get(off, set()) \
or expected[off] in got or got in expected[off]
if not ok:
mism += 1
if mism <= 12:
print(f' ⚠ offset 0x{off:02X}: layout={expected[off]!r} csv={got!r} '
f'({r[0]})')
print(f' CSV cross-check: {"OK" if mism == 0 else f"{mism} mismatch(es)"} '
f'({"Custom" if custom else "Fixed"} map)')
# ─────────────────────────── main ───────────────────────────
def build(sinam_path: Path, controller: str, output_path: Path,
validate_csv: Path | None) -> None:
if validate_csv:
print(f'Validating embedded loop layout against {validate_csv}...')
validate_against_csv(validate_csv)
print(f'Parsing {controller} points from {sinam_path}...')
registers = build_registers(sinam_path, controller)
output = { output = {
'controller': 'C3', 'controller': controller,
'report_generated': '2026-06-03', 'report_generated': datetime.date.today().isoformat(),
'float_format': 'FP_B', 'float_format': 'FP_B',
'notes': 'Register map built from Sinam_Tag_all.xlsx SourceAddress columns (C3 only). ' 'notes': f'Register map built from {sinam_path.name} ({controller} only). '
'Tags are mapped from Experion indexed addresses to fixed Modbus addresses.', 'Experion point names are the keys; addresses are resolved from the '
'HC900 fixed loop layout (named addresses) and from explicit '
'non-named addresses in the export. .MD reads LoopStatus and writes '
'Auto/Manual State (write_addr).',
'register_count': len(registers), 'register_count': len(registers),
'registers': registers, 'registers': registers,
} }
output_path.write_text(json.dumps(output, indent=2, ensure_ascii=False),
encoding='utf-8')
with open(output_path, 'w', encoding='utf-8') as f: n_loops = len({r['description'].split()[1] for r in registers
json.dump(output, f, indent=2, ensure_ascii=False) if r['description'].startswith('LOOP')})
print(f'\n✓ Wrote {output_path}')
print(f'\n✓ Register map written to {output_path}') print(f' {len(registers)} registers ({n_loops} loops expanded)')
print(f' Total C3 registers from Sinam: {len(registers)}') by_access = {}
# Summary by type
by_field = {}
for r in registers: for r in registers:
suffix = r['tag'].split('.')[-1] if '.' in r['tag'] else '(main)' by_access[r['access']] = by_access.get(r['access'], 0) + 1
by_field[suffix] = by_field.get(suffix, 0) + 1 print(f' by access: {by_access}')
print(' By suffix:')
for s, c in sorted(by_field.items(), key=lambda x: -x[1]):
print(f' {c:4d} {s}')
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser( p = argparse.ArgumentParser(
description='Build register-map.json from Sinam_Tag_all.xlsx (C3 controller)') description='Build register-map.json from the Experion export (xlsx) alone')
parser.add_argument('--sinam', required=True, p.add_argument('--controller', required=True,
help='Path to Sinam_Tag_all.xlsx') help='Experion controller name / indexed-address prefix, e.g. C3')
parser.add_argument('--csv', p.add_argument('--sinam', required=True, help='Path to Sinam_Tag_all.xlsx')
default='docs/C3-All-Modbus-Map.csv', p.add_argument('-o', '--output', default='docs/register-map.json',
help='Path to C3-All-Modbus-Map.csv for type/tag lookup (optional)') help='Output JSON path')
parser.add_argument('-o', '--output', default='docs/register-map.json', p.add_argument('--validate-csv', default=None,
help='Output JSON path') help='Optional HC Designer CSV export to cross-check the embedded layout')
parser.add_argument('--no-csv', action='store_true', args = p.parse_args()
help='Skip CSV lookup (use defaults for type)')
args = parser.parse_args()
csv_path = None if args.no_csv else Path(args.csv) build(Path(args.sinam), args.controller.upper(), Path(args.output),
if csv_path and not csv_path.exists(): Path(args.validate_csv) if args.validate_csv else None)
print(f'Warning: CSV path {csv_path} not found, continuing without lookup')
csv_path = None
build(Path(args.sinam), csv_path, Path(args.output))