""" Plot data JSON export for web dashboard. Usage: python3 export_plotdata.py --data c61_data.pkl --prefix c61 python3 export_plotdata.py --data c81_data.pkl --prefix c81 Output: data/{prefix}_plotdata.json """ import argparse import json import os import sys import numpy as np import pandas as pd BASE = os.path.dirname(os.path.abspath(__file__)) FEATURES = ["feed", "product", "T_C"] PRODMAP_FEATURES = ["feed", "product", "vacuum", "feed_preheat", "T_C", "T_D"] OP_RESAMPLE = "6h" def _load_data(data_path): df = pd.read_pickle(data_path) return df.sort_values("dtat").reset_index(drop=True) def _export_prodmap(df, prefix): """Production map: valve char + operating points + regression.""" from sklearn.ensemble import GradientBoostingRegressor from sklearn.linear_model import LinearRegression from sklearn.metrics import r2_score from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler prod = df[df["mode"] == "PROD"].copy() prod = prod[(prod["feed"] > 50) & (prod["steam_flow"] > 10) & (prod["steam_op"] > 1) & prod[PRODMAP_FEATURES + ["steam_flow", "steam_op"]].notna().all(axis=1)] if len(prod) < 50: return {"warning": "PROD 데이터 부족"} valve_char = _valve_char(prod) ops, gbm, Xte, yte, pred, imp = _regress(prod) return { "valve_char": valve_char, "operating_points": { "feed": _safelen(ops["feed"]), "steam_flow": _safelen(ops["steam_flow"]), "steam_op": _safelen(ops["steam_op"]), "n": len(ops), }, "pred_vs_actual": { "actual": _safelen(yte), "predicted": _safelen(pred), "r2": round(r2_score(yte, pred), 4), "n": len(yte), }, "feature_importance": { "feature": [str(f) for f in PRODMAP_FEATURES], "gbm_importance": [round(float(v), 4) for v in imp.values], }, "n_prod_rows": len(prod), } def _valve_char(df): """OP(밸브%) ↔ 스팀유량 히스테리시스 특성 (c6111_prodmap.py valve_char() replica).""" op, fl = df["steam_op"].values, df["steam_flow"].values dop = np.diff(df["steam_op"].values, prepend=df["steam_op"].values[0]) up, dn = dop > 0.05, dop < -0.05 bins = np.arange(np.floor(op.min()), np.ceil(op.max()) + 1, 1.0) rows = [] for lo, hi in zip(bins[:-1], bins[1:]): m = (op >= lo) & (op < hi) if m.sum() < 20: continue fu = float(fl[m & up].mean()) if (m & up).sum() > 5 else None fd = float(fl[m & dn].mean()) if (m & dn).sum() > 5 else None rows.append({ "op": float(lo + .5), "flow_mean": float(fl[m].mean()), "flow_up": fu, "flow_dn": fd, "n": int(m.sum()), }) return rows def _regress(df): """6h 운전점 집계 → GBM 회귀 (c6111_prodmap.py regress() replica).""" from sklearn.ensemble import GradientBoostingRegressor from sklearn.linear_model import LinearRegression from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler ops = (df.set_index("dtat").resample(OP_RESAMPLE).median(numeric_only=True) .dropna(subset=["steam_flow", "feed"])) ops = ops[ops["feed"] > 50] X, y = ops[PRODMAP_FEATURES].values, ops["steam_flow"].values Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=.3, random_state=0) sc = StandardScaler().fit(Xtr) lin = LinearRegression().fit(sc.transform(Xtr), ytr) gbm = GradientBoostingRegressor(n_estimators=200, max_depth=2, learning_rate=0.05, random_state=0).fit(Xtr, ytr) pred = gbm.predict(Xte) imp = pd.Series(gbm.feature_importances_, index=PRODMAP_FEATURES) return ops, gbm, Xte, yte, pred, imp def _export_startup(df, prefix): """Startup episodes + milestones.""" cutins = _detect_cutins(df) if not cutins: return {"warning": "컷인 이벤트 없음"} episodes = [] milestones_rows = [] for ci in cutins: w = df.iloc[max(0, ci - 360):min(len(df), ci + 360)].copy() w["rel_min"] = (w["dtat"] - df["dtat"].iloc[ci]).dt.total_seconds() / 60 episodes.append({ "rel_min": _safelen(w["rel_min"]), "reb_temp": _safelen(w["reb_temp"]), "T_D": _safelen(w["T_D"]), "steam_flow": _safelen(w["steam_flow"]), "reflux": _safelen(w["reflux"]), "feed": _safelen(w["feed"]), "product": _safelen(w["product"]), "cutin_time": str(df["dtat"].iloc[ci]), }) milestones_rows.append(_milestones(df, ci)) M = pd.DataFrame(milestones_rows) return { "episodes": episodes, "milestones": { "steam_to_cutin_min": _nanmid(M["steam_to_cutin"]), "reflux_to_cutin_min": _nanmid(M["reflux_to_cutin"]), "cutin_to_full_min": _nanmid(M["cutin_to_full"]), "cutin_triggers": { "reb_A": { "mean": round(float(M["cutin_rebA"].mean()), 1), "std": round(float(M["cutin_rebA"].std()), 1), }, "T_C": { "mean": round(float(M["cutin_TC"].mean()), 1), "std": round(float(M["cutin_TC"].std()), 2), }, "dT_AD": { "mean": round(float(M["cutin_dT_AD"].mean()), 1), "std": round(float(M["cutin_dT_AD"].std()), 1), }, }, }, "n_episodes": len(episodes), } def _detect_cutins(df): """c6111_startup.py detect_cutins() replica.""" prod = df["product"].values reb = df["reb_temp"].values outs = [] i = 60 n = len(df) while i < n: if prod[i] > 100 and prod[i - 1] <= 100: pre = prod[max(0, i - 60):i] if np.nanmedian(pre) < 50 and reb[i] > 75: outs.append(i) i += 720 continue i += 1 return outs def _milestones(df, ci): """c6111_startup.py milestones() replica.""" tc = df["dtat"].iloc[ci] back = df.iloc[max(0, ci - 1200):ci] off = back[back["steam_op"] <= 10] i_steam = off.index[-1] + 1 if len(off) else back.index[0] aft = df.iloc[i_steam:ci] r_on = aft[aft["reflux"] > 100] i_refl = r_on.index[0] if len(r_on) else None fwd = df.iloc[ci:ci + 1200] f_on = fwd[fwd["feed"] > 250] i_full = f_on.index[0] if len(f_on) else None def mins(i): return None if i is None else (df["dtat"].iloc[i] - tc).total_seconds() / 60 r = df.iloc[ci] return { "steam_to_cutin": -mins(i_steam) if i_steam is not None else None, "reflux_to_cutin": -mins(i_refl) if i_refl is not None else None, "cutin_to_full": mins(i_full) if i_full is not None else None, "cutin_rebA": float(r["reb_temp"]), "cutin_TC": float(r["T_C"]), "cutin_TD": float(r["T_D"]), "cutin_dT_AD": float(r["reb_temp"] - r["T_D"]), } def _export_shadow(df, prefix): """Shadow advisory vs actual OP (6h downsampled) + error histogram.""" from sklearn.ensemble import GradientBoostingRegressor prod = df[df["mode"] == "PROD"].copy() prod = prod[(prod["feed"] > 50) & (prod["steam_flow"] > 10) & (prod["steam_op"] > 1) & prod[FEATURES + ["steam_op"]].notna().all(axis=1)].sort_values("dtat") if len(prod) < 100: return {"warning": "PROD 부족 — shadow 불가"} for c in FEATURES: prod[c + "_s"] = prod[c].rolling(40, min_periods=1, center=True).median() cut = prod["dtat"].quantile(0.70) tr, te = prod[prod["dtat"] <= cut], prod[prod["dtat"] > cut] if len(te) < 50: return {"warning": "held-out 데이터 부족"} ops = (tr.set_index("dtat").resample("6h").median(numeric_only=True) .dropna(subset=["steam_flow", "feed"])) ops = ops[ops["feed"] > 50] model = GradientBoostingRegressor(n_estimators=200, max_depth=2, learning_rate=0.05, random_state=0) model.fit(ops[FEATURES].values, ops["steam_flow"].values) inv = np.polyfit(tr["steam_flow"], tr["steam_op"], 3) lo, hi = tr[FEATURES].quantile(0.01), tr[FEATURES].quantile(0.99) Xs = te[[c + "_s" for c in FEATURES]].values pf = model.predict(Xs) po = np.clip(np.polyval(inv, pf), 0, 100) ao = te["steam_op"].values env_mask = ((te[FEATURES] >= lo) & (te[FEATURES] <= hi)).all(axis=1).values # 6h downsampled time series for chart te_plot = te.assign(pred_op=po, pred_flow=pf, ood=~env_mask) te_plot = te_plot.set_index("dtat") te_6h = te_plot.resample("6h").agg({ "steam_op": "mean", "pred_op": "mean", "steam_flow": "mean", "pred_flow": "mean", "ood": "max", }).dropna(subset=["steam_op"]).reset_index() errors = po - ao hist_bins = np.linspace(errors.min(), errors.max(), 61) hist_counts, hist_edges = np.histogram(errors, bins=hist_bins) within_2 = float(np.mean(np.abs(errors) <= 2.0) * 100) return { "time_series": { "time": [str(t) for t in te_6h["dtat"]], "actual_op": _safelen(te_6h["steam_op"]), "predicted_op": _safelen(te_6h["pred_op"]), "actual_flow": _safelen(te_6h["steam_flow"]), "predicted_flow": _safelen(te_6h["pred_flow"]), "ood": [bool(x) for x in te_6h["ood"]], "n": len(te_6h), }, "error_histogram": { "bin_edges": [round(float(x), 2) for x in hist_edges], "counts": [int(c) for c in hist_counts], }, "summary": { "n_train": int(len(ops)), "n_test": int(len(te)), "mae": float(np.abs(errors).mean()), "within_2pct": within_2, "ood_rate": float(np.mean(~env_mask) * 100), }, } def _export_operator_assist(df, prefix): """Operator assist shadow replay (advisory vs actual OP across all PROD).""" from sklearn.ensemble import GradientBoostingRegressor, IsolationForest prod = df[df["mode"] == "PROD"].copy() prod = prod[(prod["feed"] > 50) & (prod["steam_flow"] > 10) & (prod["steam_op"] > 1) & prod[FEATURES + ["steam_op"]].notna().all(axis=1)] if len(prod) < 100: return {"warning": "PROD 부족 — advisory 불가"} points = (prod.set_index("dtat").resample("6h").median(numeric_only=True) .dropna(subset=["steam_flow", "feed"])) points = points[points["feed"] > 50] model = GradientBoostingRegressor(n_estimators=200, max_depth=2, learning_rate=0.05, random_state=0) model.fit(points[FEATURES].values, points["steam_flow"].values) inv = np.polyfit(prod["steam_flow"], prod["steam_op"], 3) env_lo = points[FEATURES].quantile(0.01) env_hi = points[FEATURES].quantile(0.99) ood = IsolationForest(contamination=0.05, random_state=0).fit(points[FEATURES].values) X = prod[FEATURES].values sf = model.predict(X) op = np.clip(np.polyval(inv, sf), 0, 100) env_mask = ((X >= env_lo.values) & (X <= env_hi.values)).all(axis=1) ood_mask = ood.decision_function(X) < 0 errors = op - prod["steam_op"].values # downsampled time series prod_plot = prod.assign(pred_op=op, pred_flow=sf, ood=ood_mask, in_env=env_mask) prod_plot = prod_plot.set_index("dtat") prod_6h = prod_plot.resample("6h").agg({ "steam_op": "mean", "pred_op": "mean", "steam_flow": "mean", "pred_flow": "mean", "ood": "max", "in_env": "min", }).dropna(subset=["steam_op"]).reset_index() hist_bins = np.linspace(errors.min(), errors.max(), 61) hist_counts, hist_edges = np.histogram(errors, bins=hist_bins) return { "time_series": { "time": [str(t) for t in prod_6h["dtat"]], "actual_op": _safelen(prod_6h["steam_op"]), "predicted_op": _safelen(prod_6h["pred_op"]), "actual_flow": _safelen(prod_6h["steam_flow"]), "predicted_flow": _safelen(prod_6h["pred_flow"]), "ood": [bool(x) for x in prod_6h["ood"]], "in_env": [bool(x) for x in prod_6h["in_env"]], "n": len(prod_6h), }, "error_histogram": { "bin_edges": [round(float(x), 2) for x in hist_edges], "counts": [int(c) for c in hist_counts], }, "summary": { "n_operating_points": len(points), "n_prod_rows": len(prod), "mae": float(np.abs(errors).mean()), "within_2pct": float(np.mean(np.abs(errors) <= 2.0) * 100), "ood_rate": float(np.mean(ood_mask) * 100), }, } def _safelen(x): """Convert pd.Series/np.array to Python list, handling NaNs.""" if hasattr(x, "tolist"): return [None if (isinstance(v, float) and np.isnan(v)) else v for v in x.tolist()] if isinstance(x, np.ndarray): return [None if (isinstance(v, float) and np.isnan(v)) else v for v in x.tolist()] return list(x) def _nanmid(s): """Median of series, returning None if empty.""" v = s.dropna() return round(float(v.median()), 1) if len(v) else None def main(): parser = argparse.ArgumentParser(description="Export plot data as JSON for web dashboard") parser.add_argument("--data", default=os.path.join(BASE, "c6111_data.pkl")) parser.add_argument("--prefix", default="c6111") parser.add_argument("--output", default=None, help="Output path (default: data/{prefix}_plotdata.json)") args = parser.parse_args() df = _load_data(args.data) prefix = args.prefix out_path = args.output or os.path.join(BASE, f"{prefix}_plotdata.json") result = { "prefix": prefix, "n_total_rows": len(df), "date_range": [str(df["dtat"].min()), str(df["dtat"].max())], } result["prodmap"] = _export_prodmap(df, prefix) print(f"[export] prodmap: {result['prodmap'].get('n_prod_rows', 'N/A')} PROD rows") result["startup"] = _export_startup(df, prefix) print(f"[export] startup: {result['startup'].get('n_episodes', 'N/A')} episodes") result["shadow"] = _export_shadow(df, prefix) s = result["shadow"].get("summary") if s: print(f"[export] shadow: MAE={s['mae']:.2f} within2%={s['within_2pct']:.1f}%") else: print(f"[export] shadow: {result['shadow'].get('warning', 'N/A')}") result["advisory"] = _export_operator_assist(df, prefix) s = result["advisory"].get("summary") if s: print(f"[export] advisory: MAE={s['mae']:.2f} within2%={s['within_2pct']:.1f}%") else: print(f"[export] advisory: {result['advisory'].get('warning', 'N/A')}") os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) with open(out_path, "w", encoding="utf-8") as f: json.dump(result, f, indent=2, ensure_ascii=False, default=str) print(f"[export] 저장: {out_path} ({os.path.getsize(out_path) / 1024:.0f} KB)") if __name__ == "__main__": main()