|
|
""" |
|
|
Data Hub API Router |
|
|
Serves collected data from the database |
|
|
""" |
|
|
|
|
|
from fastapi import APIRouter, HTTPException, Query |
|
|
from typing import List, Dict, Any, Optional |
|
|
from datetime import datetime, timedelta, timezone |
|
|
from database.db_manager import DatabaseManager |
|
|
from database.models import MarketPrice, OHLC, SentimentMetric |
|
|
|
|
|
router = APIRouter(prefix="/api/hub", tags=["Data Hub"]) |
|
|
db_manager = DatabaseManager() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/prices/latest") |
|
|
async def get_latest_prices( |
|
|
symbols: Optional[str] = Query(None, description="Comma-separated symbols (e.g., BTC,ETH)"), |
|
|
source: Optional[str] = Query(None, description="Filter by source (CoinGecko, Binance)"), |
|
|
limit: int = Query(100, ge=1, le=1000) |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Get latest market prices from database |
|
|
|
|
|
Returns the most recent price for each symbol |
|
|
""" |
|
|
try: |
|
|
with db_manager.get_session() as session: |
|
|
|
|
|
from sqlalchemy import func |
|
|
|
|
|
|
|
|
subq = ( |
|
|
session.query( |
|
|
MarketPrice.symbol, |
|
|
func.max(MarketPrice.timestamp).label('max_ts') |
|
|
) |
|
|
.group_by(MarketPrice.symbol) |
|
|
.subquery() |
|
|
) |
|
|
|
|
|
|
|
|
query = session.query(MarketPrice).join( |
|
|
subq, |
|
|
(MarketPrice.symbol == subq.c.symbol) & |
|
|
(MarketPrice.timestamp == subq.c.max_ts) |
|
|
) |
|
|
|
|
|
|
|
|
if symbols: |
|
|
symbol_list = [s.strip().upper() for s in symbols.split(',')] |
|
|
query = query.filter(MarketPrice.symbol.in_(symbol_list)) |
|
|
|
|
|
if source: |
|
|
query = query.filter(MarketPrice.source == source) |
|
|
|
|
|
prices = query.limit(limit).all() |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"count": len(prices), |
|
|
"data": [ |
|
|
{ |
|
|
"symbol": p.symbol, |
|
|
"price_usd": p.price_usd, |
|
|
"market_cap": p.market_cap, |
|
|
"volume_24h": p.volume_24h, |
|
|
"price_change_24h": p.price_change_24h, |
|
|
"source": p.source, |
|
|
"timestamp": p.timestamp.isoformat() |
|
|
} |
|
|
for p in prices |
|
|
] |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error fetching prices: {str(e)}") |
|
|
|
|
|
|
|
|
@router.get("/prices/{symbol}") |
|
|
async def get_symbol_price( |
|
|
symbol: str, |
|
|
hours: int = Query(24, ge=1, le=168, description="Hours of history") |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Get price history for a specific symbol |
|
|
""" |
|
|
try: |
|
|
with db_manager.get_session() as session: |
|
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) |
|
|
|
|
|
prices = ( |
|
|
session.query(MarketPrice) |
|
|
.filter( |
|
|
MarketPrice.symbol == symbol.upper(), |
|
|
MarketPrice.timestamp >= cutoff |
|
|
) |
|
|
.order_by(MarketPrice.timestamp.desc()) |
|
|
.all() |
|
|
) |
|
|
|
|
|
if not prices: |
|
|
raise HTTPException(status_code=404, detail=f"No data found for {symbol}") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"symbol": symbol.upper(), |
|
|
"count": len(prices), |
|
|
"data": [ |
|
|
{ |
|
|
"price_usd": p.price_usd, |
|
|
"market_cap": p.market_cap, |
|
|
"volume_24h": p.volume_24h, |
|
|
"price_change_24h": p.price_change_24h, |
|
|
"source": p.source, |
|
|
"timestamp": p.timestamp.isoformat() |
|
|
} |
|
|
for p in prices |
|
|
] |
|
|
} |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error fetching price history: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/ohlc/{symbol}") |
|
|
async def get_ohlc_data( |
|
|
symbol: str, |
|
|
interval: str = Query("1h", description="Timeframe (1m, 5m, 15m, 1h, 4h, 1d)"), |
|
|
limit: int = Query(100, ge=1, le=1000) |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Get OHLC candlestick data for charts |
|
|
|
|
|
Returns data in format ready for TradingView/Lightweight Charts |
|
|
""" |
|
|
try: |
|
|
with db_manager.get_session() as session: |
|
|
candles = ( |
|
|
session.query(OHLC) |
|
|
.filter( |
|
|
OHLC.symbol == symbol.upper(), |
|
|
OHLC.interval == interval |
|
|
) |
|
|
.order_by(OHLC.ts.desc()) |
|
|
.limit(limit) |
|
|
.all() |
|
|
) |
|
|
|
|
|
if not candles: |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail=f"No OHLC data found for {symbol} with interval {interval}" |
|
|
) |
|
|
|
|
|
|
|
|
candles = list(reversed(candles)) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"symbol": symbol.upper(), |
|
|
"interval": interval, |
|
|
"count": len(candles), |
|
|
"data": [ |
|
|
{ |
|
|
"time": int(c.ts.timestamp()), |
|
|
"open": c.open, |
|
|
"high": c.high, |
|
|
"low": c.low, |
|
|
"close": c.close, |
|
|
"volume": c.volume |
|
|
} |
|
|
for c in candles |
|
|
] |
|
|
} |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error fetching OHLC data: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/sentiment/fear-greed") |
|
|
async def get_fear_greed_index( |
|
|
hours: int = Query(24, ge=1, le=168) |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Get Fear & Greed Index data |
|
|
""" |
|
|
try: |
|
|
with db_manager.get_session() as session: |
|
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) |
|
|
|
|
|
metrics = ( |
|
|
session.query(SentimentMetric) |
|
|
.filter( |
|
|
SentimentMetric.metric_name == "fear_greed_index", |
|
|
SentimentMetric.timestamp >= cutoff |
|
|
) |
|
|
.order_by(SentimentMetric.timestamp.desc()) |
|
|
.all() |
|
|
) |
|
|
|
|
|
if not metrics: |
|
|
raise HTTPException(status_code=404, detail="No Fear & Greed data found") |
|
|
|
|
|
latest = metrics[0] |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"latest": { |
|
|
"value": latest.value, |
|
|
"classification": latest.classification, |
|
|
"timestamp": latest.timestamp.isoformat() |
|
|
}, |
|
|
"history": [ |
|
|
{ |
|
|
"value": m.value, |
|
|
"classification": m.classification, |
|
|
"timestamp": m.timestamp.isoformat() |
|
|
} |
|
|
for m in metrics |
|
|
] |
|
|
} |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error fetching sentiment data: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/status") |
|
|
async def get_hub_status() -> Dict[str, Any]: |
|
|
""" |
|
|
Get data hub status and statistics |
|
|
""" |
|
|
try: |
|
|
db_stats = db_manager.get_database_stats() |
|
|
|
|
|
|
|
|
with db_manager.get_session() as session: |
|
|
latest_price = ( |
|
|
session.query(MarketPrice) |
|
|
.order_by(MarketPrice.timestamp.desc()) |
|
|
.first() |
|
|
) |
|
|
|
|
|
latest_ohlc = ( |
|
|
session.query(OHLC) |
|
|
.order_by(OHLC.ts.desc()) |
|
|
.first() |
|
|
) |
|
|
|
|
|
latest_sentiment = ( |
|
|
session.query(SentimentMetric) |
|
|
.order_by(SentimentMetric.timestamp.desc()) |
|
|
.first() |
|
|
) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"status": "operational", |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"database": { |
|
|
"size_mb": db_stats.get("database_size_mb", 0), |
|
|
"providers": db_stats.get("providers", 0) |
|
|
}, |
|
|
"data_counts": { |
|
|
"market_prices": db_stats.get("market_prices", 0), |
|
|
"ohlc_candles": db_stats.get("ohlc", 0), |
|
|
"sentiment_metrics": db_stats.get("sentiment_metrics", 0) |
|
|
}, |
|
|
"latest_updates": { |
|
|
"prices": latest_price.timestamp.isoformat() if latest_price else None, |
|
|
"ohlc": latest_ohlc.ts.isoformat() if latest_ohlc else None, |
|
|
"sentiment": latest_sentiment.timestamp.isoformat() if latest_sentiment else None |
|
|
} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"status": "error", |
|
|
"error": str(e), |
|
|
"timestamp": datetime.now(timezone.utc).isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/stats") |
|
|
async def get_hub_stats() -> Dict[str, Any]: |
|
|
""" |
|
|
Get comprehensive hub statistics |
|
|
""" |
|
|
try: |
|
|
with db_manager.get_session() as session: |
|
|
from sqlalchemy import func, distinct |
|
|
|
|
|
|
|
|
unique_symbols = session.query(func.count(distinct(MarketPrice.symbol))).scalar() |
|
|
|
|
|
|
|
|
price_sources = ( |
|
|
session.query(MarketPrice.source, func.count(MarketPrice.id)) |
|
|
.group_by(MarketPrice.source) |
|
|
.all() |
|
|
) |
|
|
|
|
|
|
|
|
latest_price = ( |
|
|
session.query(MarketPrice) |
|
|
.order_by(MarketPrice.timestamp.desc()) |
|
|
.first() |
|
|
) |
|
|
|
|
|
if latest_price: |
|
|
age_seconds = (datetime.now(timezone.utc) - latest_price.timestamp).total_seconds() |
|
|
freshness = "fresh" if age_seconds < 120 else "stale" |
|
|
else: |
|
|
age_seconds = None |
|
|
freshness = "no_data" |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"symbols_tracked": unique_symbols, |
|
|
"data_sources": { |
|
|
source: count for source, count in price_sources |
|
|
}, |
|
|
"data_freshness": { |
|
|
"status": freshness, |
|
|
"age_seconds": age_seconds, |
|
|
"last_update": latest_price.timestamp.isoformat() if latest_price else None |
|
|
}, |
|
|
"database": db_manager.get_database_stats() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error fetching stats: {str(e)}") |
|
|
|