Really-amin's picture
Upload 553 files
386790e verified
#!/usr/bin/env python3
"""
Data Hub Complete API Router
=============================
✅ تمام endpoint های داده‌های کریپتو
✅ استفاده از کلیدهای API جدید
✅ سیستم Fallback خودکار
✅ WebSocket Support
"""
from fastapi import APIRouter, HTTPException, Query, Body, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
from typing import Optional, List, Dict, Any
from datetime import datetime
from pydantic import BaseModel
import logging
import json
import uuid
# Import Data Hub Complete
from backend.services.data_hub_complete import get_data_hub
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/v2/data-hub",
tags=["Data Hub Complete"]
)
# Get singleton Data Hub instance
data_hub = get_data_hub()
# ============================================================================
# Pydantic Models
# ============================================================================
class MarketRequest(BaseModel):
"""درخواست داده‌های بازار"""
symbols: Optional[List[str]] = None
limit: int = 100
source: str = "auto"
class OHLCVRequest(BaseModel):
"""درخواست داده‌های OHLCV"""
symbol: str
interval: str = "1h"
limit: int = 100
source: str = "auto"
class SentimentRequest(BaseModel):
"""درخواست تحلیل احساسات"""
text: str
source: str = "huggingface"
class NewsRequest(BaseModel):
"""درخواست اخبار"""
query: str = "cryptocurrency"
limit: int = 20
source: str = "auto"
class BlockchainRequest(BaseModel):
"""درخواست داده‌های بلاکچین"""
chain: str
data_type: str = "transactions"
address: Optional[str] = None
limit: int = 20
class WhaleRequest(BaseModel):
"""درخواست فعالیت نهنگ‌ها"""
chain: str = "all"
min_value_usd: float = 1000000
limit: int = 50
class SocialMediaRequest(BaseModel):
"""درخواست داده‌های شبکه‌های اجتماعی"""
platform: str = "reddit"
query: str = "cryptocurrency"
limit: int = 20
class AIRequest(BaseModel):
"""درخواست پیش‌بینی AI"""
symbol: str
model_type: str = "price"
timeframe: str = "24h"
# ============================================================================
# 1. Market Data Endpoints - داده‌های قیمت بازار
# ============================================================================
@router.get("/market/prices")
async def get_market_prices(
symbols: Optional[str] = Query(None, description="Comma-separated symbols (e.g., BTC,ETH)"),
limit: int = Query(100, description="Number of results"),
source: str = Query("auto", description="Data source: auto, coinmarketcap, coingecko, binance")
):
"""
دریافت قیمت‌های لحظه‌ای بازار
Sources:
- CoinMarketCap (with new API key)
- CoinGecko (free)
- Binance (free)
- HuggingFace
Returns: قیمت، تغییرات 24 ساعته، حجم معاملات، Market Cap
"""
try:
symbol_list = None
if symbols:
symbol_list = [s.strip().upper() for s in symbols.split(',')]
result = await data_hub.get_market_prices(
symbols=symbol_list,
limit=limit,
source=source
)
if not result.get("success"):
raise HTTPException(status_code=503, detail=result.get("error", "Failed to fetch market data"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Market prices error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/market/prices")
async def post_market_prices(request: MarketRequest):
"""
دریافت قیمت‌های بازار (POST method)
"""
try:
result = await data_hub.get_market_prices(
symbols=request.symbols,
limit=request.limit,
source=request.source
)
if not result.get("success"):
raise HTTPException(status_code=503, detail=result.get("error", "Failed to fetch market data"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Market prices error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/market/top")
async def get_top_coins(
limit: int = Query(10, description="Number of top coins")
):
"""
دریافت Top N ارزهای برتر بر اساس Market Cap
"""
try:
result = await data_hub.get_market_prices(limit=limit, source="auto")
if result.get("success") and result.get("data"):
# Sort by market cap
data = sorted(result["data"], key=lambda x: x.get("market_cap", 0), reverse=True)
result["data"] = data[:limit]
return result
except Exception as e:
logger.error(f"❌ Top coins error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# 2. OHLCV Data Endpoints - داده‌های تاریخی
# ============================================================================
@router.get("/market/ohlcv")
async def get_ohlcv_data(
symbol: str = Query(..., description="Symbol (e.g., BTC, ETH)"),
interval: str = Query("1h", description="Interval: 1m, 5m, 15m, 1h, 4h, 1d"),
limit: int = Query(100, description="Number of candles"),
source: str = Query("auto", description="Data source: auto, binance, huggingface")
):
"""
دریافت داده‌های OHLCV (کندل استیک)
Sources:
- Binance (best for OHLCV)
- HuggingFace
Returns: Open, High, Low, Close, Volume for each candle
"""
try:
result = await data_hub.get_ohlcv_data(
symbol=symbol.upper(),
interval=interval,
limit=limit,
source=source
)
if not result.get("success"):
raise HTTPException(status_code=503, detail=result.get("error", "Failed to fetch OHLCV data"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ OHLCV error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/market/ohlcv")
async def post_ohlcv_data(request: OHLCVRequest):
"""
دریافت داده‌های OHLCV (POST method)
"""
try:
result = await data_hub.get_ohlcv_data(
symbol=request.symbol.upper(),
interval=request.interval,
limit=request.limit,
source=request.source
)
if not result.get("success"):
raise HTTPException(status_code=503, detail=result.get("error", "Failed to fetch OHLCV data"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ OHLCV error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# 3. Sentiment Data Endpoints - داده‌های احساسات
# ============================================================================
@router.get("/sentiment/fear-greed")
async def get_fear_greed_index():
"""
دریافت شاخص ترس و طمع (Fear & Greed Index)
Source: Alternative.me
Returns:
- مقدار شاخص (0-100)
- طبقه‌بندی (Extreme Fear, Fear, Neutral, Greed, Extreme Greed)
- تاریخچه 30 روزه
"""
try:
result = await data_hub.get_fear_greed_index()
return result
except Exception as e:
logger.error(f"❌ Fear & Greed error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/sentiment/analyze")
async def analyze_sentiment(request: SentimentRequest):
"""
تحلیل احساسات متن با AI
Source: HuggingFace Models
Returns:
- Label: POSITIVE, NEGATIVE, NEUTRAL
- Score (0-1)
- Confidence
"""
try:
result = await data_hub.analyze_sentiment(
text=request.text,
source=request.source
)
if not result.get("success"):
raise HTTPException(status_code=503, detail=result.get("error", "Sentiment analysis failed"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Sentiment analysis error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/sentiment/batch")
async def batch_sentiment_analysis(texts: List[str] = Body(...)):
"""
تحلیل احساسات دسته‌ای برای چندین متن
"""
try:
results = []
for text in texts[:50]: # Limit to 50 texts
result = await data_hub.analyze_sentiment(text=text)
results.append({
"text": text[:100], # First 100 chars
"sentiment": result.get("data", {}) if result.get("success") else None,
"error": result.get("error") if not result.get("success") else None
})
return {
"success": True,
"total": len(results),
"results": results,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ Batch sentiment error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# 4. News Endpoints - داده‌های اخبار
# ============================================================================
@router.get("/news")
async def get_crypto_news(
query: str = Query("cryptocurrency", description="Search query"),
limit: int = Query(20, description="Number of articles"),
source: str = Query("auto", description="Source: auto, newsapi, reddit")
):
"""
دریافت اخبار ارزهای دیجیتال
Sources:
- NewsAPI (with new API key)
- Reddit (r/CryptoCurrency, r/Bitcoin, etc.)
- HuggingFace
Returns: Title, Description, URL, Source, Published Date
"""
try:
result = await data_hub.get_crypto_news(
query=query,
limit=limit,
source=source
)
if not result.get("success"):
raise HTTPException(status_code=503, detail=result.get("error", "Failed to fetch news"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ News error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/news")
async def post_crypto_news(request: NewsRequest):
"""
دریافت اخبار (POST method)
"""
try:
result = await data_hub.get_crypto_news(
query=request.query,
limit=request.limit,
source=request.source
)
if not result.get("success"):
raise HTTPException(status_code=503, detail=result.get("error", "Failed to fetch news"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ News error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/news/latest/{symbol}")
async def get_latest_news_for_symbol(
symbol: str,
limit: int = Query(10, description="Number of articles")
):
"""
دریافت آخرین اخبار برای یک سمبل خاص
"""
try:
query = f"{symbol} cryptocurrency"
result = await data_hub.get_crypto_news(query=query, limit=limit)
if result.get("success"):
result["symbol"] = symbol.upper()
return result
except Exception as e:
logger.error(f"❌ Symbol news error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# 5. Trending Data Endpoints - داده‌های ترندینگ
# ============================================================================
@router.get("/trending")
async def get_trending_coins():
"""
دریافت ارزهای ترند روز
Source: CoinGecko
Returns: لیست ارزهای ترند با رتبه و امتیاز
"""
try:
result = await data_hub.get_trending_coins()
return result
except Exception as e:
logger.error(f"❌ Trending error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/trending/search")
async def search_trending(
query: str = Query(..., description="Search query")
):
"""
جستجو در ارزهای ترند
"""
try:
result = await data_hub.get_trending_coins()
if result.get("success") and result.get("trending"):
# Filter by query
filtered = [
coin for coin in result["trending"]
if query.lower() in coin.get("name", "").lower() or
query.lower() in coin.get("symbol", "").lower()
]
result["trending"] = filtered
result["filtered_by"] = query
return result
except Exception as e:
logger.error(f"❌ Trending search error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# 6. Blockchain Data Endpoints - داده‌های بلاکچین
# ============================================================================
@router.get("/blockchain/{chain}")
async def get_blockchain_data(
chain: str,
data_type: str = Query("transactions", description="Type: transactions, balance, gas"),
address: Optional[str] = Query(None, description="Wallet address"),
limit: int = Query(20, description="Number of results")
):
"""
دریافت داده‌های بلاکچین
Chains: ethereum, bsc, tron
Sources:
- Etherscan (with new API key)
- BSCScan (with new API key)
- TronScan (with new API key)
Types:
- transactions: لیست تراکنش‌ها
- balance: موجودی آدرس
- gas: قیمت گس
"""
try:
result = await data_hub.get_blockchain_data(
chain=chain.lower(),
data_type=data_type,
address=address,
limit=limit
)
if not result.get("success"):
raise HTTPException(status_code=503, detail=result.get("error", "Failed to fetch blockchain data"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Blockchain data error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/blockchain")
async def post_blockchain_data(request: BlockchainRequest):
"""
دریافت داده‌های بلاکچین (POST method)
"""
try:
result = await data_hub.get_blockchain_data(
chain=request.chain.lower(),
data_type=request.data_type,
address=request.address,
limit=request.limit
)
if not result.get("success"):
raise HTTPException(status_code=503, detail=result.get("error", "Failed to fetch blockchain data"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Blockchain data error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/blockchain/{chain}/gas")
async def get_gas_prices(chain: str):
"""
دریافت قیمت گس برای بلاکچین مشخص
"""
try:
result = await data_hub.get_blockchain_data(
chain=chain.lower(),
data_type="gas"
)
return result
except Exception as e:
logger.error(f"❌ Gas prices error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# 7. Whale Activity Endpoints - فعالیت نهنگ‌ها
# ============================================================================
@router.get("/whales")
async def get_whale_activity(
chain: str = Query("all", description="Blockchain: all, ethereum, bsc, tron"),
min_value_usd: float = Query(1000000, description="Minimum transaction value in USD"),
limit: int = Query(50, description="Number of transactions")
):
"""
دریافت فعالیت نهنگ‌ها (تراکنش‌های بزرگ)
Returns:
- تراکنش‌های بالای $1M
- جهت حرکت (IN/OUT از صرافی‌ها)
- آدرس‌های مبدا و مقصد
"""
try:
result = await data_hub.get_whale_activity(
chain=chain,
min_value_usd=min_value_usd,
limit=limit
)
return result
except Exception as e:
logger.error(f"❌ Whale activity error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/whales")
async def post_whale_activity(request: WhaleRequest):
"""
دریافت فعالیت نهنگ‌ها (POST method)
"""
try:
result = await data_hub.get_whale_activity(
chain=request.chain,
min_value_usd=request.min_value_usd,
limit=request.limit
)
return result
except Exception as e:
logger.error(f"❌ Whale activity error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# 8. Social Media Endpoints - داده‌های شبکه‌های اجتماعی
# ============================================================================
@router.get("/social/{platform}")
async def get_social_media_data(
platform: str,
query: str = Query("cryptocurrency", description="Search query"),
limit: int = Query(20, description="Number of posts")
):
"""
دریافت داده‌های شبکه‌های اجتماعی
Platforms: reddit
Returns:
- پست‌های Reddit از subreddit های کریپتو
- امتیاز، تعداد کامنت، تاریخ
"""
try:
result = await data_hub.get_social_media_data(
platform=platform.lower(),
query=query,
limit=limit
)
if not result.get("success"):
raise HTTPException(status_code=503, detail=result.get("error", "Failed to fetch social data"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Social media error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/social")
async def post_social_media_data(request: SocialMediaRequest):
"""
دریافت داده‌های شبکه‌های اجتماعی (POST method)
"""
try:
result = await data_hub.get_social_media_data(
platform=request.platform.lower(),
query=request.query,
limit=request.limit
)
if not result.get("success"):
raise HTTPException(status_code=503, detail=result.get("error", "Failed to fetch social data"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Social media error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# 9. AI Predictions Endpoints - پیش‌بینی‌های AI
# ============================================================================
@router.get("/ai/predict/{symbol}")
async def get_ai_prediction(
symbol: str,
model_type: str = Query("price", description="Type: price, trend, signal"),
timeframe: str = Query("24h", description="Timeframe: 1h, 4h, 24h, 7d")
):
"""
دریافت پیش‌بینی از مدل‌های AI
Source: HuggingFace Models
Types:
- price: پیش‌بینی قیمت
- trend: پیش‌بینی روند
- signal: سیگنال خرید/فروش
"""
try:
result = await data_hub.get_ai_prediction(
symbol=symbol.upper(),
model_type=model_type,
timeframe=timeframe
)
return result
except Exception as e:
logger.error(f"❌ AI prediction error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ai/predict")
async def post_ai_prediction(request: AIRequest):
"""
دریافت پیش‌بینی AI (POST method)
"""
try:
result = await data_hub.get_ai_prediction(
symbol=request.symbol.upper(),
model_type=request.model_type,
timeframe=request.timeframe
)
return result
except Exception as e:
logger.error(f"❌ AI prediction error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# 10. Combined Data Endpoints - داده‌های ترکیبی
# ============================================================================
@router.get("/overview/{symbol}")
async def get_symbol_overview(symbol: str):
"""
دریافت نمای کلی یک سمبل (ترکیبی از همه داده‌ها)
Returns:
- قیمت و آمار بازار
- آخرین اخبار
- تحلیل احساسات
- پیش‌بینی AI
"""
try:
overview = {}
# Get market data
market = await data_hub.get_market_prices(symbols=[symbol.upper()], limit=1)
if market.get("success") and market.get("data"):
overview["market"] = market["data"][0] if market["data"] else None
# Get latest news
news = await data_hub.get_crypto_news(query=f"{symbol} cryptocurrency", limit=5)
if news.get("success"):
overview["news"] = news.get("articles", [])
# Get AI prediction
prediction = await data_hub.get_ai_prediction(symbol=symbol.upper())
if prediction.get("success"):
overview["prediction"] = prediction.get("prediction")
# Get OHLCV data for chart
ohlcv = await data_hub.get_ohlcv_data(symbol=symbol.upper(), interval="1h", limit=24)
if ohlcv.get("success"):
overview["chart_data"] = ohlcv.get("data", [])
return {
"success": True,
"symbol": symbol.upper(),
"overview": overview,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ Symbol overview error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/dashboard")
async def get_dashboard_data():
"""
دریافت داده‌های داشبورد کامل
Returns:
- Top 10 coins
- Fear & Greed Index
- Latest news
- Trending coins
- Whale activities
"""
try:
dashboard = {}
# Get top coins
market = await data_hub.get_market_prices(limit=10)
if market.get("success"):
dashboard["top_coins"] = market.get("data", [])
# Get Fear & Greed
fg = await data_hub.get_fear_greed_index()
if fg.get("success"):
dashboard["fear_greed"] = fg.get("current", {})
# Get latest news
news = await data_hub.get_crypto_news(limit=10)
if news.get("success"):
dashboard["latest_news"] = news.get("articles", [])
# Get trending
trending = await data_hub.get_trending_coins()
if trending.get("success"):
dashboard["trending"] = trending.get("trending", [])[:5]
# Get whale activity
whales = await data_hub.get_whale_activity(limit=10)
if whales.get("success"):
dashboard["whale_activity"] = whales.get("data", {})
return {
"success": True,
"dashboard": dashboard,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ Dashboard error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# System Health Endpoints - سلامت سیستم
# ============================================================================
@router.get("/health")
async def health_check():
"""
بررسی سلامت Data Hub
"""
try:
health = await data_hub.check_all_sources_health()
return health
except Exception as e:
logger.error(f"❌ Health check error: {e}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
@router.get("/status")
async def get_status():
"""
دریافت وضعیت کامل سیستم
"""
try:
health = await data_hub.check_all_sources_health()
return {
"success": True,
"status": "operational" if health.get("operational_count", 0) > 5 else "degraded",
"sources": health.get("status", {}),
"statistics": {
"operational": health.get("operational_count", 0),
"total": health.get("total_sources", 0),
"uptime_percentage": (health.get("operational_count", 0) / health.get("total_sources", 1)) * 100
},
"api_keys": {
"coinmarketcap": "✅ Configured",
"newsapi": "✅ Configured",
"etherscan": "✅ Configured",
"bscscan": "✅ Configured",
"tronscan": "✅ Configured",
"huggingface": "✅ Configured"
},
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ Status error: {e}")
return {
"success": False,
"status": "error",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
@router.get("/sources")
async def get_data_sources():
"""
لیست منابع داده و قابلیت‌های آنها
"""
sources = {
"market_data": [
{"name": "CoinMarketCap", "capabilities": ["prices", "market_cap", "volume"], "status": "active"},
{"name": "CoinGecko", "capabilities": ["prices", "trending"], "status": "active"},
{"name": "Binance", "capabilities": ["prices", "ohlcv", "24hr_tickers"], "status": "active"}
],
"blockchain": [
{"name": "Etherscan", "capabilities": ["eth_transactions", "gas_prices", "balances"], "status": "active"},
{"name": "BSCScan", "capabilities": ["bsc_transactions", "token_info"], "status": "active"},
{"name": "TronScan", "capabilities": ["tron_transactions", "tron_blocks"], "status": "active"}
],
"news": [
{"name": "NewsAPI", "capabilities": ["crypto_news", "headlines"], "status": "active"},
{"name": "Reddit", "capabilities": ["posts", "sentiment"], "status": "active"}
],
"sentiment": [
{"name": "Alternative.me", "capabilities": ["fear_greed_index"], "status": "active"},
{"name": "HuggingFace", "capabilities": ["text_sentiment", "ai_analysis"], "status": "active"}
],
"ai": [
{"name": "HuggingFace", "capabilities": ["price_prediction", "trend_analysis", "signals"], "status": "active"}
]
}
return {
"success": True,
"sources": sources,
"total_sources": sum(len(v) for v in sources.values()),
"timestamp": datetime.utcnow().isoformat()
}
# ============================================================================
# WebSocket Endpoint - Real-time Updates
# ============================================================================
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.subscriptions: Dict[str, List[str]] = {}
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.active_connections[client_id] = websocket
self.subscriptions[client_id] = []
logger.info(f"✅ WebSocket connected: {client_id}")
async def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
if client_id in self.subscriptions:
del self.subscriptions[client_id]
logger.info(f"❌ WebSocket disconnected: {client_id}")
async def send_message(self, client_id: str, message: dict):
if client_id in self.active_connections:
websocket = self.active_connections[client_id]
await websocket.send_json(message)
async def broadcast(self, message: dict, channel: str = None):
for client_id, websocket in self.active_connections.items():
if channel is None or channel in self.subscriptions.get(client_id, []):
try:
await websocket.send_json(message)
except:
await self.disconnect(client_id)
manager = ConnectionManager()
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket برای دریافت داده‌های Real-time
Channels:
- prices: قیمت‌های لحظه‌ای
- news: اخبار جدید
- whales: فعالیت نهنگ‌ها
- sentiment: تحلیل احساسات
"""
client_id = str(uuid.uuid4())
try:
await manager.connect(websocket, client_id)
# Send welcome message
await manager.send_message(client_id, {
"type": "connected",
"client_id": client_id,
"timestamp": datetime.utcnow().isoformat()
})
while True:
# Receive message from client
data = await websocket.receive_text()
message = json.loads(data)
action = message.get("action")
if action == "subscribe":
channels = message.get("channels", [])
manager.subscriptions[client_id] = channels
await manager.send_message(client_id, {
"type": "subscribed",
"channels": channels,
"timestamp": datetime.utcnow().isoformat()
})
# Start sending data for subscribed channels
if "prices" in channels:
# Send initial price data
prices = await data_hub.get_market_prices(limit=10)
await manager.send_message(client_id, {
"type": "price_update",
"data": prices,
"timestamp": datetime.utcnow().isoformat()
})
elif action == "unsubscribe":
manager.subscriptions[client_id] = []
await manager.send_message(client_id, {
"type": "unsubscribed",
"timestamp": datetime.utcnow().isoformat()
})
elif action == "ping":
await manager.send_message(client_id, {
"type": "pong",
"timestamp": datetime.utcnow().isoformat()
})
except WebSocketDisconnect:
await manager.disconnect(client_id)
logger.info(f"WebSocket client {client_id} disconnected")
except Exception as e:
logger.error(f"WebSocket error: {e}")
await manager.disconnect(client_id)
# Export router
__all__ = ["router"]