Files
2026-02-14 09:05:16 +09:00

321 lines
14 KiB
Python

import os
import json
import asyncio
import httpx
from datetime import datetime, timedelta
from typing import Dict
from contextlib import asynccontextmanager # lifespan용 추가
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy.orm import Session
from sqlalchemy import update, or_
from pydantic import BaseModel
from dotenv import load_dotenv
from app.database import get_db, engine, SessionLocal
from app.models import Base, Asset, UserAsset, AlertSetting
from app.fetcher import fetcher
from app.calculator import Calculator
load_dotenv()
# 데이터베이스 테이블 생성
Base.metadata.create_all(bind=engine)
# ==================== Pydantic 모델 ====================
class UserAssetUpdate(BaseModel):
symbol: str
previous_close: float
average_price: float
quantity: float
class AlertSettingUpdate(BaseModel):
settings: Dict
# ==================== 전역 변수 및 설정 ====================
# [변경] 비동기 스케줄러 설정
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
connected_clients = 0
clients_lock = asyncio.Lock()
last_alert_time = {}
system_status = {
"last_fetch_time": None,
"status": "initializing"
}
# ==================== 유틸리티 함수 ====================
async def send_telegram_msg_async(text: str):
"""비동기 방식으로 텔레그램 메시지 전송"""
token = os.getenv("TELEGRAM_TOKEN")
chat_id = os.getenv("TELEGRAM_CHAT_ID")
if not token or not chat_id: return
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"}
async with httpx.AsyncClient() as client:
try:
resp = await client.post(url, json=payload, timeout=5)
if resp.status_code != 200: print(f"❌ 텔레그램 실패: {resp.text}")
except Exception as e: print(f"❌ 텔레그램 오류: {e}")
def init_db_data():
db = SessionLocal()
try:
assets_data = [
("XAU/USD", "금/달러", "귀금속"), ("XAU/CNY", "금/위안", "귀금속"),
("XAU/GBP", "금/파운드", "귀금속"), ("USD/DXY", "달러인덱스", "환율"),
("USD/KRW", "달러/원", "환율"), ("BTC/USD", "비트코인/달러", "암호화폐"),
("BTC/KRW", "비트코인/원", "암호화폐"), ("KRX/GLD", "금 현물", "귀금속"),
("XAU/KRW", "금/원", "귀금속"),
]
for symbol, name, category in assets_data:
if not db.query(Asset).filter(Asset.symbol == symbol).first():
db.add(Asset(symbol=symbol, name=name, category=category))
db.commit()
assets = db.query(Asset).all()
for asset in assets:
if not db.query(UserAsset).filter(UserAsset.asset_id == asset.id).first():
db.add(UserAsset(asset_id=asset.id))
default_settings = {
"급등락_감지": False, "급등락_임계값": 3.0,
"목표수익률_감지": False, "목표수익률": 10.0,
"특정가격_감지": False, "금_목표가격": 100000, "BTC_목표가격": 100000000,
}
for key, val in default_settings.items():
if not db.query(AlertSetting).filter(AlertSetting.setting_key == key).first():
db.add(AlertSetting(setting_key=key, setting_value=json.dumps(val)))
db.commit()
finally:
db.close()
# ==================== 백그라운드 태스크 ====================
async def background_fetch():
"""비동기 수집 루프: DB 업데이트 + Heartbeat + 알림"""
while True:
try:
async with clients_lock:
interval = 5 if connected_clients > 0 else 15
db = SessionLocal()
try:
current_data = await fetcher.update_realtime_prices(db)
system_status["last_fetch_time"] = datetime.now()
system_status["status"] = "healthy"
settings_raw = db.query(AlertSetting).all()
sets = {s.setting_key: json.loads(s.setting_value) for s in settings_raw}
user_assets = db.query(Asset, UserAsset).join(UserAsset).all()
now_ts = datetime.now().timestamp()
for asset, ua in user_assets:
symbol = asset.symbol
price = asset.current_price
if price is None or price <= 0: continue
prev_c = float(ua.previous_close) if ua.previous_close else 0
avg_p = float(ua.average_price) if ua.average_price else 0
if sets.get("급등락_감지") and prev_c > 0:
change = ((price - prev_c) / prev_c) * 100
if abs(change) >= float(sets.get("급등락_임계값", 3.0)):
if now_ts - last_alert_time.get(f"{symbol}_vol", 0) > 3600:
icon = "🚀 급등" if change > 0 else "📉 급락"
await send_telegram_msg_async(f"<b>[{icon}] {symbol}</b>\n현재가: {price:,.2f}\n변동률: {change:+.2f}%")
last_alert_time[f"{symbol}_vol"] = now_ts
if sets.get("목표수익률_감지") and avg_p > 0:
profit = ((price - avg_p) / avg_p) * 100
if profit >= float(sets.get("목표수익률", 10.0)):
if now_ts - last_alert_time.get(f"{symbol}_profit", 0) > 86400:
await send_telegram_msg_async(f"<b>💰 수익 목표달성! ({symbol})</b>\n수익률: {profit:+.2f}%\n현재가: {price:,.2f}")
last_alert_time[f"{symbol}_profit"] = now_ts
if sets.get("특정가격_감지"):
if symbol == "KRX/GLD" and price >= float(sets.get("금_목표가격", 0)):
if now_ts - last_alert_time.get("gold_hit", 0) > 43200:
await send_telegram_msg_async(f"<b>✨ 금 목표가 돌파!</b>\n현재가: {price:,.0f}")
last_alert_time["gold_hit"] = now_ts
elif symbol == "BTC/KRW" and price >= float(sets.get("BTC_목표가격", 0)):
if now_ts - last_alert_time.get("btc_hit", 0) > 43200:
await send_telegram_msg_async(f"<b>₿ BTC 목표가 돌파!</b>\n현재가: {price:,.0f}")
last_alert_time["btc_hit"] = now_ts
finally:
db.close()
except Exception as e:
system_status["status"] = "error"
print(f"❌ 수집 루프 에러: {e}")
await asyncio.sleep(interval)
# ==================== [신규] Lifespan 핸들러 ====================
@asynccontextmanager
async def lifespan(app: FastAPI):
# [Startup] 앱 시작 시 실행
init_db_data()
# 서버 재시작 시 DB의 종가를 메모리로 복구 (fetcher에 해당 함수가 있다고 가정)
db = SessionLocal()
try:
# DB에서 기존 종가를 불러와 메모리 fetcher.daily_closing_prices 채우기
assets = db.query(Asset, UserAsset).join(UserAsset).all()
for a, ua in assets:
if ua.previous_close:
fetcher.daily_closing_prices[a.symbol] = float(ua.previous_close)
finally:
db.close()
# 1. 비트코인 전용 자정 스케줄러 (00:00:05)
async def btc_daily_job():
print(f"🕛 [BTC 자정 스냅샷] {datetime.now()}")
db = SessionLocal()
try:
assets = db.query(Asset).filter(Asset.symbol.like("BTC/%")).all()
for asset in assets:
if asset.current_price:
db.execute(update(UserAsset).where(UserAsset.asset_id == asset.id).values(previous_close=asset.current_price))
fetcher.daily_closing_prices[asset.symbol] = asset.current_price
db.commit()
finally: db.close()
# 2. 전통 자산용 아침 스케줄러 (07:10:00)
async def legacy_daily_job():
print(f"🌅 [전통자산 아침 스냅샷] {datetime.now()}")
db = SessionLocal()
try:
assets = db.query(Asset).filter(~Asset.symbol.like("BTC/%")).all()
for asset in assets:
if asset.current_price:
db.execute(update(UserAsset).where(UserAsset.asset_id == asset.id).values(previous_close=asset.current_price))
fetcher.daily_closing_prices[asset.symbol] = asset.current_price
db.commit()
finally: db.close()
scheduler.add_job(btc_daily_job, 'cron', hour=0, minute=0, second=5, id='btc_snapshot')
scheduler.add_job(legacy_daily_job, 'cron', hour=7, minute=10, id='legacy_snapshot')
scheduler.start()
fetch_task = asyncio.create_task(background_fetch())
yield # 서버 운영 중
# [Shutdown] 서버 종료 시 실행
scheduler.shutdown()
fetch_task.cancel()
print("👋 서버가 종료되었습니다.")
# ==================== FastAPI 앱 선언 ====================
app = FastAPI(title="Asset Pilot - Orange Pi Edition", version="1.2.0", lifespan=lifespan)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
static_path = os.path.join(BASE_DIR, "static")
if os.path.exists(static_path):
app.mount("/static", StaticFiles(directory=static_path), name="static")
print(f"✅ Static 마운트 성공: {static_path}")
templates_path = os.path.join(BASE_DIR, "templates")
templates = Jinja2Templates(directory=templates_path)
# ==================== API 엔드포인트 ====================
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/prices")
async def get_prices(db: Session = Depends(get_db)):
assets = db.query(Asset).all()
is_stale = False
if system_status["last_fetch_time"]:
if datetime.now() - system_status["last_fetch_time"] > timedelta(seconds=60):
is_stale = True
return {
"server_time": datetime.now().isoformat(),
"fetch_status": "stale" if is_stale else system_status["status"],
"last_heartbeat": system_status["last_fetch_time"].isoformat() if system_status["last_fetch_time"] else None,
"prices": {
a.symbol: {
"가격": a.current_price,
"상태": a.price_state,
"업데이트": a.last_updated.isoformat() if a.last_updated else None
} for a in assets
}
}
@app.get("/api/assets")
async def get_assets(db: Session = Depends(get_db)):
assets = db.query(Asset, UserAsset).join(UserAsset).all()
return [{
"symbol": a.symbol, "name": a.name, "category": a.category,
"previous_close": float(ua.previous_close),
"average_price": float(ua.average_price),
"quantity": float(ua.quantity)
} for a, ua in assets]
@app.get("/api/pnl")
async def get_pnl(db: Session = Depends(get_db)):
krx = db.query(Asset, UserAsset).join(UserAsset).filter(Asset.symbol == "KRX/GLD").first()
btc = db.query(Asset, UserAsset).join(UserAsset).filter(Asset.symbol == "BTC/KRW").first()
pnl = Calculator.calc_pnl(
float(krx[1].average_price) if krx else 0, float(krx[1].quantity) if krx else 0,
float(btc[1].average_price) if btc else 0, float(btc[1].quantity) if btc else 0,
krx[0].current_price if krx else 0, btc[0].current_price if btc else 0
)
return pnl
@app.get("/api/stream")
async def stream_prices(request: Request):
async def event_generator():
global connected_clients
async with clients_lock: connected_clients += 1
try:
while True:
if await request.is_disconnected(): break
db = SessionLocal()
try:
assets = db.query(Asset).all()
data = {a.symbol: {"가격": a.current_price, "상태": a.price_state} for a in assets}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
finally:
db.close()
await asyncio.sleep(5)
finally:
async with clients_lock: connected_clients = max(0, connected_clients - 1)
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.post("/api/assets")
async def update_asset(data: UserAssetUpdate, db: Session = Depends(get_db)):
asset = db.query(Asset).filter(Asset.symbol == data.symbol).first()
ua = db.query(UserAsset).filter(UserAsset.asset_id == asset.id).first()
if ua:
ua.previous_close, ua.average_price, ua.quantity = data.previous_close, data.average_price, data.quantity
db.commit()
return {"status": "success"}
raise HTTPException(status_code=404)
@app.get("/api/alerts/settings")
async def get_alert_settings(db: Session = Depends(get_db)):
settings = db.query(AlertSetting).all()
return {s.setting_key: json.loads(s.setting_value) for s in settings}
@app.post("/api/alerts/settings")
async def update_alert_settings(data: AlertSettingUpdate, db: Session = Depends(get_db)):
for key, value in data.settings.items():
s = db.query(AlertSetting).filter(AlertSetting.setting_key == key).first()
if s: s.setting_value = json.dumps(value)
db.commit()
return {"status": "success"}
@app.get("/health")
async def health_check():
return {"status": "healthy", "last_fetch": system_status["last_fetch_time"]}