Files
AssetPilot/.TemporaryDocument/main.py.good2
2026-02-13 18:48:14 +09:00

296 lines
12 KiB
Plaintext

import os
import json
import asyncio
import httpx
from datetime import datetime, timedelta
from typing import Dict
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
# [변경] 비동기용 스케줄러로 교체
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy.orm import Session
from pydantic import BaseModel
from dotenv import load_dotenv
from app.database import get_db, engine, SessionLocal
from app.models import Base, Asset, UserAsset, AlertSetting
from app.fetcher import fetcher
from app.calculator import Calculator
load_dotenv()
# 데이터베이스 테이블 생성
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Asset Pilot - Orange Pi Edition", version="1.2.0")
# [변경] 비동기 스케줄러 설정
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# 전역 상태 관리
connected_clients = 0
clients_lock = asyncio.Lock()
last_alert_time = {}
# [신규] 시스템 상태 모니터링 변수 (Heartbeat)
system_status = {
"last_fetch_time": None,
"status": "initializing"
}
# ==================== Pydantic 모델 ====================
class UserAssetUpdate(BaseModel):
symbol: str
previous_close: float
average_price: float
quantity: float
class AlertSettingUpdate(BaseModel):
settings: Dict
# ==================== 유틸리티 함수 ====================
async def send_telegram_msg_async(text: str):
"""비동기 방식으로 텔레그램 메시지 전송"""
token = os.getenv("TELEGRAM_TOKEN")
chat_id = os.getenv("TELEGRAM_CHAT_ID")
if not token or not chat_id: return
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"}
async with httpx.AsyncClient() as client:
try:
resp = await client.post(url, json=payload, timeout=5)
if resp.status_code != 200: print(f"❌ 텔레그램 실패: {resp.text}")
except Exception as e: print(f"❌ 텔레그램 오류: {e}")
# ==================== DB 초기화 ====================
def init_db_data():
db = SessionLocal()
try:
assets_data = [
("XAU/USD", "금/달러", "귀금속"), ("XAU/CNY", "금/위안", "귀금속"),
("XAU/GBP", "금/파운드", "귀금속"), ("USD/DXY", "달러인덱스", "환율"),
("USD/KRW", "달러/원", "환율"), ("BTC/USD", "비트코인/달러", "암호화폐"),
("BTC/KRW", "비트코인/원", "암호화폐"), ("KRX/GLD", "금 현물", "귀금속"),
("XAU/KRW", "금/원", "귀금속"),
]
for symbol, name, category in assets_data:
if not db.query(Asset).filter(Asset.symbol == symbol).first():
db.add(Asset(symbol=symbol, name=name, category=category))
db.commit()
assets = db.query(Asset).all()
for asset in assets:
if not db.query(UserAsset).filter(UserAsset.asset_id == asset.id).first():
db.add(UserAsset(asset_id=asset.id))
default_settings = {
"급등락_감지": False, "급등락_임계값": 3.0,
"목표수익률_감지": False, "목표수익률": 10.0,
"특정가격_감지": False, "금_목표가격": 100000, "BTC_목표가격": 100000000,
}
for key, val in default_settings.items():
if not db.query(AlertSetting).filter(AlertSetting.setting_key == key).first():
db.add(AlertSetting(setting_key=key, setting_value=json.dumps(val)))
db.commit()
finally:
db.close()
# ==================== 백그라운드 태스크 (Watchdog & 알림 통합) ====================
async def background_fetch():
"""비동기 수집 루프: DB 업데이트 + Heartbeat + 알림"""
while True:
try:
async with clients_lock:
interval = 5 if connected_clients > 0 else 15
db = SessionLocal()
try:
# 1. 수집 및 DB 업데이트
current_data = await fetcher.update_realtime_prices(db)
# [성공] Heartbeat 기록
system_status["last_fetch_time"] = datetime.now()
system_status["status"] = "healthy"
# 2. 알림 로직
settings_raw = db.query(AlertSetting).all()
sets = {s.setting_key: json.loads(s.setting_value) for s in settings_raw}
user_assets = db.query(Asset, UserAsset).join(UserAsset).all()
now_ts = datetime.now().timestamp()
for asset, ua in user_assets:
symbol = asset.symbol
price = asset.current_price
if price is None or price <= 0: continue
prev_c = float(ua.previous_close) if ua.previous_close else 0
avg_p = float(ua.average_price) if ua.average_price else 0
# 급등락 체크
if sets.get("급등락_감지") and prev_c > 0:
change = ((price - prev_c) / prev_c) * 100
if abs(change) >= float(sets.get("급등락_임계값", 3.0)):
if now_ts - last_alert_time.get(f"{symbol}_vol", 0) > 3600:
icon = "🚀 급등" if change > 0 else "📉 급락"
await send_telegram_msg_async(f"<b>[{icon}] {symbol}</b>\n현재가: {price:,.2f}\n변동률: {change:+.2f}%")
last_alert_time[f"{symbol}_vol"] = now_ts
# 수익률 체크
if sets.get("목표수익률_감지") and avg_p > 0:
profit = ((price - avg_p) / avg_p) * 100
if profit >= float(sets.get("목표수익률", 10.0)):
if now_ts - last_alert_time.get(f"{symbol}_profit", 0) > 86400:
await send_telegram_msg_async(f"<b>💰 수익 목표달성! ({symbol})</b>\n수익률: {profit:+.2f}%\n현재가: {price:,.2f}")
last_alert_time[f"{symbol}_profit"] = now_ts
# 특정가격 감지
if sets.get("특정가격_감지"):
if symbol == "KRX/GLD" and price >= float(sets.get("금_목표가격", 0)):
if now_ts - last_alert_time.get("gold_hit", 0) > 43200:
await send_telegram_msg_async(f"<b>✨ 금 목표가 돌파!</b>\n현재가: {price:,.0f}원")
last_alert_time["gold_hit"] = now_ts
elif symbol == "BTC/KRW" and price >= float(sets.get("BTC_목표가격", 0)):
if now_ts - last_alert_time.get("btc_hit", 0) > 43200:
await send_telegram_msg_async(f"<b>₿ BTC 목표가 돌파!</b>\n현재가: {price:,.0f}원")
last_alert_time["btc_hit"] = now_ts
finally:
db.close()
except Exception as e:
system_status["status"] = "error"
print(f"❌ 수집 루프 에러: {e}")
await asyncio.sleep(interval)
# ==================== 앱 생명주기 (AsyncIOScheduler 적용) ====================
@app.on_event("startup")
async def startup_event():
init_db_data()
# [변경] 7시 10분 비동기 전용 스케줄러 작업
async def daily_job():
print(f"🌅 [기준가 업데이트 시작] {datetime.now()}")
db = SessionLocal()
try:
await fetcher.update_closing_prices(db)
finally:
db.close()
scheduler.add_job(daily_job, 'cron', hour=7, minute=10, id='daily_snapshot')
scheduler.start()
asyncio.create_task(background_fetch())
@app.on_event("shutdown")
def stop_scheduler():
scheduler.shutdown()
# ==================== API 엔드포인트 ====================
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/prices")
async def get_prices(db: Session = Depends(get_db)):
"""[개선] 데이터 신선도 상태와 서버 시각을 포함하여 반환"""
assets = db.query(Asset).all()
# 지연 판별 (마지막 성공 후 60초 경과 시 stale)
is_stale = False
if system_status["last_fetch_time"]:
if datetime.now() - system_status["last_fetch_time"] > timedelta(seconds=60):
is_stale = True
return {
"server_time": datetime.now().isoformat(),
"fetch_status": "stale" if is_stale else system_status["status"],
"last_heartbeat": system_status["last_fetch_time"].isoformat() if system_status["last_fetch_time"] else None,
"prices": {
a.symbol: {
"가격": a.current_price,
"상태": a.price_state,
"업데이트": a.last_updated.isoformat() if a.last_updated else None
} for a in assets
}
}
@app.get("/api/assets")
async def get_assets(db: Session = Depends(get_db)):
assets = db.query(Asset, UserAsset).join(UserAsset).all()
return [{
"symbol": a.symbol, "name": a.name, "category": a.category,
"previous_close": float(ua.previous_close),
"average_price": float(ua.average_price),
"quantity": float(ua.quantity)
} for a, ua in assets]
@app.get("/api/pnl")
async def get_pnl(db: Session = Depends(get_db)):
krx = db.query(Asset, UserAsset).join(UserAsset).filter(Asset.symbol == "KRX/GLD").first()
btc = db.query(Asset, UserAsset).join(UserAsset).filter(Asset.symbol == "BTC/KRW").first()
pnl = Calculator.calc_pnl(
float(krx[1].average_price) if krx else 0, float(krx[1].quantity) if krx else 0,
float(btc[1].average_price) if btc else 0, float(btc[1].quantity) if btc else 0,
krx[0].current_price if krx else 0, btc[0].current_price if btc else 0
)
return pnl
@app.get("/api/stream")
async def stream_prices(request: Request):
async def event_generator():
global connected_clients
async with clients_lock: connected_clients += 1
try:
while True:
if await request.is_disconnected(): break
db = SessionLocal()
try:
assets = db.query(Asset).all()
data = {a.symbol: {"가격": a.current_price, "상태": a.price_state} for a in assets}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
finally:
db.close()
await asyncio.sleep(5)
finally:
async with clients_lock: connected_clients = max(0, connected_clients - 1)
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.post("/api/assets")
async def update_asset(data: UserAssetUpdate, db: Session = Depends(get_db)):
asset = db.query(Asset).filter(Asset.symbol == data.symbol).first()
ua = db.query(UserAsset).filter(UserAsset.asset_id == asset.id).first()
if ua:
ua.previous_close, ua.average_price, ua.quantity = data.previous_close, data.average_price, data.quantity
db.commit()
return {"status": "success"}
raise HTTPException(status_code=404)
@app.get("/api/alerts/settings")
async def get_alert_settings(db: Session = Depends(get_db)):
settings = db.query(AlertSetting).all()
return {s.setting_key: json.loads(s.setting_value) for s in settings}
@app.post("/api/alerts/settings")
async def update_alert_settings(data: AlertSettingUpdate, db: Session = Depends(get_db)):
for key, value in data.settings.items():
s = db.query(AlertSetting).filter(AlertSetting.setting_key == key).first()
if s: s.setting_value = json.dumps(value)
db.commit()
return {"status": "success"}
@app.get("/health")
async def health_check():
return {"status": "healthy", "last_fetch": system_status["last_fetch_time"]}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)