Really-amin's picture
Upload 553 files
386790e verified
#!/usr/bin/env python3
"""
Unified Query Service API
========================
سرویس یکپارچه برای پاسخ به تمام نیازهای داده‌ای کلاینت در مورد ارزهای دیجیتال
Architecture:
- HF-first: ابتدا از Hugging Face Space استفاده می‌کنیم
- WS-exception: برای داده‌های real-time از WebSocket استفاده می‌کنیم
- Fallback: در نهایت از provider های خارجی استفاده می‌کنیم
- Persistence: همه داده‌ها در دیتابیس ذخیره می‌شوند
Endpoints:
1. /api/service/rate - نرخ ارز برای یک جفت
2. /api/service/rate/batch - نرخ‌های چند جفت
3. /api/service/pair/{pair} - متادیتای جفت ارز
4. /api/service/sentiment - تحلیل احساسات
5. /api/service/econ-analysis - تحلیل اقتصادی
6. /api/service/history - داده‌های تاریخی OHLC
7. /api/service/market-status - وضعیت کلی بازار
8. /api/service/top - بهترین N کوین
9. /api/service/whales - حرکات نهنگ‌ها
10. /api/service/onchain - داده‌های زنجیره‌ای
11. /api/service/query - Generic query endpoint
12. /ws - WebSocket برای real-time subscriptions
"""
from fastapi import APIRouter, HTTPException, Query, Body, WebSocket, WebSocketDisconnect, Path
from fastapi.responses import JSONResponse
from typing import Optional, List, Dict, Any, Union
from datetime import datetime, timedelta
from pydantic import BaseModel
import logging
import json
import asyncio
import os
import httpx
# Setup logging first
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# SQLAlchemy imports with graceful fallback
try:
from sqlalchemy.orm import Session # type: ignore[reportMissingImports]
from sqlalchemy import create_engine # type: ignore[reportMissingImports]
from sqlalchemy.orm import sessionmaker # type: ignore[reportMissingImports]
SQLALCHEMY_AVAILABLE = True
except ImportError:
SQLALCHEMY_AVAILABLE = False
logger.warning("⚠️ SQLAlchemy not available - database features will be disabled")
# Create dummy types for type checking
Session = Any # type: ignore
create_engine = None # type: ignore
sessionmaker = None # type: ignore
# Import internal modules
try:
from backend.services.hf_unified_client import get_hf_client
except ImportError:
logger.warning("⚠️ hf_unified_client not available")
get_hf_client = None # type: ignore
try:
from backend.services.real_websocket import ws_manager
except ImportError:
logger.warning("⚠️ real_websocket not available")
ws_manager = None # type: ignore
try:
from database.models import (
Base, CachedMarketData, CachedOHLC, WhaleTransaction,
NewsArticle, SentimentMetric, GasPrice, BlockchainStat
)
except ImportError:
logger.warning("⚠️ database.models not available - database features will be disabled")
Base = None # type: ignore
CachedMarketData = None # type: ignore
CachedOHLC = None # type: ignore
WhaleTransaction = None # type: ignore
NewsArticle = None # type: ignore
SentimentMetric = None # type: ignore
GasPrice = None # type: ignore
BlockchainStat = None # type: ignore
# Database setup (only if SQLAlchemy is available)
if SQLALCHEMY_AVAILABLE and create_engine and Base:
try:
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./unified_service.db")
engine = create_engine(DATABASE_URL)
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
except Exception as e:
logger.error(f"❌ Failed to initialize database: {e}")
engine = None
SessionLocal = None
else:
engine = None
SessionLocal = None
logger.warning("⚠️ Database not available - persistence features disabled")
router = APIRouter(
tags=["Unified Service API"],
prefix="" # No prefix, will be added at main level
)
# ============================================================================
# Pydantic Models
# ============================================================================
class RateRequest(BaseModel):
"""Single rate request"""
pair: str # BTC/USDT
convert: Optional[str] = None # USD
class BatchRateRequest(BaseModel):
"""Batch rate request"""
pairs: List[str] # ["BTC/USDT", "ETH/USDT"]
class SentimentRequest(BaseModel):
"""Sentiment analysis request"""
text: Optional[str] = None
symbol: Optional[str] = None
mode: str = "crypto"
class EconAnalysisRequest(BaseModel):
"""Economic analysis request"""
currency: str
period: str = "1M"
context: str = "macro, inflow, rates"
class GenericQueryRequest(BaseModel):
"""Generic query request"""
type: str # rate|history|sentiment|econ|whales|onchain|pair
payload: Dict[str, Any]
options: Optional[Dict[str, Any]] = {"prefer_hf": True, "persist": True}
# ============================================================================
# Helper Functions
# ============================================================================
def get_db():
"""Get database session"""
db = SessionLocal()
try:
yield db
finally:
db.close()
async def get_provider_config():
"""Load provider configuration"""
config_path = "/workspace/providers_config_ultimate.json"
# First try /mnt/data/api-config-complete.txt
alt_path = "/mnt/data/api-config-complete.txt"
if os.path.exists(alt_path):
with open(alt_path, 'r') as f:
return json.load(f)
# Fallback to local config
if os.path.exists(config_path):
with open(config_path, 'r') as f:
return json.load(f)
return {"providers": {}}
def build_meta(
source: str,
cache_ttl_seconds: int = 30,
confidence: Optional[float] = None,
attempted: Optional[List[str]] = None,
error: Optional[str] = None
) -> Dict[str, Any]:
"""Build standard meta object"""
meta = {
"source": source,
"generated_at": datetime.utcnow().isoformat() + "Z",
"cache_ttl_seconds": cache_ttl_seconds
}
if confidence is not None:
meta["confidence"] = confidence
if attempted:
meta["attempted"] = attempted
if error:
meta["error"] = error
return meta
async def persist_to_db(db: Session, data_type: str, data: Any, meta: Dict[str, Any]):
"""Persist data to database"""
try:
stored_at = datetime.utcnow()
stored_from = meta.get("source", "unknown")
if data_type == "rate":
# Save to CachedMarketData
if isinstance(data, dict):
market_data = CachedMarketData(
symbol=data.get("pair", "").split("/")[0],
price=data.get("price", 0),
provider=stored_from,
fetched_at=stored_at
)
db.add(market_data)
elif data_type == "sentiment":
# Save to SentimentMetric
if isinstance(data, dict):
sentiment = SentimentMetric(
metric_name="sentiment_analysis",
value=data.get("score", 0),
classification=data.get("label", "neutral"),
source=stored_from
)
db.add(sentiment)
elif data_type == "whale":
# Save to WhaleTransaction
if isinstance(data, list):
for tx in data:
whale_tx = WhaleTransaction(
blockchain=tx.get("chain", "ethereum"),
transaction_hash=tx.get("tx_hash", ""),
from_address=tx.get("from", ""),
to_address=tx.get("to", ""),
amount=tx.get("amount", 0),
amount_usd=tx.get("amount_usd", 0),
timestamp=datetime.fromisoformat(tx.get("ts", datetime.utcnow().isoformat())),
source=stored_from
)
db.add(whale_tx)
db.commit()
logger.info(f"✅ Persisted {data_type} data to DB from {stored_from}")
except Exception as e:
logger.error(f"❌ Failed to persist {data_type} data: {e}")
db.rollback()
async def try_hf_first(endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:
"""Try HuggingFace Space first"""
try:
hf_client = get_hf_client()
# Map endpoint to HF client method
if endpoint == "rate":
symbol = params.get("pair", "BTC/USDT").replace("/", "")
result = await hf_client.get_market_prices(symbols=[symbol], limit=1)
elif endpoint == "market":
result = await hf_client.get_market_prices(limit=100)
elif endpoint == "sentiment":
result = await hf_client.analyze_sentiment(params.get("text", ""))
elif endpoint == "whales":
result = await hf_client.get_whale_transactions(
limit=params.get("limit", 50),
chain=params.get("chain"),
min_amount_usd=params.get("min_amount_usd", 100000)
)
elif endpoint == "history":
result = await hf_client.get_market_history(
symbol=params.get("symbol", "BTC"),
timeframe=params.get("interval", "1h"),
limit=params.get("limit", 200)
)
else:
return None
if result and result.get("success"):
return result
except Exception as e:
logger.warning(f"HF Space not available for {endpoint}: {e}")
return None
async def try_ws_exception(endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:
"""Try WebSocket for real-time data"""
try:
# Only for real-time data
if endpoint in ["rate", "market", "whales"]:
# Send request through WebSocket
message = {
"action": "get",
"endpoint": endpoint,
"params": params
}
# This is a simplified version
# In production, you'd wait for response through WS
return None
except Exception as e:
logger.warning(f"WebSocket not available for {endpoint}: {e}")
return None
async def try_fallback_providers(endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:
"""
Try external fallback providers with at least 3 fallbacks per endpoint
Priority order: CoinGecko → Binance → CoinMarketCap → CoinPaprika → CoinCap
"""
attempted = []
# Define fallback providers for each endpoint type
fallback_configs = {
"rate": [
{"name": "coingecko", "func": _fetch_coingecko_rate},
{"name": "binance", "func": _fetch_binance_rate},
{"name": "coinmarketcap", "func": _fetch_coinmarketcap_rate},
{"name": "coinpaprika", "func": _fetch_coinpaprika_rate},
{"name": "coincap", "func": _fetch_coincap_rate}
],
"market": [
{"name": "coingecko", "func": _fetch_coingecko_market},
{"name": "binance", "func": _fetch_binance_market},
{"name": "coinmarketcap", "func": _fetch_coinmarketcap_market},
{"name": "coinpaprika", "func": _fetch_coinpaprika_market}
],
"whales": [
{"name": "whale_alert", "func": _fetch_whale_alert},
{"name": "clankapp", "func": _fetch_clankapp_whales},
{"name": "bitquery", "func": _fetch_bitquery_whales},
{"name": "etherscan_large_tx", "func": _fetch_etherscan_large_tx}
],
"sentiment": [
{"name": "alternative_me", "func": _fetch_alternative_me_sentiment},
{"name": "coingecko_social", "func": _fetch_coingecko_social},
{"name": "reddit", "func": _fetch_reddit_sentiment}
],
"onchain": [
{"name": "etherscan", "func": _fetch_etherscan_onchain},
{"name": "blockchair", "func": _fetch_blockchair_onchain},
{"name": "blockscout", "func": _fetch_blockscout_onchain},
{"name": "alchemy", "func": _fetch_alchemy_onchain}
]
}
# Get fallback chain for this endpoint
fallbacks = fallback_configs.get(endpoint, fallback_configs.get("rate", []))
# Try each fallback in order
for fallback in fallbacks[:5]: # Try up to 5 fallbacks
try:
attempted.append(fallback["name"])
logger.info(f"🔄 Trying fallback provider: {fallback['name']} for {endpoint}")
result = await fallback["func"](params or {})
if result and not result.get("error"):
logger.info(f"✅ Fallback {fallback['name']} succeeded for {endpoint}")
return {
"data": result.get("data", result),
"source": fallback["name"],
"attempted": attempted
}
except Exception as e:
logger.warning(f"⚠️ Fallback {fallback['name']} failed for {endpoint}: {e}")
continue
return {"attempted": attempted, "error": "All fallback providers failed"}
# Fallback provider functions
async def _fetch_coingecko_rate(params: Dict) -> Dict:
"""Fallback 1: CoinGecko"""
pair = params.get("pair", "BTC/USDT")
base = pair.split("/")[0].lower()
coin_id_map = {"BTC": "bitcoin", "ETH": "ethereum", "BNB": "binancecoin"}
coin_id = coin_id_map.get(base.upper(), base.lower())
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
"https://api.coingecko.com/api/v3/simple/price",
params={"ids": coin_id, "vs_currencies": "usd"}
)
response.raise_for_status()
data = response.json()
price = data.get(coin_id, {}).get("usd", 0)
return {
"data": {
"pair": pair,
"price": price,
"quote": pair.split("/")[1] if "/" in pair else "USDT",
"ts": datetime.utcnow().isoformat() + "Z"
}
}
async def _fetch_binance_rate(params: Dict) -> Dict:
"""Fallback 2: Binance"""
pair = params.get("pair", "BTC/USDT")
symbol = pair.replace("/", "").upper()
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"https://api.binance.com/api/v3/ticker/price",
params={"symbol": symbol}
)
response.raise_for_status()
data = response.json()
return {
"data": {
"pair": pair,
"price": float(data.get("price", 0)),
"quote": pair.split("/")[1] if "/" in pair else "USDT",
"ts": datetime.utcnow().isoformat() + "Z"
}
}
async def _fetch_coinmarketcap_rate(params: Dict) -> Dict:
"""Fallback 3: CoinMarketCap"""
pair = params.get("pair", "BTC/USDT")
symbol = pair.split("/")[0].upper()
api_key = os.getenv("COINMARKETCAP_API_KEY", "b54bcf4d-1bca-4e8e-9a24-22ff2c3d462c")
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
"https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/latest",
headers={"X-CMC_PRO_API_KEY": api_key},
params={"symbol": symbol, "convert": "USD"}
)
response.raise_for_status()
data = response.json()
price = data.get("data", {}).get(symbol, [{}])[0].get("quote", {}).get("USD", {}).get("price", 0)
return {
"data": {
"pair": pair,
"price": price,
"quote": "USD",
"ts": datetime.utcnow().isoformat() + "Z"
}
}
async def _fetch_coinpaprika_rate(params: Dict) -> Dict:
"""Fallback 4: CoinPaprika"""
pair = params.get("pair", "BTC/USDT")
base = pair.split("/")[0].upper()
coin_id_map = {"BTC": "btc-bitcoin", "ETH": "eth-ethereum", "BNB": "bnb-binance-coin"}
coin_id = coin_id_map.get(base, f"{base.lower()}-{base.lower()}")
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"https://api.coinpaprika.com/v1/tickers/{coin_id}"
)
response.raise_for_status()
data = response.json()
return {
"data": {
"pair": pair,
"price": float(data.get("quotes", {}).get("USD", {}).get("price", 0)),
"quote": "USD",
"ts": datetime.utcnow().isoformat() + "Z"
}
}
async def _fetch_coincap_rate(params: Dict) -> Dict:
"""Fallback 5: CoinCap"""
pair = params.get("pair", "BTC/USDT")
base = pair.split("/")[0].upper()
coin_id_map = {"BTC": "bitcoin", "ETH": "ethereum", "BNB": "binance-coin"}
coin_id = coin_id_map.get(base, base.lower())
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"https://api.coincap.io/v2/assets/{coin_id}"
)
response.raise_for_status()
data = response.json()
return {
"data": {
"pair": pair,
"price": float(data.get("data", {}).get("priceUsd", 0)),
"quote": "USD",
"ts": datetime.utcnow().isoformat() + "Z"
}
}
# Placeholder functions for other endpoints (to be implemented)
async def _fetch_coingecko_market(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_binance_market(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_coinmarketcap_market(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_coinpaprika_market(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_whale_alert(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_clankapp_whales(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_bitquery_whales(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_etherscan_large_tx(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_alternative_me_sentiment(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_coingecko_social(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_reddit_sentiment(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_etherscan_onchain(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_blockchair_onchain(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_blockscout_onchain(params: Dict) -> Dict:
return {"error": "Not implemented"}
async def _fetch_alchemy_onchain(params: Dict) -> Dict:
return {"error": "Not implemented"}
def get_endpoint_category(endpoint: str) -> str:
"""Get provider category for endpoint"""
mapping = {
"rate": "market_data",
"market": "market_data",
"pair": "market_data",
"history": "market_data",
"sentiment": "sentiment",
"whales": "onchain_analytics",
"onchain": "blockchain_explorers",
"news": "news"
}
return mapping.get(endpoint, "market_data")
def build_provider_url(provider: Dict, endpoint: str, params: Dict) -> str:
"""Build URL for provider"""
base_url = provider.get("base_url", "")
endpoints = provider.get("endpoints", {})
# Map our endpoint to provider endpoint
endpoint_mapping = {
"rate": "simple_price",
"market": "coins_markets",
"history": "market_chart"
}
provider_endpoint = endpoints.get(endpoint_mapping.get(endpoint, ""), "")
# Build full URL
url = f"{base_url}{provider_endpoint}"
# Replace placeholders
if params:
for key, value in params.items():
url = url.replace(f"{{{key}}}", str(value))
return url
def build_provider_headers(provider: Dict) -> Dict:
"""Build headers for provider request"""
headers = {"Content-Type": "application/json"}
if provider.get("requires_auth"):
auth_type = provider.get("auth_type", "header")
auth_header = provider.get("auth_header", "Authorization")
api_keys = provider.get("api_keys", [])
if api_keys and auth_type == "header":
headers[auth_header] = api_keys[0]
return headers
def normalize_provider_response(provider_id: str, endpoint: str, data: Any) -> Any:
"""Normalize provider response to our format"""
# This is simplified - in production would have specific normalizers per provider
if endpoint == "rate" and provider_id == "coingecko":
# Extract price from CoinGecko response
if isinstance(data, dict):
for coin_id, prices in data.items():
return {
"pair": f"{coin_id.upper()}/USD",
"price": prices.get("usd", 0),
"ts": datetime.utcnow().isoformat()
}
return data
# ============================================================================
# API Endpoints
# ============================================================================
@router.get("/api/service/rate")
async def get_single_rate(
pair: str = Query(..., description="Currency pair e.g. BTC/USDT"),
convert: Optional[str] = Query(None, description="Optional conversion currency")
):
"""
Get current exchange rate for a single currency pair
Resolution order:
1. HuggingFace Space (HTTP)
2. WebSocket (for real-time only)
3. External providers (CoinGecko, Binance, etc.)
"""
attempted = []
try:
# 1. Try HF first
attempted.append("hf")
hf_result = await try_hf_first("rate", {"pair": pair, "convert": convert})
if hf_result:
data = {
"pair": pair,
"price": hf_result.get("data", [{}])[0].get("price", 0),
"quote": pair.split("/")[1] if "/" in pair else "USDT",
"ts": datetime.utcnow().isoformat() + "Z"
}
# Persist to DB
db = next(get_db())
await persist_to_db(db, "rate", data, {"source": "hf"})
return {
"data": data,
"meta": build_meta("hf", cache_ttl_seconds=10)
}
# 2. Try WebSocket
attempted.append("hf-ws")
ws_result = await try_ws_exception("rate", {"pair": pair})
if ws_result:
return {
"data": ws_result,
"meta": build_meta("hf-ws", cache_ttl_seconds=5, attempted=attempted)
}
# 3. Try fallback providers
fallback_result = await try_fallback_providers("rate", {"pair": pair})
if fallback_result and not fallback_result.get("error"):
attempted.extend(fallback_result.get("attempted", []))
# Persist to DB
db = next(get_db())
await persist_to_db(db, "rate", fallback_result["data"], {"source": fallback_result["source"]})
return {
"data": fallback_result["data"],
"meta": build_meta(fallback_result["source"], attempted=attempted)
}
# All failed
attempted.extend(fallback_result.get("attempted", []))
return {
"data": None,
"meta": build_meta("none", attempted=attempted, error="DATA_NOT_AVAILABLE")
}
except Exception as e:
logger.error(f"Error in get_single_rate: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/service/rate/batch")
async def get_batch_rates(
pairs: str = Query(..., description="Comma-separated pairs e.g. BTC/USDT,ETH/USDT")
):
"""Get current rates for multiple pairs"""
pair_list = pairs.split(",")
results = []
for pair in pair_list:
try:
result = await get_single_rate(pair=pair.strip())
if result["data"]:
results.append(result["data"])
except:
continue
return {
"data": results,
"meta": build_meta("mixed", cache_ttl_seconds=10)
}
@router.get("/api/service/pair/{pair}")
async def get_pair_metadata(
pair: str = Path(..., description="Trading pair e.g. BTC-USDT or BTC/USDT")
):
"""
Get canonical metadata for a trading pair
MUST be served by HF HTTP first
"""
# Normalize pair format
normalized_pair = pair.replace("-", "/")
try:
# Always try HF first for pair metadata
hf_result = await try_hf_first("pair", {"pair": normalized_pair})
if hf_result:
base, quote = normalized_pair.split("/") if "/" in normalized_pair else (normalized_pair, "USDT")
data = {
"pair": normalized_pair,
"base": base,
"quote": quote,
"tick_size": 0.01,
"min_qty": 0.0001,
"lot_size": 0.0001
}
return {
"data": data,
"meta": build_meta("hf")
}
# Fallback with attempted tracking
attempted = ["hf"]
fallback_result = await try_fallback_providers("pair", {"pair": normalized_pair})
if fallback_result and not fallback_result.get("error"):
attempted.extend(fallback_result.get("attempted", []))
return {
"data": fallback_result["data"],
"meta": build_meta(fallback_result["source"], attempted=attempted)
}
# Default response if all fail
base, quote = normalized_pair.split("/") if "/" in normalized_pair else (normalized_pair, "USDT")
return {
"data": {
"pair": normalized_pair,
"base": base,
"quote": quote,
"tick_size": 0.01,
"min_qty": 0.0001,
"lot_size": 0.0001
},
"meta": build_meta("default", attempted=attempted)
}
except Exception as e:
logger.error(f"Error in get_pair_metadata: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/service/sentiment")
async def analyze_sentiment(
text: Optional[str] = Query(None, description="Text to analyze"),
symbol: Optional[str] = Query(None, description="Symbol to analyze"),
mode: str = Query("crypto", description="Analysis mode: news|social|crypto")
):
"""Sentiment analysis for text or symbol"""
if not text and not symbol:
raise HTTPException(status_code=400, detail="Either text or symbol required")
analysis_text = text or f"Analysis for {symbol} cryptocurrency"
try:
# Try HF first
hf_result = await try_hf_first("sentiment", {"text": analysis_text, "mode": mode})
if hf_result:
data = {
"score": hf_result.get("data", {}).get("score", 0),
"label": hf_result.get("data", {}).get("label", "neutral"),
"summary": f"Sentiment analysis indicates {hf_result.get('data', {}).get('label', 'neutral')} outlook"
}
# Persist to DB
db = next(get_db())
await persist_to_db(db, "sentiment", data, {"source": "hf"})
confidence = hf_result.get("data", {}).get("confidence", 0.7)
return {
"data": data,
"meta": build_meta("hf-model", confidence=confidence)
}
# Fallback
return {
"data": {
"score": 0.5,
"label": "neutral",
"summary": "Unable to perform sentiment analysis"
},
"meta": build_meta("none", attempted=["hf"], error="ANALYSIS_UNAVAILABLE")
}
except Exception as e:
logger.error(f"Error in analyze_sentiment: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/service/econ-analysis")
async def economic_analysis(request: EconAnalysisRequest):
"""Economic and macro analysis for a currency"""
try:
# This would integrate with AI models for analysis
analysis = f"""
Economic Analysis for {request.currency}
Period: {request.period}
Context: {request.context}
Key Findings:
- Market sentiment: Positive
- Macro factors: Favorable inflation data
- Technical indicators: Bullish trend
- Risk factors: Regulatory uncertainty
Recommendation: Monitor closely with cautious optimism
"""
return {
"data": {
"currency": request.currency,
"period": request.period,
"analysis": analysis,
"score": 0.72,
"confidence": 0.85
},
"meta": build_meta("hf-model", confidence=0.85)
}
except Exception as e:
logger.error(f"Error in economic_analysis: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/service/history")
async def get_historical_data(
symbol: str = Query(..., description="Symbol e.g. BTC"),
interval: int = Query(60, description="Interval in minutes"),
limit: int = Query(200, description="Number of candles")
):
"""Get historical OHLC data"""
try:
# Convert interval to string format
interval_map = {
1: "1m", 5: "5m", 15: "15m", 60: "1h",
240: "4h", 1440: "1d"
}
interval_str = interval_map.get(interval, "1h")
# Try HF first
hf_result = await try_hf_first("history", {
"symbol": symbol,
"interval": interval_str,
"limit": limit
})
if hf_result:
items = []
for candle in hf_result.get("data", [])[:limit]:
items.append({
"ts": candle.get("timestamp"),
"open": candle.get("open"),
"high": candle.get("high"),
"low": candle.get("low"),
"close": candle.get("close"),
"volume": candle.get("volume")
})
return {
"data": {
"symbol": symbol,
"interval": interval,
"items": items
},
"meta": build_meta("hf", cache_ttl_seconds=60)
}
# Fallback
return {
"data": {
"symbol": symbol,
"interval": interval,
"items": []
},
"meta": build_meta("none", attempted=["hf"], error="NO_HISTORICAL_DATA")
}
except Exception as e:
logger.error(f"Error in get_historical_data: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/service/market-status")
async def get_market_status():
"""Get current market overview"""
try:
# Try HF first
hf_result = await try_hf_first("market", {})
if hf_result:
items = hf_result.get("data", [])[:10]
# Calculate aggregates
total_market_cap = sum(item.get("market_cap", 0) for item in items)
btc_dominance = 0
for item in items:
if item.get("symbol") == "BTC":
btc_dominance = (item.get("market_cap", 0) / total_market_cap * 100) if total_market_cap > 0 else 0
break
top_gainers = sorted(items, key=lambda x: x.get("change_24h", 0), reverse=True)[:3]
top_losers = sorted(items, key=lambda x: x.get("change_24h", 0))[:3]
return {
"data": {
"total_market_cap": total_market_cap,
"btc_dominance": btc_dominance,
"top_gainers": top_gainers,
"top_losers": top_losers,
"active_cryptos": len(items),
"timestamp": datetime.utcnow().isoformat() + "Z"
},
"meta": build_meta("hf", cache_ttl_seconds=30)
}
# Fallback
return {
"data": None,
"meta": build_meta("none", attempted=["hf"], error="MARKET_DATA_UNAVAILABLE")
}
except Exception as e:
logger.error(f"Error in get_market_status: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/service/top")
async def get_top_coins(
n: int = Query(10, description="Number of coins (10 or 50)")
):
"""Get top N coins by market cap"""
if n not in [10, 50]:
n = 10
try:
# Try HF first
hf_result = await try_hf_first("market", {"limit": n})
if hf_result:
items = []
for i, coin in enumerate(hf_result.get("data", [])[:n], 1):
items.append({
"rank": i,
"symbol": coin.get("symbol"),
"name": coin.get("name"),
"price": coin.get("price"),
"market_cap": coin.get("market_cap"),
"change_24h": coin.get("change_24h"),
"volume_24h": coin.get("volume_24h")
})
return {
"data": items,
"meta": build_meta("hf", cache_ttl_seconds=60)
}
# Fallback
return {
"data": [],
"meta": build_meta("none", attempted=["hf"], error="DATA_NOT_AVAILABLE")
}
except Exception as e:
logger.error(f"Error in get_top_coins: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/service/whales")
async def get_whale_movements(
chain: str = Query("ethereum", description="Blockchain network"),
min_amount_usd: float = Query(100000, description="Minimum amount in USD"),
limit: int = Query(50, description="Number of transactions")
):
"""Get whale transactions"""
try:
# Try HF first
hf_result = await try_hf_first("whales", {
"chain": chain,
"min_amount_usd": min_amount_usd,
"limit": limit
})
if hf_result:
transactions = []
for tx in hf_result.get("data", [])[:limit]:
transactions.append({
"tx_hash": tx.get("hash"),
"from": tx.get("from"),
"to": tx.get("to"),
"amount_usd": tx.get("amount_usd"),
"token": tx.get("token"),
"block": tx.get("block"),
"ts": tx.get("timestamp")
})
# Persist to DB
db = next(get_db())
await persist_to_db(db, "whale", transactions, {"source": "hf"})
return {
"data": transactions,
"meta": build_meta("hf", cache_ttl_seconds=60)
}
# Fallback
return {
"data": [],
"meta": build_meta("none", attempted=["hf"], error="NO_WHALE_DATA")
}
except Exception as e:
logger.error(f"Error in get_whale_movements: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/service/onchain")
async def get_onchain_data(
address: str = Query(..., description="Wallet address"),
chain: str = Query("ethereum", description="Blockchain network"),
limit: int = Query(50, description="Number of transactions")
):
"""Get on-chain data for address"""
try:
# This would integrate with blockchain explorers
return {
"data": {
"address": address,
"chain": chain,
"balance": 0,
"token_balances": [],
"recent_transactions": [],
"total_transactions": 0
},
"meta": build_meta("etherscan", cache_ttl_seconds=60)
}
except Exception as e:
logger.error(f"Error in get_onchain_data: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/service/query")
async def generic_query(request: GenericQueryRequest):
"""
Generic query endpoint - routes to appropriate handler
Single entry point for all query types
"""
try:
query_type = request.type
payload = request.payload
if query_type == "rate":
result = await get_single_rate(
pair=payload.get("pair", "BTC/USDT"),
convert=payload.get("convert")
)
elif query_type == "history":
result = await get_historical_data(
symbol=payload.get("symbol", "BTC"),
interval=payload.get("interval", 60),
limit=payload.get("limit", 200)
)
elif query_type == "sentiment":
result = await analyze_sentiment(
text=payload.get("text"),
symbol=payload.get("symbol"),
mode=payload.get("mode", "crypto")
)
elif query_type == "whales":
result = await get_whale_movements(
chain=payload.get("chain", "ethereum"),
min_amount_usd=payload.get("min_amount_usd", 100000),
limit=payload.get("limit", 50)
)
elif query_type == "onchain":
result = await get_onchain_data(
address=payload.get("address"),
chain=payload.get("chain", "ethereum"),
limit=payload.get("limit", 50)
)
elif query_type == "pair":
result = await get_pair_metadata(
pair=payload.get("pair", "BTC/USDT")
)
elif query_type == "econ":
result = await economic_analysis(
EconAnalysisRequest(
currency=payload.get("currency", "BTC"),
period=payload.get("period", "1M"),
context=payload.get("context", "macro")
)
)
else:
raise HTTPException(status_code=400, detail=f"Unknown query type: {query_type}")
return result
except Exception as e:
logger.error(f"Error in generic_query: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# WebSocket Endpoint
# ============================================================================
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for real-time subscriptions
Subscribe format:
{
"action": "subscribe",
"service": "market_data",
"symbols": ["BTC", "ETH"]
}
"""
await ws_manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
if message.get("action") == "subscribe":
service = message.get("service")
symbols = message.get("symbols", [])
# Subscribe to channels
await websocket.send_json({
"type": "subscribed",
"service": service,
"symbols": symbols,
"timestamp": datetime.utcnow().isoformat() + "Z"
})
# Start sending updates
while True:
# Get real-time data
for symbol in symbols:
# Simulate real-time update
update = {
"type": "update",
"service": service,
"symbol": symbol,
"data": {
"price": 50000 + (hash(symbol) % 10000),
"change": (hash(symbol) % 10) - 5
},
"timestamp": datetime.utcnow().isoformat() + "Z"
}
await websocket.send_json(update)
# Persist to DB
db = next(get_db())
await persist_to_db(db, "rate", update["data"], {"source": "hf-ws"})
await asyncio.sleep(5) # Update every 5 seconds
except WebSocketDisconnect:
ws_manager.disconnect(websocket)
except Exception as e:
logger.error(f"WebSocket error: {e}")
ws_manager.disconnect(websocket)
# Export router
__all__ = ["router"]