Files
HC900-Crawler/scripts/analysis/c6111_trim.py
windpacer 1bc46b1eb0 feat: 6-1차 컬럼 학습형 제어 오프라인 분석 (생산맵·shadow·startup)
신암정유 6-1차 측류 솔벤트 컬럼(C-6111) 실데이터로 오퍼레이터 모방 제어 분석:

- 데이터 추출기 tag_frame() (field_hist WIDE 포맷 디코드) + 운전모드 분류
- ① 생산맵: 스팀유량=f(피드,제품,목표T_C) 운전점 GBM R²0.99 (steam/feed≈0.73)
- shadow 백테스트: in-envelope 오퍼레이터 OP 94% 모방, OOD 게이트→폴백
- 롤링 재학습: 새 로드레짐 적응 (5월 OP MAE 3.9→1.2%)
- 캠페인내 트림: 컬럼 자기제어, 피드백 미미 → 전향 맵이 제어 지배
- ② START-UP 절차: 레시피 + 제품컷인 트리거(reb-A 84.5℃, ΔT 2℃, 조건기반)

문서: 설계·진행 플랜 + 남은 작업(형제확장·shutdown·assist·live포팅) 작업지시서.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 17:33:17 +09:00

93 lines
4.6 KiB
Python

"""
C-6111 캠페인내 온도 트림 = gentle 피드백 추출 (플랜 §5, §16.8-(3)).
정상맵(전향)은 운전점 단위 steam=f(부하)만 잡는다. 캠페인 내부에서 오퍼레이터가
T_C 작은 편차에 스팀(OP)을 어떻게 미세조정하는지 = 피드백 정책을 데이터에서 추출:
- 부하 일정 구간만(전향/부하응답과 분리)
- 목표 T_C = 느린 인과 베이스라인, 오차 err = T_C - 목표
- OP 변경 이벤트의 트리거 오차(데드밴드), 변경크기 vs 오차(게인), 이벤트 간격(dwell)
→ §5 anti-hunting 제어(데드밴드+게인+dwell)의 현장 파라미터.
"""
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
BASE = "/home/windpacer/projects/hc900_ax/scripts/analysis/"
TGT_WIN = 360 # 목표 T_C 베이스라인 3h(인과 trailing)
LOAD_WIN = 120 # 부하 일정 판정 1h
LOAD_STD_MAX = 15 # feed rolling std 임계(저변동=캠페인 내부)
MOVE = 0.1 # OP 변경 인식 임계(%)
def main():
df = pd.read_pickle(BASE + "c6111_data.pkl")
df = df[df["mode"] == "PROD"].copy().sort_values("dtat").reset_index(drop=True)
df = df[(df["feed"] > 50) & (df["steam_op"] > 1)]
# 목표 T_C(느린 인과 베이스라인) 및 오차
df["tc_tgt"] = df["T_C"].rolling(TGT_WIN, min_periods=30).median()
df["err"] = df["T_C"] - df["tc_tgt"]
df["dop"] = df["steam_op"].diff()
# 부하 일정 구간(전향 부하응답 제외 → 순수 피드백 트림)
df["feed_std"] = df["feed"].rolling(LOAD_WIN, min_periods=30).std()
steady = df[(df["feed_std"] < LOAD_STD_MAX) & df["err"].notna()].copy()
print(f"PROD {len(df)} → 부하일정 정상구간 {len(steady)} ({100*len(steady)/len(df):.0f}%)")
# 제어 성능: 목표 대비 T_C 유지 밴드
e = steady["err"]
print(f"\n[T_C 유지] 오차 std={e.std():.3f}℃ p5/p95=[{e.quantile(.05):+.2f},{e.quantile(.95):+.2f}]℃")
# OP 변경 이벤트
mv = steady[steady["dop"].abs() > MOVE].copy()
# 직전 오차(트리거)
mv["err_trig"] = steady["err"].shift(1).loc[mv.index]
print(f"[OP 이동] 정상구간 {len(steady)}샘플 중 이동 {len(mv)}"
f"(평탄율 {100*(1-len(mv)/len(steady)):.1f}%)")
# 데드밴드: 이동시 |오차| vs 비이동시 |오차|
nomv = steady[steady["dop"].abs() <= MOVE]
print(f"[데드밴드] 이동시 |오차| median={mv['err_trig'].abs().median():.3f}"
f"비이동시 |오차| median={nomv['err'].abs().median():.3f}")
print(f" 이동의 75%는 |오차|>{mv['err_trig'].abs().quantile(.25):.3f}℃ 에서 발생")
# 피드백 게인: dOP vs 트리거오차 (음의 기울기 기대)
g = mv.dropna(subset=["err_trig"])
g = g[g["err_trig"].abs() < 2] # 이상치 제외
if len(g) > 30:
a = np.polyfit(g["err_trig"], g["dop"], 1)
r = np.corrcoef(g["err_trig"], g["dop"])[0, 1]
print(f"[피드백 게인] dOP ≈ {a[0]:+.2f}·오차 {a[1]:+.2f} (corr={r:+.2f}, "
f"음수=음의피드백: 온도↑→스팀↓)")
# dwell: 이동 간격(샘플)
dwell = np.diff(mv.index.values)
dwell = dwell[dwell > 0]
if len(dwell):
print(f"[dwell] 이동 간격 중앙 {np.median(dwell)*30/60:.0f}"
f"p25/p75=[{np.percentile(dwell,25)*30/60:.0f},{np.percentile(dwell,75)*30/60:.0f}]분")
# 플롯
fig, ax = plt.subplots(2, 2, figsize=(14, 9))
ax[0, 0].hist(e.dropna(), bins=100); ax[0, 0].axvline(0, c="k", lw=.5)
ax[0, 0].set_title(f"T_C error band (std {e.std():.3f}C)"); ax[0, 0].set_xlim(-1, 1)
ax[0, 1].hist(mv["err_trig"].abs().dropna(), bins=60, alpha=.6, density=True, label="at move")
ax[0, 1].hist(nomv["err"].abs().dropna(), bins=60, alpha=.6, density=True, label="no move")
ax[0, 1].set_title("deadband: |error| at move vs no-move"); ax[0, 1].set_xlim(0, 1); ax[0, 1].legend()
if len(g) > 30:
ax[1, 0].scatter(g["err_trig"], g["dop"], s=5, alpha=.3)
xs = np.linspace(g["err_trig"].min(), g["err_trig"].max(), 50)
ax[1, 0].plot(xs, np.polyval(a, xs), "r-")
ax[1, 0].set_xlabel("T_C error trigger"); ax[1, 0].set_ylabel("dOP")
ax[1, 0].set_title(f"feedback gain dOP/err = {a[0]:+.2f}")
if len(dwell):
ax[1, 1].hist(dwell * 30 / 60, bins=60); ax[1, 1].set_xlabel("dwell min")
ax[1, 1].set_title("dwell between OP moves"); ax[1, 1].set_xlim(0, 300)
fig.tight_layout(); fig.savefig(BASE + "c6111_trim.png", dpi=95)
print(f"\n플롯 저장: {BASE}c6111_trim.png")
if __name__ == "__main__":
main()