import os import json import asyncio from datetime import datetime from typing import Dict from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from pydantic import BaseModel from dotenv import load_dotenv from app.database import get_db, engine from app.models import Base, Asset, UserAsset, AlertSetting from app.fetcher import fetcher from app.calculator import Calculator load_dotenv() # 테이블 생성 Base.metadata.create_all(bind=engine) app = FastAPI(title="Asset Pilot - Orange Pi Edition", version="1.0.0") # 정적 파일 및 템플릿 설정 app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") # 전역 변수: 현재 가격 캐시 current_prices: Dict = {} # ==================== Pydantic 모델 ==================== class UserAssetUpdate(BaseModel): symbol: str previous_close: float average_price: float quantity: float class AlertSettingUpdate(BaseModel): settings: Dict # ==================== 데이터베이스 초기화 ==================== def init_assets(db: Session): """자산 마스터 데이터 초기화""" assets_data = [ ("XAU/USD", "금/달러", "귀금속"), ("XAU/CNY", "금/위안", "귀금속"), ("XAU/GBP", "금/파운드", "귀금속"), ("USD/DXY", "달러인덱스", "환율"), ("USD/KRW", "달러/원", "환율"), ("BTC/USD", "비트코인/달러", "암호화폐"), ("BTC/KRW", "비트코인/원", "암호화폐"), ("KRX/GLD", "금 현물", "귀금속"), ("XAU/KRW", "금/원", "귀금속"), ] for symbol, name, category in assets_data: existing = db.query(Asset).filter(Asset.symbol == symbol).first() if not existing: asset = Asset(symbol=symbol, name=name, category=category) db.add(asset) db.commit() print("✅ 자산 마스터 데이터 초기화 완료") def init_user_assets(db: Session): """사용자 자산 초기화 (기본값 0)""" assets = db.query(Asset).all() for asset in assets: existing = db.query(UserAsset).filter(UserAsset.asset_id == asset.id).first() if not existing: user_asset = UserAsset( asset_id=asset.id, previous_close=0, average_price=0, quantity=0 ) db.add(user_asset) db.commit() print("✅ 사용자 자산 데이터 초기화 완료") def init_alert_settings(db: Session): """알림 설정 초기화""" default_settings = { "급등락_감지": False, "급등락_임계값": 3.0, "목표수익률_감지": False, "목표수익률": 10.0, "특정가격_감지": False, "금_목표가격": 100000, "BTC_목표가격": 100000000, } for key, value in default_settings.items(): existing = db.query(AlertSetting).filter(AlertSetting.setting_key == key).first() if not existing: setting = AlertSetting(setting_key=key, setting_value=json.dumps(value)) db.add(setting) db.commit() print("✅ 알림 설정 초기화 완료") # ==================== 앱 시작 이벤트 ==================== @app.on_event("startup") async def startup_event(): """앱 시작 시 초기화 및 백그라운드 작업 시작""" db = next(get_db()) try: init_assets(db) init_user_assets(db) init_alert_settings(db) print("🚀 Asset Pilot 서버 시작 완료") finally: db.close() # 백그라운드 데이터 수집 시작 asyncio.create_task(background_fetch()) async def background_fetch(): """백그라운드에서 주기적으로 가격 수집""" global current_prices interval = int(os.getenv('FETCH_INTERVAL', 5)) while True: try: print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 데이터 수집 시작...") current_prices = fetcher.fetch_all() except Exception as e: print(f"❌ 데이터 수집 오류: {e}") await asyncio.sleep(interval) # ==================== API 엔드포인트 ==================== @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): """메인 페이지""" return templates.TemplateResponse("index.html", {"request": request}) @app.get("/api/prices") async def get_prices(): """현재 가격 조회""" return current_prices @app.get("/api/assets") async def get_assets(db: Session = Depends(get_db)): """사용자 자산 조회""" assets = db.query(Asset, UserAsset).join( UserAsset, Asset.id == UserAsset.asset_id ).all() result = [] for asset, user_asset in assets: result.append({ "symbol": asset.symbol, "name": asset.name, "category": asset.category, "previous_close": float(user_asset.previous_close), "average_price": float(user_asset.average_price), "quantity": float(user_asset.quantity), }) return result @app.post("/api/assets") async def update_asset(data: UserAssetUpdate, db: Session = Depends(get_db)): """자산 정보 업데이트""" asset = db.query(Asset).filter(Asset.symbol == data.symbol).first() if not asset: raise HTTPException(status_code=404, detail="자산을 찾을 수 없습니다") user_asset = db.query(UserAsset).filter(UserAsset.asset_id == asset.id).first() if user_asset: user_asset.previous_close = data.previous_close user_asset.average_price = data.average_price user_asset.quantity = data.quantity db.commit() return {"status": "success", "message": "업데이트 완료"} raise HTTPException(status_code=404, detail="사용자 자산 정보를 찾을 수 없습니다") @app.get("/api/pnl") async def get_pnl(db: Session = Depends(get_db)): """손익 계산""" # KRX/GLD 자산 정보 krx_asset = db.query(Asset).filter(Asset.symbol == "KRX/GLD").first() krx_user = db.query(UserAsset).filter(UserAsset.asset_id == krx_asset.id).first() if krx_asset else None # BTC/KRW 자산 정보 btc_asset = db.query(Asset).filter(Asset.symbol == "BTC/KRW").first() btc_user = db.query(UserAsset).filter(UserAsset.asset_id == btc_asset.id).first() if btc_asset else None gold_buy_price = float(krx_user.average_price) if krx_user else 0 gold_quantity = float(krx_user.quantity) if krx_user else 0 btc_buy_price = float(btc_user.average_price) if btc_user else 0 btc_quantity = float(btc_user.quantity) if btc_user else 0 current_gold = current_prices.get("KRX/GLD", {}).get("가격") current_btc = current_prices.get("BTC/KRW", {}).get("가격") pnl = Calculator.calc_pnl( gold_buy_price, gold_quantity, btc_buy_price, btc_quantity, current_gold, current_btc ) return pnl @app.get("/api/alerts/settings") async def get_alert_settings(db: Session = Depends(get_db)): """알림 설정 조회""" settings = db.query(AlertSetting).all() result = {} for setting in settings: try: result[setting.setting_key] = json.loads(setting.setting_value) except: result[setting.setting_key] = setting.setting_value return result @app.post("/api/alerts/settings") async def update_alert_settings(data: AlertSettingUpdate, db: Session = Depends(get_db)): """알림 설정 업데이트""" for key, value in data.settings.items(): setting = db.query(AlertSetting).filter(AlertSetting.setting_key == key).first() if setting: setting.setting_value = json.dumps(value) else: new_setting = AlertSetting(setting_key=key, setting_value=json.dumps(value)) db.add(new_setting) db.commit() return {"status": "success", "message": "알림 설정 업데이트 완료"} @app.get("/api/stream") async def stream_prices(): """Server-Sent Events로 실시간 가격 스트리밍""" async def event_generator(): while True: if current_prices: data = json.dumps(current_prices, ensure_ascii=False) yield f"data: {data}\n\n" await asyncio.sleep(1) return StreamingResponse(event_generator(), media_type="text/event-stream") @app.get("/health") async def health_check(): """헬스 체크""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "prices_loaded": len(current_prices) > 0 } # ==================== 메인 실행 ==================== if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host=os.getenv("APP_HOST", "0.0.0.0"), port=int(os.getenv("APP_PORT", 8000)), reload=os.getenv("DEBUG", "False").lower() == "true" )