#!/usr/bin/env python3 from __future__ import annotations import asyncio import os import time import logging from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple from enum import Enum try: import httpx HTTPX_AVAILABLE = True except ImportError: HTTPX_AVAILABLE = False try: import aiohttp AIOHTTP_AVAILABLE = True except ImportError: AIOHTTP_AVAILABLE = False logger = logging.getLogger(__name__) class ProviderCategory(str, Enum): MARKET_DATA = "market_data" OHLCV = "ohlcv" ORDERBOOK = "orderbook" TRADES = "trades" NEWS = "news" SENTIMENT = "sentiment" FEAR_GREED = "fear_greed" ONCHAIN = "onchain" BLOCKCHAIN = "blockchain" WHALES = "whales" SOCIAL = "social" DEFI = "defi" NFT = "nft" @dataclass class ProviderConfig: id: str name: str category: ProviderCategory base_url: str api_key: Optional[str] = None api_key_env: Optional[str] = None api_key_header: Optional[str] = None api_key_param: Optional[str] = None rate_limit_per_minute: int = 60 timeout_seconds: int = 15 requires_auth: bool = False free_tier: bool = True priority: int = 1 fallback_for: Optional[str] = None @dataclass class ProviderHealth: name: str status: str latency_ms: Optional[int] = None last_check: Optional[str] = None error_message: Optional[str] = None success_rate: float = 1.0 total_requests: int = 0 failed_requests: int = 0 class RateLimiter: def __init__(self, requests_per_minute: int): self.requests_per_minute = requests_per_minute self.requests: List[float] = [] async def acquire(self): now = time.time() self.requests = [t for t in self.requests if now - t < 60] if len(self.requests) >= self.requests_per_minute: sleep_time = 60 - (now - self.requests[0]) if sleep_time > 0: await asyncio.sleep(sleep_time) self.requests.append(time.time()) class CircuitBreaker: def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.failures = 0 self.last_failure_time: Optional[float] = None self.is_open = False def record_success(self): self.failures = 0 self.is_open = False def record_failure(self): self.failures += 1 self.last_failure_time = time.time() if self.failures >= self.failure_threshold: self.is_open = True def can_attempt(self) -> bool: if not self.is_open: return True if self.last_failure_time: if time.time() - self.last_failure_time >= self.recovery_timeout: self.is_open = False self.failures = 0 return True return False class BaseDataProvider(ABC): def __init__(self, config: ProviderConfig): self.config = config self.rate_limiter = RateLimiter(config.rate_limit_per_minute) self.circuit_breaker = CircuitBreaker() self.health = ProviderHealth(name=config.name, status="unknown") self._client: Optional[Any] = None def _get_api_key(self) -> Optional[str]: if self.config.api_key: return self.config.api_key if self.config.api_key_env: return os.getenv(self.config.api_key_env) return None async def _get_client(self): if self._client is None: if HTTPX_AVAILABLE: self._client = httpx.AsyncClient(timeout=self.config.timeout_seconds) elif AIOHTTP_AVAILABLE: self._client = aiohttp.ClientSession() return self._client async def close(self): if self._client: if HTTPX_AVAILABLE and isinstance(self._client, httpx.AsyncClient): await self._client.aclose() elif AIOHTTP_AVAILABLE: await self._client.close() self._client = None async def _request( self, endpoint: str, params: Optional[Dict] = None, headers: Optional[Dict] = None, method: str = "GET", json_body: Optional[Dict] = None ) -> Dict[str, Any]: if not self.circuit_breaker.can_attempt(): raise Exception(f"Circuit breaker open for {self.config.name}") await self.rate_limiter.acquire() url = f"{self.config.base_url}{endpoint}" request_headers = headers or {} request_params = params or {} api_key = self._get_api_key() if api_key: if self.config.api_key_header: request_headers[self.config.api_key_header] = api_key elif self.config.api_key_param: request_params[self.config.api_key_param] = api_key start_time = time.time() self.health.total_requests += 1 try: client = await self._get_client() if HTTPX_AVAILABLE and isinstance(client, httpx.AsyncClient): if method == "GET": response = await client.get(url, params=request_params, headers=request_headers) else: response = await client.post(url, params=request_params, headers=request_headers, json=json_body) response.raise_for_status() data = response.json() elif AIOHTTP_AVAILABLE: if method == "GET": async with client.get(url, params=request_params, headers=request_headers) as response: response.raise_for_status() data = await response.json() else: async with client.post(url, params=request_params, headers=request_headers, json=json_body) as response: response.raise_for_status() data = await response.json() else: raise Exception("No HTTP client available") latency = int((time.time() - start_time) * 1000) self.health.latency_ms = latency self.health.last_check = datetime.now(timezone.utc).isoformat() self.health.status = "online" self.health.error_message = None self.circuit_breaker.record_success() return {"success": True, "data": data, "latency_ms": latency} except Exception as e: self.health.failed_requests += 1 self.health.status = "error" self.health.error_message = str(e)[:200] self.health.last_check = datetime.now(timezone.utc).isoformat() self.circuit_breaker.record_failure() if self.health.total_requests > 0: self.health.success_rate = 1 - (self.health.failed_requests / self.health.total_requests) return {"success": False, "error": str(e), "data": None} @abstractmethod async def fetch_data(self, **kwargs) -> Dict[str, Any]: pass def get_health(self) -> ProviderHealth: return self.health class CoinGeckoProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="coingecko", name="CoinGecko", category=ProviderCategory.MARKET_DATA, base_url="https://api.coingecko.com/api/v3", rate_limit_per_minute=30, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: endpoint = kwargs.get("endpoint", "/simple/price") params = kwargs.get("params", {"ids": "bitcoin,ethereum", "vs_currencies": "usd"}) return await self._request(endpoint, params) async def get_prices(self, symbols: List[str], vs_currencies: str = "usd") -> Dict[str, Any]: ids = ",".join(symbols) return await self._request("/simple/price", { "ids": ids, "vs_currencies": vs_currencies, "include_24hr_change": "true", "include_market_cap": "true" }) async def get_trending(self) -> Dict[str, Any]: return await self._request("/search/trending") async def get_global(self) -> Dict[str, Any]: return await self._request("/global") async def get_coin_market_chart(self, coin_id: str, days: int = 7) -> Dict[str, Any]: return await self._request(f"/coins/{coin_id}/market_chart", {"vs_currency": "usd", "days": days}) class CoinMarketCapProvider(BaseDataProvider): CMC_KEYS = [ "a35ffaec-c66c-4f16-81e3-41a717e4822f", "04cf4b5b-9868-465c-8ba0-9f2e78c92eb1", "b54bcf4d-1bca-4e8e-9a24-22ff2c3d462c" ] def __init__(self, key_index: int = 0): super().__init__(ProviderConfig( id=f"coinmarketcap_{key_index}" if key_index > 0 else "coinmarketcap", name=f"CoinMarketCap {'Primary' if key_index == 0 else f'Key {key_index + 1}'}", category=ProviderCategory.MARKET_DATA, base_url="https://pro-api.coinmarketcap.com/v1", api_key_header="X-CMC_PRO_API_KEY", rate_limit_per_minute=30, requires_auth=True, free_tier=True, priority=2 + key_index )) self.config.api_key = os.getenv("COINMARKETCAP_API_KEY") or os.getenv("CMC_API_KEY") or self.CMC_KEYS[key_index % len(self.CMC_KEYS)] async def fetch_data(self, **kwargs) -> Dict[str, Any]: endpoint = kwargs.get("endpoint", "/cryptocurrency/listings/latest") params = kwargs.get("params", {"limit": 100}) return await self._request(endpoint, params) async def get_quotes(self, symbols: List[str]) -> Dict[str, Any]: return await self._request("/cryptocurrency/quotes/latest", {"symbol": ",".join(symbols)}) async def get_listings(self, limit: int = 100) -> Dict[str, Any]: return await self._request("/cryptocurrency/listings/latest", {"limit": limit}) async def get_global_metrics(self) -> Dict[str, Any]: return await self._request("/global-metrics/quotes/latest") async def get_trending(self) -> Dict[str, Any]: return await self._request("/cryptocurrency/trending/latest") async def get_latest_quotes(self, symbols: List[str], convert: str = "USD") -> Dict[str, Any]: return await self._request("/cryptocurrency/quotes/latest", { "symbol": ",".join(symbols), "convert": convert }) class CryptoCompareProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="cryptocompare", name="CryptoCompare", category=ProviderCategory.OHLCV, base_url="https://min-api.cryptocompare.com/data", api_key_env="CRYPTOCOMPARE_KEY", api_key_param="api_key", rate_limit_per_minute=100, free_tier=True, priority=1 )) if not self._get_api_key(): self.config.api_key = "e79c8e6d4c5b4a3f2e1d0c9b8a7f6e5d4c3b2a1f" async def fetch_data(self, **kwargs) -> Dict[str, Any]: endpoint = kwargs.get("endpoint", "/v2/histohour") params = kwargs.get("params", {"fsym": "BTC", "tsym": "USD", "limit": 24}) return await self._request(endpoint, params) async def get_ohlcv_hourly(self, symbol: str, limit: int = 24) -> Dict[str, Any]: return await self._request("/v2/histohour", {"fsym": symbol, "tsym": "USD", "limit": limit}) async def get_ohlcv_daily(self, symbol: str, limit: int = 30) -> Dict[str, Any]: return await self._request("/v2/histoday", {"fsym": symbol, "tsym": "USD", "limit": limit}) async def get_multi_price(self, symbols: List[str]) -> Dict[str, Any]: return await self._request("/pricemulti", {"fsyms": ",".join(symbols), "tsyms": "USD,EUR,BTC"}) class BinanceProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="binance", name="Binance", category=ProviderCategory.OHLCV, base_url="https://api.binance.com/api/v3", rate_limit_per_minute=1200, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: endpoint = kwargs.get("endpoint", "/ticker/price") params = kwargs.get("params", {}) return await self._request(endpoint, params) async def get_price(self, symbol: str = None) -> Dict[str, Any]: params = {"symbol": symbol} if symbol else {} return await self._request("/ticker/price", params) async def get_klines(self, symbol: str, interval: str = "1h", limit: int = 100) -> Dict[str, Any]: return await self._request("/klines", {"symbol": symbol, "interval": interval, "limit": limit}) async def get_orderbook(self, symbol: str, limit: int = 100) -> Dict[str, Any]: return await self._request("/depth", {"symbol": symbol, "limit": limit}) async def get_24h_ticker(self, symbol: str = None) -> Dict[str, Any]: params = {"symbol": symbol} if symbol else {} return await self._request("/ticker/24hr", params) class CoinCapProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="coincap", name="CoinCap", category=ProviderCategory.MARKET_DATA, base_url="https://api.coincap.io/v2", rate_limit_per_minute=200, free_tier=True, priority=2 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: endpoint = kwargs.get("endpoint", "/assets") params = kwargs.get("params", {"limit": 100}) return await self._request(endpoint, params) async def get_assets(self, limit: int = 100) -> Dict[str, Any]: return await self._request("/assets", {"limit": limit}) async def get_asset(self, asset_id: str) -> Dict[str, Any]: return await self._request(f"/assets/{asset_id}") async def get_asset_history(self, asset_id: str, interval: str = "h1") -> Dict[str, Any]: return await self._request(f"/assets/{asset_id}/history", {"interval": interval}) class CoinPaprikaProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="coinpaprika", name="CoinPaprika", category=ProviderCategory.MARKET_DATA, base_url="https://api.coinpaprika.com/v1", rate_limit_per_minute=100, free_tier=True, priority=2 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: endpoint = kwargs.get("endpoint", "/tickers") return await self._request(endpoint) async def get_tickers(self) -> Dict[str, Any]: return await self._request("/tickers") async def get_ticker(self, coin_id: str) -> Dict[str, Any]: return await self._request(f"/tickers/{coin_id}") async def get_global(self) -> Dict[str, Any]: return await self._request("/global") class AlternativeMeProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="alternative_me", name="Alternative.me", category=ProviderCategory.FEAR_GREED, base_url="https://api.alternative.me", rate_limit_per_minute=60, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: limit = kwargs.get("limit", 1) return await self._request("/fng/", {"limit": limit}) async def get_fear_greed_current(self) -> Dict[str, Any]: return await self._request("/fng/", {"limit": 1}) async def get_fear_greed_history(self, days: int = 30) -> Dict[str, Any]: return await self._request("/fng/", {"limit": days}) class CryptoPanicProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="cryptopanic", name="CryptoPanic", category=ProviderCategory.NEWS, base_url="https://cryptopanic.com/api/v1", rate_limit_per_minute=30, free_tier=True, priority=2 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: return await self._request("/posts/", {"public": "true"}) async def get_news(self, currencies: str = None, filter_type: str = None) -> Dict[str, Any]: params = {"public": "true"} if currencies: params["currencies"] = currencies if filter_type: params["filter"] = filter_type return await self._request("/posts/", params) class NewsAPIProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="newsapi", name="NewsAPI", category=ProviderCategory.NEWS, base_url="https://newsapi.org/v2", api_key=os.getenv("NEWSAPI_API_KEY") or "968a5e25552b4cb5ba3280361d8444ab", api_key_param="apiKey", rate_limit_per_minute=100, requires_auth=True, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: query = kwargs.get("query", "cryptocurrency OR bitcoin OR ethereum") return await self._request("/everything", {"q": query, "sortBy": "publishedAt", "language": "en"}) async def get_crypto_news(self, query: str = None) -> Dict[str, Any]: q = query or "cryptocurrency OR bitcoin OR ethereum OR crypto" return await self._request("/everything", { "q": q, "sortBy": "publishedAt", "language": "en", "pageSize": 50 }) async def get_top_headlines(self, category: str = "business") -> Dict[str, Any]: return await self._request("/top-headlines", { "category": category, "language": "en", "pageSize": 50 }) async def search_news(self, query: str, from_date: str = None, to_date: str = None) -> Dict[str, Any]: params = {"q": query, "sortBy": "relevancy", "language": "en"} if from_date: params["from"] = from_date if to_date: params["to"] = to_date return await self._request("/everything", params) class SentimentAPIProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="sentiment_api", name="Sentiment API", category=ProviderCategory.SENTIMENT, base_url="https://api.sentiment.io/v1", api_key=os.getenv("SENTIMENT_API_KEY") or "vltdvdho63uqnjgf_fq75qbks72e3wfmx", api_key_header="Authorization", rate_limit_per_minute=60, requires_auth=True, free_tier=True, priority=1 )) def _get_api_key(self) -> Optional[str]: key = super()._get_api_key() if key and not key.startswith("Bearer "): return f"Bearer {key}" return key async def fetch_data(self, **kwargs) -> Dict[str, Any]: symbol = kwargs.get("symbol", "BTC") return await self._request("/sentiment", {"symbol": symbol}) async def get_market_sentiment(self, symbol: str = "BTC") -> Dict[str, Any]: return await self._request("/sentiment", {"symbol": symbol}) async def get_social_sentiment(self, symbol: str = "BTC") -> Dict[str, Any]: return await self._request("/social", {"symbol": symbol}) class EtherscanProvider(BaseDataProvider): def __init__(self, key_index: int = 0): keys = [ "SZHYFZK2RR8H9TIMJBVW54V4H81K2Z2KR2", "T6IR8VJHX2NE6ZJW2S3FDVN1TYG4PYYI45" ] super().__init__(ProviderConfig( id=f"etherscan_{key_index}", name=f"Etherscan {'Primary' if key_index == 0 else 'Secondary'}", category=ProviderCategory.BLOCKCHAIN, base_url="https://api.etherscan.io/api", api_key=os.getenv("ETHERSCAN_API_KEY") or keys[key_index % len(keys)], api_key_param="apikey", rate_limit_per_minute=5, requires_auth=True, free_tier=True, priority=1 + key_index )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: module = kwargs.get("module", "account") action = kwargs.get("action", "balance") address = kwargs.get("address", "") return await self._request("", {"module": module, "action": action, "address": address, "tag": "latest"}) async def get_balance(self, address: str) -> Dict[str, Any]: return await self._request("", {"module": "account", "action": "balance", "address": address, "tag": "latest"}) async def get_transactions(self, address: str, start_block: int = 0) -> Dict[str, Any]: return await self._request("", { "module": "account", "action": "txlist", "address": address, "startblock": start_block, "endblock": 99999999, "sort": "desc" }) async def get_gas_price(self) -> Dict[str, Any]: return await self._request("", {"module": "gastracker", "action": "gasoracle"}) class BscScanProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="bscscan", name="BscScan", category=ProviderCategory.BLOCKCHAIN, base_url="https://api.bscscan.com/api", api_key=os.getenv("BSCSCAN_API_KEY") or "K62RKHGXTDCG53RU4MCG6XABIMJKTN19IT", api_key_param="apikey", rate_limit_per_minute=5, requires_auth=True, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: module = kwargs.get("module", "account") action = kwargs.get("action", "balance") address = kwargs.get("address", "") return await self._request("", {"module": module, "action": action, "address": address}) async def get_balance(self, address: str) -> Dict[str, Any]: return await self._request("", {"module": "account", "action": "balance", "address": address}) async def get_transactions(self, address: str) -> Dict[str, Any]: return await self._request("", {"module": "account", "action": "txlist", "address": address}) class TronScanProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="tronscan", name="TronScan", category=ProviderCategory.BLOCKCHAIN, base_url="https://apilist.tronscanapi.com/api", api_key=os.getenv("TRONSCAN_API_KEY") or "7ae72726-bffe-4e74-9c33-97b761eeea21", api_key_param="apiKey", rate_limit_per_minute=60, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: address = kwargs.get("address", "") return await self._request("/account", {"address": address}) async def get_account(self, address: str) -> Dict[str, Any]: return await self._request("/account", {"address": address}) async def get_transactions(self, address: str, limit: int = 20) -> Dict[str, Any]: return await self._request("/transaction", {"address": address, "limit": limit}) class BlockchairProvider(BaseDataProvider): def __init__(self, chain: str = "ethereum"): super().__init__(ProviderConfig( id=f"blockchair_{chain}", name=f"Blockchair {chain.title()}", category=ProviderCategory.BLOCKCHAIN, base_url=f"https://api.blockchair.com/{chain}", rate_limit_per_minute=30, free_tier=True, priority=3 )) self.chain = chain async def fetch_data(self, **kwargs) -> Dict[str, Any]: address = kwargs.get("address", "") return await self._request(f"/dashboards/address/{address}") async def get_address_dashboard(self, address: str) -> Dict[str, Any]: return await self._request(f"/dashboards/address/{address}") class DefiLlamaProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="defillama", name="DefiLlama", category=ProviderCategory.DEFI, base_url="https://api.llama.fi", rate_limit_per_minute=60, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: return await self._request("/protocols") async def get_protocols(self) -> Dict[str, Any]: return await self._request("/protocols") async def get_tvl_history(self) -> Dict[str, Any]: return await self._request("/v2/historicalChainTvl") async def get_chains(self) -> Dict[str, Any]: return await self._request("/v2/chains") class DefiLlamaPricesProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="defillama_prices", name="DefiLlama Prices", category=ProviderCategory.MARKET_DATA, base_url="https://coins.llama.fi", rate_limit_per_minute=60, free_tier=True, priority=2 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: coins = kwargs.get("coins", "coingecko:bitcoin,coingecko:ethereum") return await self._request(f"/prices/current/{coins}") async def get_current_prices(self, coins: str) -> Dict[str, Any]: return await self._request(f"/prices/current/{coins}") class GlassnodeProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="glassnode", name="Glassnode", category=ProviderCategory.ONCHAIN, base_url="https://api.glassnode.com/v1", api_key_env="GLASSNODE_API_KEY", api_key_param="api_key", rate_limit_per_minute=10, requires_auth=True, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: metric = kwargs.get("metric", "indicators/sopr") return await self._request(f"/metrics/{metric}", {"a": "BTC"}) class LunarCrushProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="lunarcrush", name="LunarCrush", category=ProviderCategory.SOCIAL, base_url="https://api.lunarcrush.com/v2", api_key_env="LUNARCRUSH_API_KEY", api_key_param="key", rate_limit_per_minute=30, requires_auth=True, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: return await self._request("", {"data": "assets", "symbol": "BTC"}) async def get_assets(self, symbol: str = None) -> Dict[str, Any]: params = {"data": "assets"} if symbol: params["symbol"] = symbol return await self._request("", params) class MessariProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="messari", name="Messari", category=ProviderCategory.MARKET_DATA, base_url="https://data.messari.io/api/v1", rate_limit_per_minute=30, free_tier=True, priority=3 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: asset = kwargs.get("asset", "bitcoin") return await self._request(f"/assets/{asset}/metrics") async def get_asset_metrics(self, asset: str) -> Dict[str, Any]: return await self._request(f"/assets/{asset}/metrics") async def get_all_assets(self) -> Dict[str, Any]: return await self._request("/assets") class SantimentProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="santiment", name="Santiment", category=ProviderCategory.SENTIMENT, base_url="https://api.santiment.net/graphql", api_key_env="SANTIMENT_API_KEY", api_key_header="Authorization", rate_limit_per_minute=20, requires_auth=True, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: query = kwargs.get("query", '{ projects(slug: "bitcoin") { slug name } }') return await self._request("", method="POST", json_body={"query": query}) class RedditProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="reddit", name="Reddit", category=ProviderCategory.SOCIAL, base_url="https://www.reddit.com", rate_limit_per_minute=60, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: subreddit = kwargs.get("subreddit", "CryptoCurrency") sort = kwargs.get("sort", "hot") limit = kwargs.get("limit", 25) return await self._request(f"/r/{subreddit}/{sort}.json", {"limit": limit}) async def get_subreddit_posts(self, subreddit: str, sort: str = "hot", limit: int = 25) -> Dict[str, Any]: return await self._request(f"/r/{subreddit}/{sort}.json", {"limit": limit}) class CoinStatsProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="coinstats", name="CoinStats", category=ProviderCategory.MARKET_DATA, base_url="https://api.coinstats.app/public/v1", rate_limit_per_minute=50, free_tier=True, priority=2 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: return await self._request("/coins", {"limit": 100}) async def get_coins(self, limit: int = 100) -> Dict[str, Any]: return await self._request("/coins", {"limit": limit}) async def get_news(self) -> Dict[str, Any]: return await self._request("/news") class CoinLoreProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="coinlore", name="CoinLore", category=ProviderCategory.MARKET_DATA, base_url="https://api.coinlore.net/api", rate_limit_per_minute=100, free_tier=True, priority=3 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: return await self._request("/tickers/", {"start": 0, "limit": 100}) async def get_tickers(self, start: int = 0, limit: int = 100) -> Dict[str, Any]: return await self._request("/tickers/", {"start": start, "limit": limit}) async def get_global(self) -> Dict[str, Any]: return await self._request("/global/") class MobulaProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="mobula", name="Mobula", category=ProviderCategory.MARKET_DATA, base_url="https://api.mobula.io/api/1", rate_limit_per_minute=50, free_tier=True, priority=3 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: asset = kwargs.get("asset", "bitcoin") return await self._request("/market/data", {"asset": asset}) class KrakenProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="kraken", name="Kraken", category=ProviderCategory.OHLCV, base_url="https://api.kraken.com/0/public", rate_limit_per_minute=60, free_tier=True, priority=2 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: pair = kwargs.get("pair", "XBTUSD") return await self._request("/Ticker", {"pair": pair}) async def get_ticker(self, pair: str) -> Dict[str, Any]: return await self._request("/Ticker", {"pair": pair}) async def get_ohlc(self, pair: str, interval: int = 60) -> Dict[str, Any]: return await self._request("/OHLC", {"pair": pair, "interval": interval}) class BybitProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="bybit", name="Bybit", category=ProviderCategory.OHLCV, base_url="https://api.bybit.com/v5", rate_limit_per_minute=120, free_tier=True, priority=2 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: symbol = kwargs.get("symbol", "BTCUSDT") category = kwargs.get("category", "spot") return await self._request("/market/tickers", {"category": category, "symbol": symbol}) async def get_ticker(self, symbol: str, category: str = "spot") -> Dict[str, Any]: return await self._request("/market/tickers", {"category": category, "symbol": symbol}) class OKXProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="okx", name="OKX", category=ProviderCategory.OHLCV, base_url="https://www.okx.com/api/v5", rate_limit_per_minute=60, free_tier=True, priority=2 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: inst_id = kwargs.get("instId", "BTC-USDT") return await self._request("/market/ticker", {"instId": inst_id}) async def get_ticker(self, inst_id: str) -> Dict[str, Any]: return await self._request("/market/ticker", {"instId": inst_id}) async def get_candles(self, inst_id: str, bar: str = "1H") -> Dict[str, Any]: return await self._request("/market/candles", {"instId": inst_id, "bar": bar}) class HTXProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="htx", name="HTX (Huobi)", category=ProviderCategory.OHLCV, base_url="https://api.huobi.pro", rate_limit_per_minute=100, free_tier=True, priority=3 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: symbol = kwargs.get("symbol", "btcusdt") return await self._request("/market/detail/merged", {"symbol": symbol}) class GateIOProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="gateio", name="Gate.io", category=ProviderCategory.OHLCV, base_url="https://api.gateio.ws/api/v4", rate_limit_per_minute=100, free_tier=True, priority=3 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: return await self._request("/spot/tickers") class KuCoinProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="kucoin", name="KuCoin", category=ProviderCategory.OHLCV, base_url="https://api.kucoin.com/api/v1", rate_limit_per_minute=100, free_tier=True, priority=2 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: symbol = kwargs.get("symbol", "BTC-USDT") return await self._request("/market/orderbook/level1", {"symbol": symbol}) class DexScreenerProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="dexscreener", name="DexScreener", category=ProviderCategory.DEFI, base_url="https://api.dexscreener.com", rate_limit_per_minute=300, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: chain = kwargs.get("chain", "ethereum") return await self._request(f"/latest/dex/tokens/{chain}") async def get_token_pairs(self, token_address: str) -> Dict[str, Any]: return await self._request(f"/latest/dex/tokens/{token_address}") async def get_pair(self, chain: str, pair_address: str) -> Dict[str, Any]: return await self._request(f"/latest/dex/pairs/{chain}/{pair_address}") class TheGraphProvider(BaseDataProvider): def __init__(self, subgraph: str = "uniswap/uniswap-v3"): super().__init__(ProviderConfig( id=f"thegraph_{subgraph.replace('/', '_')}", name=f"TheGraph {subgraph}", category=ProviderCategory.DEFI, base_url=f"https://api.thegraph.com/subgraphs/name/{subgraph}", rate_limit_per_minute=30, free_tier=True, priority=1 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: query = kwargs.get("query", "{ pools(first: 5) { id token0 { symbol } token1 { symbol } } }") return await self._request("", method="POST", json_body={"query": query}) class CovalentProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="covalent", name="Covalent", category=ProviderCategory.BLOCKCHAIN, base_url="https://api.covalenthq.com/v1", api_key_env="COVALENT_API_KEY", api_key_param="key", rate_limit_per_minute=5, requires_auth=True, free_tier=True, priority=2 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: chain_id = kwargs.get("chain_id", 1) address = kwargs.get("address", "") return await self._request(f"/{chain_id}/address/{address}/balances_v2/") class MoralisProvider(BaseDataProvider): def __init__(self): super().__init__(ProviderConfig( id="moralis", name="Moralis", category=ProviderCategory.BLOCKCHAIN, base_url="https://deep-index.moralis.io/api/v2", api_key_env="MORALIS_API_KEY", api_key_header="X-API-Key", rate_limit_per_minute=25, requires_auth=True, free_tier=True, priority=2 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: address = kwargs.get("address", "") chain = kwargs.get("chain", "eth") return await self._request(f"/{address}/balance", {"chain": chain}) class BlockscoutProvider(BaseDataProvider): def __init__(self, chain: str = "eth"): chains = { "eth": "https://eth.blockscout.com/api", "polygon": "https://polygon.blockscout.com/api", "arbitrum": "https://arbitrum.blockscout.com/api", "optimism": "https://optimism.blockscout.com/api" } super().__init__(ProviderConfig( id=f"blockscout_{chain}", name=f"Blockscout {chain.upper()}", category=ProviderCategory.BLOCKCHAIN, base_url=chains.get(chain, chains["eth"]), rate_limit_per_minute=60, free_tier=True, priority=3 )) async def fetch_data(self, **kwargs) -> Dict[str, Any]: address = kwargs.get("address", "") return await self._request("", {"module": "account", "action": "balance", "address": address}) PROVIDER_REGISTRY: Dict[str, type] = { "coingecko": CoinGeckoProvider, "coinmarketcap": CoinMarketCapProvider, "cryptocompare": CryptoCompareProvider, "binance": BinanceProvider, "coincap": CoinCapProvider, "coinpaprika": CoinPaprikaProvider, "alternative_me": AlternativeMeProvider, "cryptopanic": CryptoPanicProvider, "newsapi": NewsAPIProvider, "sentiment_api": SentimentAPIProvider, "etherscan": EtherscanProvider, "bscscan": BscScanProvider, "tronscan": TronScanProvider, "blockchair": BlockchairProvider, "defillama": DefiLlamaProvider, "defillama_prices": DefiLlamaPricesProvider, "glassnode": GlassnodeProvider, "lunarcrush": LunarCrushProvider, "messari": MessariProvider, "santiment": SantimentProvider, "reddit": RedditProvider, "coinstats": CoinStatsProvider, "coinlore": CoinLoreProvider, "mobula": MobulaProvider, "kraken": KrakenProvider, "bybit": BybitProvider, "okx": OKXProvider, "htx": HTXProvider, "gateio": GateIOProvider, "kucoin": KuCoinProvider, "dexscreener": DexScreenerProvider, "thegraph": TheGraphProvider, "covalent": CovalentProvider, "moralis": MoralisProvider, "blockscout": BlockscoutProvider, } class ExpandedProviderManager: def __init__(self): self.providers: Dict[str, BaseDataProvider] = {} self._initialized = False def initialize(self): if self._initialized: return self.providers["coingecko"] = CoinGeckoProvider() self.providers["coinmarketcap"] = CoinMarketCapProvider(0) self.providers["coinmarketcap_1"] = CoinMarketCapProvider(1) self.providers["coinmarketcap_2"] = CoinMarketCapProvider(2) self.providers["cryptocompare"] = CryptoCompareProvider() self.providers["binance"] = BinanceProvider() self.providers["coincap"] = CoinCapProvider() self.providers["coinpaprika"] = CoinPaprikaProvider() self.providers["alternative_me"] = AlternativeMeProvider() self.providers["cryptopanic"] = CryptoPanicProvider() self.providers["newsapi"] = NewsAPIProvider() self.providers["sentiment_api"] = SentimentAPIProvider() self.providers["etherscan_0"] = EtherscanProvider(0) self.providers["etherscan_1"] = EtherscanProvider(1) self.providers["bscscan"] = BscScanProvider() self.providers["tronscan"] = TronScanProvider() self.providers["blockchair_ethereum"] = BlockchairProvider("ethereum") self.providers["blockchair_bsc"] = BlockchairProvider("binance-smart-chain") self.providers["blockchair_tron"] = BlockchairProvider("tron") self.providers["defillama"] = DefiLlamaProvider() self.providers["defillama_prices"] = DefiLlamaPricesProvider() self.providers["messari"] = MessariProvider() self.providers["reddit"] = RedditProvider() self.providers["coinstats"] = CoinStatsProvider() self.providers["coinlore"] = CoinLoreProvider() self.providers["kraken"] = KrakenProvider() self.providers["bybit"] = BybitProvider() self.providers["okx"] = OKXProvider() self.providers["htx"] = HTXProvider() self.providers["gateio"] = GateIOProvider() self.providers["kucoin"] = KuCoinProvider() self.providers["dexscreener"] = DexScreenerProvider() self.providers["blockscout_eth"] = BlockscoutProvider("eth") self._initialized = True logger.info(f"Initialized {len(self.providers)} expanded providers") def get_provider(self, provider_id: str) -> Optional[BaseDataProvider]: if not self._initialized: self.initialize() return self.providers.get(provider_id) def get_providers_by_category(self, category: ProviderCategory) -> List[BaseDataProvider]: if not self._initialized: self.initialize() return [p for p in self.providers.values() if p.config.category == category] def get_all_providers(self) -> Dict[str, BaseDataProvider]: if not self._initialized: self.initialize() return self.providers def get_all_health(self) -> List[Dict[str, Any]]: if not self._initialized: self.initialize() return [ { "id": pid, "name": p.config.name, "category": p.config.category.value, "status": p.health.status, "latency_ms": p.health.latency_ms, "last_check": p.health.last_check, "error_message": p.health.error_message, "success_rate": p.health.success_rate, "total_requests": p.health.total_requests, "failed_requests": p.health.failed_requests, "free_tier": p.config.free_tier, "requires_auth": p.config.requires_auth, } for pid, p in self.providers.items() ] async def fetch_with_fallback( self, category: ProviderCategory, **kwargs ) -> Dict[str, Any]: providers = sorted( self.get_providers_by_category(category), key=lambda p: p.config.priority ) for provider in providers: if not provider.circuit_breaker.can_attempt(): continue try: result = await provider.fetch_data(**kwargs) if result.get("success"): return { "success": True, "provider": provider.config.name, "data": result.get("data"), "latency_ms": result.get("latency_ms") } except Exception as e: logger.warning(f"Provider {provider.config.name} failed: {e}") continue return { "success": False, "error": f"All providers for {category.value} failed", "data": None } async def close_all(self): for provider in self.providers.values(): await provider.close() _manager = ExpandedProviderManager() def get_provider_manager() -> ExpandedProviderManager: return _manager def initialize_providers(): _manager.initialize() return { "status": "initialized", "provider_count": len(_manager.providers), "providers": list(_manager.providers.keys()) } async def fetch_market_data(symbols: List[str] = None) -> Dict[str, Any]: _manager.initialize() return await _manager.fetch_with_fallback( ProviderCategory.MARKET_DATA, symbols=symbols or ["bitcoin", "ethereum"] ) async def fetch_fear_greed() -> Dict[str, Any]: _manager.initialize() return await _manager.fetch_with_fallback(ProviderCategory.FEAR_GREED) async def fetch_news() -> Dict[str, Any]: _manager.initialize() return await _manager.fetch_with_fallback(ProviderCategory.NEWS) async def fetch_ohlcv(symbol: str, interval: str = "1h") -> Dict[str, Any]: _manager.initialize() return await _manager.fetch_with_fallback( ProviderCategory.OHLCV, symbol=symbol, interval=interval ) def get_all_provider_health() -> List[Dict[str, Any]]: _manager.initialize() return _manager.get_all_health()