AssetPilot OrangePi 5 Pluse Server-First Commit

This commit is contained in:
Wind
2026-02-10 12:23:22 +09:00
commit bf87175f51
45 changed files with 4060 additions and 0 deletions

268
asset_pilot_docker/main.py Normal file
View File

@@ -0,0 +1,268 @@
import os
import json
import asyncio
from datetime import datetime
from typing import Dict
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from pydantic import BaseModel
from dotenv import load_dotenv
from app.database import get_db, engine
from app.models import Base, Asset, UserAsset, AlertSetting
from app.fetcher import fetcher
from app.calculator import Calculator
load_dotenv()
# 테이블 생성
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Asset Pilot - Orange Pi Edition", version="1.0.0")
# 정적 파일 및 템플릿 설정
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# 전역 변수: 현재 가격 캐시
current_prices: Dict = {}
# ==================== Pydantic 모델 ====================
class UserAssetUpdate(BaseModel):
symbol: str
previous_close: float
average_price: float
quantity: float
class AlertSettingUpdate(BaseModel):
settings: Dict
# ==================== 데이터베이스 초기화 ====================
def init_assets(db: Session):
"""자산 마스터 데이터 초기화"""
assets_data = [
("XAU/USD", "금/달러", "귀금속"),
("XAU/CNY", "금/위안", "귀금속"),
("XAU/GBP", "금/파운드", "귀금속"),
("USD/DXY", "달러인덱스", "환율"),
("USD/KRW", "달러/원", "환율"),
("BTC/USD", "비트코인/달러", "암호화폐"),
("BTC/KRW", "비트코인/원", "암호화폐"),
("KRX/GLD", "금 현물", "귀금속"),
("XAU/KRW", "금/원", "귀금속"),
]
for symbol, name, category in assets_data:
existing = db.query(Asset).filter(Asset.symbol == symbol).first()
if not existing:
asset = Asset(symbol=symbol, name=name, category=category)
db.add(asset)
db.commit()
print("✅ 자산 마스터 데이터 초기화 완료")
def init_user_assets(db: Session):
"""사용자 자산 초기화 (기본값 0)"""
assets = db.query(Asset).all()
for asset in assets:
existing = db.query(UserAsset).filter(UserAsset.asset_id == asset.id).first()
if not existing:
user_asset = UserAsset(
asset_id=asset.id,
previous_close=0,
average_price=0,
quantity=0
)
db.add(user_asset)
db.commit()
print("✅ 사용자 자산 데이터 초기화 완료")
def init_alert_settings(db: Session):
"""알림 설정 초기화"""
default_settings = {
"급등락_감지": False,
"급등락_임계값": 3.0,
"목표수익률_감지": False,
"목표수익률": 10.0,
"특정가격_감지": False,
"금_목표가격": 100000,
"BTC_목표가격": 100000000,
}
for key, value in default_settings.items():
existing = db.query(AlertSetting).filter(AlertSetting.setting_key == key).first()
if not existing:
setting = AlertSetting(setting_key=key, setting_value=json.dumps(value))
db.add(setting)
db.commit()
print("✅ 알림 설정 초기화 완료")
# ==================== 앱 시작 이벤트 ====================
@app.on_event("startup")
async def startup_event():
"""앱 시작 시 초기화 및 백그라운드 작업 시작"""
db = next(get_db())
try:
init_assets(db)
init_user_assets(db)
init_alert_settings(db)
print("🚀 Asset Pilot 서버 시작 완료")
finally:
db.close()
# 백그라운드 데이터 수집 시작
asyncio.create_task(background_fetch())
async def background_fetch():
"""백그라운드에서 주기적으로 가격 수집"""
global current_prices
interval = int(os.getenv('FETCH_INTERVAL', 5))
while True:
try:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 데이터 수집 시작...")
current_prices = fetcher.fetch_all()
except Exception as e:
print(f"❌ 데이터 수집 오류: {e}")
await asyncio.sleep(interval)
# ==================== API 엔드포인트 ====================
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
"""메인 페이지"""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/prices")
async def get_prices():
"""현재 가격 조회"""
return current_prices
@app.get("/api/assets")
async def get_assets(db: Session = Depends(get_db)):
"""사용자 자산 조회"""
assets = db.query(Asset, UserAsset).join(
UserAsset, Asset.id == UserAsset.asset_id
).all()
result = []
for asset, user_asset in assets:
result.append({
"symbol": asset.symbol,
"name": asset.name,
"category": asset.category,
"previous_close": float(user_asset.previous_close),
"average_price": float(user_asset.average_price),
"quantity": float(user_asset.quantity),
})
return result
@app.post("/api/assets")
async def update_asset(data: UserAssetUpdate, db: Session = Depends(get_db)):
"""자산 정보 업데이트"""
asset = db.query(Asset).filter(Asset.symbol == data.symbol).first()
if not asset:
raise HTTPException(status_code=404, detail="자산을 찾을 수 없습니다")
user_asset = db.query(UserAsset).filter(UserAsset.asset_id == asset.id).first()
if user_asset:
user_asset.previous_close = data.previous_close
user_asset.average_price = data.average_price
user_asset.quantity = data.quantity
db.commit()
return {"status": "success", "message": "업데이트 완료"}
raise HTTPException(status_code=404, detail="사용자 자산 정보를 찾을 수 없습니다")
@app.get("/api/pnl")
async def get_pnl(db: Session = Depends(get_db)):
"""손익 계산"""
# KRX/GLD 자산 정보
krx_asset = db.query(Asset).filter(Asset.symbol == "KRX/GLD").first()
krx_user = db.query(UserAsset).filter(UserAsset.asset_id == krx_asset.id).first() if krx_asset else None
# BTC/KRW 자산 정보
btc_asset = db.query(Asset).filter(Asset.symbol == "BTC/KRW").first()
btc_user = db.query(UserAsset).filter(UserAsset.asset_id == btc_asset.id).first() if btc_asset else None
gold_buy_price = float(krx_user.average_price) if krx_user else 0
gold_quantity = float(krx_user.quantity) if krx_user else 0
btc_buy_price = float(btc_user.average_price) if btc_user else 0
btc_quantity = float(btc_user.quantity) if btc_user else 0
current_gold = current_prices.get("KRX/GLD", {}).get("가격")
current_btc = current_prices.get("BTC/KRW", {}).get("가격")
pnl = Calculator.calc_pnl(
gold_buy_price, gold_quantity,
btc_buy_price, btc_quantity,
current_gold, current_btc
)
return pnl
@app.get("/api/alerts/settings")
async def get_alert_settings(db: Session = Depends(get_db)):
"""알림 설정 조회"""
settings = db.query(AlertSetting).all()
result = {}
for setting in settings:
try:
result[setting.setting_key] = json.loads(setting.setting_value)
except:
result[setting.setting_key] = setting.setting_value
return result
@app.post("/api/alerts/settings")
async def update_alert_settings(data: AlertSettingUpdate, db: Session = Depends(get_db)):
"""알림 설정 업데이트"""
for key, value in data.settings.items():
setting = db.query(AlertSetting).filter(AlertSetting.setting_key == key).first()
if setting:
setting.setting_value = json.dumps(value)
else:
new_setting = AlertSetting(setting_key=key, setting_value=json.dumps(value))
db.add(new_setting)
db.commit()
return {"status": "success", "message": "알림 설정 업데이트 완료"}
@app.get("/api/stream")
async def stream_prices():
"""Server-Sent Events로 실시간 가격 스트리밍"""
async def event_generator():
while True:
if current_prices:
data = json.dumps(current_prices, ensure_ascii=False)
yield f"data: {data}\n\n"
await asyncio.sleep(1)
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/health")
async def health_check():
"""헬스 체크"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"prices_loaded": len(current_prices) > 0
}
# ==================== 메인 실행 ====================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=os.getenv("APP_HOST", "0.0.0.0"),
port=int(os.getenv("APP_PORT", 8000)),
reload=os.getenv("DEBUG", "False").lower() == "true"
)