181 lines
8.0 KiB
Python
181 lines
8.0 KiB
Python
import httpx
|
|
import asyncio
|
|
import re
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, Optional
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import update
|
|
|
|
# [중요] 순환 참조 방지를 위해 상단에서는 Asset만 가져옵니다.
|
|
try:
|
|
from .models import Asset
|
|
except ImportError:
|
|
from models import Asset
|
|
|
|
class DataFetcher:
|
|
def __init__(self):
|
|
# 인베스팅닷컴 등 외부 사이트 차단 방지용 헤더
|
|
self.headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
|
|
'Accept-Language': 'ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7',
|
|
'Cache-Control': 'no-cache',
|
|
'Pragma': 'no-cache'
|
|
}
|
|
self.daily_closing_prices = {}
|
|
|
|
async def fetch_google_finance(self, client: httpx.AsyncClient, asset_code: str) -> Optional[float]:
|
|
"""구글 파이낸스 환율/지수 수집"""
|
|
try:
|
|
symbol = "USD-KRW" if asset_code == "USD/KRW" else "INDEXDXY:CURRENCY" if asset_code == "USD/DXY" else None
|
|
if not symbol: return None
|
|
|
|
url = f"https://www.google.com/finance/quote/{symbol}"
|
|
res = await client.get(url, timeout=5)
|
|
if res.status_code != 200: return None
|
|
|
|
m = re.search(r'data-last-price="([\d,.]+)"', res.text)
|
|
if m:
|
|
return float(m.group(1).replace(",", ""))
|
|
except Exception as e:
|
|
print(f"⚠️ Google Finance 에러 ({asset_code}): {e}")
|
|
return None
|
|
|
|
async def fetch_investing_com(self, client: httpx.AsyncClient, asset_code: str) -> Optional[float]:
|
|
"""인베스팅닷컴 비동기 수집 (기존 패턴들 모두 포함)"""
|
|
try:
|
|
if asset_code == "USD/DXY":
|
|
url = "https://www.investing.com/indices/usdollar"
|
|
else:
|
|
url = f"https://www.investing.com/currencies/{asset_code.lower().replace('/', '-')}"
|
|
|
|
response = await client.get(url, timeout=15, follow_redirects=True)
|
|
if response.status_code != 200:
|
|
print(f"⚠️ Investing 응답 에러 ({asset_code}): {response.status_code}")
|
|
return None
|
|
|
|
html = response.text
|
|
patterns = [
|
|
r'data-test="instrument-price-last">([\d,.]+)<',
|
|
r'last_last">([\d,.]+)<',
|
|
r'instrument-price-last">([\d,.]+)<',
|
|
r'class="[^"]*text-2xl[^"]*">([\d,.]+)<'
|
|
]
|
|
|
|
for pattern in patterns:
|
|
p = re.search(pattern, html)
|
|
if p:
|
|
return float(p.group(1).replace(',', ''))
|
|
|
|
print(f"⚠️ Investing 패턴 매칭 실패 ({asset_code})")
|
|
except Exception as e:
|
|
print(f"⚠️ Investing 시스템 에러 ({asset_code}): {e}")
|
|
return None
|
|
|
|
async def fetch_binance(self, client: httpx.AsyncClient) -> Optional[float]:
|
|
"""바이낸스 BTC/USD"""
|
|
try:
|
|
url = "https://api.binance.com/api/v3/ticker/price?symbol=BTCUSD"
|
|
res = await client.get(url, timeout=5)
|
|
return float(res.json()["price"])
|
|
except: return None
|
|
|
|
async def fetch_upbit(self, client: httpx.AsyncClient) -> Optional[float]:
|
|
"""업비트 BTC/KRW"""
|
|
try:
|
|
url = "https://api.upbit.com/v1/ticker?markets=KRW-BTC"
|
|
res = await client.get(url, timeout=5)
|
|
return float(res.json()[0]["trade_price"])
|
|
except: return None
|
|
|
|
async def fetch_krx_gold(self, client: httpx.AsyncClient) -> Optional[float]:
|
|
"""네이버 금 시세 (국내)"""
|
|
try:
|
|
url = "https://m.stock.naver.com/marketindex/metals/M04020000"
|
|
res = await client.get(url, timeout=5)
|
|
m = re.search(r'\"closePrice\":\"([\d,.]+)\"', res.text)
|
|
if m:
|
|
return float(m.group(1).replace(",", ""))
|
|
except: return None
|
|
|
|
async def update_closing_prices(self, db: Session):
|
|
"""매일 정해진 시간에 기준가 스냅샷 업데이트"""
|
|
results = await self.update_realtime_prices(db)
|
|
for symbol, price in results.items():
|
|
if price:
|
|
self.daily_closing_prices[symbol] = price
|
|
print(f"📌 [기준가 업데이트] 완료: {self.daily_closing_prices}")
|
|
|
|
async def update_realtime_prices(self, db: Session) -> Dict:
|
|
"""[핵심] 비동기 수집 후 DB 즉시 업데이트 및 특수 로직 처리"""
|
|
start_time = time.time()
|
|
|
|
async with httpx.AsyncClient(headers=self.headers, follow_redirects=True) as client:
|
|
# 1. 병렬 수집
|
|
tasks = {
|
|
"XAU/USD": self.fetch_investing_com(client, "XAU/USD"),
|
|
"XAU/CNY": self.fetch_investing_com(client, "XAU/CNY"),
|
|
"XAU/GBP": self.fetch_investing_com(client, "XAU/GBP"),
|
|
"USD/DXY": self.fetch_investing_com(client, "USD/DXY"),
|
|
"USD/KRW": self.fetch_google_finance(client, "USD/KRW"),
|
|
"BTC/USD": self.fetch_binance(client),
|
|
"BTC/KRW": self.fetch_upbit(client),
|
|
"KRX/GLD": self.fetch_krx_gold(client),
|
|
}
|
|
|
|
keys = list(tasks.keys())
|
|
values = await asyncio.gather(*tasks.values(), return_exceptions=True)
|
|
|
|
raw_results = {keys[i]: (v if isinstance(v, (int, float)) else None) for i, v in enumerate(values)}
|
|
|
|
# 2. XAU/KRW 실시간 계산 (국제 금 시세 * 환율)
|
|
if raw_results.get("XAU/USD") and raw_results.get("USD/KRW"):
|
|
raw_results["XAU/KRW"] = round((raw_results["XAU/USD"] / 31.1034768) * raw_results["USD/KRW"], 0)
|
|
else:
|
|
raw_results["XAU/KRW"] = None
|
|
|
|
# 3. DB 업데이트 수행
|
|
try:
|
|
# [A] 기본 Asset 테이블 업데이트
|
|
for symbol, price in raw_results.items():
|
|
if price is not None:
|
|
state = "stable"
|
|
if symbol in self.daily_closing_prices:
|
|
closing = self.daily_closing_prices[symbol]
|
|
if price > closing: state = "up"
|
|
elif price < closing: state = "down"
|
|
|
|
db.execute(
|
|
update(Asset)
|
|
.where(Asset.symbol == symbol)
|
|
.values(current_price=price, price_state=state, last_updated=datetime.now())
|
|
)
|
|
|
|
# [B] XAU/KRW 프리미엄용 특수 로직 (KRX 가격을 전일종가로 세팅)
|
|
if raw_results.get("KRX/GLD"):
|
|
try:
|
|
from .models import UserAsset, Asset as ModelAsset # 로컬 임포트 유지
|
|
except ImportError:
|
|
from models import UserAsset, Asset as ModelAsset
|
|
|
|
target_asset = db.query(ModelAsset).filter(ModelAsset.symbol == "XAU/KRW").first()
|
|
if target_asset:
|
|
db.execute(
|
|
update(UserAsset)
|
|
.where(UserAsset.asset_id == target_asset.id)
|
|
.values(previous_close=raw_results["KRX/GLD"])
|
|
)
|
|
# 메모리 기준가도 함께 갱신하여 UI 상태 화살표 유지
|
|
self.daily_closing_prices["XAU/KRW"] = raw_results["KRX/GLD"]
|
|
|
|
db.commit()
|
|
except Exception as e:
|
|
db.rollback()
|
|
print(f"❌ DB 저장 중 오류 발생: {e}")
|
|
|
|
print(f"✅ [{datetime.now().strftime('%H:%M:%S')}] 수집 및 DB 저장 완료 ({time.time()-start_time:.2f}s)")
|
|
return raw_results
|
|
|
|
# 싱글톤 인스턴스
|
|
fetcher = DataFetcher() |