Datasourceforcryptocurrency-2 / backend /routers /frontend_compat_router.py
Really-amin's picture
Upload 946 files
942e436 verified
#!/usr/bin/env python3
"""
Frontend Compatibility Router
Provides missing endpoints expected by frontend with paths that match frontend requests
"""
import logging
import json
import time
from pathlib import Path
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import JSONResponse
from typing import Optional, List, Dict, Any
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
router = APIRouter(tags=["Frontend Compatibility"])
def _get_debug_log_path():
"""Get debug log path"""
workspace_root = Path("/app" if Path("/app").exists() else (Path("/workspace") if Path("/workspace").exists() else Path("."))).resolve()
debug_log_dir = workspace_root / ".cursor"
debug_log_dir.mkdir(parents=True, exist_ok=True)
return debug_log_dir / "debug.log"
def create_response(data: Any, source: str) -> Dict[str, Any]:
"""Create standardized response"""
return {
"data": data,
"meta": {
"source": source,
"generated_at": datetime.now(timezone.utc).isoformat(),
"cache_ttl": 60
}
}
# ============================================================================
# SENTIMENT ENDPOINTS
# ============================================================================
@router.get("/sentiment/fear-greed")
async def get_fear_greed_index():
"""
Get Fear & Greed Index - frontend compatible path
Maps to collector data
"""
# #region agent log
try:
with open(_get_debug_log_path(), "a", encoding="utf-8") as f:
f.write(json.dumps({"timestamp": int(time.time() * 1000), "sessionId": "debug-session", "runId": "run1", "hypothesisId": "D", "location": "frontend_compat_router.py:50", "message": "GET /sentiment/fear-greed called", "data": {}}) + "\n")
except: pass
# #endregion
try:
# Try to get from collectors endpoint
from collectors.sentiment.fear_greed import FearGreedCollector
collector = FearGreedCollector()
result = await collector.collect()
if result and result.get("status") == "success":
fear_greed_data = result.get("data", {})
# #region agent log
try:
with open(_get_debug_log_path(), "a", encoding="utf-8") as f:
f.write(json.dumps({"timestamp": int(time.time() * 1000), "sessionId": "debug-session", "runId": "run1", "hypothesisId": "F", "location": "frontend_compat_router.py:65", "message": "Fear & Greed collector success", "data": {"fear_greed_value": fear_greed_data.get("value")}}) + "\n")
except: pass
# #endregion
return create_response(fear_greed_data, "fear_greed_collector")
except Exception as e:
logger.warning(f"Fear & Greed collector failed: {e}")
# #region agent log
try:
with open(_get_debug_log_path(), "a", encoding="utf-8") as f:
f.write(json.dumps({"timestamp": int(time.time() * 1000), "sessionId": "debug-session", "runId": "run1", "hypothesisId": "F", "location": "frontend_compat_router.py:74", "message": "Fear & Greed collector failed", "data": {"error": str(e)}}) + "\n")
except: pass
# #endregion
# Fallback - return sample data
return create_response({
"value": 50,
"classification": "Neutral",
"timestamp": datetime.now(timezone.utc).isoformat(),
"status": "degraded",
"message": "Using fallback data - collector unavailable"
}, "fallback")
@router.get("/sentiment/global")
async def get_global_sentiment():
"""
Get global sentiment aggregated from multiple sources
"""
# #region agent log
try:
with open(_get_debug_log_path(), "a", encoding="utf-8") as f:
f.write(json.dumps({"timestamp": int(time.time() * 1000), "sessionId": "debug-session", "runId": "run1", "hypothesisId": "D", "location": "frontend_compat_router.py:95", "message": "GET /sentiment/global called", "data": {}}) + "\n")
except: pass
# #endregion
try:
# Try to get fear & greed as primary sentiment indicator
from collectors.sentiment.fear_greed import FearGreedCollector
collector = FearGreedCollector()
result = await collector.collect()
if result and result.get("status") == "success":
fear_greed_data = result.get("data", {})
return create_response({
"sentiment": fear_greed_data.get("classification", "Neutral"),
"fear_greed_index": fear_greed_data.get("value", 50),
"confidence": 0.85,
"sources": ["fear_greed"],
"timestamp": datetime.now(timezone.utc).isoformat()
}, "fear_greed_collector")
except Exception as e:
logger.warning(f"Global sentiment collection failed: {e}")
# Fallback
return create_response({
"sentiment": "Neutral",
"fear_greed_index": 50,
"confidence": 0.5,
"sources": [],
"timestamp": datetime.now(timezone.utc).isoformat(),
"status": "degraded"
}, "fallback")
# ============================================================================
# MARKET ENDPOINTS
# ============================================================================
@router.get("/market/prices")
async def get_market_prices(
limit: int = Query(10, description="Number of coins"),
symbols: Optional[str] = Query(None, description="Comma-separated symbols (BTC,ETH)")
):
"""
Get cryptocurrency prices - frontend compatible path
"""
# #region agent log
try:
with open(_get_debug_log_path(), "a", encoding="utf-8") as f:
f.write(json.dumps({"timestamp": int(time.time() * 1000), "sessionId": "debug-session", "runId": "run1", "hypothesisId": "D", "location": "frontend_compat_router.py:145", "message": "GET /market/prices called", "data": {"limit": limit, "symbols": symbols}}) + "\n")
except: pass
# #endregion
try:
# Try to get from CoinGecko collector
from collectors.market.coingecko import CoinGeckoCollector
collector = CoinGeckoCollector()
result = await collector.collect()
if result and result.get("status") == "success":
prices = result.get("data", {}).get("coins", [])
# Filter by symbols if provided
if symbols:
symbol_list = [s.strip().upper() for s in symbols.split(",")]
prices = [p for p in prices if p.get("symbol", "").upper() in symbol_list]
# Limit results
prices = prices[:limit]
# #region agent log
try:
with open(_get_debug_log_path(), "a", encoding="utf-8") as f:
f.write(json.dumps({"timestamp": int(time.time() * 1000), "sessionId": "debug-session", "runId": "run1", "hypothesisId": "F", "location": "frontend_compat_router.py:169", "message": "CoinGecko collector success", "data": {"count": len(prices)}}) + "\n")
except: pass
# #endregion
return create_response(prices, "coingecko_collector")
except Exception as e:
logger.warning(f"CoinGecko collector failed: {e}")
# #region agent log
try:
with open(_get_debug_log_path(), "a", encoding="utf-8") as f:
f.write(json.dumps({"timestamp": int(time.time() * 1000), "sessionId": "debug-session", "runId": "run1", "hypothesisId": "F", "location": "frontend_compat_router.py:179", "message": "CoinGecko collector failed", "data": {"error": str(e)}}) + "\n")
except: pass
# #endregion
# Fallback - return sample data
sample_prices = [
{"symbol": "BTC", "price": 50000.0, "change_24h": 2.5, "market_cap": 980000000000},
{"symbol": "ETH", "price": 3000.0, "change_24h": 1.8, "market_cap": 360000000000},
]
return create_response(sample_prices[:limit], "fallback")
@router.get("/market/top")
async def get_top_coins(
limit: int = Query(10, description="Number of top coins")
):
"""
Get top cryptocurrencies by market cap - frontend compatible path
"""
# #region agent log
try:
with open(_get_debug_log_path(), "a", encoding="utf-8") as f:
f.write(json.dumps({"timestamp": int(time.time() * 1000), "sessionId": "debug-session", "runId": "run1", "hypothesisId": "D", "location": "frontend_compat_router.py:202", "message": "GET /market/top called", "data": {"limit": limit}}) + "\n")
except: pass
# #endregion
try:
# Try to get from CoinGecko collector
from collectors.market.coingecko import CoinGeckoCollector
collector = CoinGeckoCollector()
result = await collector.collect()
if result and result.get("status") == "success":
coins = result.get("data", {}).get("coins", [])
# Sort by market cap
sorted_coins = sorted(coins, key=lambda x: x.get("market_cap", 0), reverse=True)
return create_response(sorted_coins[:limit], "coingecko_collector")
except Exception as e:
logger.warning(f"CoinGecko collector failed: {e}")
# Fallback
sample_top = [
{"symbol": "BTC", "name": "Bitcoin", "rank": 1, "market_cap": 980000000000},
{"symbol": "ETH", "name": "Ethereum", "rank": 2, "market_cap": 360000000000},
]
return create_response(sample_top[:limit], "fallback")
# ============================================================================
# NEWS ENDPOINTS
# ============================================================================
@router.get("/news/latest")
async def get_latest_news(
limit: int = Query(10, description="Number of articles"),
symbol: Optional[str] = Query(None, description="Filter by symbol")
):
"""
Get latest crypto news - frontend compatible path
"""
# #region agent log
try:
with open(_get_debug_log_path(), "a", encoding="utf-8") as f:
f.write(json.dumps({"timestamp": int(time.time() * 1000), "sessionId": "debug-session", "runId": "run1", "hypothesisId": "D", "location": "frontend_compat_router.py:243", "message": "GET /news/latest called", "data": {"limit": limit, "symbol": symbol}}) + "\n")
except: pass
# #endregion
# Forward to main news endpoint
try:
from backend.routers.news_router import get_latest_news as news_handler
return await news_handler(symbol=symbol or "BTC", limit=limit)
except Exception as e:
logger.warning(f"News router call failed: {e}")
# Return graceful fallback
return create_response([], "unavailable")
# ============================================================================
# WHALE TRACKING ENDPOINTS
# ============================================================================
@router.get("/whales")
async def get_whale_transactions(
limit: int = Query(50, description="Number of transactions"),
chain: Optional[str] = Query(None, description="Blockchain filter"),
min_amount_usd: Optional[float] = Query(100000, description="Minimum amount USD")
):
"""
Get whale transactions - frontend compatible path
"""
# #region agent log
try:
with open(_get_debug_log_path(), "a", encoding="utf-8") as f:
f.write(json.dumps({"timestamp": int(time.time() * 1000), "sessionId": "debug-session", "runId": "run1", "hypothesisId": "D", "location": "frontend_compat_router.py:274", "message": "GET /whales called", "data": {"limit": limit, "chain": chain}}) + "\n")
except: pass
# #endregion
# Forward to main whales endpoint
try:
from backend.routers.whales_router import get_whale_transactions as whales_handler
return await whales_handler(limit=limit, chain=chain, min_amount_usd=min_amount_usd)
except Exception as e:
logger.warning(f"Whales router call failed: {e}")
# Return graceful fallback
return create_response([], "unavailable")
# ============================================================================
# SYSTEM ENDPOINTS
# ============================================================================
@router.get("/system/status")
async def get_system_status():
"""
Get system status - frontend compatible path
"""
# #region agent log
try:
with open(_get_debug_log_path(), "a", encoding="utf-8") as f:
f.write(json.dumps({"timestamp": int(time.time() * 1000), "sessionId": "debug-session", "runId": "run1", "hypothesisId": "D", "location": "frontend_compat_router.py:300", "message": "GET /system/status called", "data": {}}) + "\n")
except: pass
# #endregion
# Forward to main status endpoint
try:
from backend.routers.system_router import get_status
return await get_status()
except Exception as e:
logger.warning(f"System router call failed: {e}")
# Return graceful fallback
return {
"status": "degraded",
"service": "Crypto Intelligence Hub",
"timestamp": datetime.now(timezone.utc).isoformat(),
"error": str(e)
}