Really-amin's picture
Upload 663 files
25809d7 verified
#!/usr/bin/env python3
"""
Market Data Router - Complete market endpoints
GET /api/market, /api/market/top10, /api/market/top50, /api/market/pairs,
/api/market/ohlc/{symbol}, /api/market/depth/{symbol}, /api/market/history/{symbol}
"""
from fastapi import APIRouter, HTTPException, Query, Path
from fastapi.responses import JSONResponse
from typing import Optional, List, Dict, Any
from datetime import datetime, timezone
import logging
import json
from pathlib import Path as PathLib
from backend.services.provider_fallback_manager import fallback_manager
from backend.services.data_resolver import get_data_resolver
from backend.services.persistence_service import PersistenceService
from database.db_manager import db_manager
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/market",
tags=["Market Data"]
)
persistence_service = PersistenceService()
WORKSPACE_ROOT = PathLib("/app" if PathLib("/app").exists() else PathLib("."))
RESOURCES_JSON = WORKSPACE_ROOT / "api-resources" / "crypto_resources_unified_2025-11-11.json"
ALL_APIS_JSON = WORKSPACE_ROOT / "all_apis_merged_2025.json"
def create_response(data: Any, source: str, attempted: List[str] = None) -> Dict[str, Any]:
"""Create standardized response"""
return {
"data": data,
"meta": {
"source": source,
"generated_at": datetime.now(timezone.utc).isoformat(),
"cache_ttl": 30,
"attempted": attempted or [source]
}
}
@router.get("")
async def get_market():
"""Get market snapshot"""
attempted = []
# Try HF Space first
try:
resolver = await get_data_resolver()
hf_data = await resolver.resolve_market_data()
if hf_data:
attempted.append("hf")
return create_response(hf_data, "hf", attempted)
except Exception as e:
logger.warning(f"HF Space unavailable: {e}")
attempted.append("hf")
# Try local resources
try:
if RESOURCES_JSON.exists():
with open(RESOURCES_JSON, 'r', encoding='utf-8') as f:
resources = json.load(f)
# Extract market data from resources
market_data = resources.get("registry", {}).get("market_data", [])
if market_data:
attempted.append("local_resources")
return create_response(market_data[:50], "local_resources", attempted)
except Exception as e:
logger.warning(f"Local resources failed: {e}")
attempted.append("local_resources")
# Fallback to external APIs
try:
result = await fallback_manager.fetch_with_fallback(
endpoint="/ticker/price",
params={"symbol": "BTCUSDT"},
transform_func=lambda x: x
)
attempted.extend(result.attempted)
if result.success:
return create_response(result.data, result.source, attempted)
except Exception as e:
logger.error(f"External APIs failed: {e}")
attempted.append("external_apis")
# Return error with attempted sources
raise HTTPException(
status_code=503,
detail={
"error": "DATA_NOT_AVAILABLE",
"attempted": attempted,
"message": "All data sources failed"
}
)
@router.get("/top10")
async def get_top10():
"""Get top 10 cryptocurrencies"""
market_data = await get_market()
if market_data.get("data"):
top10 = sorted(
market_data["data"],
key=lambda x: x.get("market_cap", 0) or x.get("rank", 999),
reverse=False
)[:10]
return create_response(top10, market_data["meta"]["source"])
raise HTTPException(status_code=404, detail="Market data not available")
@router.get("/top50")
async def get_top50():
"""Get top 50 cryptocurrencies"""
market_data = await get_market()
if market_data.get("data"):
top50 = sorted(
market_data["data"],
key=lambda x: x.get("market_cap", 0) or x.get("rank", 999),
reverse=False
)[:50]
return create_response(top50, market_data["meta"]["source"])
raise HTTPException(status_code=404, detail="Market data not available")
@router.get("/pairs")
async def get_pairs():
"""Get trading pairs"""
attempted = []
# Try HF Space first
try:
resolver = await get_data_resolver()
pairs = await resolver.resolve_trading_pairs()
if pairs:
attempted.append("hf")
return create_response(pairs, "hf", attempted)
except Exception as e:
logger.warning(f"HF Space unavailable: {e}")
attempted.append("hf")
# Try local trading_pairs.txt
try:
pairs_file = WORKSPACE_ROOT / "trading_pairs.txt"
if pairs_file.exists():
pairs = []
for line in pairs_file.read_text(encoding='utf-8').strip().split('\n'):
if line.strip():
pairs.append({"pair": line.strip(), "base": line.strip()[:-4], "quote": "USDT"})
if pairs:
attempted.append("local_file")
return create_response(pairs, "local_file", attempted)
except Exception as e:
logger.warning(f"Local file failed: {e}")
attempted.append("local_file")
# Fallback to external
try:
result = await fallback_manager.fetch_with_fallback(
endpoint="/exchangeInfo",
params={},
transform_func=lambda x: x.get("symbols", []) if isinstance(x, dict) else []
)
attempted.extend(result.attempted)
if result.success and result.data:
pairs = [{"pair": s.get("symbol", ""), "base": s.get("baseAsset", ""), "quote": s.get("quoteAsset", "")}
for s in result.data[:100]]
return create_response(pairs, result.source, attempted)
except Exception as e:
logger.error(f"External APIs failed: {e}")
attempted.append("external_apis")
# Default pairs
default_pairs = [
{"pair": "BTCUSDT", "base": "BTC", "quote": "USDT"},
{"pair": "ETHUSDT", "base": "ETH", "quote": "USDT"},
{"pair": "BNBUSDT", "base": "BNB", "quote": "USDT"},
]
return create_response(default_pairs, "default", attempted)
@router.get("/ohlc/{symbol}")
async def get_ohlc(
symbol: str = Path(..., description="Trading symbol"),
interval: str = Query("1h", description="Interval"),
limit: int = Query(100, description="Number of candles")
):
"""Get OHLC candlestick data"""
attempted = []
# Try HF Space
try:
resolver = await get_data_resolver()
ohlc = await resolver.resolve_ohlc(symbol, interval, limit)
if ohlc:
attempted.append("hf")
return create_response(ohlc, "hf", attempted)
except Exception as e:
logger.warning(f"HF Space unavailable: {e}")
attempted.append("hf")
# Fallback to external
try:
result = await fallback_manager.fetch_with_fallback(
endpoint="/klines",
params={"symbol": symbol.upper() + "USDT", "interval": interval, "limit": limit},
transform_func=lambda x: x if isinstance(x, list) else []
)
attempted.extend(result.attempted)
if result.success:
return create_response(result.data, result.source, attempted)
except Exception as e:
logger.error(f"External APIs failed: {e}")
attempted.append("external_apis")
raise HTTPException(
status_code=503,
detail={"error": "OHLC_DATA_NOT_AVAILABLE", "attempted": attempted}
)
@router.get("/depth/{symbol}")
async def get_depth(
symbol: str = Path(..., description="Trading symbol"),
limit: int = Query(20, description="Depth limit")
):
"""Get order book depth"""
attempted = []
# Try HF Space
try:
resolver = await get_data_resolver()
depth = await resolver.resolve_orderbook(symbol, limit)
if depth:
attempted.append("hf")
return create_response(depth, "hf", attempted)
except Exception as e:
logger.warning(f"HF Space unavailable: {e}")
attempted.append("hf")
# Fallback to external
try:
result = await fallback_manager.fetch_with_fallback(
endpoint="/depth",
params={"symbol": symbol.upper() + "USDT", "limit": limit},
transform_func=lambda x: x if isinstance(x, dict) else {"bids": [], "asks": []}
)
attempted.extend(result.attempted)
if result.success:
return create_response(result.data, result.source, attempted)
except Exception as e:
logger.error(f"External APIs failed: {e}")
attempted.append("external_apis")
raise HTTPException(
status_code=503,
detail={"error": "DEPTH_DATA_NOT_AVAILABLE", "attempted": attempted}
)
@router.get("/history/{symbol}")
async def get_history(
symbol: str = Path(..., description="Trading symbol"),
days: int = Query(30, description="Number of days")
):
"""Get historical price data"""
attempted = []
# Try database first
try:
historical = await persistence_service.get_historical_prices(symbol, days)
if historical:
attempted.append("database")
return create_response(historical, "database", attempted)
except Exception as e:
logger.warning(f"Database query failed: {e}")
attempted.append("database")
# Try HF Space
try:
resolver = await get_data_resolver()
history = await resolver.resolve_historical(symbol, days)
if history:
attempted.append("hf")
# Save to database
await persistence_service.save_historical_prices(symbol, history)
return create_response(history, "hf", attempted)
except Exception as e:
logger.warning(f"HF Space unavailable: {e}")
attempted.append("hf")
# Fallback to external
try:
result = await fallback_manager.fetch_with_fallback(
endpoint="/historical",
params={"symbol": symbol.upper(), "days": days},
transform_func=lambda x: x if isinstance(x, list) else []
)
attempted.extend(result.attempted)
if result.success:
# Save to database
await persistence_service.save_historical_prices(symbol, result.data)
return create_response(result.data, result.source, attempted)
except Exception as e:
logger.error(f"External APIs failed: {e}")
attempted.append("external_apis")
raise HTTPException(
status_code=503,
detail={"error": "HISTORICAL_DATA_NOT_AVAILABLE", "attempted": attempted}
)