import asyncio import logging from datetime import datetime from typing import Dict, Any from backend.orchestration.provider_manager import provider_manager from backend.services.ws_service_manager import ws_manager, ServiceType from utils.logger import setup_logger logger = setup_logger("ws_data_broadcaster") class DataBroadcaster: """ Broadcasts cryptocurrency data updates to WebSocket clients using the Provider Orchestrator for data fetching. """ def __init__(self): """Initialize the broadcaster""" self.last_broadcast = {} self.broadcast_interval = 5 # seconds for price updates self.is_running = False logger.info("DataBroadcaster initialized") async def start_broadcasting(self): """Start all broadcast tasks""" logger.info("Starting WebSocket data broadcaster...") self.is_running = True tasks = [ self.broadcast_market_data(), self.broadcast_news(), self.broadcast_sentiment(), self.broadcast_gas_prices() ] try: await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: logger.error(f"Error in broadcasting tasks: {e}", exc_info=True) finally: self.is_running = False async def stop_broadcasting(self): """Stop broadcasting""" logger.info("Stopping WebSocket data broadcaster...") self.is_running = False async def broadcast_market_data(self): """Broadcast market price updates""" logger.info("Starting market data broadcast...") while self.is_running: try: # Use Orchestrator to fetch market data # Using 30s TTL to prevent provider spam, but broadcast often response = await provider_manager.fetch_data( "market", params={"ids": "bitcoin,ethereum,tron,solana,binancecoin,ripple", "vs_currency": "usd"}, use_cache=True, ttl=10 # Short TTL for live prices if provider allows ) if response["success"] and response["data"]: coins = response["data"] # Format data for broadcast prices = {} price_changes = {} volumes = {} market_caps = {} for coin in coins: symbol = coin.get("symbol", "").upper() prices[symbol] = coin.get("current_price") price_changes[symbol] = coin.get("price_change_percentage_24h") volumes[symbol] = coin.get("total_volume") market_caps[symbol] = coin.get("market_cap") data = { "type": "market_data", "data": { "prices": prices, "volumes": volumes, "market_caps": market_caps, "price_changes": price_changes }, "count": len(coins), "timestamp": datetime.utcnow().isoformat(), "source": response["source"] } # Diff check could be here (optimization) # Broadcast to subscribed clients await ws_manager.broadcast_to_service(ServiceType.MARKET_DATA, data) logger.debug(f"Broadcasted {len(coins)} price updates from {response['source']}") except Exception as e: logger.error(f"Error broadcasting market data: {e}", exc_info=True) await asyncio.sleep(self.broadcast_interval) async def broadcast_news(self): """Broadcast news updates""" logger.info("Starting news broadcast...") while self.is_running: try: response = await provider_manager.fetch_data( "news", params={"filter": "hot"}, use_cache=True, ttl=300 ) if response["success"] and response["data"]: # Transform/Normalize data = response["data"] articles = [] if "results" in data: # CryptoPanic for post in data.get('results', [])[:5]: articles.append({ "id": str(post.get('id')), "title": post.get('title', ''), "source": post.get('source', {}).get('title', 'Unknown'), "url": post.get('url', ''), "published_at": post.get('published_at', datetime.now().isoformat()) }) elif "articles" in data: # NewsAPI for post in data.get('articles', [])[:5]: articles.append({ "id": str(hash(post.get('url', ''))), "title": post.get('title', ''), "source": post.get('source', {}).get('name', 'Unknown'), "url": post.get('url', ''), "published_at": post.get('publishedAt', datetime.now().isoformat()) }) if articles: payload = { "type": "news", "data": {"articles": articles}, "count": len(articles), "timestamp": datetime.utcnow().isoformat(), "source": response["source"] } await ws_manager.broadcast_to_service(ServiceType.NEWS, payload) logger.info(f"Broadcasted {len(articles)} news articles from {response['source']}") except Exception as e: logger.error(f"Error broadcasting news: {e}", exc_info=True) await asyncio.sleep(60) async def broadcast_sentiment(self): """Broadcast sentiment updates""" logger.info("Starting sentiment broadcast...") while self.is_running: try: response = await provider_manager.fetch_data( "sentiment", params={"limit": 1}, use_cache=True, ttl=3600 ) if response["success"] and response["data"]: data = response["data"] fng_value = 50 classification = "Neutral" if data.get('data'): item = data['data'][0] fng_value = int(item.get('value', 50)) classification = item.get('value_classification', 'Neutral') payload = { "type": "sentiment", "data": { "fear_greed_index": fng_value, "classification": classification, "timestamp": datetime.utcnow().isoformat() }, "timestamp": datetime.utcnow().isoformat(), "source": response["source"] } await ws_manager.broadcast_to_service(ServiceType.SENTIMENT, payload) logger.info(f"Broadcasted sentiment: {fng_value} from {response['source']}") except Exception as e: logger.error(f"Error broadcasting sentiment: {e}", exc_info=True) await asyncio.sleep(60) async def broadcast_gas_prices(self): """Broadcast gas price updates""" logger.info("Starting gas price broadcast...") while self.is_running: try: response = await provider_manager.fetch_data( "onchain", params={}, use_cache=True, ttl=15 ) if response["success"] and response["data"]: data = response["data"] result = data.get("result", {}) if result: payload = { "type": "gas_prices", "data": { "fast": result.get("FastGasPrice"), "standard": result.get("ProposeGasPrice"), "slow": result.get("SafeGasPrice") }, "timestamp": datetime.utcnow().isoformat(), "source": response["source"] } # Broadcast to RPC_NODES service type (gas prices are blockchain-related) await ws_manager.broadcast_to_service(ServiceType.RPC_NODES, payload) logger.debug(f"Broadcasted gas prices from {response['source']}") except Exception as e: logger.error(f"Error broadcasting gas prices: {e}", exc_info=True) await asyncio.sleep(30) # Global broadcaster instance broadcaster = DataBroadcaster()