""" 컬럼 데이터 추출 + 운전모드 1차 특성 분석. field_hist DB(shinam 실데이터, WIDE 포맷)에서 ptlist/mapping/tblist로 태그를 디코드해 tidy DataFrame을 만든다. 재사용 가능한 tag_frame() 추출기 포함. 근거: docs/학습형제어-오퍼레이터모방-플랜.md §15(디코드), §16(C-6111 토폴로지). 형제 컬럼 확장: roles_for(prefix, asset)로 파라미터화. - 6-1: prefix=61, asset=/ASSETS/P6 (기본) - 6-2: prefix=62, asset=/ASSETS/P6 - 8: prefix=81, asset=/ASSETS/P8 - 9: prefix=91, asset=/ASSETS/P9 (또는 92) - 10: prefix=101, asset=/ASSETS/P10 (또는 102) """ import sys import psycopg import pandas as pd DSN = "host=localhost port=5432 dbname=field_hist user=postgres password=postgres" ASSET = "/ASSETS/P6" # --- 형제 컬럼 역할 생성기 --- # DB 검증 결과(2026-06-05) 기반 예외 오버라이드: # P8(81): TICA에 A/B/C/D 접미사 없음, PICA-8111A (with A suffix) # P9(91): PICA-9111A (with A suffix). 92xx 2차 컬럼 존재 # P10(101): FICQ-10114A (not 10114), PICA-10111A, LIA-10111 (not LICA). 102xx 2차 컬럼 존재 COLUMN_EXCEPTIONS = { "51": { # P5: 민감단 TI-5111C 센서 없음(A/B/D만 존재) → T_C를 TI-5111B로 대체 (사용자 확정 2026-06-05). # startup 트리거는 reb-A·ΔT(A-D) 사용이라 영향 없음. "T_C": "TI-5111B.PV", }, "81": { "steam_op": "TICA-8111.OP", "reb_temp": "TICA-8111.PV", "vacuum": "PICA-8111A.PV", }, "91": { "vacuum": "PICA-9111A.PV", }, "92": { "vacuum": "PICA-9211A.PV", }, "101": { "light": "FICQ-10114A.PV", "vacuum": "PICA-10111A.PV", "reb_level": "LIA-10111.PV", # 10-1차 리보일러 레벨 (LI-10111 없음, 사용자 확정) # reflux_drum은 base 규칙 LICA-10113.PV 사용(실존). 기존 LIA-10111 매핑은 오류(=리보일러레벨)였음. }, "102": { "light": "FICQ-10214.PV", "vacuum": "PICA-10211A.PV", "reb_level": "LIA-10211.PV", # 10-2차 리보일러 레벨 }, } def roles_for(prefix, asset=ASSET): """{role: shorttag} dict 생성. prefix 예: '61', '62', '81', '91', '101'. Base 규칙(6-1 기준, docs/작업지시서-학습형제어-다음단계.md 작업1): feed=FICQ-{p}01, reflux=FICQ-{p}13, light(D)=FICQ-{p}14, heavy(B)=FICQ-{p}16, product(P)=FICQ-{p}18, steam_op=TICA-{p}11A.OP, reb_temp=TICA-{p}11A.PV, steam_flow=FIQ-{p}15, T_B=TI-{p}11B, T_C=TI-{p}11C, T_D=TI-{p}11D, vacuum=PICA-{p}11.PV, dp=PI-{p}11B.PV, reb_level=LI-{p}11.PV, reflux_drum=LICA-{p}13.PV, feed_preheat=TI-{p}03.PV COLUMN_EXCEPTIONS에 등록된 prefix는 자동 오버라이드. """ p = prefix roles = { "feed": f"FICQ-{p}01.PV", "steam_op": f"TICA-{p}11A.OP", "steam_flow": f"FIQ-{p}15.PV", "reb_temp": f"TICA-{p}11A.PV", "T_B": f"TI-{p}11B.PV", "T_C": f"TI-{p}11C.PV", "T_D": f"TI-{p}11D.PV", "feed_preheat": f"TI-{p}03.PV", "vacuum": f"PICA-{p}11.PV", "dp": f"PI-{p}11B.PV", "product": f"FICQ-{p}18.PV", "reflux": f"FICQ-{p}13.PV", "light": f"FICQ-{p}14.PV", "heavy": f"FICQ-{p}16.PV", "reb_level": f"LI-{p}11.PV", "reflux_drum": f"LICA-{p}13.PV", } ov = COLUMN_EXCEPTIONS.get(prefix, {}) roles.update(ov) return roles # C-6111 (6-1) 역할별 태그 — legacy 직접 참조 호환용 ROLES = roles_for("61", ASSET) def resolve(conn, shorttags, asset=ASSET): """shortptname 목록 -> {tag: (tblname, colnum)}""" with conn.cursor() as cur: cur.execute(""" SELECT p.shortptname, t.tblname, m.oit FROM ptlist p JOIN mapping m ON m.pid=p.pid JOIN tblist t ON t.tid=m.tid WHERE p.asset=%s AND p.shortptname = ANY(%s) """, (asset, list(shorttags))) out = {} for short, tbl, oit in cur.fetchall(): out[short] = (tbl, int(oit)) return out def tag_frame(conn, role_map, asset=ASSET): """{role: shorttag} -> dtat 인덱스 DataFrame(컬럼=role). 테이블별 1쿼리 후 merge.""" loc = resolve(conn, role_map.values(), asset) missing = [r for r, t in role_map.items() if t not in loc] if missing: print(f"[warn] 미해결 태그: {[(r, role_map[r]) for r in missing]}", file=sys.stderr) # 테이블별 그룹 by_tbl = {} for role, short in role_map.items(): if short not in loc: continue tbl, col = loc[short] by_tbl.setdefault(tbl, []).append((role, col)) df = None for tbl, cols in by_tbl.items(): sel = ", ".join([f'col{c:02d} AS "{role}"' for role, c in cols]) q = f"SELECT dtat, {sel} FROM {tbl}" part = pd.read_sql(q, conn) df = part if df is None else df.merge(part, on="dtat", how="outer") return df.sort_values("dtat").reset_index(drop=True) _RANGES = None def load_ranges(): """instrument_ranges.json (gen_instrument_ranges.py 산출) 로드. 없으면 빈 dict.""" global _RANGES if _RANGES is None: import json, os p = os.path.join(os.path.dirname(os.path.abspath(__file__)), "instrument_ranges.json") _RANGES = json.load(open(p)) if os.path.exists(p) else {} if not _RANGES: print("[warn] instrument_ranges.json 없음 — 계기범위 클린징 건너뜀 " "(gen_instrument_ranges.py 먼저 실행)", file=sys.stderr) return _RANGES def clip_to_ranges(df, role_map): """role 컬럼값이 계기 EU range[lo,hi] 밖이면 NaN(센서 스파이크 제거). range는 instrument_ranges.json(realtime 실측 우선, 9·10차는 xlsx)에서 태그별 조회. OP(밸브%)는 0-100 고정이라 스킵. range 미등록 태그(FIQ/TI signal)도 스킵. NaN 처리(행 제거 아님) → 다른 role은 유효값 보존, 다운스트림 notna() 필터가 흡수. """ import numpy as np ranges = load_ranges() if not ranges: return df total = 0 for role, short in role_map.items(): if role not in df.columns or short.endswith(".OP"): continue r = ranges.get(short.split(".")[0]) if not r: continue mask = (df[role] < r["lo"]) | (df[role] > r["hi"]) n = int(mask.sum()) if n: df.loc[mask, role] = np.nan total += n print(f" [clip] {role:12s}({short.split('.')[0]}) " f"range[{r['lo']:.0f},{r['hi']:.0f}] 밖 {n}개 → NaN") if total: print(f" [clip] 계기범위 밖 총 {total}개 값 제거") return df def classify_phases(df): """1차 운전모드 분류 (임계 기반, §16.3-2). 추후 정교화.""" import numpy as np reb, vac, steam, prod = df["reb_temp"], df["vacuum"], df["steam_op"], df["product"] hot_vac = (reb > 60) & (vac < 200) & (steam > 5) # 컬럼 가동(hot+진공) # 온도 추세(60분=120샘플 기울기)로 startup/shutdown 구분 slope = reb.diff().rolling(120, min_periods=10, center=True).mean() mode = np.where( hot_vac, np.where(prod < 80, "LINEOUT", "PROD"), # 제품≈0 → 전환류/라인아웃 np.where(slope > 0.02, "STARTUP", np.where(slope < -0.02, "SHUTDOWN", "STOPPED"))) return pd.Series(mode, index=df.index, name="mode") def plot_timeline(df, png): import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt d = df.iloc[::30].copy() # 15분 다운샘플 colors = {"PROD": "#2ca02c", "LINEOUT": "#ff7f0e", "STARTUP": "#1f77b4", "SHUTDOWN": "#d62728", "STOPPED": "#7f7f7f"} fig, ax = plt.subplots(5, 1, figsize=(16, 12), sharex=True) ax[0].plot(d.dtat, d.reb_temp, lw=.5, label="reb_temp(A)") ax[0].plot(d.dtat, d.T_C, lw=.5, label="T_C(민감단)") ax[0].plot(d.dtat, d.T_D, lw=.5, label="T_D(탑상)") ax[0].set_ylabel("온도"); ax[0].legend(loc="upper right", fontsize=7) ax[1].plot(d.dtat, d.feed, lw=.5, color="purple"); ax[1].set_ylabel("feed FICQ-6101") ax[2].plot(d.dtat, d["product"], lw=.5, color="orange"); ax[2].set_ylabel("측류제품 6118") ax[3].plot(d.dtat, d.steam_flow, lw=.5, color="red") ax[3].plot(d.dtat, d.steam_op * 10, lw=.5, color="brown", alpha=.5, label="OP×10") ax[3].set_ylabel("스팀유량/OP"); ax[3].legend(loc="upper right", fontsize=7) ax[4].plot(d.dtat, d.vacuum, lw=.5, color="teal"); ax[4].set_ylabel("진공 PICA-6111") ax[4].set_ylim(100, 130) # 모드 배경 음영 for a in ax: for m, c in colors.items(): seg = d[d["mode"] == m] a.scatter(seg.dtat, [a.get_ylim()[0]] * len(seg), c=c, s=2, marker="|") fig.suptitle("C-6111 (6-1차) 전체기간 — 운전모드별 (하단 컬러바)") fig.tight_layout() fig.savefig(png, dpi=90) print(f"플롯 저장: {png}") def main(): with psycopg.connect(DSN) as conn: df = tag_frame(conn, ROLES) print(f"행수={len(df)} 기간={df.dtat.min()} ~ {df.dtat.max()}") print("\n=== 핵심 신호 분포 (운전모드 임계 설정용) ===") show = ["feed", "reb_temp", "vacuum", "product", "reflux", "steam_op", "steam_flow", "T_C", "T_D", "dp"] desc = df[show].describe(percentiles=[.01, .05, .25, .5, .75, .95, .99]).T print(desc[["min", "1%", "5%", "50%", "95%", "99%", "max"]].round(2).to_string()) df["mode"] = classify_phases(df) print("\n=== 운전모드 분포 (30초 샘플 기준) ===") vc = df["mode"].value_counts() for m, n in vc.items(): print(f" {m:9s} {n:7d} {100*n/len(df):5.1f}% ≈ {n*30/3600:7.1f} h") out = "/home/windpacer/projects/hc900_ax/scripts/analysis/C-6111_data.pkl" df.to_pickle(out) plot_timeline(df, "/home/windpacer/projects/hc900_ax/scripts/analysis/c6111_timeline.png") print(f"저장: {out}") if __name__ == "__main__": main()