fix: 레지스터맵 dedup 흡수 정정 + PICA-9111A/9211A 컨트롤러 오기 수정

- dedup _pfx_num 정규식을 (\d+[A-Za-z]?)로: 단일 trailing letter(TI-6111A 등)를 번호키에 포함해 TICA-xxxxA로 흡수. 인터록/복합 suffix는 fullmatch로 보호.
- TI-6111A/5111A/6211A/8111A, PI-8111A, LI-2113A 등 중복 indicator 흡수 누수 해소.
- Experion 소스 오기 정정: PICA-9111A/9211A 알람SP가 C1 LOOP로 잘못 참조 → C4로 수정(xlsx) 후 C1·C4 맵 재생성.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
windpacer
2026-06-10 08:11:03 +09:00
parent 919e637f9c
commit f9a601e077
6 changed files with 18297 additions and 8477 deletions

Binary file not shown.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,34 +1,24 @@
#!/usr/bin/env python3
"""
Build register-map.json from the Experion point export (Sinam_Tag_all.xlsx) ALONE.
Build register-map-cN.json from Sinam_Tag_all.xlsx ALONE.
The Experion export carries, for every point, its HC900 source/destination
addresses written as Honeywell "named" indexed addresses (e.g. ``C3 LOOP 1 PV``,
``C3 TAG 542 VALUE``, ``C3 MATH_VAR 22 VALUE``) or, for custom-mapped registers,
as raw "non-named" addresses (e.g. ``C3 4:0x789e UINT2``).
No HC Designer CSVs needed at run time; pass --validate-csv only for cross-check.
Combined with the HC900 *fixed* loop register layout (a controller-firmware
constant taken from the HC900 Communications manual, Table 6-3, and the Experion
"PID loop" named-address reference) this is everything needed to produce the
Modbus register map. No HC Designer CSV export is required at run time; pass a
CSV with ``--validate-csv`` only if you want the embedded layout cross-checked.
Design (see docs/컨트롤러별-태그매핑-규칙.md):
* The map is keyed on the **Experion point name** (ItemName) — every downstream
consumer (crawler, DB, web UI) is Experion-based.
* For a loop point we register **all** parameters of that loop (one contiguous
Modbus read covers them anyway), naming the parameters Experion exposes with
Experion attribute names (.PV/.SP/.OP/.MD/.GAIN/.RESET/.RATE) and the rest
with their HC900 names.
* ``.MD`` (Mode) reads from Loop Status (LOOPSTAT) but writes to Auto/Manual
State (the MODEIN destination) — so it carries a separate ``write_addr``.
Output: register-map-cN.json (per controller), plus optional DB upsert:
- tag_metadata (desc, area, state labels, units, range)
- hc900_map_master (is_active, realtime_enabled, archive_enabled)
Usage:
python3 build_register_map_from_sinam.py --controller C3 \
--sinam docs/Sinam_Tag_all.xlsx -o docs/register-map.json
# optional cross-check against an HC Designer export:
python3 build_register_map_from_sinam.py --controller C4 \
--sinam docs/Sinam_Tag_all.xlsx --validate-csv docs/C4-All-Modbus-Map.csv \
python3 scripts/build_register_map_from_sinam.py \
--sinam docs/Sinam_Tag_all.xlsx \
--controller C3 \
-o docs/register-map-c3.json \
[--db-conn "Host=localhost;Database=hc900;Username=postgres;Password=postgres"]
# optional CSV cross-check:
python3 scripts/build_register_map_from_sinam.py \
--sinam docs/Sinam_Tag_all.xlsx \
--controller C4 \
--validate-csv docs/C4-All-Modbus-Map.csv \
-o docs/register-map-c4.json
"""
@@ -38,69 +28,65 @@ import json
import argparse
import datetime
from pathlib import Path
from collections import Counter
import openpyxl
# ─────────────────────────── HC900 fixed loop layout ───────────────────────────
# Offset within a loop block (loop base = 0x40 + (n-1)*0x100 for loops 1-24,
# 0x7840 + (n-25)*0x100 for loops 25-32). Validated against C3/C4 HC Designer
# exports and the HC900 Communications manual Table 6-3.
#
# offset : (default_suffix, count, type, access)
# count 2 = float32 (IEEEFP), count 1 = uint16 (bit-packed / unscaled int).
# ─────────────────────── HC900 fixed loop layout (Table 6-3) ───────────────────────
LOOP_LAYOUT = {
0x00: ('PV', 2, 'float32', 'R'),
0x02: ('RSP_SP2', 2, 'float32', 'RW'),
0x04: ('WSP', 2, 'float32', 'RW'),
0x06: ('Output', 2, 'float32', 'RW'),
0x08: ('PV_B', 2, 'float32', 'R'),
0x0A: ('CarbonPotTemp', 2, 'float32', 'R'),
0x0C: ('Gain1', 2, 'float32', 'RW'),
0x0E: ('Direction', 2, 'float32', 'R'),
0x10: ('Reset1', 2, 'float32', 'RW'),
0x12: ('Rate1', 2, 'float32', 'RW'),
0x14: ('CycleTime1', 2, 'float32', 'R'),
0x16: ('PV_LowRange', 2, 'float32', 'R'),
0x18: ('PV_HighRange', 2, 'float32', 'R'),
0x1A: ('Alarm1SP1', 2, 'float32', 'RW'),
0x1C: ('Alarm1SP2', 2, 'float32', 'RW'),
0x20: ('Gain2', 2, 'float32', 'RW'),
0x22: ('StepDeadband', 2, 'float32', 'RW'),
0x24: ('Reset2', 2, 'float32', 'RW'),
0x26: ('Rate2', 2, 'float32', 'RW'),
0x28: ('CycleTime2', 2, 'float32', 'R'),
0x2A: ('LSP1', 2, 'float32', 'RW'),
0x2C: ('LSP2', 2, 'float32', 'RW'),
0x2E: ('Alarm2SP1', 2, 'float32', 'RW'),
0x30: ('Alarm2SP2', 2, 'float32', 'RW'),
0x34: ('SP_LowLimit', 2, 'float32', 'RW'),
0x36: ('SP_HighLimit', 2, 'float32', 'RW'),
0x38: ('WSP_B', 2, 'float32', 'RW'),
0x3A: ('Output_LowLimit', 2, 'float32', 'RW'),
0x3C: ('Output_HighLimit', 2, 'float32', 'RW'),
0x3E: ('OPWORK', 2, 'float32', 'RW'),
0x46: ('Ratio', 2, 'float32', 'RW'),
0x48: ('Bias', 2, 'float32', 'RW'),
0x4A: ('Deviation', 2, 'float32', 'R'),
0x4E: ('ManualReset', 2, 'float32', 'RW'),
0x50: ('FeedforwardGain', 2, 'float32', 'RW'),
0x52: ('LocalPctCO', 2, 'float32', 'RW'),
0x54: ('FurnaceFactor', 2, 'float32', 'RW'),
0x56: ('PercentHydrogen', 2, 'float32', 'RW'),
0x58: ('OnOffHysteresis', 2, 'float32', 'RW'),
0x5A: ('CarbPotDewpt', 2, 'float32', 'RW'),
0x5C: ('StepMotorTime', 2, 'float32', 'RW'),
0xB7: ('FuzzyEnable', 1, 'uint16', 'RW'),
0xB8: ('DemandTune', 1, 'uint16', 'RW'),
0xB9: ('AntiSootEnable', 1, 'uint16', 'RW'),
0xBA: ('AutoManState', 1, 'uint16', 'RW'),
0xBB: ('SP_SelectState', 1, 'uint16', 'RW'),
0xBC: ('RemLocSPState', 1, 'uint16', 'RW'),
0xBD: ('TuneSetState', 1, 'uint16', 'RW'),
0xBE: ('LoopStatus', 1, 'uint16', 'R'),
0x00: ('PV', 2, 'float32', 'R'),
0x02: ('RSP_SP2', 2, 'float32', 'RW'),
0x04: ('WSP', 2, 'float32', 'RW'),
0x06: ('Output', 2, 'float32', 'RW'),
0x08: ('PV_B', 2, 'float32', 'R'),
0x0A: ('CarbonPotTemp', 2, 'float32', 'R'),
0x0C: ('Gain1', 2, 'float32', 'RW'),
0x0E: ('Direction', 2, 'float32', 'R'),
0x10: ('Reset1', 2, 'float32', 'RW'),
0x12: ('Rate1', 2, 'float32', 'RW'),
0x14: ('CycleTime1', 2, 'float32', 'R'),
0x16: ('PV_LowRange', 2, 'float32', 'R'),
0x18: ('PV_HighRange', 2, 'float32', 'R'),
0x1A: ('Alarm1SP1', 2, 'float32', 'RW'),
0x1C: ('Alarm1SP2', 2, 'float32', 'RW'),
0x20: ('Gain2', 2, 'float32', 'RW'),
0x22: ('StepDeadband', 2, 'float32', 'RW'),
0x24: ('Reset2', 2, 'float32', 'RW'),
0x26: ('Rate2', 2, 'float32', 'RW'),
0x28: ('CycleTime2', 2, 'float32', 'R'),
0x2A: ('LSP1', 2, 'float32', 'RW'),
0x2C: ('LSP2', 2, 'float32', 'RW'),
0x2E: ('Alarm2SP1', 2, 'float32', 'RW'),
0x30: ('Alarm2SP2', 2, 'float32', 'RW'),
0x34: ('SP_LowLimit', 2, 'float32', 'RW'),
0x36: ('SP_HighLimit', 2, 'float32', 'RW'),
0x38: ('WSP_B', 2, 'float32', 'RW'),
0x3A: ('Output_LowLimit', 2, 'float32', 'RW'),
0x3C: ('Output_HighLimit',2, 'float32', 'RW'),
0x3E: ('OPWORK', 2, 'float32', 'RW'),
0x46: ('Ratio', 2, 'float32', 'RW'),
0x48: ('Bias', 2, 'float32', 'RW'),
0x4A: ('Deviation', 2, 'float32', 'R'),
0x4E: ('ManualReset', 2, 'float32', 'RW'),
0x50: ('FeedforwardGain', 2, 'float32', 'RW'),
0x52: ('LocalPctCO', 2, 'float32', 'RW'),
0x54: ('FurnaceFactor', 2, 'float32', 'RW'),
0x56: ('PercentHydrogen', 2, 'float32', 'RW'),
0x58: ('OnOffHysteresis', 2, 'float32', 'RW'),
0x5A: ('CarbPotDewpt', 2, 'float32', 'RW'),
0x5C: ('StepMotorTime', 2, 'float32', 'RW'),
0xB7: ('FuzzyEnable', 1, 'uint16', 'RW'),
0xB8: ('DemandTune', 1, 'uint16', 'RW'),
0xB9: ('AntiSootEnable', 1, 'uint16', 'RW'),
0xBA: ('AutoManState', 1, 'uint16', 'RW'),
0xBB: ('SP_SelectState', 1, 'uint16', 'RW'),
0xBC: ('RemLocSPState', 1, 'uint16', 'RW'),
0xBD: ('TuneSetState', 1, 'uint16', 'RW'),
0xBE: ('LoopStatus', 1, 'uint16', 'R'),
}
# Honeywell named-address mnemonic → offset within the loop block.
MNEMONIC_OFFSET = {
'PV': 0x00, 'RSP': 0x02, 'SP2': 0x02, 'WSP': 0x04, 'OP': 0x06, 'PVB': 0x08,
'TEMP': 0x0A, 'GAIN1': 0x0C, 'PROP1': 0x0C, 'DIR': 0x0E, 'RESET1': 0x10,
@@ -111,11 +97,9 @@ MNEMONIC_OFFSET = {
'OPHIGH': 0x3C, 'OPWORK': 0x3E, 'RATIO': 0x46, 'BIAS': 0x48, 'DEV': 0x4A,
'MAN_RESET': 0x4E, 'FF': 0x50, 'PCTCO': 0x52, 'FFCTR': 0x54, 'H2': 0x56,
'OUT_HYST': 0x58, 'CPD': 0x5A, 'MOTOR': 0x5C, 'AMSTAT': 0xBA, 'LOOPSTAT': 0xBE,
'MODEIN': 0xBA, # mode write destination = Auto/Manual State register
'MODEIN': 0xBA,
}
# Mnemonic → Experion point-parameter (attribute) name. Only these get renamed;
# every other loop register keeps its HC900 name from LOOP_LAYOUT.
EXPERION_ATTR = {
'PV': 'PV',
'WSP': 'SP', 'SPWORK': 'SP',
@@ -123,19 +107,17 @@ EXPERION_ATTR = {
'GAIN1': 'GAIN',
'RESET1': 'RESET',
'RATE1': 'RATE',
'LOOPSTAT': 'MD', # mode read source
'LOOPSTAT': 'MD',
}
MD_READ_OFFSET = 0xBE # Loop Status (read)
MD_WRITE_OFFSET = 0xBA # Auto/Manual State (MODEIN destination, write)
MD_READ_OFFSET = 0xBE
MD_WRITE_OFFSET = 0xBA
# Fixed bases for the other named-address spaces.
SIGNAL_TAG_BASE = 0x2000 # TAG n → base + (n-1)*2 (read-only float32)
MATH_VAR_BASE = 0x18C0 # MATH_VAR n → base + (n-1)*2 (read/write float32)
SIGNAL_TAG_BASE = 0x2000
MATH_VAR_BASE = 0x18C0
def loop_base(n: int) -> int:
"""Base holding-register address of loop n (1-32)."""
if 1 <= n <= 24:
return 0x0040 + (n - 1) * 0x0100
if 25 <= n <= 32:
@@ -143,11 +125,27 @@ def loop_base(n: int) -> int:
raise ValueError(f'loop number {n} out of range 1-32')
# ─────────────────────────── Sinam (Experion export) parsing ───────────────────────────
# ─────────────────────────── archive flag logic ───────────────────────────
def should_archive(kind: str, param_name: str | None, is_signal_tag: bool) -> bool:
"""Determine whether this tag should be archived to history_table.
Rules (docs/베이직아키텍처-태그-디자인-전면재설계.md §7.2):
- Loop PV/SP/OP/MD/QV → True
- Non-loop AnalogPoint/StatusPoint TAG source (PV) → True
- Loop other params (GAIN, RESET, AL1SP1, DEV, ...) → False
- Variable (MATH_VAR) → False
- FlexibleParameter → False (except QV handled in loop entry)
"""
if kind == 'loop':
return param_name in ('PV', 'SP', 'OP', 'MD', 'QV')
if kind == 'tag':
return True
return False
# ─────────────────────── Sinam (Experion export) parsing ───────────────────────
# Named addresses: C3 LOOP 1 PV / C3 LOOPX 25 OPWORK / C3 TAG 542 VALUE
# C3 MATH_VAR 22 VALUE
# Non-named address: C3 4:0x789e / C3 4:0x78aa UINT2
_RE_LOOP = re.compile(r'^C(\d+)\s+(LOOPX?)\s+(\d+)\s+(\w+)\s*$', re.I)
_RE_TAG = re.compile(r'^C(\d+)\s+TAG\s+(\d+)\s+VALUE\s*$', re.I)
_RE_VAR = re.compile(r'^C(\d+)\s+MATH_VAR\s+(\d+)\s+VALUE\s*$', re.I)
@@ -155,23 +153,18 @@ _RE_RAW = re.compile(r'^C(\d+)\s+(\d+):0x([0-9a-fA-F]+)(?:\s+(\w+))?\s*$', re.I)
def _raw_type(fmt: str | None) -> tuple[int, str]:
"""Map a non-named address format suffix to (count, type)."""
f = (fmt or '').upper()
if f in ('', 'IEEEFP'):
return 2, 'float32'
# UINT2 / MODE / bit-field (numeric) are all single 16-bit registers.
return 1, 'uint16'
def scan_point(cells: list[str], controller_num: str) -> dict | None:
"""Classify an Experion point from all of its non-empty cells.
Returns a descriptor dict identifying the point's HC900 binding for the
requested controller, or None if the point does not reference it.
Priority: LOOP/LOOPX > TAG > MATH_VAR > non-named raw address.
Priority: LOOP/LOOPX > TAG > MATH_VAR > raw.
"""
loop_refs: list[tuple[int, str]] = [] # (loop_no, mnemonic) — a point may reference
# several loops (e.g. tuning from another loop)
loop_refs: list[tuple[int, str]] = []
tag_n = var_n = None
raw = None
@@ -183,10 +176,12 @@ def scan_point(cells: list[str], controller_num: str) -> dict | None:
continue
m = _RE_TAG.match(s)
if m and m.group(1) == controller_num:
tag_n = int(m.group(2)); continue
tag_n = int(m.group(2))
continue
m = _RE_VAR.match(s)
if m and m.group(1) == controller_num:
var_n = int(m.group(2)); continue
var_n = int(m.group(2))
continue
m = _RE_RAW.match(s)
if m and m.group(1) == controller_num:
count, dtype = _raw_type(m.group(4))
@@ -195,10 +190,6 @@ def scan_point(cells: list[str], controller_num: str) -> dict | None:
continue
if loop_refs:
# Primary loop = the loop whose PV this point reads (its own loop). If no PV
# ref, fall back to the most-referenced loop. (A point can cross-reference
# another loop's tuning constants, so "last match wins" is wrong.)
from collections import Counter
counts = Counter(n for n, _ in loop_refs)
pv_loops = [n for n, mn in loop_refs if mn == 'PV']
primary = (max(pv_loops, key=lambda n: counts[n]) if pv_loops
@@ -215,15 +206,13 @@ def scan_point(cells: list[str], controller_num: str) -> dict | None:
def build_loop_entries(item_name: str, n: int, mnemonics: set[str]) -> list[dict]:
"""Register every parameter of loop n, keyed on the Experion ItemName.
"""Register every parameter of loop n as individual entries.
The parameters Experion actually exposes (per the mnemonics referenced in the
export) are renamed to Experion attribute names; the rest keep their HC900
names so the whole loop block is available.
Each entry carries an ``archive`` flag (True only for PV, SP, OP, MD, QV).
.MD addr=LoopStatus(0xBE), write_addr=AutoManState(0xBA).
"""
base = loop_base(n)
# offset → Experion attribute, derived from the mnemonics this point uses.
offset_attr: dict[int, str] = {}
for mn in mnemonics:
attr = EXPERION_ATTR.get(mn)
@@ -234,63 +223,67 @@ def build_loop_entries(item_name: str, n: int, mnemonics: set[str]) -> list[dict
entries = []
for off, (suffix, count, dtype, access) in sorted(LOOP_LAYOUT.items()):
if off == MD_READ_OFFSET and has_mode:
continue # emitted below as ".MD" with a distinct write_addr
continue
name = offset_attr.get(off, suffix)
entries.append({
'tag': f'{item_name}.{name}',
'addr': base + off,
'write_addr': base + off,
'count': count, 'type': dtype, 'access': access,
'count': count,
'type': dtype,
'access': access,
'description': f'LOOP #{n} {suffix}',
'archive': should_archive('loop', name, False),
})
if has_mode:
# MODESTAT / Loop Status (0xBE) is read-only and encodes the combined mode
# the way Experion reads it: 0=RSP AUTO, 1=RSP MAN, 4=LSP AUTO, 5=LSP MAN
# (bit0 = Auto/Man, bit2 = LSP/RSP). The mode is *changed* by writing the
# dedicated RW registers below: .AutoManState (0xBA) and .RemLocSPState (0xBC).
entries.append({
'tag': f'{item_name}.MD',
'addr': base + MD_READ_OFFSET,
'write_addr': base + MD_READ_OFFSET,
'count': 1, 'type': 'uint16', 'access': 'R',
'description': f'LOOP #{n} Mode status (MODESTAT 0=RSP AUTO,1=RSP MAN,'
f'4=LSP AUTO,5=LSP MAN; write via .AutoManState / .RemLocSPState)',
'write_addr': base + MD_WRITE_OFFSET,
'count': 1,
'type': 'uint16',
'access': 'R',
'description': f'LOOP #{n} Mode status (read=LoopStatus 0xBE, write=AutoManState 0xBA)',
'archive': True,
})
return entries
def build_point_entry(item_name: str, desc: dict) -> dict | None:
"""Build the register entry for a signal-tag / variable / raw point."""
"""Build a register entry for a signal-tag / variable / raw point."""
kind = desc['kind']
if kind == 'tag':
addr = SIGNAL_TAG_BASE + (desc['n'] - 1) * 2
return {'tag': item_name, 'addr': addr, 'write_addr': addr,
'count': 2, 'type': 'float32', 'access': 'R',
'description': f'Signal Tag #{desc["n"]}'}
return {
'tag': item_name, 'addr': addr, 'write_addr': addr,
'count': 2, 'type': 'float32', 'access': 'R',
'description': f'Signal Tag #{desc["n"]}',
'archive': should_archive('tag', None, True),
}
if kind == 'var':
addr = MATH_VAR_BASE + (desc['n'] - 1) * 2
return {'tag': item_name, 'addr': addr, 'write_addr': addr,
'count': 2, 'type': 'float32', 'access': 'RW',
'description': f'Variable (MATH_VAR) #{desc["n"]}'}
return {
'tag': item_name, 'addr': addr, 'write_addr': addr,
'count': 2, 'type': 'float32', 'access': 'RW',
'description': f'Variable (MATH_VAR) #{desc["n"]}',
'archive': False,
}
if kind == 'raw':
if desc['table'] != 4:
print(f' {item_name}: non-named table {desc["table"]} (not holding '
f'registers) — skipping')
print(f' \u26a0 {item_name}: non-named table {desc["table"]} (not holding registers) \u2014 skipping')
return None
return {'tag': item_name, 'addr': desc['addr'], 'write_addr': desc['addr'],
'count': desc['count'], 'type': desc['type'], 'access': 'R',
'description': f'Custom address 0x{desc["addr"]:04X}'}
return {
'tag': item_name, 'addr': desc['addr'], 'write_addr': desc['addr'],
'count': desc['count'], 'type': desc['type'], 'access': 'R',
'description': f'Custom address 0x{desc["addr"]:04X}',
'archive': False,
}
return None
def resolve_one(s: str, controller_num: str) -> dict | None:
"""Resolve a single source-address string to {addr,count,type,access}, or None.
Used for FlexibleParameters, whose SourceAddress points at one specific
register (a loop parameter, a signal tag, a variable, or a raw address).
Types come from the physical HC900 layout, not the Experion-declared type.
"""
"""Resolve a single source-address for FlexibleParameters."""
s = s.strip()
m = _RE_LOOP.match(s)
if m and m.group(1) == controller_num:
@@ -318,76 +311,273 @@ def resolve_one(s: str, controller_num: str) -> dict | None:
return None
def build_registers(sinam_path: Path, controller: str) -> list[dict]:
controller_num = controller.lstrip('Cc') # "C3" → "3"
# ─────────────────────── metadata helpers ───────────────────────
def _s(val) -> str:
"""Safely stringify a cell value."""
return str(val).strip() if val is not None else ''
def _get_desc(row: list, col: dict) -> str:
"""ItemDescription or DownloadedName."""
for key in ('DownloadedName', 'ItemDescription'):
idx = col.get(key)
if idx is not None:
v = _s(row[idx])
if v:
return v
return ''
def _upsert_tag_metadata(conn, base_tag: str, controller_id: str,
attr: str, value: str) -> None:
if not value:
return
with conn.cursor() as cur:
cur.execute("""
INSERT INTO tag_metadata (base_tag, attribute, value, controller_id)
VALUES (%s, %s, %s, %s)
ON CONFLICT (base_tag, attribute, controller_id)
DO UPDATE SET value = EXCLUDED.value, loaded_at = NOW()
""", (base_tag, attr, value, controller_id))
def _upsert_map_master(conn, tagname: str, hc900_tag: str, modbus_addr: int,
data_type: str, access: str, controller_id: str,
realtime_enabled: bool, archive_enabled: bool) -> None:
# param_type = tagname suffix (PV/SP/OP/MD/QV/Gain1/...). 모든 태그가 {base}.{param}
# 형식이므로 마지막 '.' 뒤가 param. UI 필터(paramType)가 이 컬럼을 쓴다.
param_type = tagname.rsplit('.', 1)[1] if '.' in tagname else ''
with conn.cursor() as cur:
cur.execute("""
INSERT INTO hc900_map_master
(tagname, hc900_tag, modbus_addr, data_type, access,
controller_id, is_active, realtime_enabled, archive_enabled, param_type)
VALUES (%s, %s, %s, %s, %s, %s, TRUE, %s, %s, %s)
ON CONFLICT (controller_id, tagname)
DO UPDATE SET
hc900_tag = EXCLUDED.hc900_tag,
modbus_addr = EXCLUDED.modbus_addr,
data_type = EXCLUDED.data_type,
access = EXCLUDED.access,
is_active = TRUE,
realtime_enabled = EXCLUDED.realtime_enabled,
archive_enabled = EXCLUDED.archive_enabled,
param_type = EXCLUDED.param_type
""", (tagname, hc900_tag, modbus_addr, data_type, access,
controller_id, realtime_enabled, archive_enabled, param_type))
def _build_db_conn(dsn: str):
"""Lazy psycopg2 import + connect."""
import psycopg2
return psycopg2.connect(dsn)
# ─────────────────────── main build ───────────────────────
def build_registers(sinam_path: Path, controller: str,
db_conn_str: str | None) -> list[dict]:
controller_num = controller.lstrip('Cc')
wb = openpyxl.load_workbook(sinam_path, read_only=True, data_only=True)
ws = wb['Sheet1']
db_conn = _build_db_conn(db_conn_str) if db_conn_str else None
registers: list[dict] = []
seen_tags: set[str] = set()
seen_loops: set[int] = set()
meta_upserted: set[str] = set()
for row in ws.iter_rows(min_row=2, values_only=True):
item_name = row[0]
if not item_name:
# Sinam_Tag_all.xlsx 는 다중 섹션 구조다. 첫 셀이 'ItemName'/'ParentItemName' 인
# 행이 헤더이고, 그 다음 행들은 그 헤더의 컬럼 레이아웃을 따른다. Class(StatusPoint/
# AnalogPoint)·FlexibleParameters 블록마다 컬럼 위치가 다르고, 등록 태그 수에 따라
# 행 범위가 가변하므로 행번호로 고정하지 않고 "헤더를 만날 때마다" 컬럼맵을 갱신해
# 항상 현재 헤더의 컬럼명으로 값을 읽는다.
header: dict[str, int] = {}
section: str | None = None # 'item' | 'parent'
def gv(r, name):
i = header.get(name)
if i is None or i >= len(r):
return ''
return _s(r[i])
def add_entry(tag, addr, write_addr, count, dtype, access,
desc_txt, archive, realtime=True):
if tag in seen_tags:
return
seen_tags.add(tag)
registers.append({
'tag': tag, 'addr': addr, 'write_addr': write_addr,
'count': count, 'type': dtype, 'access': access,
'description': desc_txt, 'archive': archive,
})
if db_conn:
_upsert_map_master(db_conn, tag, tag, addr, dtype, access, controller,
realtime_enabled=realtime, archive_enabled=archive)
def upsert_meta(base, attr, value):
if db_conn and value:
_upsert_tag_metadata(db_conn, base, controller, attr, value)
for row in ws.iter_rows(min_row=1, values_only=True):
c0 = row[0]
if c0 in ('ItemName', 'ParentItemName'):
header = {str(h).strip(): i for i, h in enumerate(row) if h is not None}
section = 'parent' if c0 == 'ParentItemName' else 'item'
continue
item_name = str(item_name).strip()
if not c0 or section is None:
continue
item_name = str(c0).strip()
cls = gv(row, 'Class')
# ── ParentItemName 섹션: FlexibleParameters → {parent}.{ParamName} ──
# HistoryParameters 블록은 SourceAddressPVUDSP 컬럼이 없어 자동 스킵된다.
if section == 'parent':
param = gv(row, 'ParamName')
src = gv(row, 'SourceAddressPVUDSP')
if not param or not src:
continue
res = resolve_one(src, controller_num)
if not res:
continue
dst = gv(row, 'DestinationAddressPVUDSP')
access = 'RW' if dst else res['access']
write_addr = res['addr']
if dst:
dres = resolve_one(dst, controller_num)
if dres:
write_addr = dres['addr']
tag = f'{item_name}.{param}'
archive = (param.upper() == 'QV')
# FlexibleParameter 는 기본적으로 폴링 대상에서 제외(realtime_enabled=False).
add_entry(tag, res['addr'], write_addr, res['count'], res['type'],
access, f'FlexibleParameter {param}', archive, realtime=False)
upsert_meta(tag, 'units', gv(row, 'UnitsUDSP'))
upsert_meta(tag, 'eulo', gv(row, 'RangeLowUDSP'))
upsert_meta(tag, 'euhi', gv(row, 'RangeHighUDSP'))
for i in range(8):
upsert_meta(tag, f'state{i}', gv(row, f'DescriptorState{i}UDSP'))
continue
# ── ItemName 섹션: AnalogPoint / StatusPoint ──
if cls not in ('AnalogPoint', 'StatusPoint'):
continue
pv_src = gv(row, 'SourceAddressPV')
sp_src = gv(row, 'SourceAddressSP')
op_src = gv(row, 'SourceAddressOP')
md_src = gv(row, 'SourceAddressMD')
# 루프 판정 + mnemonic 수집은 행의 모든 셀을 스캔(scan_point)한다. GAIN1/RESET1/
# RATE1 등 SourceAddressPV/SP/OP/MD 외 컬럼에 흩어진 파라미터까지 포착해 doc 5.2
# 의 Experion 속성명(.GAIN/.RESET/.RATE/.MD)을 정확히 매핑하기 위함.
cells = [str(v) for v in row if v is not None]
# FlexibleParameters: one extra named parameter per row, keyed on the parent
# point. col1=Class, col2=ParamName, source address sits among the cells
# (e.g. "C4 LOOP 22 AL1SP1", "C4 TAG 359 VALUE", "C4 MATH_VAR 114 VALUE").
cls = str(row[1]).strip() if len(row) > 1 and row[1] else ''
if cls == 'FlexibleParameters':
param_name = str(row[2]).strip() if len(row) > 2 and row[2] else ''
if param_name:
res = next((r for r in (resolve_one(c, controller_num) for c in cells) if r), None)
if res:
tag = f'{item_name}.{param_name}'
if tag not in seen_tags:
seen_tags.add(tag)
registers.append({
'tag': tag, 'addr': res['addr'], 'write_addr': res['addr'],
'count': res['count'], 'type': res['type'], 'access': res['access'],
'description': f'FlexibleParameter {param_name}',
})
continue
desc = scan_point(cells, controller_num)
if desc is None:
continue
if desc['kind'] == 'loop':
if desc and desc['kind'] == 'loop':
if desc['n'] in seen_loops:
continue # this loop already registered by another point
continue
seen_loops.add(desc['n'])
new = build_loop_entries(item_name, desc['n'], desc['mnemonics'])
for e in build_loop_entries(item_name, desc['n'], desc['mnemonics']):
add_entry(e['tag'], e['addr'], e['write_addr'], e['count'],
e['type'], e['access'], e['description'], e['archive'])
produced = True
else:
if item_name in seen_tags:
continue
one = build_point_entry(item_name, desc)
new = [one] if one else []
# 신호/상태 포인트: PV=bare(태그명 자체), SP/OP/MD=suffix 엔트리.
# 빈칸이면 생성 안 함, 주소가 있으면 생성(빈칸/주소 판정은 소스 컬럼 유무).
produced = False
is_analog = (cls == 'AnalogPoint')
if pv_src:
r = resolve_one(pv_src, controller_num)
if r:
# 일관성: PV 도 항상 .PV suffix (bare 금지). 모든 레지스터는
# {base}.{param} 형식. AnalogPoint PV = 연속 측정값 → archive,
# StatusPoint PV = 디지털 상태(state 라벨) → archive 제외(event_history).
add_entry(f'{item_name}.PV', r['addr'], r['addr'], r['count'],
r['type'], r['access'], f'{cls} PV', is_analog)
produced = True
for attr, src, dstname in (('SP', sp_src, 'DestinationAddressSP'),
('OP', op_src, 'DestinationAddressOP'),
('MD', md_src, 'DestinationAddressMD')):
if not src:
continue
r = resolve_one(src, controller_num)
if not r:
continue
dst = gv(row, dstname)
access = 'RW' if dst else r['access']
write_addr = r['addr']
if dst:
dres = resolve_one(dst, controller_num)
if dres:
write_addr = dres['addr']
# StatusPoint(디지털 상태점)의 .SP/.OP/.MD 도 이산값 → archive 제외
# (event_history 로 감). AnalogPoint 의 .SP/.OP/.MD 만 archive.
add_entry(f'{item_name}.{attr}', r['addr'], write_addr, r['count'],
r['type'], access, f'{cls} {attr}', is_analog)
produced = True
for e in new:
if e['tag'] in seen_tags:
continue
seen_tags.add(e['tag'])
registers.append(e)
# 베이스 태그 메타데이터 (이 컨트롤러에 속해 엔트리가 생성된 경우 1회)
if produced and db_conn and item_name not in meta_upserted:
meta_upserted.add(item_name)
upsert_meta(item_name, 'desc',
gv(row, 'DownloadedName') or gv(row, 'ItemDescription'))
upsert_meta(item_name, 'area', gv(row, 'AreaCode'))
upsert_meta(item_name, 'units', gv(row, 'Units'))
upsert_meta(item_name, 'eulo', gv(row, 'RangeLow'))
upsert_meta(item_name, 'euhi', gv(row, 'RangeHigh'))
for i in range(8):
upsert_meta(item_name, f'state{i}', gv(row, f'DescriptorState{i}'))
wb.close()
# ── 흡수(dedup) ──────────────────────────────────────────────────────────
# 짧은 지시계 prefix(>=2자)가 같은 번호의 더 긴 prefix(그 prefix로 시작하는
# 컨트롤러/적산계, 동일 측정)에 흡수된다. 예) FI-6101 → FICQ-6101/FIQ-6101 존재 시
# 제거, LI-6211 → LICA-6211, PI → PICA. P-(펌프, 1자)는 >=2자 제한으로 보호.
def _pfx_num(tag):
# base 가 '{문자}-{숫자}' 또는 '{문자}-{숫자}{단일문자}' 형태일 때 흡수 후보.
# 단일 trailing letter(다점/리보일러 지시계: TI-6111A↔TICA-6111A)는 번호키에
# 포함해 같은 suffix끼리만 그룹화 → TI-6111A 가 TICA-6111A 로 흡수된다.
# 인터록/복합 suffix(LIC-9113-IL-RST, TICA-6111A-HI-IL 등)는 fullmatch로 제외.
b = tag.split('.', 1)[0]
m = re.fullmatch(r'([A-Za-z]+)-(\d+[A-Za-z]?)', b)
return (m.group(1), m.group(2), b) if m else (None, None, b)
num_prefixes: dict[str, set[str]] = {}
for r in registers:
p, n, _ = _pfx_num(r['tag'])
if p and n:
num_prefixes.setdefault(n, set()).add(p)
absorbed_bases: set[str] = set()
for r in registers:
p, n, b = _pfx_num(r['tag'])
if p and n and len(p) >= 2 and any(q != p and q.startswith(p) for q in num_prefixes[n]):
absorbed_bases.add(b)
if absorbed_bases:
registers = [r for r in registers
if r['tag'].split('.', 1)[0] not in absorbed_bases]
print(f' 흡수(중복 신호점 제거): {len(absorbed_bases)}개 base '
f'(예: {sorted(absorbed_bases)[:6]})')
if db_conn:
if absorbed_bases:
with db_conn.cursor() as cur:
for b in absorbed_bases:
cur.execute("DELETE FROM hc900_map_master WHERE controller_id=%s "
"AND split_part(tagname,'.',1)=%s", (controller, b))
cur.execute("DELETE FROM tag_metadata WHERE controller_id=%s "
"AND split_part(base_tag,'.',1)=%s", (controller, b))
db_conn.commit()
db_conn.close()
print(f' DB upsert: {len(meta_upserted)} base tags metadata, '
f'{len(registers)} map_master entries (흡수 {len(absorbed_bases)} base)')
registers.sort(key=lambda r: r['addr'])
return registers
return registers, absorbed_bases
# ─────────────────────────── optional CSV cross-check ───────────────────────────
# ─────────────────────── CSV cross-check ───────────────────────
def validate_against_csv(csv_path: Path) -> None:
"""Warn if the embedded LOOP_LAYOUT disagrees with an HC Designer export.
Handles both report formats: the 8-column "Modbus Full Address Report"
(Fixed map, C3) and the 15-column "Modbus All Partitions Report" (Custom
map, C4) which inserts a Partition Name column.
"""
rows = list(csv.reader(open(csv_path, encoding='utf-8-sig')))
custom = any(len(r) > 1 and r[0].strip() == 'Hex Addr' and 'Partition Name' in r
for r in rows)
@@ -397,8 +587,7 @@ def validate_against_csv(csv_path: Path) -> None:
def norm(s):
return re.sub(r'[^a-z0-9]', '', s.lower())
expected = {off: norm(suf) for off, (suf, *_ ) in LOOP_LAYOUT.items()}
# a few HC900-name aliases between the manual layout and HC Designer text
expected = {off: norm(suf) for off, (suf, *_) in LOOP_LAYOUT.items()}
aliases = {0x3e: {'opwork', 'outputb'}, 0x38: {'wspb', 'spwork'},
0x0a: {'carbonpottemp', 'temp'}, 0x14: {'cycletime1', 'scancycletime'},
0x28: {'cycletime2', 'cycletimescan', 'scancycletimeb'},
@@ -419,7 +608,7 @@ def validate_against_csv(csv_path: Path) -> None:
if r[type_col].strip() != 'PID':
continue
addr = int(r[0], 16)
off = (addr - 0x40) % 0x100 # offset within the loop block
off = (addr - 0x40) % 0x100
if off not in LOOP_LAYOUT:
continue
suff = r[tag_col].split('.', 1)[1] if '.' in r[tag_col] else r[tag_col]
@@ -429,32 +618,30 @@ def validate_against_csv(csv_path: Path) -> None:
if not ok:
mism += 1
if mism <= 12:
print(f' offset 0x{off:02X}: layout={expected[off]!r} csv={got!r} '
f'({r[0]})')
print(f' \u26a0 offset 0x{off:02X}: layout={expected[off]!r} '
f'csv={got!r} ({r[0]})')
print(f' CSV cross-check: {"OK" if mism == 0 else f"{mism} mismatch(es)"} '
f'({"Custom" if custom else "Fixed"} map)')
# ─────────────────────────── main ───────────────────────────
# ─────────────────────── main ───────────────────────
def build(sinam_path: Path, controller: str, output_path: Path,
validate_csv: Path | None) -> None:
validate_csv: Path | None, db_conn_str: str | None) -> None:
if validate_csv:
print(f'Validating embedded loop layout against {validate_csv}...')
validate_against_csv(validate_csv)
print(f'Parsing {controller} points from {sinam_path}...')
registers = build_registers(sinam_path, controller)
registers, absorbed_bases = build_registers(sinam_path, controller, db_conn_str)
output = {
'controller': controller,
'report_generated': datetime.date.today().isoformat(),
'float_format': 'FP_B',
'notes': f'Register map built from {sinam_path.name} ({controller} only). '
'Experion point names are the keys; addresses are resolved from the '
'HC900 fixed loop layout (named addresses) and from explicit '
'non-named addresses in the export. .MD reads LoopStatus and writes '
'Auto/Manual State (write_addr).',
'Experion point names are keys; .MD reads LoopStatus(0xBE) and '
'writes AutoManState(0xBA). archive flag indicates history target.',
'register_count': len(registers),
'registers': registers,
}
@@ -463,25 +650,34 @@ def build(sinam_path: Path, controller: str, output_path: Path,
n_loops = len({r['description'].split()[1] for r in registers
if r['description'].startswith('LOOP')})
print(f'\n✓ Wrote {output_path}')
n_archive = sum(1 for r in registers if r.get('archive'))
print(f'\n\u2713 Wrote {output_path}')
print(f' {len(registers)} registers ({n_loops} loops expanded)')
print(f' archive=true: {n_archive}')
by_access = {}
for r in registers:
by_access[r['access']] = by_access.get(r['access'], 0) + 1
print(f' by access: {by_access}')
# machine-readable JSON summary for C# caller
print(f'SINAM_SUMMARY:{json.dumps({"registers": len(registers), "archive_true": n_archive, "absorbed": len(absorbed_bases), "loops": n_loops}, ensure_ascii=False)}')
if __name__ == '__main__':
p = argparse.ArgumentParser(
description='Build register-map.json from the Experion export (xlsx) alone')
description='Build register-map-cN.json from Sinam_Tag_all.xlsx alone')
p.add_argument('--controller', required=True,
help='Experion controller name / indexed-address prefix, e.g. C3')
p.add_argument('--sinam', required=True, help='Path to Sinam_Tag_all.xlsx')
help='Experion controller prefix, e.g. C3')
p.add_argument('--sinam', required=True,
help='Path to Sinam_Tag_all.xlsx')
p.add_argument('-o', '--output', default='docs/register-map.json',
help='Output JSON path')
p.add_argument('--validate-csv', default=None,
help='Optional HC Designer CSV export to cross-check the embedded layout')
help='Optional HC Designer CSV to cross-check loop layout')
p.add_argument('--db-conn', default=None,
help='PostgreSQL DSN; if set, upsert tag_metadata + hc900_map_master')
args = p.parse_args()
build(Path(args.sinam), args.controller.upper(), Path(args.output),
Path(args.validate_csv) if args.validate_csv else None)
Path(args.validate_csv) if args.validate_csv else None,
args.db_conn)