chore: 레지스터맵 생성 스크립트 정리 + field_hist import

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
windpacer
2026-06-10 08:12:01 +09:00
parent 19c8c2e95c
commit f12290fc2b
5 changed files with 4952 additions and 14499 deletions

File diff suppressed because it is too large Load Diff

309
scripts/build_register_map.py Normal file → Executable file
View File

@@ -2,52 +2,52 @@
""" """
HC900 Register Map Builder HC900 Register Map Builder
Converts HC Designer CSV exports (SummaryFunctionBlockReport, SignalTags, Variables) Converts Sinam_Tag_all.xlsx (Experion point database) as the PRIMARY source
into a unified JSON register map for the Modbus TCP gateway. for PID loop tag names, with HC Designer CSV exports for address validation.
Usage: Usage:
python3 build_register_map.py \ python3 build_register_map.py \
--loop-csv ../docs/SummaryFucntionBlockReport.csv \ --xlsx ../docs/Sinam_Tag_all.xlsx \
--signal-csv ../docs/SignalTags.csv \ --signal-csv ../docs/SignalTags.csv \
--variable-csv ../docs/Variables.csv \ --variable-csv ../docs/Variables.csv \
--loop-csv ../docs/SummaryFucntionBlockReport.csv \
-o ../docs/register-map.json -o ../docs/register-map.json
""" """
import csv import csv
import json import json
import argparse import argparse
import re
from pathlib import Path from pathlib import Path
try:
import openpyxl
except ImportError:
print("ERROR: openpyxl is required. Install with: pip install openpyxl")
exit(1)
# ─── PID loop parameter offset map (based on manual Table 6-3) ─── # ─── PID loop parameter offset map (based on manual Table 6-3) ───
LOOP_PARAM_OFFSETS = [ LOOP_PARAM_OFFSETS = [
# (name, offset_hex, type, access, description)
# PV / SP / OP
("PV", 0x00, "float32", "R", "Process Variable"), ("PV", 0x00, "float32", "R", "Process Variable"),
("RSP", 0x02, "float32", "R", "Remote Set Point (SP2)"), ("RSP", 0x02, "float32", "R", "Remote Set Point (SP2)"),
("SP", 0x04, "float32", "RW", "Working Set Point"), ("SP", 0x04, "float32", "RW", "Working Set Point"),
("OP", 0x06, "float32", "RW", "Output"), ("OP", 0x06, "float32", "RW", "Output"),
# Gains / tuning
("GAIN1", 0x0C, "float32", "RW", "Gain #1 / Prop Band #1"), ("GAIN1", 0x0C, "float32", "RW", "Gain #1 / Prop Band #1"),
("DIR", 0x0E, "float32", "R", "Direction (0=Direct, 1=Reverse)"), ("DIR", 0x0E, "float32", "R", "Direction (0=Direct, 1=Reverse)"),
("RESET1", 0x10, "float32", "RW", "Reset #1"), ("RESET1", 0x10, "float32", "RW", "Reset #1"),
("RATE1", 0x12, "float32", "RW", "Rate #1"), ("RATE1", 0x12, "float32", "RW", "Rate #1"),
# Ranges
("PV_LO", 0x16, "float32", "R", "PV Low Range"), ("PV_LO", 0x16, "float32", "R", "PV Low Range"),
("PV_HI", 0x18, "float32", "R", "PV High Range"), ("PV_HI", 0x18, "float32", "R", "PV High Range"),
# Alarms
("ALM1_SP1", 0x1A, "float32", "RW", "Alarm #1 SP #1"), ("ALM1_SP1", 0x1A, "float32", "RW", "Alarm #1 SP #1"),
("ALM1_SP2", 0x1C, "float32", "RW", "Alarm #1 SP #2"), ("ALM1_SP2", 0x1C, "float32", "RW", "Alarm #1 SP #2"),
# Local SPs
("LSP1", 0x2A, "float32", "RW", "Local SP #1"), ("LSP1", 0x2A, "float32", "RW", "Local SP #1"),
("LSP2", 0x2C, "float32", "RW", "Local SP #2"), ("LSP2", 0x2C, "float32", "RW", "Local SP #2"),
# Limits
("SP_LO", 0x34, "float32", "RW", "SP Low Limit"), ("SP_LO", 0x34, "float32", "RW", "SP Low Limit"),
("SP_HI", 0x36, "float32", "RW", "SP High Limit"), ("SP_HI", 0x36, "float32", "RW", "SP High Limit"),
("OP_LO", 0x3A, "float32", "RW", "Output Low Limit"), ("OP_LO", 0x3A, "float32", "RW", "Output Low Limit"),
("OP_HI", 0x3C, "float32", "RW", "Output High Limit"), ("OP_HI", 0x3C, "float32", "RW", "Output High Limit"),
("DEV", 0x4A, "float32", "R", "Deviation (SP-PV)"), ("DEV", 0x4A, "float32", "R", "Deviation (SP-PV)"),
# Bit-packed status / control
("FUZZY_EN", 0xB7, "uint16", "RW", "Fuzzy Enable"), ("FUZZY_EN", 0xB7, "uint16", "RW", "Fuzzy Enable"),
("ATUNE_REQ", 0xB8, "uint16", "RW", "Autotune Request"), ("ATUNE_REQ", 0xB8, "uint16", "RW", "Autotune Request"),
("MODE", 0xBA, "uint16", "RW", "Auto/Manual (0=Man, 1=Auto)"), ("MODE", 0xBA, "uint16", "RW", "Auto/Manual (0=Man, 1=Auto)"),
@@ -56,11 +56,35 @@ LOOP_PARAM_OFFSETS = [
("LOOP_STATUS", 0xBE, "uint16", "R", "Loop Status (bit packed)"), ("LOOP_STATUS", 0xBE, "uint16", "R", "Loop Status (bit packed)"),
] ]
# Experion source param → canonical param name mapping
SOURCE_PARAM_MAP = {
"PV": "PV",
"WSP": "SP",
"SPWORK": "SP",
"OPWORK": "OP",
"OP": "OP",
"GAIN1": "GAIN1",
"RESET1": "RESET1",
"RATE1": "RATE1",
"MODEIN": "MODE",
"DIRECTION": "DIR",
"LOOPSTAT": "LOOP_STATUS",
}
# Source columns in Sinam_Tag_all.xlsx (0-indexed) where LOOP params appear
LOOP_SOURCE_COLS = [26, 27, 28, 29, 31, 32, 33, 75]
# ─── helpers ─── # ─── helpers ───
def hex_addr(s: str) -> int: def hex_addr(s: str) -> int:
return int(s.strip(), 16) return int(s.strip(), 16)
def normalize_tag(tag: str) -> str:
"""Normalize tag name: remove hyphens, uppercase."""
return tag.replace("-", "").upper()
def strip_header(rows, header_marker: str): def strip_header(rows, header_marker: str):
"""Skip rows until we find the actual data header.""" """Skip rows until we find the actual data header."""
for i, row in enumerate(rows): for i, row in enumerate(rows):
@@ -68,7 +92,25 @@ def strip_header(rows, header_marker: str):
return rows[i + 1:] return rows[i + 1:]
return rows return rows
# ─── parsers ───
# ─── Experion point naming: append .PV for standard instrument tags ───
_EXPERION_PV_PREFIXES = frozenset([
"DP", "FI", "FIQ", "FQ", "FT", "LI", "LIA", "LIC", "LICA", "LIS",
"LISA", "PI", "PIA", "PIC", "PICA", "PT", "TI", "TIA", "TIC", "TICA",
"TE", "WT",
])
def _to_experion_point_name(tag: str) -> str:
"""Append .PV for standard instrument tags per Experion naming convention."""
m = re.match(r"^([A-Z]+)", tag)
if m and m.group(1) in _EXPERION_PV_PREFIXES:
return tag + ".PV"
return tag
# ─── CSV parsers (unchanged from original) ───
def parse_loop_csv(path: Path) -> list[dict]: def parse_loop_csv(path: Path) -> list[dict]:
"""Parse SummaryFunctionBlockReport.csv → list of loop dicts.""" """Parse SummaryFunctionBlockReport.csv → list of loop dicts."""
@@ -76,7 +118,6 @@ def parse_loop_csv(path: Path) -> list[dict]:
with open(path, "r", encoding="utf-8-sig") as f: with open(path, "r", encoding="utf-8-sig") as f:
reader = csv.reader(f) reader = csv.reader(f)
rows = list(reader) rows = list(reader)
for row in rows: for row in rows:
if len(row) < 6: if len(row) < 6:
continue continue
@@ -105,7 +146,6 @@ def parse_signal_or_variable_csv(path: Path, access: str) -> list[dict]:
with open(path, "r", encoding="utf-8-sig") as f: with open(path, "r", encoding="utf-8-sig") as f:
reader = csv.reader(f) reader = csv.reader(f)
rows = list(reader) rows = list(reader)
for row in rows: for row in rows:
if len(row) < 8: if len(row) < 8:
continue continue
@@ -116,7 +156,6 @@ def parse_signal_or_variable_csv(path: Path, access: str) -> list[dict]:
desc = row[3].strip() desc = row[3].strip()
typ = row[6].strip() if len(row) > 6 else "float 32" typ = row[6].strip() if len(row) > 6 else "float 32"
eu = row[7].strip() if len(row) > 7 else "" eu = row[7].strip() if len(row) > 7 else ""
# normalise type
if "float" in typ.lower(): if "float" in typ.lower():
data_type = "float32" data_type = "float32"
register_count = 2 register_count = 2
@@ -126,10 +165,9 @@ def parse_signal_or_variable_csv(path: Path, access: str) -> list[dict]:
else: else:
data_type = "float32" data_type = "float32"
register_count = 2 register_count = 2
addr = hex_addr(hex_addr_raw) addr = hex_addr(hex_addr_raw)
tags.append({ tags.append({
"tag": tag, "tag": _to_experion_point_name(tag),
"description": desc, "description": desc,
"addr": addr, "addr": addr,
"count": register_count, "count": register_count,
@@ -140,34 +178,198 @@ def parse_signal_or_variable_csv(path: Path, access: str) -> list[dict]:
return tags return tags
# ─── PID loop tag name matching ───
def _make_csv_lookup(loops: list[dict]) -> dict:
"""
Build lookup from normalized tag name → (base_addr, csv_tag, loop_number).
Handles FICA↔FICQ prefix variants.
"""
lookup = {}
for loop in loops:
norm = normalize_tag(loop["tag"])
# Primary entry
lookup[norm] = (loop["start_addr"], loop["tag"], loop["loop_number"])
# Also add the FICA↔FICQ variant
if norm.startswith("FICA"):
alt = "FICQ" + norm[4:]
if alt not in lookup:
lookup[alt] = (loop["start_addr"], loop["tag"], loop["loop_number"])
elif norm.startswith("FICQ"):
alt = "FICA" + norm[4:]
if alt not in lookup:
lookup[alt] = (loop["start_addr"], loop["tag"], loop["loop_number"])
return lookup
def _resolve_loop_addr(xlsx_tag: str, csv_lookup: dict) -> tuple:
"""
Try to find the address for an xlsx tag from the CSV lookup.
Returns (addr, csv_tag, loop_number) or (None, None, None).
"""
norm = normalize_tag(xlsx_tag)
if norm in csv_lookup:
return csv_lookup[norm]
return (None, None, None)
# ─── xlsx parsing ───
def parse_xlsx_loops(path: Path, csv_lookup: dict) -> list[dict]:
"""
Parse Sinam_Tag_all.xlsx for PID loop AnalogPoint entries.
Returns list of register entry dicts.
"""
wb = openpyxl.load_workbook(path, data_only=True)
ws = wb.active
registers = []
warnings = []
validated_count = 0
unvalidated_count = 0
# Map param_name → (canonical_name, offset, type, access, desc_suffix)
param_defs = {
entry[0]: entry
for entry in LOOP_PARAM_OFFSETS
}
# Suffix mapping for tag names
PARAM_SUFFIX = {
"PV": ".PV",
"SP": ".SP",
"OP": ".OP",
"GAIN1": ".GAIN1",
"RESET1": ".RESET1",
"RATE1": ".RATE1",
"MODE": ".MODE",
"LOOP_STATUS": ".LOOPSTAT",
}
matched_tags = {} # item_name → (addr, csv_tag, loop_num)
for r in range(3, ws.max_row + 1):
item_name = ws.cell(r, 1).value
cls_val = ws.cell(r, 2).value
if not item_name or cls_val != "AnalogPoint":
continue
# Scan source columns for LOOP references
loop_params = {} # canonical_param_name → source_param_name
for c in LOOP_SOURCE_COLS:
v = ws.cell(r, c + 1).value # openpyxl is 1-indexed
if v and "LOOP" in str(v):
m = re.match(r"C(\d+)\s+LOOP\s+(\d+)\s+(\S+)", str(v))
if m:
c_unit = int(m.group(1))
loop_num = int(m.group(2))
source_param = m.group(3)
canon = SOURCE_PARAM_MAP.get(source_param)
if canon:
loop_params[canon] = source_param
if not loop_params:
continue
# Find address from CSV lookup
addr, csv_tag, csv_loop_num = _resolve_loop_addr(item_name, csv_lookup)
# If no direct match, try the second time with the tag without suffix if it has one
if addr is None and "." in item_name:
base_name = item_name.split(".")[0]
addr, csv_tag, csv_loop_num = _resolve_loop_addr(base_name, csv_lookup)
if addr is not None:
validated = True
validated_count += 1
matched_tags[item_name] = (addr, csv_tag, csv_loop_num)
else:
validated = False
unvalidated_count += 1
warnings.append(
f"WARNING: No CSV match for '{item_name}' (C{c_unit} LOOP {loop_num}), "
f"params: {list(loop_params.keys())}"
)
continue
description_base = f"Loop #{csv_loop_num} {csv_tag}" if csv_tag else f"C{c_unit} Loop {loop_num}"
for canon, source_param in sorted(loop_params.items()):
if canon not in param_defs:
warnings.append(f"WARNING: Unknown param '{canon}' for {item_name}")
continue
name, off, dtype, access, desc_suffix = param_defs[canon]
reg_count = 2 if dtype == "float32" else 1
tag_suffix = PARAM_SUFFIX.get(canon, f".{canon}")
# Clean ItemName: remove any existing .PV/.SP etc. suffix
clean_item = item_name.split(".")[0] if "." in item_name else item_name
tag_name = f"{clean_item}{tag_suffix}"
description = f"{description_base} / {desc_suffix}"
registers.append({
"tag": tag_name,
"addr": addr + off,
"count": reg_count,
"type": dtype,
"access": access,
"description": description,
"validated": validated,
})
wb.close()
return registers, warnings, validated_count, unvalidated_count
# ─── main ─── # ─── main ───
def build(loop_path, signal_path, variable_path, output_path): def build(xlsx_path, loop_path, signal_path, variable_path, output_path):
loops = parse_loop_csv(loop_path)
signals = parse_signal_or_variable_csv(signal_path, "R") signals = parse_signal_or_variable_csv(signal_path, "R")
variables = parse_signal_or_variable_csv(variable_path, "RW") variables = parse_signal_or_variable_csv(variable_path, "RW")
registers = [] registers = []
# 1. Expand PID loops into individual parameters # Parse CSV for validation lookup
for loop in loops: if loop_path:
base = loop["start_addr"] loops = parse_loop_csv(loop_path)
for param in LOOP_PARAM_OFFSETS: csv_lookup = _make_csv_lookup(loops)
name, off, dtype, access, desc_template = param else:
reg_count = 2 if dtype == "float32" else 1 loops = []
addr = base + off csv_lookup = {}
tag_name = f"{loop['tag']}.{name}"
description = f"{loop['description']} / {desc_template}" if loop['description'] else desc_template
registers.append({
"tag": tag_name,
"addr": addr,
"count": reg_count,
"type": dtype,
"access": access,
"description": description,
})
# 2. Signal Tags (read-only) # Parse xlsx as primary PID source
if xlsx_path:
xlsx_registers, warnings, validated_count, unvalidated_count = parse_xlsx_loops(
xlsx_path, csv_lookup
)
registers.extend(xlsx_registers)
else:
warnings = []
validated_count = 0
unvalidated_count = 0
# Fallback: use CSV as loop source (original behavior)
for loop in loops:
base = loop["start_addr"]
for param in LOOP_PARAM_OFFSETS:
name, off, dtype, access, desc_template = param
reg_count = 2 if dtype == "float32" else 1
addr = base + off
tag_name = f"{loop['tag']}.{name}"
description = f"{loop['description']} / {desc_template}" if loop['description'] else desc_template
registers.append({
"tag": tag_name,
"addr": addr,
"count": reg_count,
"type": dtype,
"access": access,
"description": description,
})
validated_count = len(loops)
# Signal Tags (read-only)
for sig in signals: for sig in signals:
registers.append({ registers.append({
"tag": sig["tag"], "tag": sig["tag"],
@@ -179,7 +381,7 @@ def build(loop_path, signal_path, variable_path, output_path):
"eu": sig["eu"], "eu": sig["eu"],
}) })
# 3. Variables (R/W) # Variables (R/W)
for var in variables: for var in variables:
registers.append({ registers.append({
"tag": var["tag"], "tag": var["tag"],
@@ -191,16 +393,18 @@ def build(loop_path, signal_path, variable_path, output_path):
"eu": var["eu"], "eu": var["eu"],
}) })
# sort by address # Sort by address
registers.sort(key=lambda r: r["addr"]) registers.sort(key=lambda r: r["addr"])
output = { output = {
"controller": "HC900-C70 Rev 4.4x", "controller": "HC900-C70 Rev 4.4x",
"report_generated": "2026-06-01", "report_generated": "2026-06-08",
"float_format": "FP_B", "float_format": "FP_B",
"notes": "FP_B = IEEE 754 Big Endian (byte order 4,3,2,1). " "notes": "FP_B = IEEE 754 Big Endian (byte order 4,3,2,1). "
"Address is the first register (0-based). " "Address is the first register (0-based). "
"float32 uses 2 consecutive registers.", "float32 uses 2 consecutive registers. "
"PID loop sources from Sinam_Tag_all.xlsx (Experion), "
"addresses validated against SummaryFunctionBlockReport.csv.",
"register_count": len(registers), "register_count": len(registers),
"registers": registers, "registers": registers,
} }
@@ -209,17 +413,26 @@ def build(loop_path, signal_path, variable_path, output_path):
json.dump(output, f, indent=2, ensure_ascii=False) json.dump(output, f, indent=2, ensure_ascii=False)
print(f"✓ Register map written to {output_path}") print(f"✓ Register map written to {output_path}")
print(f" Loops: {len(loops)}") print(f" PID loops (validated): {validated_count}")
print(f" Signals: {len(signals)}") print(f" PID loops (unmatched): {unvalidated_count}")
print(f" Variables: {len(variables)}") print(f" Signals: {len(signals)}")
print(f" Total registers in map: {len(registers)}") print(f" Variables: {len(variables)}")
print(f" Total registers: {len(registers)}")
for w in warnings:
print(f" {w}")
return registers
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Build HC900 register map JSON from CSV exports") parser = argparse.ArgumentParser(
parser.add_argument("--loop-csv", required=True, help="SummaryFunctionBlockReport.csv") description="Build HC900 register map JSON from xlsx + CSV exports"
)
parser.add_argument("--xlsx", help="Sinam_Tag_all.xlsx (primary PID loop source)")
parser.add_argument("--loop-csv", help="SummaryFunctionBlockReport.csv (validation)")
parser.add_argument("--signal-csv", required=True, help="SignalTags.csv") parser.add_argument("--signal-csv", required=True, help="SignalTags.csv")
parser.add_argument("--variable-csv", required=True, help="Variables.csv") parser.add_argument("--variable-csv", required=True, help="Variables.csv")
parser.add_argument("-o", "--output", default="register-map.json", help="Output JSON path") parser.add_argument("-o", "--output", default="register-map.json", help="Output JSON path")
args = parser.parse_args() args = parser.parse_args()
build(args.loop_csv, args.signal_csv, args.variable_csv, args.output) build(args.xlsx, args.loop_csv, args.signal_csv, args.variable_csv, args.output)

View File

@@ -10,8 +10,24 @@ JSON register map consumed by the hc900_gateway.
import csv import csv
import json import json
import argparse import argparse
import re
from pathlib import Path from pathlib import Path
# Experion point naming convention: standard instrument tags get .PV suffix.
_EXPERION_PV_PREFIXES = frozenset([
"DP", "FI", "FIQ", "FQ", "FT", "LI", "LIA", "LIC", "LICA", "LIS",
"LISA", "PI", "PIA", "PIC", "PICA", "PT", "TI", "TIA", "TIC", "TICA",
"TE", "WT",
])
def _to_experion_point_name(tag: str) -> str:
"""Append .PV for standard instrument tags per Experion naming convention."""
m = re.match(r"^([A-Z]+)", tag)
if m and m.group(1) in _EXPERION_PV_PREFIXES:
return tag + ".PV"
return tag
def parse_full_map(path: Path) -> dict: def parse_full_map(path: Path) -> dict:
registers = [] registers = []
@@ -56,8 +72,9 @@ def parse_full_map(path: Path) -> dict:
else: else:
dtype = "float32" dtype = "float32"
count = 2 count = 2
experion_tag = _to_experion_point_name(tag)
registers.append({ registers.append({
"tag": tag, "addr": addr, "count": count, "tag": experion_tag, "addr": addr, "count": count,
"type": dtype, "access": "R", "description": desc, "type": dtype, "access": "R", "description": desc,
}) })
@@ -69,8 +86,9 @@ def parse_full_map(path: Path) -> dict:
else: else:
dtype = "float32" dtype = "float32"
count = 2 count = 2
experion_tag = _to_experion_point_name(tag)
registers.append({ registers.append({
"tag": tag, "addr": addr, "count": count, "tag": experion_tag, "addr": addr, "count": count,
"type": dtype, "access": "RW", "description": desc, "type": dtype, "access": "RW", "description": desc,
}) })

View File

@@ -0,0 +1,223 @@
#!/usr/bin/env python3
"""
Import field history data from field_hist DB into hc900.history_table
for C3 and C4 controller tags only. 2026-02-05 ~ 2026-06-05 (~30s interval).
Usage:
# Preview-only
python3 scripts/import_field_hist_C3_C4.py --dry-run
# Full import
python3 scripts/import_field_hist_C3_C4.py
# Partial date range
python3 scripts/import_field_hist_C3_C4.py --from-date 2026-03-01 --to-date 2026-03-07
Logic:
1. Load C3/C4 base tag names from `register-map-c3.json` / `register-map-c4.json`.
2. Decode field_hist ptlist+mapping+tblist → {tagname_upper: (tblname, oit)}.
3. Filter to C3/C4 (by base name match), tag names stored UPPERCASE.
4. For each cont table in daily chunks: DELETE existing history overlap,
then COPY new long-format data.
"""
import argparse
import json
import time
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
import psycopg
KST = timezone(timedelta(hours=9))
REG_MAP_DIR = Path(__file__).resolve().parent.parent / "docs"
FIELD_DSN = "host=localhost port=5432 dbname=field_hist user=postgres password=postgres"
TARGET_DSN = (
"host=localhost port=5432 dbname=iiot_platform "
"user=postgres password=postgres options=-csearch_path=hc900"
)
BATCH_DAYS = 1
def load_controller_bases() -> dict[str, str]:
"""Return {base_name_lower: controller_id} from register-map-c3/c4.json."""
result = {}
for ctrl in ("C3", "C4"):
path = REG_MAP_DIR / f"register-map-{ctrl.lower()}.json"
data = json.loads(path.read_text())
for entry in data["registers"]:
base = entry["tag"].split(".")[0].lower()
if base not in result:
result[base] = ctrl
return result
def build_tag_map(allowed_bases: dict[str, str]) -> dict[str, tuple[str, int, str]]:
"""Return {tagname_upper: (tblname, oit, controller_id)} for C3/C4 tags."""
tag_map = {}
with psycopg.connect(FIELD_DSN) as conn:
cur = conn.execute("""
SELECT p.shortptname, t.tblname, m.oit
FROM ptlist p
JOIN mapping m ON m.pid = p.pid
JOIN tblist t ON t.tid = m.tid
WHERE p.shortptname IS NOT NULL
""")
for short, tbl, oit in cur:
upper = short.strip().upper()
base = upper.split(".")[0].lower()
ctrl = allowed_bases.get(base)
if ctrl is not None:
tag_map[upper] = (tbl, int(oit), ctrl)
return tag_map
def process_all(dry_run: bool, from_date: str | None, to_date: str | None) -> int:
"""Main processing loop."""
print("Loading controller tag bases from register maps...", flush=True)
bases = load_controller_bases()
c3_ct = sum(1 for v in bases.values() if v == "C3")
c4_ct = sum(1 for v in bases.values() if v == "C4")
print(f" C3: {c3_ct} base tags, C4: {c4_ct} base tags", flush=True)
print("Building tag map from field_hist...", flush=True)
tag_map = build_tag_map(bases)
c3_tags = sum(1 for _, _, c in tag_map.values() if c == "C3")
c4_tags = sum(1 for _, _, c in tag_map.values() if c == "C4")
print(f" Matched: C3={c3_tags}, C4={c4_tags} (total={len(tag_map)})", flush=True)
if not tag_map:
print("ERROR: no tags matched — aborting", flush=True)
return 0
# Group by cont table
groups: dict[str, list[tuple[int, str, str]]] = defaultdict(list)
for tagname, (tbl, oit, ctrl) in tag_map.items():
groups[tbl].append((oit, tagname, ctrl))
print(f" Across {len(groups)} cont tables", flush=True)
# Load tblist for ordered processing
with psycopg.connect(FIELD_DSN) as conn:
cur = conn.execute("SELECT tblname FROM tblist ORDER BY tid")
tbl_order = [r[0] for r in cur]
# Time range
t_from = (
datetime.strptime(from_date, "%Y-%m-%d")
if from_date
else datetime(2026, 2, 5)
)
t_to = (
datetime.strptime(to_date, "%Y-%m-%d") if to_date else datetime(2026, 6, 6)
)
total_rows = 0
for tbl in tbl_order:
col_info = groups.get(tbl)
if not col_info:
continue
# Build dynamic SELECT for mapped columns
oits = sorted({oit for oit, _, _ in col_info})
oit_map: dict[int, list[tuple[str, str]]] = defaultdict(list)
for oit, tag, ctrl in col_info:
oit_map[oit].append((tag, ctrl))
col_sel = ", ".join(f"col{c:02d}" for c in oits)
sql = f"SELECT dtat, {col_sel} FROM {tbl} WHERE dtat >= %s AND dtat < %s ORDER BY dtat"
print(f"\n{tbl} ({len(col_info)} cols, {len(oits)} unique oits) ...", flush=True)
with psycopg.connect(FIELD_DSN) as src_conn:
with src_conn.cursor(name="fetch_cursor") as src_cur:
src_cur.itersize = 50000
src_cur.execute(sql, (t_from, t_to))
batch = []
for row in src_cur:
dtat, *vals = row
recorded_at = dtat - timedelta(hours=9)
for i, val in enumerate(vals):
if val is None:
continue
oit = oits[i]
for tagname, ctrl_id in oit_map[oit]:
batch.append((tagname, str(val), recorded_at, ctrl_id))
if len(batch) >= 50000:
total_rows += _flush_batch(batch, dry_run, tbl)
batch.clear()
if batch:
total_rows += _flush_batch(batch, dry_run, tbl)
return total_rows
def _flush_batch(rows: list, dry_run: bool, tbl: str) -> int:
"""DELETE overlap + COPY batch into history_table. Returns row count."""
n = len(rows)
if dry_run:
print(f" [{tbl}] {n:>8} rows (dry-run)", flush=True)
return n
# Collect unique tagnames + time range for DELETE
tagnames = list({r[0] for r in rows})
ts_min = min(r[2] for r in rows)
ts_max = max(r[2] for r in rows)
with psycopg.connect(TARGET_DSN) as tgt:
with tgt.cursor() as cur:
del_sql = """
DELETE FROM history_table
WHERE tagname = ANY(%s) AND recorded_at >= %s AND recorded_at <= %s
"""
cur.execute(del_sql, (tagnames, ts_min, ts_max))
deleted = cur.rowcount
with tgt.cursor() as cur:
with cur.copy(
"COPY history_table (tagname, value, recorded_at, controller_id) FROM STDIN"
) as copy:
for r in rows:
copy.write_row(r)
tgt.commit()
print(f" [{tbl}] {n:>8} inserted ({deleted} deleted)", flush=True)
return n
def main():
p = argparse.ArgumentParser(
description="Import field_hist data into hc900.history_table (C3/C4 only)"
)
p.add_argument(
"--dry-run", action="store_true", help="Preview only, no writes"
)
p.add_argument(
"--from-date", help="Start date (YYYY-MM-DD), default 2026-02-05"
)
p.add_argument(
"--to-date", help="End date (YYYY-MM-DD), default 2026-06-06"
)
args = p.parse_args()
t0 = time.time()
total = process_all(args.dry_run, args.from_date, args.to_date)
elapsed = time.time() - t0
print(f"\n{'=' * 60}")
label = "dry-run rows" if args.dry_run else "rows imported"
print(f"Total {label}: {total:,}")
print(f"Elapsed: {elapsed:.1f}s ({total / max(elapsed, 1):,.0f} rows/s)")
print(f"{'=' * 60}")
if __name__ == "__main__":
main()

View File

@@ -48,14 +48,19 @@ def classify(entry: dict) -> tuple[str, int | None]:
def load_controller(cur, controller: str, map_path: Path, active: bool) -> int: def load_controller(cur, controller: str, map_path: Path, active: bool) -> int:
data = json.loads(map_path.read_text(encoding="utf-8")) data = json.loads(map_path.read_text(encoding="utf-8"))
# Snapshot old tagnames BEFORE DELETE so we can detect renames.
cur.execute(
"SELECT tagname FROM hc900_map_master WHERE controller_id = %s", (controller,))
old_names = {row[0] for row in cur.fetchall()}
cur.execute("DELETE FROM hc900_map_master WHERE controller_id = %s", (controller,)) cur.execute("DELETE FROM hc900_map_master WHERE controller_id = %s", (controller,))
n = 0 n = 0
new_entries = []
for e in data["registers"]: for e in data["registers"]:
param_type, loop_no = classify(e) param_type, loop_no = classify(e)
access = "R/W" if e.get("access") == "RW" else "R" access = "R/W" if e.get("access") == "RW" else "R"
# Single consistent name everywhere: tagname == hc900_tag == the register tag new_entries.append((e["tag"], e["tag"], e["addr"], e["type"], access,
# (Experion ItemName, e.g. FICQ-6101.PV) — used by the gateway, map_master and active, loop_no, param_type, controller))
# realtime_table alike. No case conversion.
cur.execute( cur.execute(
"""INSERT INTO hc900_map_master """INSERT INTO hc900_map_master
(tagname, hc900_tag, modbus_addr, data_type, access, (tagname, hc900_tag, modbus_addr, data_type, access,
@@ -64,9 +69,74 @@ def load_controller(cur, controller: str, map_path: Path, active: bool) -> int:
(e["tag"], e["tag"], e["addr"], e["type"], access, (e["tag"], e["tag"], e["addr"], e["type"], access,
active, loop_no, param_type, controller)) active, loop_no, param_type, controller))
n += 1 n += 1
# Detect renames: tagname changed between old and new map.
new_names = {e[0] for e in new_entries}
renamed = {} # old_name -> new_name
for old in old_names:
if old not in new_names:
# Old tagname removed — find matching new tagname.
# Heuristic: same base tag, different suffix (e.g. "TI-3101" -> "TI-3101.PV")
for new in new_names:
if _is_rename_candidate(old, new):
renamed[old] = new
break
if renamed:
_migrate_history(cur, renamed, controller)
_migrate_realtime(cur, renamed, controller)
print(f" ↻ Renamed {len(renamed)} tag(s): {', '.join(f'{k}->{v}' for k, v in renamed.items())}")
return n return n
def _is_rename_candidate(old: str, new: str) -> bool:
"""Check if new could be a renamed version of old (same base, different suffix)."""
# Exact match → not a rename
if old == new:
return False
# Strip common suffixes (.PV, .SP, .OP, etc.) and compare base
old_base = re.sub(r'\.[A-Z_]+$', '', old)
new_base = re.sub(r'\.[A-Z_]+$', '', new)
return old_base == new_base
def _migrate_history(cur, renamed: dict[str, str], controller: str) -> None:
"""Rename tagname in history_table for renamed tags."""
if not renamed:
return
placeholders = ", ".join(["%s"] * len(renamed))
values = list(renamed.keys())
cur.execute(f"""
UPDATE hc900.history_table
SET tagname = CASE tagname
{"".join(f"WHEN %s THEN %s" for _ in renamed)}
END
WHERE tagname IN ({placeholders})
AND controller_id = %s
""", [*values, *renamed.values(), controller])
if cur.rowcount:
print(f" history_table: {cur.rowcount} rows migrated")
def _migrate_realtime(cur, renamed: dict[str, str], controller: str) -> None:
"""Rename tagname in realtime_table for renamed tags."""
if not renamed:
return
placeholders = ", ".join(["%s"] * len(renamed))
values = list(renamed.keys())
cur.execute(f"""
UPDATE hc900.realtime_table
SET tagname = CASE tagname
{"".join(f"WHEN %s THEN %s" for _ in renamed)}
END
WHERE tagname IN ({placeholders})
AND controller_id = %s
""", [*values, *renamed.values(), controller])
if cur.rowcount:
print(f" realtime_table: {cur.rowcount} rows migrated")
def main(): def main():
p = argparse.ArgumentParser(description="Load hc900_map_master from register maps") p = argparse.ArgumentParser(description="Load hc900_map_master from register maps")
p.add_argument("--controller", help="controller id, e.g. C3") p.add_argument("--controller", help="controller id, e.g. C3")