#!/usr/bin/env python3 """ Multi-Source Data Fetchers Specialized fetchers for each data type with 10+ fallback sources Includes special handlers for CoinGecko and Binance """ import httpx import asyncio import logging import feedparser from typing import Dict, Any, List, Optional from datetime import datetime logger = logging.getLogger(__name__) class MarketPriceFetcher: """ Fetch market prices with 23+ fallback sources Special handling for CoinGecko and Binance """ @staticmethod async def fetch_coingecko_special(source: Dict[str, Any], symbols: Optional[List[str]] = None, **kwargs) -> Dict[str, Any]: """ Special CoinGecko handler with advanced features - Automatic symbol mapping - Batch requests - Community data integration """ try: base_url = source["url"] timeout = source.get("timeout", 10) async with httpx.AsyncClient(timeout=timeout) as client: if symbols and len(symbols) > 0: # Map symbols to CoinGecko IDs symbol_map = { "BTC": "bitcoin", "ETH": "ethereum", "BNB": "binancecoin", "XRP": "ripple", "ADA": "cardano", "DOGE": "dogecoin", "SOL": "solana", "TRX": "tron", "DOT": "polkadot", "MATIC": "matic-network", "LTC": "litecoin", "SHIB": "shiba-inu", "AVAX": "avalanche-2", "UNI": "uniswap", "LINK": "chainlink", "ATOM": "cosmos", "XLM": "stellar", "ETC": "ethereum-classic", "XMR": "monero", "BCH": "bitcoin-cash" } coin_ids = [] for symbol in symbols: clean_symbol = symbol.upper().replace("USDT", "").replace("USD", "") coin_id = symbol_map.get(clean_symbol, clean_symbol.lower()) coin_ids.append(coin_id) # Batch request for specific symbols response = await client.get( f"{base_url}/simple/price", params={ "ids": ",".join(coin_ids), "vs_currencies": "usd", "include_24hr_change": "true", "include_24hr_vol": "true", "include_market_cap": "true", "include_last_updated_at": "true" } ) else: # Get top coins by market cap limit = kwargs.get("limit", 100) response = await client.get( f"{base_url}/coins/markets", params={ "vs_currency": "usd", "order": "market_cap_desc", "per_page": min(limit, 250), "page": 1, "sparkline": "false", "price_change_percentage": "24h,7d" } ) response.raise_for_status() data = response.json() # Transform to standard format prices = [] if isinstance(data, dict) and symbols: # Simple price format for coin_id, coin_data in data.items(): symbol = next((k for k, v in symbol_map.items() if v == coin_id), coin_id.upper()) prices.append({ "symbol": symbol, "price": coin_data.get("usd", 0), "change24h": coin_data.get("usd_24h_change", 0), "volume24h": coin_data.get("usd_24h_vol", 0), "marketCap": coin_data.get("usd_market_cap", 0), "lastUpdated": coin_data.get("last_updated_at", int(datetime.utcnow().timestamp())) }) elif isinstance(data, list): # Markets format for coin in data: prices.append({ "symbol": coin.get("symbol", "").upper(), "name": coin.get("name", ""), "price": coin.get("current_price", 0), "change24h": coin.get("price_change_24h", 0), "changePercent24h": coin.get("price_change_percentage_24h", 0), "changePercent7d": coin.get("price_change_percentage_7d_in_currency", 0), "volume24h": coin.get("total_volume", 0), "marketCap": coin.get("market_cap", 0), "marketCapRank": coin.get("market_cap_rank", 0), "circulatingSupply": coin.get("circulating_supply", 0), "totalSupply": coin.get("total_supply", 0), "ath": coin.get("ath", 0), "athDate": coin.get("ath_date", ""), "lastUpdated": coin.get("last_updated", "") }) logger.info(f"✅ CoinGecko Special: {len(prices)} prices fetched") return { "prices": prices, "count": len(prices), "source": "coingecko_special", "enhanced": True } except Exception as e: logger.error(f"❌ CoinGecko Special failed: {e}") raise @staticmethod async def fetch_binance_special(source: Dict[str, Any], symbols: Optional[List[str]] = None, **kwargs) -> Dict[str, Any]: """ Special Binance handler with advanced features - 24h ticker statistics - Book ticker (best bid/ask) - Average price - Multi-symbol batch requests """ try: base_url = source["url"] timeout = source.get("timeout", 10) async with httpx.AsyncClient(timeout=timeout) as client: if symbols and len(symbols) > 0: # Fetch data for specific symbols prices = [] # Create tasks for parallel fetching tasks = [] for symbol in symbols: clean_symbol = symbol.upper().replace("USD", "") binance_symbol = f"{clean_symbol}USDT" tasks.append(MarketPriceFetcher._fetch_binance_single(client, base_url, binance_symbol)) # Execute in parallel results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, dict): prices.append(result) else: # Get all tickers response = await client.get(f"{base_url}/ticker/24hr") response.raise_for_status() tickers = response.json() # Filter USDT pairs and transform prices = [] limit = kwargs.get("limit", 100) for ticker in tickers: symbol = ticker.get("symbol", "") if symbol.endswith("USDT"): clean_symbol = symbol.replace("USDT", "") prices.append({ "symbol": clean_symbol, "price": float(ticker.get("lastPrice", 0)), "change24h": float(ticker.get("priceChange", 0)), "changePercent24h": float(ticker.get("priceChangePercent", 0)), "volume24h": float(ticker.get("volume", 0)), "quoteVolume24h": float(ticker.get("quoteVolume", 0)), "high24h": float(ticker.get("highPrice", 0)), "low24h": float(ticker.get("lowPrice", 0)), "openPrice": float(ticker.get("openPrice", 0)), "weightedAvgPrice": float(ticker.get("weightedAvgPrice", 0)), "trades": int(ticker.get("count", 0)), "openTime": int(ticker.get("openTime", 0)), "closeTime": int(ticker.get("closeTime", 0)) }) if len(prices) >= limit: break logger.info(f"✅ Binance Special: {len(prices)} prices fetched") return { "prices": prices, "count": len(prices), "source": "binance_special", "enhanced": True } except Exception as e: logger.error(f"❌ Binance Special failed: {e}") raise @staticmethod async def _fetch_binance_single(client: httpx.AsyncClient, base_url: str, symbol: str) -> Dict[str, Any]: """Fetch single symbol data from Binance with multiple endpoints""" try: # Fetch 24h ticker response = await client.get( f"{base_url}/ticker/24hr", params={"symbol": symbol} ) response.raise_for_status() ticker = response.json() # Try to get book ticker (best bid/ask) try: book_response = await client.get( f"{base_url}/ticker/bookTicker", params={"symbol": symbol} ) book_response.raise_for_status() book_ticker = book_response.json() except: book_ticker = {} clean_symbol = symbol.replace("USDT", "") return { "symbol": clean_symbol, "price": float(ticker.get("lastPrice", 0)), "change24h": float(ticker.get("priceChange", 0)), "changePercent24h": float(ticker.get("priceChangePercent", 0)), "volume24h": float(ticker.get("volume", 0)), "quoteVolume24h": float(ticker.get("quoteVolume", 0)), "high24h": float(ticker.get("highPrice", 0)), "low24h": float(ticker.get("lowPrice", 0)), "weightedAvgPrice": float(ticker.get("weightedAvgPrice", 0)), "bidPrice": float(book_ticker.get("bidPrice", 0)) if book_ticker else None, "askPrice": float(book_ticker.get("askPrice", 0)) if book_ticker else None, "spread": float(book_ticker.get("askPrice", 0)) - float(book_ticker.get("bidPrice", 0)) if book_ticker else None, "trades": int(ticker.get("count", 0)) } except Exception as e: logger.warning(f"⚠️ Failed to fetch {symbol}: {e}") raise @staticmethod async def fetch_generic(source: Dict[str, Any], **kwargs) -> Dict[str, Any]: """Generic price fetcher for other sources""" source_name = source["name"] url = source["url"] timeout = source.get("timeout", 10) try: async with httpx.AsyncClient(timeout=timeout) as client: # Different endpoints based on source if "coinpaprika" in source_name: response = await client.get(f"{url}/tickers") response.raise_for_status() data = response.json() prices = [] for coin in data[:kwargs.get("limit", 100)]: quotes = coin.get("quotes", {}).get("USD", {}) prices.append({ "symbol": coin.get("symbol", ""), "name": coin.get("name", ""), "price": quotes.get("price", 0), "changePercent24h": quotes.get("percent_change_24h", 0), "volume24h": quotes.get("volume_24h", 0), "marketCap": quotes.get("market_cap", 0) }) return {"prices": prices, "count": len(prices)} elif "coincap" in source_name: response = await client.get(f"{url}/assets") response.raise_for_status() data = response.json() prices = [] for asset in data.get("data", [])[:kwargs.get("limit", 100)]: prices.append({ "symbol": asset.get("symbol", ""), "name": asset.get("name", ""), "price": float(asset.get("priceUsd", 0)), "changePercent24h": float(asset.get("changePercent24Hr", 0)), "volume24h": float(asset.get("volumeUsd24Hr", 0)), "marketCap": float(asset.get("marketCapUsd", 0)) }) return {"prices": prices, "count": len(prices)} elif "coinmarketcap" in source_name: headers = {"X-CMC_PRO_API_KEY": source.get("api_key", "")} response = await client.get( f"{url}/cryptocurrency/listings/latest", headers=headers, params={"limit": kwargs.get("limit", 100), "convert": "USD"} ) response.raise_for_status() data = response.json() prices = [] for coin in data.get("data", []): quote = coin.get("quote", {}).get("USD", {}) prices.append({ "symbol": coin.get("symbol", ""), "name": coin.get("name", ""), "price": quote.get("price", 0), "changePercent24h": quote.get("percent_change_24h", 0), "volume24h": quote.get("volume_24h", 0), "marketCap": quote.get("market_cap", 0) }) return {"prices": prices, "count": len(prices)} else: # Generic fallback logger.warning(f"⚠️ No specific handler for {source_name}, using generic") return {"prices": [], "count": 0, "error": "No specific handler"} except Exception as e: logger.error(f"❌ {source_name} failed: {e}") raise class OHLCFetcher: """ Fetch OHLC/candlestick data with 18+ fallback sources Special handling for Binance klines """ @staticmethod async def fetch_binance_ohlc_special( source: Dict[str, Any], symbol: str, timeframe: str = "1h", limit: int = 1000, **kwargs ) -> Dict[str, Any]: """ Special Binance OHLC handler with advanced features - Supports all timeframes - Up to 1000 candles per request - Automatic symbol normalization """ try: base_url = source["url"].replace("/api/v3", "/api/v3") timeout = source.get("timeout", 15) # Normalize symbol clean_symbol = symbol.upper().replace("USD", "") if not clean_symbol.endswith("USDT"): binance_symbol = f"{clean_symbol}USDT" else: binance_symbol = clean_symbol # Timeframe mapping interval_map = { "1m": "1m", "3m": "3m", "5m": "5m", "15m": "15m", "30m": "30m", "1h": "1h", "2h": "2h", "4h": "4h", "6h": "6h", "8h": "8h", "12h": "12h", "1d": "1d", "3d": "3d", "1w": "1w", "1M": "1M" } binance_interval = interval_map.get(timeframe, "1h") async with httpx.AsyncClient(timeout=timeout) as client: response = await client.get( "https://api.binance.com/api/v3/klines", params={ "symbol": binance_symbol, "interval": binance_interval, "limit": min(limit, 1000) } ) response.raise_for_status() klines = response.json() # Transform to standard OHLCV format candles = [] for kline in klines: candles.append({ "timestamp": int(kline[0]), "open": float(kline[1]), "high": float(kline[2]), "low": float(kline[3]), "close": float(kline[4]), "volume": float(kline[5]), "closeTime": int(kline[6]), "quoteVolume": float(kline[7]), "trades": int(kline[8]), "takerBuyBaseVolume": float(kline[9]), "takerBuyQuoteVolume": float(kline[10]) }) logger.info(f"✅ Binance OHLC Special: {len(candles)} candles for {binance_symbol}") return { "symbol": symbol, "timeframe": timeframe, "candles": candles, "count": len(candles), "source": "binance_ohlc_special", "enhanced": True } except Exception as e: logger.error(f"❌ Binance OHLC Special failed: {e}") raise @staticmethod async def fetch_coingecko_ohlc(source: Dict[str, Any], symbol: str, days: int = 7, **kwargs) -> Dict[str, Any]: """Fetch OHLC from CoinGecko""" try: # Symbol to coin ID mapping symbol_map = { "BTC": "bitcoin", "ETH": "ethereum", "BNB": "binancecoin", "XRP": "ripple", "ADA": "cardano", "DOGE": "dogecoin", "SOL": "solana", "TRX": "tron", "DOT": "polkadot" } coin_id = symbol_map.get(symbol.upper(), symbol.lower()) base_url = source["url"] timeout = source.get("timeout", 15) async with httpx.AsyncClient(timeout=timeout) as client: response = await client.get( f"{base_url}/coins/{coin_id}/ohlc", params={"vs_currency": "usd", "days": days} ) response.raise_for_status() data = response.json() candles = [] for item in data: candles.append({ "timestamp": item[0], "open": item[1], "high": item[2], "low": item[3], "close": item[4], "volume": 0 # CoinGecko OHLC doesn't include volume }) return {"symbol": symbol, "candles": candles, "count": len(candles)} except Exception as e: logger.error(f"❌ CoinGecko OHLC failed: {e}") raise @staticmethod async def fetch_generic_exchange(source: Dict[str, Any], symbol: str, timeframe: str = "1h", limit: int = 100, **kwargs) -> Dict[str, Any]: """Generic OHLC fetcher for exchanges (KuCoin, Bybit, OKX, etc.)""" source_name = source["name"] url = source["url"] try: # Add specific logic for each exchange if "kucoin" in source_name: # KuCoin specific implementation pass elif "bybit" in source_name: # Bybit specific implementation pass elif "okx" in source_name: # OKX specific implementation pass # Placeholder return {"symbol": symbol, "candles": [], "count": 0} except Exception as e: logger.error(f"❌ {source_name} OHLC failed: {e}") raise class NewsFetcher: """Fetch news from 15+ sources""" @staticmethod async def fetch_news_api(source: Dict[str, Any], query: str = "cryptocurrency", limit: int = 20, **kwargs) -> Dict[str, Any]: """Fetch from news API sources""" try: url = source["url"] api_key = source.get("api_key") timeout = source.get("timeout", 10) async with httpx.AsyncClient(timeout=timeout) as client: if "newsapi.org" in url: response = await client.get( f"{url}/everything", params={ "q": query, "apiKey": api_key, "language": "en", "sortBy": "publishedAt", "pageSize": limit } ) response.raise_for_status() data = response.json() articles = [] for article in data.get("articles", []): articles.append({ "title": article.get("title", ""), "description": article.get("description", ""), "url": article.get("url", ""), "source": article.get("source", {}).get("name", ""), "publishedAt": article.get("publishedAt", ""), "author": article.get("author", "") }) return {"articles": articles, "count": len(articles)} else: return {"articles": [], "count": 0} except Exception as e: logger.error(f"❌ News API failed: {e}") raise @staticmethod async def fetch_rss_feed(source: Dict[str, Any], limit: int = 20, **kwargs) -> Dict[str, Any]: """Fetch from RSS feeds""" try: feed_url = source["url"] # Parse RSS feed (using feedparser - sync operation) feed = await asyncio.to_thread(feedparser.parse, feed_url) articles = [] for entry in feed.entries[:limit]: try: published = entry.get("published_parsed") if published: dt = datetime(*published[:6]) timestamp = dt.isoformat() else: timestamp = datetime.utcnow().isoformat() except: timestamp = datetime.utcnow().isoformat() articles.append({ "title": entry.get("title", ""), "description": entry.get("summary", ""), "url": entry.get("link", ""), "source": source["name"], "publishedAt": timestamp }) logger.info(f"✅ RSS {source['name']}: {len(articles)} articles") return {"articles": articles, "count": len(articles)} except Exception as e: logger.error(f"❌ RSS feed failed: {e}") raise class SentimentFetcher: """Fetch sentiment data from 12+ sources""" @staticmethod async def fetch_fear_greed(source: Dict[str, Any], **kwargs) -> Dict[str, Any]: """Fetch Fear & Greed Index""" try: url = source["url"] timeout = source.get("timeout", 10) async with httpx.AsyncClient(timeout=timeout) as client: response = await client.get(url, params={"limit": 1}) response.raise_for_status() data = response.json() if "data" in data and len(data["data"]) > 0: fng = data["data"][0] return { "value": int(fng.get("value", 50)), "classification": fng.get("value_classification", "neutral"), "timestamp": int(fng.get("timestamp", 0)) } return {"value": 50, "classification": "neutral", "timestamp": int(datetime.utcnow().timestamp())} except Exception as e: logger.error(f"❌ Fear & Greed failed: {e}") raise __all__ = [ "MarketPriceFetcher", "OHLCFetcher", "NewsFetcher", "SentimentFetcher" ]