#!/usr/bin/env python3 """ C4 컨트롤러 Experion 태그 → HC900 Modbus 레지스터 매핑 CSV 생성 입력: docs/C4-All-Modbus-Map.csv - HC900 C4 Modbus 레지스터 전체 맵 docs/Sinam_Tag_all.xlsx - Experion 태그 목록 (SourceAddressPV/OP/MD 컬럼) 출력: docs/c4_tag_mapping.csv - hc900_map_master 적재용 매핑 테이블 전략: AnalogPoint (PID Loop) → 루프 번호로 표준 파라미터 세트 생성 (PV / SP / OP / RSP / MODE / STATUS) StatusPoint (Signal Tag) → C4 TAG N → Signal Tag 주소 직접 매핑 MATH_VAR (Variable) → C4 MATH_VAR N → Variable 주소 직접 매핑 """ import csv, re, openpyxl, argparse from pathlib import Path from collections import OrderedDict # ── 경로 설정 ───────────────────────────────────────────────────────────── BASE = Path(__file__).parent.parent / 'docs' # ── AnalogPoint 표준 파라미터 세트 ────────────────────────────────────────── # (tagname_suffix, hc900_suffix, offset_hex, dtype, access, param_type, description) ANALOG_PARAMS = [ ('pv', 'PV', 0x0000, 'float32', 'R', 'PV', 'Process Variable'), ('sp', 'WSP', 0x0004, 'float32', 'R/W', 'SP', 'Working Set Point'), ('op', 'Output', 0x0006, 'float32', 'R/W', 'OP', 'Output'), ('rsp', 'RSP_SP2', 0x0002, 'float32', 'R/W', 'RSP', 'Remote Set Point (SP2)'), ('lsp1', 'LSP1', 0x002A, 'float32', 'R/W', 'LSP1', 'Local SP 1'), ('lsp2', 'LSP2', 0x002C, 'float32', 'R/W', 'LSP2', 'Local SP 2'), ('dev', 'Deviation', 0x004A, 'float32', 'R', 'DEV', 'Deviation (SP-PV)'), ('pv_lo', 'PV_low_range', 0x0016, 'float32', 'R', 'PV_LO', 'PV Low Range'), ('pv_hi', 'PV_high_range', 0x0018, 'float32', 'R', 'PV_HI', 'PV High Range'), ('sp_lo', 'SP_low_limit', 0x0034, 'float32', 'R/W', 'SP_LO', 'SP Low Limit'), ('sp_hi', 'SP_high_limit', 0x0036, 'float32', 'R/W', 'SP_HI', 'SP High Limit'), ('op_lo', 'Output_Low_Limit', 0x003A, 'float32', 'R/W', 'OP_LO', 'Output Low Limit'), ('op_hi', 'Output_High_Limit', 0x003C, 'float32', 'R/W', 'OP_HI', 'Output High Limit'), ('alm1', 'Alarm_1_SP1', 0x001A, 'float32', 'R/W', 'ALM1', 'Alarm 1 SP1'), ('alm2', 'Alarm_2_SP1', 0x002E, 'float32', 'R/W', 'ALM2', 'Alarm 2 SP1'), ('mode', 'Auto_Man_State', 0x00BA, 'uint16', 'R/W', 'MODE', 'Auto/Manual State (0=Man,1=Auto)'), ('status', 'Loop_Status_Register', 0x00BE, 'uint16', 'R', 'STATUS', 'Loop Status Register'), ] # Experion 소스 파라미터명 → HC900 오프셋 (SourceAddressMD 처리용) EXP_PARAM_MAP = { 'RESET1': (0x0010, 'Reset_1', 'float32', 'R/W', 'RESET1', 'Reset 1'), 'RATE1': (0x0012, 'Rate_1', 'float32', 'R/W', 'RATE1', 'Rate 1'), 'AMSTAT': (0x00BA, 'Auto_Man_State', 'uint16', 'R/W', 'MODE', 'Auto/Manual State'), 'LOOPSTAT': (0x00BE, 'Loop_Status_Register', 'uint16', 'R', 'STATUS', 'Loop Status Register'), 'PV': (0x0000, 'PV', 'float32', 'R', 'PV', 'Process Variable'), 'WSP': (0x0004, 'WSP', 'float32', 'R/W', 'SP', 'Working Set Point'), 'OP': (0x0006, 'Output', 'float32', 'R/W', 'OP', 'Output'), } def load_modbus_map(csv_path: Path): """C4-All-Modbus-Map.csv → loop_map, sig_map, var_map""" with open(csv_path, encoding='utf-8-sig') as f: rows = list(csv.reader(f)) header_idx = next(i for i,r in enumerate(rows) if r and r[0]=='Hex Addr') data = rows[header_idx+1:] loop_map = {} # loop_num → {'tag': str, 'base': int, 'desc': str} sig_map = {} # tag_num → {'tag': str, 'addr': int, 'dtype': str, 'desc': str} var_map = {} # var_num → {'tag': str, 'addr': int, 'desc': str} for r in data: if len(r) < 9: continue partition = r[2].strip() tag_name = r[3].strip() if not tag_name: continue addr_hex = r[0].strip() try: addr = int(addr_hex, 16) except ValueError: continue desc = r[4].strip() dtype = r[7].strip() access = r[8].strip() # PID Loop 1-24 if partition == 'Loops 1-24' and tag_name.endswith('.PV'): n = (addr - 0x0040) // 0x0100 + 1 base_tag = tag_name.replace('.PV', '') loop_map[n] = {'tag': base_tag, 'base': addr, 'desc': desc} # PID Loop 25-32 elif partition == 'Loops 25-32' and tag_name.endswith('.PV'): n = (addr - 0x7840) // 0x0100 + 25 base_tag = tag_name.replace('.PV', '') loop_map[n] = {'tag': base_tag, 'base': addr, 'desc': desc} # Signal Tags 1-1000 elif partition == 'Signal Tags 1-1000': n = (addr - 0x2000) // 2 + 1 dt = 'float32' if 'float' in dtype.lower() else 'uint16' sig_map[n] = {'tag': tag_name, 'addr': addr, 'dtype': dt, 'access': access, 'desc': desc} # Variables 1-600 elif partition == 'Variables 1-600': n = (addr - 0x18C0) // 2 + 1 var_map[n] = {'tag': tag_name, 'addr': addr, 'desc': desc} return loop_map, sig_map, var_map def parse_indexed(src: str): """Indexed Address → (partition, n, param)""" m = re.match(r'\S+\s+(LOOP|LOOPX)\s+(\d+)\s+(\w+)', src.strip()) if m: return m.group(1), int(m.group(2)), m.group(3) m = re.match(r'\S+\s+TAG\s+(\d+)\s+VALUE', src.strip()) if m: return 'TAG', int(m.group(1)), 'VALUE' m = re.match(r'\S+\s+MATH_VAR\s+(\d+)\s+VALUE', src.strip()) if m: return 'MATH_VAR', int(m.group(1)), 'VALUE' return None, None, None def get_loop_num(spv: str, smd: str) -> tuple[str|None, int|None]: """SourceAddressPV/MD 에서 루프 번호 추출 → (partition, n)""" for src in [spv, smd]: if not src: continue part, n, _ = parse_indexed(src) if part in ('LOOP', 'LOOPX'): return part, n return None, None def build_mapping(modbus_csv: Path, sinam_xlsx: Path) -> list[dict]: loop_map, sig_map, var_map = load_modbus_map(modbus_csv) wb = openpyxl.load_workbook(sinam_xlsx, read_only=True, data_only=True) ws = wb.active xlsx_rows = list(ws.iter_rows(values_only=True)) headers = xlsx_rows[1] col = {h: i for i, h in enumerate(headers) if h} rows_out = [] seen_tagnames = set() def emit(tagname: str, hc900_tag: str, modbus_addr: int, dtype: str, access: str, loop_no, param_type: str, description: str, experion_src: str): if tagname in seen_tagnames: return seen_tagnames.add(tagname) rows_out.append(OrderedDict([ ('tagname', tagname), ('hc900_tag', hc900_tag), ('modbus_addr', modbus_addr), ('modbus_addr_hex', f'0x{modbus_addr:04X}'), ('data_type', dtype), ('access', access), ('loop_no', loop_no if loop_no else ''), ('param_type', param_type), ('description', description), ('experion_src', experion_src), ('is_active', 'TRUE'), ])) for row in xlsx_rows[2:]: item_name = row[col['ItemName']] cls = row[col['Class']] spv = str(row[col['SourceAddressPV']] or '').strip() sop = str(row[col['SourceAddressOP']] or '').strip() smd = str(row[col['SourceAddressMD']] or '').strip() if not item_name or not cls: continue # C4 참조만 처리 if not any('C4 ' in s for s in [spv, sop, smd]): continue base_name = str(item_name).strip().lower() # ── AnalogPoint (PID Loop) ──────────────────────────────────────── if cls == 'AnalogPoint': loop_part, loop_n = get_loop_num(spv, smd) if loop_n is None: continue linfo = loop_map.get(loop_n) if linfo is None: continue base_tag = linfo['tag'] base_addr = linfo['base'] # 표준 파라미터 세트 생성 for suffix, hc_suffix, offset, dtype, access, ptype, pdesc in ANALOG_PARAMS: tagname = f'{base_name}.{suffix}' hc900_tag = f'{base_tag}.{hc_suffix}' addr = base_addr + offset src_str = f'{loop_part} {loop_n} → {base_tag}' emit(tagname, hc900_tag, addr, dtype, access, loop_n, ptype, pdesc, src_str) # ── StatusPoint / 기타 ──────────────────────────────────────────── else: processed_srcs = set() for src in [spv, sop, smd]: if not src or 'C4 ' not in src or src in processed_srcs: continue processed_srcs.add(src) part, n, param = parse_indexed(src) if part is None: continue if part == 'TAG': info = sig_map.get(n) hc900_tag = info['tag'] if info else f'SIG_TAG_{n}' addr = 0x2000 + (n-1)*2 dtype = info['dtype'] if info else 'float32' access = 'R' pdesc = info['desc'] if info else '' emit(base_name, hc900_tag, addr, dtype, access, None, 'SIG', pdesc, src) elif part == 'MATH_VAR': info = var_map.get(n) hc900_tag = info['tag'] if info else f'VAR_{n}' addr = 0x18C0 + (n-1)*2 pdesc = info['desc'] if info else '' emit(base_name, hc900_tag, addr, 'float32', 'R/W', None, 'VAR', pdesc, src) elif part in ('LOOP', 'LOOPX'): # AnalogPoint가 아닌데 LOOP 소스 참조 (드물지만 처리) linfo = loop_map.get(n) if linfo is None: continue pinfo = EXP_PARAM_MAP.get(param) if pinfo is None: continue off, hc_suffix, dtype, access, ptype, pdesc = pinfo addr = linfo['base'] + off tagname = f'{base_name}' if src == spv else f'{base_name}.{param.lower()}' hc900_tag = f"{linfo['tag']}.{hc_suffix}" emit(tagname, hc900_tag, addr, dtype, access, n, ptype, pdesc, src) return rows_out def main(): parser = argparse.ArgumentParser(description='C4 태그 매핑 CSV 생성') parser.add_argument('--modbus-csv', default=str(BASE/'C4-All-Modbus-Map.csv')) parser.add_argument('--sinam-xlsx', default=str(BASE/'Sinam_Tag_all.xlsx')) parser.add_argument('-o', '--output', default=str(BASE/'c4_tag_mapping.csv')) args = parser.parse_args() rows = build_mapping(Path(args.modbus_csv), Path(args.sinam_xlsx)) with open(args.output, 'w', newline='', encoding='utf-8') as f: w = csv.DictWriter(f, fieldnames=list(rows[0].keys())) w.writeheader() w.writerows(rows) # 통계 from collections import Counter ptypes = Counter(r['param_type'] for r in rows) loop_rows = sum(1 for r in rows if r['loop_no']) sig_rows = sum(1 for r in rows if r['param_type'] == 'SIG') var_rows = sum(1 for r in rows if r['param_type'] == 'VAR') print(f'총 매핑: {len(rows)}개') print(f' Loop 파라미터: {loop_rows}개 ({len(set(r["loop_no"] for r in rows if r["loop_no"]))} loops × 최대 {len(ANALOG_PARAMS)} params)') print(f' Signal Tag: {sig_rows}개') print(f' Variable: {var_rows}개') print(f' param_type 분포: {dict(ptypes.most_common())}') print(f'저장: {args.output}') if __name__ == '__main__': main()