FastAPI app.on_event -> lifespan 변경

This commit is contained in:
Wind
2026-02-14 09:05:16 +09:00
parent d4f1ca87ab
commit fc1462a3f3
3 changed files with 289 additions and 132 deletions

View File

@@ -4,13 +4,15 @@ import asyncio
import httpx
from datetime import datetime, timedelta
from typing import Dict
from contextlib import asynccontextmanager # lifespan용 추가
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
# [변경] 비동기용 스케줄러로 교체
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy.orm import Session
from sqlalchemy import update, or_
from pydantic import BaseModel
from dotenv import load_dotenv
@@ -24,36 +26,6 @@ load_dotenv()
# 데이터베이스 테이블 생성
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Asset Pilot - Orange Pi Edition", version="1.2.0")
# 1. 현재 main.py 파일의 절대 경로를 가져옵니다.
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 2. Static 파일 경로 설정 (절대 경로 사용)
static_path = os.path.join(BASE_DIR, "static")
if os.path.exists(static_path):
app.mount("/static", StaticFiles(directory=static_path), name="static")
print(f"✅ Static 마운트 성공: {static_path}")
else:
print(f"❌ Static 폴더를 찾을 수 없습니다: {static_path}")
# 3. 템플릿 설정 (절대 경로 사용)
templates_path = os.path.join(BASE_DIR, "templates")
templates = Jinja2Templates(directory=templates_path)
# [변경] 비동기 스케줄러 설정
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
# 전역 상태 관리
connected_clients = 0
clients_lock = asyncio.Lock()
last_alert_time = {}
# [신규] 시스템 상태 모니터링 변수 (Heartbeat)
system_status = {
"last_fetch_time": None,
"status": "initializing"
}
# ==================== Pydantic 모델 ====================
class UserAssetUpdate(BaseModel):
symbol: str
@@ -64,6 +36,19 @@ class UserAssetUpdate(BaseModel):
class AlertSettingUpdate(BaseModel):
settings: Dict
# ==================== 전역 변수 및 설정 ====================
# [변경] 비동기 스케줄러 설정
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
connected_clients = 0
clients_lock = asyncio.Lock()
last_alert_time = {}
system_status = {
"last_fetch_time": None,
"status": "initializing"
}
# ==================== 유틸리티 함수 ====================
async def send_telegram_msg_async(text: str):
"""비동기 방식으로 텔레그램 메시지 전송"""
@@ -80,7 +65,6 @@ async def send_telegram_msg_async(text: str):
if resp.status_code != 200: print(f"❌ 텔레그램 실패: {resp.text}")
except Exception as e: print(f"❌ 텔레그램 오류: {e}")
# ==================== DB 초기화 ====================
def init_db_data():
db = SessionLocal()
try:
@@ -113,7 +97,7 @@ def init_db_data():
finally:
db.close()
# ==================== 백그라운드 태스크 (Watchdog & 알림 통합) ====================
# ==================== 백그라운드 태스크 ====================
async def background_fetch():
"""비동기 수집 루프: DB 업데이트 + Heartbeat + 알림"""
while True:
@@ -123,14 +107,10 @@ async def background_fetch():
db = SessionLocal()
try:
# 1. 수집 및 DB 업데이트
current_data = await fetcher.update_realtime_prices(db)
# [성공] Heartbeat 기록
system_status["last_fetch_time"] = datetime.now()
system_status["status"] = "healthy"
# 2. 알림 로직
settings_raw = db.query(AlertSetting).all()
sets = {s.setting_key: json.loads(s.setting_value) for s in settings_raw}
user_assets = db.query(Asset, UserAsset).join(UserAsset).all()
@@ -145,7 +125,6 @@ async def background_fetch():
prev_c = float(ua.previous_close) if ua.previous_close else 0
avg_p = float(ua.average_price) if ua.average_price else 0
# 급등락 체크
if sets.get("급등락_감지") and prev_c > 0:
change = ((price - prev_c) / prev_c) * 100
if abs(change) >= float(sets.get("급등락_임계값", 3.0)):
@@ -154,7 +133,6 @@ async def background_fetch():
await send_telegram_msg_async(f"<b>[{icon}] {symbol}</b>\n현재가: {price:,.2f}\n변동률: {change:+.2f}%")
last_alert_time[f"{symbol}_vol"] = now_ts
# 수익률 체크
if sets.get("목표수익률_감지") and avg_p > 0:
profit = ((price - avg_p) / avg_p) * 100
if profit >= float(sets.get("목표수익률", 10.0)):
@@ -162,7 +140,6 @@ async def background_fetch():
await send_telegram_msg_async(f"<b>💰 수익 목표달성! ({symbol})</b>\n수익률: {profit:+.2f}%\n현재가: {price:,.2f}")
last_alert_time[f"{symbol}_profit"] = now_ts
# 특정가격 감지
if sets.get("특정가격_감지"):
if symbol == "KRX/GLD" and price >= float(sets.get("금_목표가격", 0)):
if now_ts - last_alert_time.get("gold_hit", 0) > 43200:
@@ -172,7 +149,6 @@ async def background_fetch():
if now_ts - last_alert_time.get("btc_hit", 0) > 43200:
await send_telegram_msg_async(f"<b>₿ BTC 목표가 돌파!</b>\n현재가: {price:,.0f}")
last_alert_time["btc_hit"] = now_ts
finally:
db.close()
except Exception as e:
@@ -181,28 +157,73 @@ async def background_fetch():
await asyncio.sleep(interval)
# ==================== 앱 생명주기 (AsyncIOScheduler 적용) ====================
@app.on_event("startup")
async def startup_event():
# ==================== [신규] Lifespan 핸들러 ====================
@asynccontextmanager
async def lifespan(app: FastAPI):
# [Startup] 앱 시작 시 실행
init_db_data()
# [변경] 7시 10분 비동기 전용 스케줄러 작업
async def daily_job():
print(f"🌅 [기준가 업데이트 시작] {datetime.now()}")
# 서버 재시작 시 DB의 종가를 메모리로 복구 (fetcher에 해당 함수가 있다고 가정)
db = SessionLocal()
try:
# DB에서 기존 종가를 불러와 메모리 fetcher.daily_closing_prices 채우기
assets = db.query(Asset, UserAsset).join(UserAsset).all()
for a, ua in assets:
if ua.previous_close:
fetcher.daily_closing_prices[a.symbol] = float(ua.previous_close)
finally:
db.close()
# 1. 비트코인 전용 자정 스케줄러 (00:00:05)
async def btc_daily_job():
print(f"🕛 [BTC 자정 스냅샷] {datetime.now()}")
db = SessionLocal()
try:
await fetcher.update_closing_prices(db)
finally:
db.close()
assets = db.query(Asset).filter(Asset.symbol.like("BTC/%")).all()
for asset in assets:
if asset.current_price:
db.execute(update(UserAsset).where(UserAsset.asset_id == asset.id).values(previous_close=asset.current_price))
fetcher.daily_closing_prices[asset.symbol] = asset.current_price
db.commit()
finally: db.close()
scheduler.add_job(daily_job, 'cron', hour=7, minute=10, id='daily_snapshot')
# 2. 전통 자산용 아침 스케줄러 (07:10:00)
async def legacy_daily_job():
print(f"🌅 [전통자산 아침 스냅샷] {datetime.now()}")
db = SessionLocal()
try:
assets = db.query(Asset).filter(~Asset.symbol.like("BTC/%")).all()
for asset in assets:
if asset.current_price:
db.execute(update(UserAsset).where(UserAsset.asset_id == asset.id).values(previous_close=asset.current_price))
fetcher.daily_closing_prices[asset.symbol] = asset.current_price
db.commit()
finally: db.close()
scheduler.add_job(btc_daily_job, 'cron', hour=0, minute=0, second=5, id='btc_snapshot')
scheduler.add_job(legacy_daily_job, 'cron', hour=7, minute=10, id='legacy_snapshot')
scheduler.start()
asyncio.create_task(background_fetch())
fetch_task = asyncio.create_task(background_fetch())
yield # 서버 운영 중
@app.on_event("shutdown")
def stop_scheduler():
# [Shutdown] 서버 종료 시 실행
scheduler.shutdown()
fetch_task.cancel()
print("👋 서버가 종료되었습니다.")
# ==================== FastAPI 앱 선언 ====================
app = FastAPI(title="Asset Pilot - Orange Pi Edition", version="1.2.0", lifespan=lifespan)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
static_path = os.path.join(BASE_DIR, "static")
if os.path.exists(static_path):
app.mount("/static", StaticFiles(directory=static_path), name="static")
print(f"✅ Static 마운트 성공: {static_path}")
templates_path = os.path.join(BASE_DIR, "templates")
templates = Jinja2Templates(directory=templates_path)
# ==================== API 엔드포인트 ====================
@app.get("/", response_class=HTMLResponse)
@@ -211,10 +232,7 @@ async def read_root(request: Request):
@app.get("/api/prices")
async def get_prices(db: Session = Depends(get_db)):
"""[개선] 데이터 신선도 상태와 서버 시각을 포함하여 반환"""
assets = db.query(Asset).all()
# 지연 판별 (마지막 성공 후 60초 경과 시 stale)
is_stale = False
if system_status["last_fetch_time"]:
if datetime.now() - system_status["last_fetch_time"] > timedelta(seconds=60):
@@ -300,8 +318,4 @@ async def update_alert_settings(data: AlertSettingUpdate, db: Session = Depends(
@app.get("/health")
async def health_check():
return {"status": "healthy", "last_fetch": system_status["last_fetch_time"]}
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)
return {"status": "healthy", "last_fetch": system_status["last_fetch_time"]}