import os import json import asyncio import httpx from datetime import datetime, timedelta from typing import Dict from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates # [변경] 비동기용 스케줄러로 교체 from apscheduler.schedulers.asyncio import AsyncIOScheduler from sqlalchemy.orm import Session from pydantic import BaseModel from dotenv import load_dotenv from app.database import get_db, engine, SessionLocal from app.models import Base, Asset, UserAsset, AlertSetting from app.fetcher import fetcher from app.calculator import Calculator load_dotenv() # 데이터베이스 테이블 생성 Base.metadata.create_all(bind=engine) app = FastAPI(title="Asset Pilot - Orange Pi Edition", version="1.2.0") # 1. 현재 main.py 파일의 절대 경로를 가져옵니다. BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 2. Static 파일 경로 설정 (절대 경로 사용) static_path = os.path.join(BASE_DIR, "static") if os.path.exists(static_path): app.mount("/static", StaticFiles(directory=static_path), name="static") print(f"✅ Static 마운트 성공: {static_path}") else: print(f"❌ Static 폴더를 찾을 수 없습니다: {static_path}") # 3. 템플릿 설정 (절대 경로 사용) templates_path = os.path.join(BASE_DIR, "templates") templates = Jinja2Templates(directory=templates_path) # [변경] 비동기 스케줄러 설정 scheduler = AsyncIOScheduler(timezone="Asia/Seoul") # 전역 상태 관리 connected_clients = 0 clients_lock = asyncio.Lock() last_alert_time = {} # [신규] 시스템 상태 모니터링 변수 (Heartbeat) system_status = { "last_fetch_time": None, "status": "initializing" } # ==================== Pydantic 모델 ==================== class UserAssetUpdate(BaseModel): symbol: str previous_close: float average_price: float quantity: float class AlertSettingUpdate(BaseModel): settings: Dict # ==================== 유틸리티 함수 ==================== async def send_telegram_msg_async(text: str): """비동기 방식으로 텔레그램 메시지 전송""" token = os.getenv("TELEGRAM_TOKEN") chat_id = os.getenv("TELEGRAM_CHAT_ID") if not token or not chat_id: return url = f"https://api.telegram.org/bot{token}/sendMessage" payload = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"} async with httpx.AsyncClient() as client: try: resp = await client.post(url, json=payload, timeout=5) if resp.status_code != 200: print(f"❌ 텔레그램 실패: {resp.text}") except Exception as e: print(f"❌ 텔레그램 오류: {e}") # ==================== DB 초기화 ==================== def init_db_data(): db = SessionLocal() try: assets_data = [ ("XAU/USD", "금/달러", "귀금속"), ("XAU/CNY", "금/위안", "귀금속"), ("XAU/GBP", "금/파운드", "귀금속"), ("USD/DXY", "달러인덱스", "환율"), ("USD/KRW", "달러/원", "환율"), ("BTC/USD", "비트코인/달러", "암호화폐"), ("BTC/KRW", "비트코인/원", "암호화폐"), ("KRX/GLD", "금 현물", "귀금속"), ("XAU/KRW", "금/원", "귀금속"), ] for symbol, name, category in assets_data: if not db.query(Asset).filter(Asset.symbol == symbol).first(): db.add(Asset(symbol=symbol, name=name, category=category)) db.commit() assets = db.query(Asset).all() for asset in assets: if not db.query(UserAsset).filter(UserAsset.asset_id == asset.id).first(): db.add(UserAsset(asset_id=asset.id)) default_settings = { "급등락_감지": False, "급등락_임계값": 3.0, "목표수익률_감지": False, "목표수익률": 10.0, "특정가격_감지": False, "금_목표가격": 100000, "BTC_목표가격": 100000000, } for key, val in default_settings.items(): if not db.query(AlertSetting).filter(AlertSetting.setting_key == key).first(): db.add(AlertSetting(setting_key=key, setting_value=json.dumps(val))) db.commit() finally: db.close() # ==================== 백그라운드 태스크 (Watchdog & 알림 통합) ==================== async def background_fetch(): """비동기 수집 루프: DB 업데이트 + Heartbeat + 알림""" while True: try: async with clients_lock: interval = 5 if connected_clients > 0 else 15 db = SessionLocal() try: # 1. 수집 및 DB 업데이트 current_data = await fetcher.update_realtime_prices(db) # [성공] Heartbeat 기록 system_status["last_fetch_time"] = datetime.now() system_status["status"] = "healthy" # 2. 알림 로직 settings_raw = db.query(AlertSetting).all() sets = {s.setting_key: json.loads(s.setting_value) for s in settings_raw} user_assets = db.query(Asset, UserAsset).join(UserAsset).all() now_ts = datetime.now().timestamp() for asset, ua in user_assets: symbol = asset.symbol price = asset.current_price if price is None or price <= 0: continue prev_c = float(ua.previous_close) if ua.previous_close else 0 avg_p = float(ua.average_price) if ua.average_price else 0 # 급등락 체크 if sets.get("급등락_감지") and prev_c > 0: change = ((price - prev_c) / prev_c) * 100 if abs(change) >= float(sets.get("급등락_임계값", 3.0)): if now_ts - last_alert_time.get(f"{symbol}_vol", 0) > 3600: icon = "🚀 급등" if change > 0 else "📉 급락" await send_telegram_msg_async(f"[{icon}] {symbol}\n현재가: {price:,.2f}\n변동률: {change:+.2f}%") last_alert_time[f"{symbol}_vol"] = now_ts # 수익률 체크 if sets.get("목표수익률_감지") and avg_p > 0: profit = ((price - avg_p) / avg_p) * 100 if profit >= float(sets.get("목표수익률", 10.0)): if now_ts - last_alert_time.get(f"{symbol}_profit", 0) > 86400: await send_telegram_msg_async(f"💰 수익 목표달성! ({symbol})\n수익률: {profit:+.2f}%\n현재가: {price:,.2f}") last_alert_time[f"{symbol}_profit"] = now_ts # 특정가격 감지 if sets.get("특정가격_감지"): if symbol == "KRX/GLD" and price >= float(sets.get("금_목표가격", 0)): if now_ts - last_alert_time.get("gold_hit", 0) > 43200: await send_telegram_msg_async(f"✨ 금 목표가 돌파!\n현재가: {price:,.0f}원") last_alert_time["gold_hit"] = now_ts elif symbol == "BTC/KRW" and price >= float(sets.get("BTC_목표가격", 0)): if now_ts - last_alert_time.get("btc_hit", 0) > 43200: await send_telegram_msg_async(f"₿ BTC 목표가 돌파!\n현재가: {price:,.0f}원") last_alert_time["btc_hit"] = now_ts finally: db.close() except Exception as e: system_status["status"] = "error" print(f"❌ 수집 루프 에러: {e}") await asyncio.sleep(interval) # ==================== 앱 생명주기 (AsyncIOScheduler 적용) ==================== @app.on_event("startup") async def startup_event(): init_db_data() # [변경] 7시 10분 비동기 전용 스케줄러 작업 async def daily_job(): print(f"🌅 [기준가 업데이트 시작] {datetime.now()}") db = SessionLocal() try: await fetcher.update_closing_prices(db) finally: db.close() scheduler.add_job(daily_job, 'cron', hour=7, minute=10, id='daily_snapshot') scheduler.start() asyncio.create_task(background_fetch()) @app.on_event("shutdown") def stop_scheduler(): scheduler.shutdown() # ==================== API 엔드포인트 ==================== @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/api/prices") async def get_prices(db: Session = Depends(get_db)): """[개선] 데이터 신선도 상태와 서버 시각을 포함하여 반환""" assets = db.query(Asset).all() # 지연 판별 (마지막 성공 후 60초 경과 시 stale) is_stale = False if system_status["last_fetch_time"]: if datetime.now() - system_status["last_fetch_time"] > timedelta(seconds=60): is_stale = True return { "server_time": datetime.now().isoformat(), "fetch_status": "stale" if is_stale else system_status["status"], "last_heartbeat": system_status["last_fetch_time"].isoformat() if system_status["last_fetch_time"] else None, "prices": { a.symbol: { "가격": a.current_price, "상태": a.price_state, "업데이트": a.last_updated.isoformat() if a.last_updated else None } for a in assets } } @app.get("/api/assets") async def get_assets(db: Session = Depends(get_db)): assets = db.query(Asset, UserAsset).join(UserAsset).all() return [{ "symbol": a.symbol, "name": a.name, "category": a.category, "previous_close": float(ua.previous_close), "average_price": float(ua.average_price), "quantity": float(ua.quantity) } for a, ua in assets] @app.get("/api/pnl") async def get_pnl(db: Session = Depends(get_db)): krx = db.query(Asset, UserAsset).join(UserAsset).filter(Asset.symbol == "KRX/GLD").first() btc = db.query(Asset, UserAsset).join(UserAsset).filter(Asset.symbol == "BTC/KRW").first() pnl = Calculator.calc_pnl( float(krx[1].average_price) if krx else 0, float(krx[1].quantity) if krx else 0, float(btc[1].average_price) if btc else 0, float(btc[1].quantity) if btc else 0, krx[0].current_price if krx else 0, btc[0].current_price if btc else 0 ) return pnl @app.get("/api/stream") async def stream_prices(request: Request): async def event_generator(): global connected_clients async with clients_lock: connected_clients += 1 try: while True: if await request.is_disconnected(): break db = SessionLocal() try: assets = db.query(Asset).all() data = {a.symbol: {"가격": a.current_price, "상태": a.price_state} for a in assets} yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" finally: db.close() await asyncio.sleep(5) finally: async with clients_lock: connected_clients = max(0, connected_clients - 1) return StreamingResponse(event_generator(), media_type="text/event-stream") @app.post("/api/assets") async def update_asset(data: UserAssetUpdate, db: Session = Depends(get_db)): asset = db.query(Asset).filter(Asset.symbol == data.symbol).first() ua = db.query(UserAsset).filter(UserAsset.asset_id == asset.id).first() if ua: ua.previous_close, ua.average_price, ua.quantity = data.previous_close, data.average_price, data.quantity db.commit() return {"status": "success"} raise HTTPException(status_code=404) @app.get("/api/alerts/settings") async def get_alert_settings(db: Session = Depends(get_db)): settings = db.query(AlertSetting).all() return {s.setting_key: json.loads(s.setting_value) for s in settings} @app.post("/api/alerts/settings") async def update_alert_settings(data: AlertSettingUpdate, db: Session = Depends(get_db)): for key, value in data.settings.items(): s = db.query(AlertSetting).filter(AlertSetting.setting_key == key).first() if s: s.setting_value = json.dumps(value) db.commit() return {"status": "success"} @app.get("/health") async def health_check(): return {"status": "healthy", "last_fetch": system_status["last_fetch_time"]} # if __name__ == "__main__": # import uvicorn # uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)