- c6111_extract: roles_for() 동적 생성, COLUMN_EXCEPTIONS per-prefix - c6111_prodmap/shadow/startup/rolling: --data/--prefix CLI 인자 지원 - run_column.py: 5개 컬럼 전 파이프라인 실행 래퍼 - c6111_shutdown.py: detect_cutoffs + shutdown_milestones (lookback 1200) - c6111_operator_assist.py: OOD 게이트 + shadow 리플레이 - c6111_export_model.py: 선형근사 JSON export - SteamAdvisor.cs: Predict+ClassifyMode+InEnvelope (NaN guard, Ood fix) - SteamAdvisorController: GET/POST /api/steam/predict - appsettings.json/Program.cs: DI 등록 - docs: 작업지시서 현황 갱신, 진단보고서 작성 (3 MED/8 LOW, 100% 정확도)
194 lines
7.6 KiB
Python
194 lines
7.6 KiB
Python
"""
|
|
Operator-assist 패키징 (작업3).
|
|
|
|
사용법:
|
|
python3 c6111_operator_assist.py --data c61_data.pkl --prefix c61
|
|
python3 c6111_operator_assist.py --data c61_data.pkl --prefix c61 --live '{"feed":500,"product":300,"T_C":84.7}'
|
|
"""
|
|
import argparse
|
|
import json
|
|
import numpy as np
|
|
import pandas as pd
|
|
from sklearn.ensemble import IsolationForest
|
|
|
|
BASE = "/home/windpacer/projects/hc900_ax/scripts/analysis/"
|
|
FEATURES = ["feed", "product", "T_C"]
|
|
PROD_SMOOTH = 40
|
|
|
|
|
|
class OperatorAssist:
|
|
def __init__(self, df):
|
|
self.df = df
|
|
self.mode = "UNKNOWN"
|
|
self.model = None
|
|
self.inv = None
|
|
self.ood = None
|
|
self.env_lo = None
|
|
self.env_hi = None
|
|
self._train()
|
|
|
|
def _train(self):
|
|
prod = self.df[self.df["mode"] == "PROD"].copy()
|
|
prod = prod[(prod["feed"] > 50) & (prod["steam_flow"] > 10) & (prod["steam_op"] > 1)]
|
|
prod = prod.dropna(subset=FEATURES + ["steam_op", "steam_flow"])
|
|
if len(prod) < 100:
|
|
print(" [WARN] PROD 데이터 부족 — advisory 신뢰도 낮음")
|
|
points = (prod.set_index("dtat").resample("6h").median(numeric_only=True)
|
|
.dropna(subset=["steam_flow", "feed"]))
|
|
points = points[points["feed"] > 50]
|
|
from sklearn.ensemble import GradientBoostingRegressor
|
|
self.model = GradientBoostingRegressor(n_estimators=200, max_depth=2,
|
|
learning_rate=0.05, random_state=0)
|
|
self.model.fit(points[FEATURES].values, points["steam_flow"].values)
|
|
self.inv = np.polyfit(prod["steam_flow"], prod["steam_op"], 3)
|
|
self.env_lo = points[FEATURES].quantile(0.01)
|
|
self.env_hi = points[FEATURES].quantile(0.99)
|
|
self.ood = IsolationForest(contamination=0.05, random_state=0).fit(points[FEATURES].values)
|
|
print(f" 학습 운전점: {len(points)}개 envelope:")
|
|
for c in FEATURES:
|
|
print(f" {c}: [{self.env_lo[c]:.0f}, {self.env_hi[c]:.1f}]")
|
|
|
|
def classify_mode(self, tags):
|
|
"""tags dict → mode 추정 (classify_phases 단순 replica).
|
|
|
|
steam_op 없으면 feed/product로 판단 (live advisory용).
|
|
"""
|
|
prod = tags.get("product", 0)
|
|
feed = tags.get("feed", 0)
|
|
steam = tags.get("steam_op", None)
|
|
reb = tags.get("reb_temp", 60)
|
|
if prod > 100:
|
|
if steam is None or steam > 10:
|
|
return "PROD"
|
|
if steam is not None:
|
|
if steam > 10 and reb > 60:
|
|
return "LINEOUT"
|
|
if steam > 10 and feed < 50:
|
|
return "STARTUP"
|
|
if feed > 50:
|
|
return "PROD" # fallback: steam_op 없이 feed>50 + product>100는 PROD
|
|
return "STOPPED"
|
|
|
|
def in_envelope(self, tags):
|
|
x = np.array([[tags[c] for c in FEATURES]])
|
|
return ((x >= self.env_lo.values) & (x <= self.env_hi.values)).all()
|
|
|
|
def ood_score(self, tags):
|
|
return self.ood.decision_function(np.array([[tags[c] for c in FEATURES]]))[0]
|
|
|
|
def predict(self, tags, smooth_history=None):
|
|
"""live_tags dict → advisory dict.
|
|
|
|
tags: {"feed": float, "product": float, "T_C": float}
|
|
smooth_history: optional list of prior tag dicts for causal smoothing
|
|
|
|
Returns:
|
|
{"rec_OP": float or None, "rec_steam": float, "confidence": str,
|
|
"mode": str, "ood": bool, "in_env": bool, "message": str}
|
|
"""
|
|
mode = self.classify_mode(tags)
|
|
self.mode = mode
|
|
env = self.in_envelope(tags)
|
|
ood = self.ood_score(tags) < 0
|
|
raw = np.array([[[tags[c] for c in FEATURES]]])
|
|
|
|
if mode != "PROD":
|
|
msg = f"운전모드={mode} — advisory는 PROD에서만 제공 (STARTUP/LINEOUT은 레시피 참조)"
|
|
return {"rec_OP": None, "rec_steam": None, "confidence": "N/A",
|
|
"mode": mode, "ood": ood, "in_env": env, "message": msg}
|
|
|
|
# smooth: causal trailing median over recent history
|
|
if smooth_history and len(smooth_history) >= PROD_SMOOTH:
|
|
buf = pd.DataFrame(smooth_history[-PROD_SMOOTH:])[FEATURES].median()
|
|
x = np.array([[buf[c] for c in FEATURES]])
|
|
else:
|
|
x = raw[0]
|
|
|
|
sf = self.model.predict(x)[0]
|
|
op = np.clip(np.polyval(self.inv, sf), 0, 100)
|
|
|
|
if not env:
|
|
confidence = "LOW_OOD"
|
|
msg = (f"⚠ 범위밖 입력 — 권장 OP={op:.1f}% (외삽, 신뢰도 낮음). "
|
|
"오퍼레이터 판단 우선")
|
|
elif ood:
|
|
confidence = "MEDIUM"
|
|
msg = f"권장 OP={op:.1f}% (신뢰: 구간내, IForest 이상감지 — 주의)"
|
|
else:
|
|
confidence = "HIGH"
|
|
msg = f"권장 OP={op:.1f}% (신뢰: 구간내)"
|
|
|
|
return {"rec_OP": round(op, 1), "rec_steam": round(sf, 1),
|
|
"confidence": confidence, "mode": mode, "ood": bool(ood),
|
|
"in_env": bool(env), "feed": float(x[0][0]),
|
|
"product": float(x[0][1]), "T_C": float(x[0][2]),
|
|
"message": msg}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--data", default=BASE + "c6111_data.pkl")
|
|
parser.add_argument("--prefix", default="c6111")
|
|
parser.add_argument("--live", help='JSON live_tags for single predict test')
|
|
args = parser.parse_args()
|
|
df = pd.read_pickle(args.data)
|
|
assist = OperatorAssist(df)
|
|
|
|
if args.live:
|
|
tags = json.loads(args.live)
|
|
res = assist.predict(tags)
|
|
print(f"\n=== Operator Advisory ({args.prefix}) ===")
|
|
for k, v in res.items():
|
|
print(f" {k:15s}: {v}")
|
|
return
|
|
|
|
# 전체 shadow 리플레이: PROD 행 벡터화 처리
|
|
prod = df[df["mode"] == "PROD"].sort_values("dtat").copy()
|
|
prod = prod[(prod["feed"] > 50) & (prod["steam_flow"] > 10) & (prod["steam_op"] > 1)
|
|
& prod[FEATURES + ["steam_op"]].notna().all(axis=1)]
|
|
if len(prod) == 0:
|
|
print(" PROD 없음 — advisory 불가")
|
|
return
|
|
|
|
X = prod[FEATURES].values
|
|
sf = assist.model.predict(X)
|
|
op = np.clip(np.polyval(assist.inv, sf), 0, 100)
|
|
env_mask = ((X >= assist.env_lo.values) & (X <= assist.env_hi.values)).all(axis=1)
|
|
ood_mask = assist.ood.decision_function(X) < 0
|
|
errors = op - prod["steam_op"].values
|
|
|
|
ood_rate = np.mean(ood_mask) * 100
|
|
within_2 = np.mean(np.abs(errors) <= 2.0) * 100
|
|
print(f"\n=== Shadow Advisory Report ({args.prefix}) ===")
|
|
print(f" PROD 행수 : {len(prod)}")
|
|
print(f" OOD 비율 : {ood_rate:.1f}%")
|
|
print(f" OP MAE : {np.abs(errors).mean():.2f}%")
|
|
print(f" |Δ|≤2% : {within_2:.1f}% (검증기준: 90%+ in-envelope)")
|
|
env_only = errors[~ood_mask[:len(errors)]]
|
|
if len(env_only):
|
|
print(f" in-env MAE : {np.abs(env_only).mean():.2f}% "
|
|
f"|Δ|≤2%={np.mean(np.abs(env_only)<=2)*100:.1f}%")
|
|
|
|
# 권장 OP vs 실제 OP 시계열 플롯
|
|
import matplotlib
|
|
matplotlib.use("Agg")
|
|
import matplotlib.pyplot as plt
|
|
fig, ax = plt.subplots(2, 1, figsize=(14, 8))
|
|
s = prod.iloc[::10]
|
|
ax[0].plot(s["dtat"], s["steam_op"], lw=.6, label="actual OP")
|
|
ax[0].plot(s["dtat"], op[::10], lw=.6, c="r", label="advisory OP")
|
|
ax[0].set_ylabel("OP %"); ax[0].legend(fontsize=8)
|
|
ax[0].set_title(f"Operator Advisory vs Actual OP ({args.prefix})")
|
|
ax[1].hist(errors, bins=60)
|
|
ax[1].axvline(0, c="k", lw=.5)
|
|
ax[1].set_title(f"Advisory error (rec-actual): median {np.median(errors):+.2f}%, "
|
|
f"within 2%={within_2:.1f}%")
|
|
fig.tight_layout()
|
|
path = BASE + f"{args.prefix}_advisory.png"
|
|
fig.savefig(path, dpi=95)
|
|
print(f"\n 플롯 저장: {path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|