Honeywell HC900을 Modbus TCP로 직접 폴링 → gRPC → C# 크롤러 → PostgreSQL. 기존 Experion OPC UA 데이터 경로를 HC900 직접 통신으로 대체. - industrial-comm/cpp: C++ Modbus 게이트웨이 (gRPC 서버) - src: C# .NET 8 ASP.NET Core 크롤러 + 웹 UI (3-Layer) - mcp-server: Python FastMCP (RAG/NL2SQL/P&ID) - 다중 컨트롤러(N-Controller) 지원 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
272 lines
12 KiB
Python
272 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
C4 컨트롤러 Experion 태그 → HC900 Modbus 레지스터 매핑 CSV 생성
|
||
|
||
입력:
|
||
docs/C4-All-Modbus-Map.csv - HC900 C4 Modbus 레지스터 전체 맵
|
||
docs/Sinam_Tag_all.xlsx - Experion 태그 목록 (SourceAddressPV/OP/MD 컬럼)
|
||
|
||
출력:
|
||
docs/c4_tag_mapping.csv - hc900_map_master 적재용 매핑 테이블
|
||
|
||
전략:
|
||
AnalogPoint (PID Loop) → 루프 번호로 표준 파라미터 세트 생성
|
||
(PV / SP / OP / RSP / MODE / STATUS)
|
||
StatusPoint (Signal Tag) → C4 TAG N → Signal Tag 주소 직접 매핑
|
||
MATH_VAR (Variable) → C4 MATH_VAR N → Variable 주소 직접 매핑
|
||
"""
|
||
|
||
import csv, re, openpyxl, argparse
|
||
from pathlib import Path
|
||
from collections import OrderedDict
|
||
|
||
# ── 경로 설정 ─────────────────────────────────────────────────────────────
|
||
BASE = Path(__file__).parent.parent / 'docs'
|
||
|
||
# ── AnalogPoint 표준 파라미터 세트 ──────────────────────────────────────────
|
||
# (tagname_suffix, hc900_suffix, offset_hex, dtype, access, param_type, description)
|
||
ANALOG_PARAMS = [
|
||
('pv', 'PV', 0x0000, 'float32', 'R', 'PV', 'Process Variable'),
|
||
('sp', 'WSP', 0x0004, 'float32', 'R/W', 'SP', 'Working Set Point'),
|
||
('op', 'Output', 0x0006, 'float32', 'R/W', 'OP', 'Output'),
|
||
('rsp', 'RSP_SP2', 0x0002, 'float32', 'R/W', 'RSP', 'Remote Set Point (SP2)'),
|
||
('lsp1', 'LSP1', 0x002A, 'float32', 'R/W', 'LSP1', 'Local SP 1'),
|
||
('lsp2', 'LSP2', 0x002C, 'float32', 'R/W', 'LSP2', 'Local SP 2'),
|
||
('dev', 'Deviation', 0x004A, 'float32', 'R', 'DEV', 'Deviation (SP-PV)'),
|
||
('pv_lo', 'PV_low_range', 0x0016, 'float32', 'R', 'PV_LO', 'PV Low Range'),
|
||
('pv_hi', 'PV_high_range', 0x0018, 'float32', 'R', 'PV_HI', 'PV High Range'),
|
||
('sp_lo', 'SP_low_limit', 0x0034, 'float32', 'R/W', 'SP_LO', 'SP Low Limit'),
|
||
('sp_hi', 'SP_high_limit', 0x0036, 'float32', 'R/W', 'SP_HI', 'SP High Limit'),
|
||
('op_lo', 'Output_Low_Limit', 0x003A, 'float32', 'R/W', 'OP_LO', 'Output Low Limit'),
|
||
('op_hi', 'Output_High_Limit', 0x003C, 'float32', 'R/W', 'OP_HI', 'Output High Limit'),
|
||
('alm1', 'Alarm_1_SP1', 0x001A, 'float32', 'R/W', 'ALM1', 'Alarm 1 SP1'),
|
||
('alm2', 'Alarm_2_SP1', 0x002E, 'float32', 'R/W', 'ALM2', 'Alarm 2 SP1'),
|
||
('mode', 'Auto_Man_State', 0x00BA, 'uint16', 'R/W', 'MODE', 'Auto/Manual State (0=Man,1=Auto)'),
|
||
('status', 'Loop_Status_Register', 0x00BE, 'uint16', 'R', 'STATUS', 'Loop Status Register'),
|
||
]
|
||
|
||
# Experion 소스 파라미터명 → HC900 오프셋 (SourceAddressMD 처리용)
|
||
EXP_PARAM_MAP = {
|
||
'RESET1': (0x0010, 'Reset_1', 'float32', 'R/W', 'RESET1', 'Reset 1'),
|
||
'RATE1': (0x0012, 'Rate_1', 'float32', 'R/W', 'RATE1', 'Rate 1'),
|
||
'AMSTAT': (0x00BA, 'Auto_Man_State', 'uint16', 'R/W', 'MODE', 'Auto/Manual State'),
|
||
'LOOPSTAT': (0x00BE, 'Loop_Status_Register', 'uint16', 'R', 'STATUS', 'Loop Status Register'),
|
||
'PV': (0x0000, 'PV', 'float32', 'R', 'PV', 'Process Variable'),
|
||
'WSP': (0x0004, 'WSP', 'float32', 'R/W', 'SP', 'Working Set Point'),
|
||
'OP': (0x0006, 'Output', 'float32', 'R/W', 'OP', 'Output'),
|
||
}
|
||
|
||
def load_modbus_map(csv_path: Path):
|
||
"""C4-All-Modbus-Map.csv → loop_map, sig_map, var_map"""
|
||
with open(csv_path, encoding='utf-8-sig') as f:
|
||
rows = list(csv.reader(f))
|
||
header_idx = next(i for i,r in enumerate(rows) if r and r[0]=='Hex Addr')
|
||
data = rows[header_idx+1:]
|
||
|
||
loop_map = {} # loop_num → {'tag': str, 'base': int, 'desc': str}
|
||
sig_map = {} # tag_num → {'tag': str, 'addr': int, 'dtype': str, 'desc': str}
|
||
var_map = {} # var_num → {'tag': str, 'addr': int, 'desc': str}
|
||
|
||
for r in data:
|
||
if len(r) < 9: continue
|
||
partition = r[2].strip()
|
||
tag_name = r[3].strip()
|
||
if not tag_name: continue
|
||
|
||
addr_hex = r[0].strip()
|
||
try:
|
||
addr = int(addr_hex, 16)
|
||
except ValueError:
|
||
continue
|
||
|
||
desc = r[4].strip()
|
||
dtype = r[7].strip()
|
||
access = r[8].strip()
|
||
|
||
# PID Loop 1-24
|
||
if partition == 'Loops 1-24' and tag_name.endswith('.PV'):
|
||
n = (addr - 0x0040) // 0x0100 + 1
|
||
base_tag = tag_name.replace('.PV', '')
|
||
loop_map[n] = {'tag': base_tag, 'base': addr, 'desc': desc}
|
||
|
||
# PID Loop 25-32
|
||
elif partition == 'Loops 25-32' and tag_name.endswith('.PV'):
|
||
n = (addr - 0x7840) // 0x0100 + 25
|
||
base_tag = tag_name.replace('.PV', '')
|
||
loop_map[n] = {'tag': base_tag, 'base': addr, 'desc': desc}
|
||
|
||
# Signal Tags 1-1000
|
||
elif partition == 'Signal Tags 1-1000':
|
||
n = (addr - 0x2000) // 2 + 1
|
||
dt = 'float32' if 'float' in dtype.lower() else 'uint16'
|
||
sig_map[n] = {'tag': tag_name, 'addr': addr, 'dtype': dt,
|
||
'access': access, 'desc': desc}
|
||
|
||
# Variables 1-600
|
||
elif partition == 'Variables 1-600':
|
||
n = (addr - 0x18C0) // 2 + 1
|
||
var_map[n] = {'tag': tag_name, 'addr': addr, 'desc': desc}
|
||
|
||
return loop_map, sig_map, var_map
|
||
|
||
|
||
def parse_indexed(src: str):
|
||
"""Indexed Address → (partition, n, param)"""
|
||
m = re.match(r'\S+\s+(LOOP|LOOPX)\s+(\d+)\s+(\w+)', src.strip())
|
||
if m: return m.group(1), int(m.group(2)), m.group(3)
|
||
m = re.match(r'\S+\s+TAG\s+(\d+)\s+VALUE', src.strip())
|
||
if m: return 'TAG', int(m.group(1)), 'VALUE'
|
||
m = re.match(r'\S+\s+MATH_VAR\s+(\d+)\s+VALUE', src.strip())
|
||
if m: return 'MATH_VAR', int(m.group(1)), 'VALUE'
|
||
return None, None, None
|
||
|
||
|
||
def get_loop_num(spv: str, smd: str) -> tuple[str|None, int|None]:
|
||
"""SourceAddressPV/MD 에서 루프 번호 추출 → (partition, n)"""
|
||
for src in [spv, smd]:
|
||
if not src: continue
|
||
part, n, _ = parse_indexed(src)
|
||
if part in ('LOOP', 'LOOPX'):
|
||
return part, n
|
||
return None, None
|
||
|
||
|
||
def build_mapping(modbus_csv: Path, sinam_xlsx: Path) -> list[dict]:
|
||
loop_map, sig_map, var_map = load_modbus_map(modbus_csv)
|
||
|
||
wb = openpyxl.load_workbook(sinam_xlsx, read_only=True, data_only=True)
|
||
ws = wb.active
|
||
xlsx_rows = list(ws.iter_rows(values_only=True))
|
||
headers = xlsx_rows[1]
|
||
col = {h: i for i, h in enumerate(headers) if h}
|
||
|
||
rows_out = []
|
||
seen_tagnames = set()
|
||
|
||
def emit(tagname: str, hc900_tag: str, modbus_addr: int,
|
||
dtype: str, access: str, loop_no, param_type: str,
|
||
description: str, experion_src: str):
|
||
if tagname in seen_tagnames:
|
||
return
|
||
seen_tagnames.add(tagname)
|
||
rows_out.append(OrderedDict([
|
||
('tagname', tagname),
|
||
('hc900_tag', hc900_tag),
|
||
('modbus_addr', modbus_addr),
|
||
('modbus_addr_hex', f'0x{modbus_addr:04X}'),
|
||
('data_type', dtype),
|
||
('access', access),
|
||
('loop_no', loop_no if loop_no else ''),
|
||
('param_type', param_type),
|
||
('description', description),
|
||
('experion_src', experion_src),
|
||
('is_active', 'TRUE'),
|
||
]))
|
||
|
||
for row in xlsx_rows[2:]:
|
||
item_name = row[col['ItemName']]
|
||
cls = row[col['Class']]
|
||
spv = str(row[col['SourceAddressPV']] or '').strip()
|
||
sop = str(row[col['SourceAddressOP']] or '').strip()
|
||
smd = str(row[col['SourceAddressMD']] or '').strip()
|
||
|
||
if not item_name or not cls: continue
|
||
# C4 참조만 처리
|
||
if not any('C4 ' in s for s in [spv, sop, smd]):
|
||
continue
|
||
|
||
base_name = str(item_name).strip().lower()
|
||
|
||
# ── AnalogPoint (PID Loop) ────────────────────────────────────────
|
||
if cls == 'AnalogPoint':
|
||
loop_part, loop_n = get_loop_num(spv, smd)
|
||
if loop_n is None:
|
||
continue
|
||
linfo = loop_map.get(loop_n)
|
||
if linfo is None:
|
||
continue
|
||
|
||
base_tag = linfo['tag']
|
||
base_addr = linfo['base']
|
||
|
||
# 표준 파라미터 세트 생성
|
||
for suffix, hc_suffix, offset, dtype, access, ptype, pdesc in ANALOG_PARAMS:
|
||
tagname = f'{base_name}.{suffix}'
|
||
hc900_tag = f'{base_tag}.{hc_suffix}'
|
||
addr = base_addr + offset
|
||
src_str = f'{loop_part} {loop_n} → {base_tag}'
|
||
emit(tagname, hc900_tag, addr, dtype, access, loop_n, ptype, pdesc, src_str)
|
||
|
||
# ── StatusPoint / 기타 ────────────────────────────────────────────
|
||
else:
|
||
processed_srcs = set()
|
||
for src in [spv, sop, smd]:
|
||
if not src or 'C4 ' not in src or src in processed_srcs:
|
||
continue
|
||
processed_srcs.add(src)
|
||
part, n, param = parse_indexed(src)
|
||
if part is None:
|
||
continue
|
||
|
||
if part == 'TAG':
|
||
info = sig_map.get(n)
|
||
hc900_tag = info['tag'] if info else f'SIG_TAG_{n}'
|
||
addr = 0x2000 + (n-1)*2
|
||
dtype = info['dtype'] if info else 'float32'
|
||
access = 'R'
|
||
pdesc = info['desc'] if info else ''
|
||
emit(base_name, hc900_tag, addr, dtype, access, None, 'SIG', pdesc, src)
|
||
|
||
elif part == 'MATH_VAR':
|
||
info = var_map.get(n)
|
||
hc900_tag = info['tag'] if info else f'VAR_{n}'
|
||
addr = 0x18C0 + (n-1)*2
|
||
pdesc = info['desc'] if info else ''
|
||
emit(base_name, hc900_tag, addr, 'float32', 'R/W', None, 'VAR', pdesc, src)
|
||
|
||
elif part in ('LOOP', 'LOOPX'):
|
||
# AnalogPoint가 아닌데 LOOP 소스 참조 (드물지만 처리)
|
||
linfo = loop_map.get(n)
|
||
if linfo is None: continue
|
||
pinfo = EXP_PARAM_MAP.get(param)
|
||
if pinfo is None: continue
|
||
off, hc_suffix, dtype, access, ptype, pdesc = pinfo
|
||
addr = linfo['base'] + off
|
||
tagname = f'{base_name}' if src == spv else f'{base_name}.{param.lower()}'
|
||
hc900_tag = f"{linfo['tag']}.{hc_suffix}"
|
||
emit(tagname, hc900_tag, addr, dtype, access, n, ptype, pdesc, src)
|
||
|
||
return rows_out
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='C4 태그 매핑 CSV 생성')
|
||
parser.add_argument('--modbus-csv', default=str(BASE/'C4-All-Modbus-Map.csv'))
|
||
parser.add_argument('--sinam-xlsx', default=str(BASE/'Sinam_Tag_all.xlsx'))
|
||
parser.add_argument('-o', '--output', default=str(BASE/'c4_tag_mapping.csv'))
|
||
args = parser.parse_args()
|
||
|
||
rows = build_mapping(Path(args.modbus_csv), Path(args.sinam_xlsx))
|
||
|
||
with open(args.output, 'w', newline='', encoding='utf-8') as f:
|
||
w = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
|
||
w.writeheader()
|
||
w.writerows(rows)
|
||
|
||
# 통계
|
||
from collections import Counter
|
||
ptypes = Counter(r['param_type'] for r in rows)
|
||
loop_rows = sum(1 for r in rows if r['loop_no'])
|
||
sig_rows = sum(1 for r in rows if r['param_type'] == 'SIG')
|
||
var_rows = sum(1 for r in rows if r['param_type'] == 'VAR')
|
||
|
||
print(f'총 매핑: {len(rows)}개')
|
||
print(f' Loop 파라미터: {loop_rows}개 ({len(set(r["loop_no"] for r in rows if r["loop_no"]))} loops × 최대 {len(ANALOG_PARAMS)} params)')
|
||
print(f' Signal Tag: {sig_rows}개')
|
||
print(f' Variable: {var_rows}개')
|
||
print(f' param_type 분포: {dict(ptypes.most_common())}')
|
||
print(f'저장: {args.output}')
|
||
|
||
if __name__ == '__main__':
|
||
main()
|