Really-amin's picture
Upload 748 files
4418362 verified
#!/usr/bin/env python3
from __future__ import annotations
import asyncio
import os
import time
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from enum import Enum
try:
import httpx
HTTPX_AVAILABLE = True
except ImportError:
HTTPX_AVAILABLE = False
try:
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
logger = logging.getLogger(__name__)
class ProviderCategory(str, Enum):
MARKET_DATA = "market_data"
OHLCV = "ohlcv"
ORDERBOOK = "orderbook"
TRADES = "trades"
NEWS = "news"
SENTIMENT = "sentiment"
FEAR_GREED = "fear_greed"
ONCHAIN = "onchain"
BLOCKCHAIN = "blockchain"
WHALES = "whales"
SOCIAL = "social"
DEFI = "defi"
NFT = "nft"
@dataclass
class ProviderConfig:
id: str
name: str
category: ProviderCategory
base_url: str
api_key: Optional[str] = None
api_key_env: Optional[str] = None
api_key_header: Optional[str] = None
api_key_param: Optional[str] = None
rate_limit_per_minute: int = 60
timeout_seconds: int = 15
requires_auth: bool = False
free_tier: bool = True
priority: int = 1
fallback_for: Optional[str] = None
@dataclass
class ProviderHealth:
name: str
status: str
latency_ms: Optional[int] = None
last_check: Optional[str] = None
error_message: Optional[str] = None
success_rate: float = 1.0
total_requests: int = 0
failed_requests: int = 0
class RateLimiter:
def __init__(self, requests_per_minute: int):
self.requests_per_minute = requests_per_minute
self.requests: List[float] = []
async def acquire(self):
now = time.time()
self.requests = [t for t in self.requests if now - t < 60]
if len(self.requests) >= self.requests_per_minute:
sleep_time = 60 - (now - self.requests[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.requests.append(time.time())
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure_time: Optional[float] = None
self.is_open = False
def record_success(self):
self.failures = 0
self.is_open = False
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.is_open = True
def can_attempt(self) -> bool:
if not self.is_open:
return True
if self.last_failure_time:
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.is_open = False
self.failures = 0
return True
return False
class BaseDataProvider(ABC):
def __init__(self, config: ProviderConfig):
self.config = config
self.rate_limiter = RateLimiter(config.rate_limit_per_minute)
self.circuit_breaker = CircuitBreaker()
self.health = ProviderHealth(name=config.name, status="unknown")
self._client: Optional[Any] = None
def _get_api_key(self) -> Optional[str]:
if self.config.api_key:
return self.config.api_key
if self.config.api_key_env:
return os.getenv(self.config.api_key_env)
return None
async def _get_client(self):
if self._client is None:
if HTTPX_AVAILABLE:
self._client = httpx.AsyncClient(timeout=self.config.timeout_seconds)
elif AIOHTTP_AVAILABLE:
self._client = aiohttp.ClientSession()
return self._client
async def close(self):
if self._client:
if HTTPX_AVAILABLE and isinstance(self._client, httpx.AsyncClient):
await self._client.aclose()
elif AIOHTTP_AVAILABLE:
await self._client.close()
self._client = None
async def _request(
self,
endpoint: str,
params: Optional[Dict] = None,
headers: Optional[Dict] = None,
method: str = "GET",
json_body: Optional[Dict] = None
) -> Dict[str, Any]:
if not self.circuit_breaker.can_attempt():
raise Exception(f"Circuit breaker open for {self.config.name}")
await self.rate_limiter.acquire()
url = f"{self.config.base_url}{endpoint}"
request_headers = headers or {}
request_params = params or {}
api_key = self._get_api_key()
if api_key:
if self.config.api_key_header:
request_headers[self.config.api_key_header] = api_key
elif self.config.api_key_param:
request_params[self.config.api_key_param] = api_key
start_time = time.time()
self.health.total_requests += 1
try:
client = await self._get_client()
if HTTPX_AVAILABLE and isinstance(client, httpx.AsyncClient):
if method == "GET":
response = await client.get(url, params=request_params, headers=request_headers)
else:
response = await client.post(url, params=request_params, headers=request_headers, json=json_body)
response.raise_for_status()
data = response.json()
elif AIOHTTP_AVAILABLE:
if method == "GET":
async with client.get(url, params=request_params, headers=request_headers) as response:
response.raise_for_status()
data = await response.json()
else:
async with client.post(url, params=request_params, headers=request_headers, json=json_body) as response:
response.raise_for_status()
data = await response.json()
else:
raise Exception("No HTTP client available")
latency = int((time.time() - start_time) * 1000)
self.health.latency_ms = latency
self.health.last_check = datetime.now(timezone.utc).isoformat()
self.health.status = "online"
self.health.error_message = None
self.circuit_breaker.record_success()
return {"success": True, "data": data, "latency_ms": latency}
except Exception as e:
self.health.failed_requests += 1
self.health.status = "error"
self.health.error_message = str(e)[:200]
self.health.last_check = datetime.now(timezone.utc).isoformat()
self.circuit_breaker.record_failure()
if self.health.total_requests > 0:
self.health.success_rate = 1 - (self.health.failed_requests / self.health.total_requests)
return {"success": False, "error": str(e), "data": None}
@abstractmethod
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
pass
def get_health(self) -> ProviderHealth:
return self.health
class CoinGeckoProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="coingecko",
name="CoinGecko",
category=ProviderCategory.MARKET_DATA,
base_url="https://api.coingecko.com/api/v3",
rate_limit_per_minute=30,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
endpoint = kwargs.get("endpoint", "/simple/price")
params = kwargs.get("params", {"ids": "bitcoin,ethereum", "vs_currencies": "usd"})
return await self._request(endpoint, params)
async def get_prices(self, symbols: List[str], vs_currencies: str = "usd") -> Dict[str, Any]:
ids = ",".join(symbols)
return await self._request("/simple/price", {
"ids": ids,
"vs_currencies": vs_currencies,
"include_24hr_change": "true",
"include_market_cap": "true"
})
async def get_trending(self) -> Dict[str, Any]:
return await self._request("/search/trending")
async def get_global(self) -> Dict[str, Any]:
return await self._request("/global")
async def get_coin_market_chart(self, coin_id: str, days: int = 7) -> Dict[str, Any]:
return await self._request(f"/coins/{coin_id}/market_chart", {"vs_currency": "usd", "days": days})
class CoinMarketCapProvider(BaseDataProvider):
CMC_KEYS = [
"a35ffaec-c66c-4f16-81e3-41a717e4822f",
"04cf4b5b-9868-465c-8ba0-9f2e78c92eb1",
"b54bcf4d-1bca-4e8e-9a24-22ff2c3d462c"
]
def __init__(self, key_index: int = 0):
super().__init__(ProviderConfig(
id=f"coinmarketcap_{key_index}" if key_index > 0 else "coinmarketcap",
name=f"CoinMarketCap {'Primary' if key_index == 0 else f'Key {key_index + 1}'}",
category=ProviderCategory.MARKET_DATA,
base_url="https://pro-api.coinmarketcap.com/v1",
api_key_header="X-CMC_PRO_API_KEY",
rate_limit_per_minute=30,
requires_auth=True,
free_tier=True,
priority=2 + key_index
))
self.config.api_key = os.getenv("COINMARKETCAP_API_KEY") or os.getenv("CMC_API_KEY") or self.CMC_KEYS[key_index % len(self.CMC_KEYS)]
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
endpoint = kwargs.get("endpoint", "/cryptocurrency/listings/latest")
params = kwargs.get("params", {"limit": 100})
return await self._request(endpoint, params)
async def get_quotes(self, symbols: List[str]) -> Dict[str, Any]:
return await self._request("/cryptocurrency/quotes/latest", {"symbol": ",".join(symbols)})
async def get_listings(self, limit: int = 100) -> Dict[str, Any]:
return await self._request("/cryptocurrency/listings/latest", {"limit": limit})
async def get_global_metrics(self) -> Dict[str, Any]:
return await self._request("/global-metrics/quotes/latest")
async def get_trending(self) -> Dict[str, Any]:
return await self._request("/cryptocurrency/trending/latest")
async def get_latest_quotes(self, symbols: List[str], convert: str = "USD") -> Dict[str, Any]:
return await self._request("/cryptocurrency/quotes/latest", {
"symbol": ",".join(symbols),
"convert": convert
})
class CryptoCompareProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="cryptocompare",
name="CryptoCompare",
category=ProviderCategory.OHLCV,
base_url="https://min-api.cryptocompare.com/data",
api_key_env="CRYPTOCOMPARE_KEY",
api_key_param="api_key",
rate_limit_per_minute=100,
free_tier=True,
priority=1
))
if not self._get_api_key():
self.config.api_key = "e79c8e6d4c5b4a3f2e1d0c9b8a7f6e5d4c3b2a1f"
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
endpoint = kwargs.get("endpoint", "/v2/histohour")
params = kwargs.get("params", {"fsym": "BTC", "tsym": "USD", "limit": 24})
return await self._request(endpoint, params)
async def get_ohlcv_hourly(self, symbol: str, limit: int = 24) -> Dict[str, Any]:
return await self._request("/v2/histohour", {"fsym": symbol, "tsym": "USD", "limit": limit})
async def get_ohlcv_daily(self, symbol: str, limit: int = 30) -> Dict[str, Any]:
return await self._request("/v2/histoday", {"fsym": symbol, "tsym": "USD", "limit": limit})
async def get_multi_price(self, symbols: List[str]) -> Dict[str, Any]:
return await self._request("/pricemulti", {"fsyms": ",".join(symbols), "tsyms": "USD,EUR,BTC"})
class BinanceProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="binance",
name="Binance",
category=ProviderCategory.OHLCV,
base_url="https://api.binance.com/api/v3",
rate_limit_per_minute=1200,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
endpoint = kwargs.get("endpoint", "/ticker/price")
params = kwargs.get("params", {})
return await self._request(endpoint, params)
async def get_price(self, symbol: str = None) -> Dict[str, Any]:
params = {"symbol": symbol} if symbol else {}
return await self._request("/ticker/price", params)
async def get_klines(self, symbol: str, interval: str = "1h", limit: int = 100) -> Dict[str, Any]:
return await self._request("/klines", {"symbol": symbol, "interval": interval, "limit": limit})
async def get_orderbook(self, symbol: str, limit: int = 100) -> Dict[str, Any]:
return await self._request("/depth", {"symbol": symbol, "limit": limit})
async def get_24h_ticker(self, symbol: str = None) -> Dict[str, Any]:
params = {"symbol": symbol} if symbol else {}
return await self._request("/ticker/24hr", params)
class CoinCapProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="coincap",
name="CoinCap",
category=ProviderCategory.MARKET_DATA,
base_url="https://api.coincap.io/v2",
rate_limit_per_minute=200,
free_tier=True,
priority=2
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
endpoint = kwargs.get("endpoint", "/assets")
params = kwargs.get("params", {"limit": 100})
return await self._request(endpoint, params)
async def get_assets(self, limit: int = 100) -> Dict[str, Any]:
return await self._request("/assets", {"limit": limit})
async def get_asset(self, asset_id: str) -> Dict[str, Any]:
return await self._request(f"/assets/{asset_id}")
async def get_asset_history(self, asset_id: str, interval: str = "h1") -> Dict[str, Any]:
return await self._request(f"/assets/{asset_id}/history", {"interval": interval})
class CoinPaprikaProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="coinpaprika",
name="CoinPaprika",
category=ProviderCategory.MARKET_DATA,
base_url="https://api.coinpaprika.com/v1",
rate_limit_per_minute=100,
free_tier=True,
priority=2
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
endpoint = kwargs.get("endpoint", "/tickers")
return await self._request(endpoint)
async def get_tickers(self) -> Dict[str, Any]:
return await self._request("/tickers")
async def get_ticker(self, coin_id: str) -> Dict[str, Any]:
return await self._request(f"/tickers/{coin_id}")
async def get_global(self) -> Dict[str, Any]:
return await self._request("/global")
class AlternativeMeProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="alternative_me",
name="Alternative.me",
category=ProviderCategory.FEAR_GREED,
base_url="https://api.alternative.me",
rate_limit_per_minute=60,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
limit = kwargs.get("limit", 1)
return await self._request("/fng/", {"limit": limit})
async def get_fear_greed_current(self) -> Dict[str, Any]:
return await self._request("/fng/", {"limit": 1})
async def get_fear_greed_history(self, days: int = 30) -> Dict[str, Any]:
return await self._request("/fng/", {"limit": days})
class CryptoPanicProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="cryptopanic",
name="CryptoPanic",
category=ProviderCategory.NEWS,
base_url="https://cryptopanic.com/api/v1",
rate_limit_per_minute=30,
free_tier=True,
priority=2
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
return await self._request("/posts/", {"public": "true"})
async def get_news(self, currencies: str = None, filter_type: str = None) -> Dict[str, Any]:
params = {"public": "true"}
if currencies:
params["currencies"] = currencies
if filter_type:
params["filter"] = filter_type
return await self._request("/posts/", params)
class NewsAPIProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="newsapi",
name="NewsAPI",
category=ProviderCategory.NEWS,
base_url="https://newsapi.org/v2",
api_key=os.getenv("NEWSAPI_API_KEY") or "968a5e25552b4cb5ba3280361d8444ab",
api_key_param="apiKey",
rate_limit_per_minute=100,
requires_auth=True,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
query = kwargs.get("query", "cryptocurrency OR bitcoin OR ethereum")
return await self._request("/everything", {"q": query, "sortBy": "publishedAt", "language": "en"})
async def get_crypto_news(self, query: str = None) -> Dict[str, Any]:
q = query or "cryptocurrency OR bitcoin OR ethereum OR crypto"
return await self._request("/everything", {
"q": q,
"sortBy": "publishedAt",
"language": "en",
"pageSize": 50
})
async def get_top_headlines(self, category: str = "business") -> Dict[str, Any]:
return await self._request("/top-headlines", {
"category": category,
"language": "en",
"pageSize": 50
})
async def search_news(self, query: str, from_date: str = None, to_date: str = None) -> Dict[str, Any]:
params = {"q": query, "sortBy": "relevancy", "language": "en"}
if from_date:
params["from"] = from_date
if to_date:
params["to"] = to_date
return await self._request("/everything", params)
class SentimentAPIProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="sentiment_api",
name="Sentiment API",
category=ProviderCategory.SENTIMENT,
base_url="https://api.sentiment.io/v1",
api_key=os.getenv("SENTIMENT_API_KEY") or "vltdvdho63uqnjgf_fq75qbks72e3wfmx",
api_key_header="Authorization",
rate_limit_per_minute=60,
requires_auth=True,
free_tier=True,
priority=1
))
def _get_api_key(self) -> Optional[str]:
key = super()._get_api_key()
if key and not key.startswith("Bearer "):
return f"Bearer {key}"
return key
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
symbol = kwargs.get("symbol", "BTC")
return await self._request("/sentiment", {"symbol": symbol})
async def get_market_sentiment(self, symbol: str = "BTC") -> Dict[str, Any]:
return await self._request("/sentiment", {"symbol": symbol})
async def get_social_sentiment(self, symbol: str = "BTC") -> Dict[str, Any]:
return await self._request("/social", {"symbol": symbol})
class EtherscanProvider(BaseDataProvider):
def __init__(self, key_index: int = 0):
keys = [
"SZHYFZK2RR8H9TIMJBVW54V4H81K2Z2KR2",
"T6IR8VJHX2NE6ZJW2S3FDVN1TYG4PYYI45"
]
super().__init__(ProviderConfig(
id=f"etherscan_{key_index}",
name=f"Etherscan {'Primary' if key_index == 0 else 'Secondary'}",
category=ProviderCategory.BLOCKCHAIN,
base_url="https://api.etherscan.io/api",
api_key=os.getenv("ETHERSCAN_API_KEY") or keys[key_index % len(keys)],
api_key_param="apikey",
rate_limit_per_minute=5,
requires_auth=True,
free_tier=True,
priority=1 + key_index
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
module = kwargs.get("module", "account")
action = kwargs.get("action", "balance")
address = kwargs.get("address", "")
return await self._request("", {"module": module, "action": action, "address": address, "tag": "latest"})
async def get_balance(self, address: str) -> Dict[str, Any]:
return await self._request("", {"module": "account", "action": "balance", "address": address, "tag": "latest"})
async def get_transactions(self, address: str, start_block: int = 0) -> Dict[str, Any]:
return await self._request("", {
"module": "account", "action": "txlist", "address": address,
"startblock": start_block, "endblock": 99999999, "sort": "desc"
})
async def get_gas_price(self) -> Dict[str, Any]:
return await self._request("", {"module": "gastracker", "action": "gasoracle"})
class BscScanProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="bscscan",
name="BscScan",
category=ProviderCategory.BLOCKCHAIN,
base_url="https://api.bscscan.com/api",
api_key=os.getenv("BSCSCAN_API_KEY") or "K62RKHGXTDCG53RU4MCG6XABIMJKTN19IT",
api_key_param="apikey",
rate_limit_per_minute=5,
requires_auth=True,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
module = kwargs.get("module", "account")
action = kwargs.get("action", "balance")
address = kwargs.get("address", "")
return await self._request("", {"module": module, "action": action, "address": address})
async def get_balance(self, address: str) -> Dict[str, Any]:
return await self._request("", {"module": "account", "action": "balance", "address": address})
async def get_transactions(self, address: str) -> Dict[str, Any]:
return await self._request("", {"module": "account", "action": "txlist", "address": address})
class TronScanProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="tronscan",
name="TronScan",
category=ProviderCategory.BLOCKCHAIN,
base_url="https://apilist.tronscanapi.com/api",
api_key=os.getenv("TRONSCAN_API_KEY") or "7ae72726-bffe-4e74-9c33-97b761eeea21",
api_key_param="apiKey",
rate_limit_per_minute=60,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
address = kwargs.get("address", "")
return await self._request("/account", {"address": address})
async def get_account(self, address: str) -> Dict[str, Any]:
return await self._request("/account", {"address": address})
async def get_transactions(self, address: str, limit: int = 20) -> Dict[str, Any]:
return await self._request("/transaction", {"address": address, "limit": limit})
class BlockchairProvider(BaseDataProvider):
def __init__(self, chain: str = "ethereum"):
super().__init__(ProviderConfig(
id=f"blockchair_{chain}",
name=f"Blockchair {chain.title()}",
category=ProviderCategory.BLOCKCHAIN,
base_url=f"https://api.blockchair.com/{chain}",
rate_limit_per_minute=30,
free_tier=True,
priority=3
))
self.chain = chain
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
address = kwargs.get("address", "")
return await self._request(f"/dashboards/address/{address}")
async def get_address_dashboard(self, address: str) -> Dict[str, Any]:
return await self._request(f"/dashboards/address/{address}")
class DefiLlamaProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="defillama",
name="DefiLlama",
category=ProviderCategory.DEFI,
base_url="https://api.llama.fi",
rate_limit_per_minute=60,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
return await self._request("/protocols")
async def get_protocols(self) -> Dict[str, Any]:
return await self._request("/protocols")
async def get_tvl_history(self) -> Dict[str, Any]:
return await self._request("/v2/historicalChainTvl")
async def get_chains(self) -> Dict[str, Any]:
return await self._request("/v2/chains")
class DefiLlamaPricesProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="defillama_prices",
name="DefiLlama Prices",
category=ProviderCategory.MARKET_DATA,
base_url="https://coins.llama.fi",
rate_limit_per_minute=60,
free_tier=True,
priority=2
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
coins = kwargs.get("coins", "coingecko:bitcoin,coingecko:ethereum")
return await self._request(f"/prices/current/{coins}")
async def get_current_prices(self, coins: str) -> Dict[str, Any]:
return await self._request(f"/prices/current/{coins}")
class GlassnodeProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="glassnode",
name="Glassnode",
category=ProviderCategory.ONCHAIN,
base_url="https://api.glassnode.com/v1",
api_key_env="GLASSNODE_API_KEY",
api_key_param="api_key",
rate_limit_per_minute=10,
requires_auth=True,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
metric = kwargs.get("metric", "indicators/sopr")
return await self._request(f"/metrics/{metric}", {"a": "BTC"})
class LunarCrushProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="lunarcrush",
name="LunarCrush",
category=ProviderCategory.SOCIAL,
base_url="https://api.lunarcrush.com/v2",
api_key_env="LUNARCRUSH_API_KEY",
api_key_param="key",
rate_limit_per_minute=30,
requires_auth=True,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
return await self._request("", {"data": "assets", "symbol": "BTC"})
async def get_assets(self, symbol: str = None) -> Dict[str, Any]:
params = {"data": "assets"}
if symbol:
params["symbol"] = symbol
return await self._request("", params)
class MessariProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="messari",
name="Messari",
category=ProviderCategory.MARKET_DATA,
base_url="https://data.messari.io/api/v1",
rate_limit_per_minute=30,
free_tier=True,
priority=3
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
asset = kwargs.get("asset", "bitcoin")
return await self._request(f"/assets/{asset}/metrics")
async def get_asset_metrics(self, asset: str) -> Dict[str, Any]:
return await self._request(f"/assets/{asset}/metrics")
async def get_all_assets(self) -> Dict[str, Any]:
return await self._request("/assets")
class SantimentProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="santiment",
name="Santiment",
category=ProviderCategory.SENTIMENT,
base_url="https://api.santiment.net/graphql",
api_key_env="SANTIMENT_API_KEY",
api_key_header="Authorization",
rate_limit_per_minute=20,
requires_auth=True,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
query = kwargs.get("query", '{ projects(slug: "bitcoin") { slug name } }')
return await self._request("", method="POST", json_body={"query": query})
class RedditProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="reddit",
name="Reddit",
category=ProviderCategory.SOCIAL,
base_url="https://www.reddit.com",
rate_limit_per_minute=60,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
subreddit = kwargs.get("subreddit", "CryptoCurrency")
sort = kwargs.get("sort", "hot")
limit = kwargs.get("limit", 25)
return await self._request(f"/r/{subreddit}/{sort}.json", {"limit": limit})
async def get_subreddit_posts(self, subreddit: str, sort: str = "hot", limit: int = 25) -> Dict[str, Any]:
return await self._request(f"/r/{subreddit}/{sort}.json", {"limit": limit})
class CoinStatsProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="coinstats",
name="CoinStats",
category=ProviderCategory.MARKET_DATA,
base_url="https://api.coinstats.app/public/v1",
rate_limit_per_minute=50,
free_tier=True,
priority=2
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
return await self._request("/coins", {"limit": 100})
async def get_coins(self, limit: int = 100) -> Dict[str, Any]:
return await self._request("/coins", {"limit": limit})
async def get_news(self) -> Dict[str, Any]:
return await self._request("/news")
class CoinLoreProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="coinlore",
name="CoinLore",
category=ProviderCategory.MARKET_DATA,
base_url="https://api.coinlore.net/api",
rate_limit_per_minute=100,
free_tier=True,
priority=3
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
return await self._request("/tickers/", {"start": 0, "limit": 100})
async def get_tickers(self, start: int = 0, limit: int = 100) -> Dict[str, Any]:
return await self._request("/tickers/", {"start": start, "limit": limit})
async def get_global(self) -> Dict[str, Any]:
return await self._request("/global/")
class MobulaProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="mobula",
name="Mobula",
category=ProviderCategory.MARKET_DATA,
base_url="https://api.mobula.io/api/1",
rate_limit_per_minute=50,
free_tier=True,
priority=3
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
asset = kwargs.get("asset", "bitcoin")
return await self._request("/market/data", {"asset": asset})
class KrakenProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="kraken",
name="Kraken",
category=ProviderCategory.OHLCV,
base_url="https://api.kraken.com/0/public",
rate_limit_per_minute=60,
free_tier=True,
priority=2
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
pair = kwargs.get("pair", "XBTUSD")
return await self._request("/Ticker", {"pair": pair})
async def get_ticker(self, pair: str) -> Dict[str, Any]:
return await self._request("/Ticker", {"pair": pair})
async def get_ohlc(self, pair: str, interval: int = 60) -> Dict[str, Any]:
return await self._request("/OHLC", {"pair": pair, "interval": interval})
class BybitProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="bybit",
name="Bybit",
category=ProviderCategory.OHLCV,
base_url="https://api.bybit.com/v5",
rate_limit_per_minute=120,
free_tier=True,
priority=2
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
symbol = kwargs.get("symbol", "BTCUSDT")
category = kwargs.get("category", "spot")
return await self._request("/market/tickers", {"category": category, "symbol": symbol})
async def get_ticker(self, symbol: str, category: str = "spot") -> Dict[str, Any]:
return await self._request("/market/tickers", {"category": category, "symbol": symbol})
class OKXProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="okx",
name="OKX",
category=ProviderCategory.OHLCV,
base_url="https://www.okx.com/api/v5",
rate_limit_per_minute=60,
free_tier=True,
priority=2
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
inst_id = kwargs.get("instId", "BTC-USDT")
return await self._request("/market/ticker", {"instId": inst_id})
async def get_ticker(self, inst_id: str) -> Dict[str, Any]:
return await self._request("/market/ticker", {"instId": inst_id})
async def get_candles(self, inst_id: str, bar: str = "1H") -> Dict[str, Any]:
return await self._request("/market/candles", {"instId": inst_id, "bar": bar})
class HTXProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="htx",
name="HTX (Huobi)",
category=ProviderCategory.OHLCV,
base_url="https://api.huobi.pro",
rate_limit_per_minute=100,
free_tier=True,
priority=3
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
symbol = kwargs.get("symbol", "btcusdt")
return await self._request("/market/detail/merged", {"symbol": symbol})
class GateIOProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="gateio",
name="Gate.io",
category=ProviderCategory.OHLCV,
base_url="https://api.gateio.ws/api/v4",
rate_limit_per_minute=100,
free_tier=True,
priority=3
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
return await self._request("/spot/tickers")
class KuCoinProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="kucoin",
name="KuCoin",
category=ProviderCategory.OHLCV,
base_url="https://api.kucoin.com/api/v1",
rate_limit_per_minute=100,
free_tier=True,
priority=2
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
symbol = kwargs.get("symbol", "BTC-USDT")
return await self._request("/market/orderbook/level1", {"symbol": symbol})
class DexScreenerProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="dexscreener",
name="DexScreener",
category=ProviderCategory.DEFI,
base_url="https://api.dexscreener.com",
rate_limit_per_minute=300,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
chain = kwargs.get("chain", "ethereum")
return await self._request(f"/latest/dex/tokens/{chain}")
async def get_token_pairs(self, token_address: str) -> Dict[str, Any]:
return await self._request(f"/latest/dex/tokens/{token_address}")
async def get_pair(self, chain: str, pair_address: str) -> Dict[str, Any]:
return await self._request(f"/latest/dex/pairs/{chain}/{pair_address}")
class TheGraphProvider(BaseDataProvider):
def __init__(self, subgraph: str = "uniswap/uniswap-v3"):
super().__init__(ProviderConfig(
id=f"thegraph_{subgraph.replace('/', '_')}",
name=f"TheGraph {subgraph}",
category=ProviderCategory.DEFI,
base_url=f"https://api.thegraph.com/subgraphs/name/{subgraph}",
rate_limit_per_minute=30,
free_tier=True,
priority=1
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
query = kwargs.get("query", "{ pools(first: 5) { id token0 { symbol } token1 { symbol } } }")
return await self._request("", method="POST", json_body={"query": query})
class CovalentProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="covalent",
name="Covalent",
category=ProviderCategory.BLOCKCHAIN,
base_url="https://api.covalenthq.com/v1",
api_key_env="COVALENT_API_KEY",
api_key_param="key",
rate_limit_per_minute=5,
requires_auth=True,
free_tier=True,
priority=2
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
chain_id = kwargs.get("chain_id", 1)
address = kwargs.get("address", "")
return await self._request(f"/{chain_id}/address/{address}/balances_v2/")
class MoralisProvider(BaseDataProvider):
def __init__(self):
super().__init__(ProviderConfig(
id="moralis",
name="Moralis",
category=ProviderCategory.BLOCKCHAIN,
base_url="https://deep-index.moralis.io/api/v2",
api_key_env="MORALIS_API_KEY",
api_key_header="X-API-Key",
rate_limit_per_minute=25,
requires_auth=True,
free_tier=True,
priority=2
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
address = kwargs.get("address", "")
chain = kwargs.get("chain", "eth")
return await self._request(f"/{address}/balance", {"chain": chain})
class BlockscoutProvider(BaseDataProvider):
def __init__(self, chain: str = "eth"):
chains = {
"eth": "https://eth.blockscout.com/api",
"polygon": "https://polygon.blockscout.com/api",
"arbitrum": "https://arbitrum.blockscout.com/api",
"optimism": "https://optimism.blockscout.com/api"
}
super().__init__(ProviderConfig(
id=f"blockscout_{chain}",
name=f"Blockscout {chain.upper()}",
category=ProviderCategory.BLOCKCHAIN,
base_url=chains.get(chain, chains["eth"]),
rate_limit_per_minute=60,
free_tier=True,
priority=3
))
async def fetch_data(self, **kwargs) -> Dict[str, Any]:
address = kwargs.get("address", "")
return await self._request("", {"module": "account", "action": "balance", "address": address})
PROVIDER_REGISTRY: Dict[str, type] = {
"coingecko": CoinGeckoProvider,
"coinmarketcap": CoinMarketCapProvider,
"cryptocompare": CryptoCompareProvider,
"binance": BinanceProvider,
"coincap": CoinCapProvider,
"coinpaprika": CoinPaprikaProvider,
"alternative_me": AlternativeMeProvider,
"cryptopanic": CryptoPanicProvider,
"newsapi": NewsAPIProvider,
"sentiment_api": SentimentAPIProvider,
"etherscan": EtherscanProvider,
"bscscan": BscScanProvider,
"tronscan": TronScanProvider,
"blockchair": BlockchairProvider,
"defillama": DefiLlamaProvider,
"defillama_prices": DefiLlamaPricesProvider,
"glassnode": GlassnodeProvider,
"lunarcrush": LunarCrushProvider,
"messari": MessariProvider,
"santiment": SantimentProvider,
"reddit": RedditProvider,
"coinstats": CoinStatsProvider,
"coinlore": CoinLoreProvider,
"mobula": MobulaProvider,
"kraken": KrakenProvider,
"bybit": BybitProvider,
"okx": OKXProvider,
"htx": HTXProvider,
"gateio": GateIOProvider,
"kucoin": KuCoinProvider,
"dexscreener": DexScreenerProvider,
"thegraph": TheGraphProvider,
"covalent": CovalentProvider,
"moralis": MoralisProvider,
"blockscout": BlockscoutProvider,
}
class ExpandedProviderManager:
def __init__(self):
self.providers: Dict[str, BaseDataProvider] = {}
self._initialized = False
def initialize(self):
if self._initialized:
return
self.providers["coingecko"] = CoinGeckoProvider()
self.providers["coinmarketcap"] = CoinMarketCapProvider(0)
self.providers["coinmarketcap_1"] = CoinMarketCapProvider(1)
self.providers["coinmarketcap_2"] = CoinMarketCapProvider(2)
self.providers["cryptocompare"] = CryptoCompareProvider()
self.providers["binance"] = BinanceProvider()
self.providers["coincap"] = CoinCapProvider()
self.providers["coinpaprika"] = CoinPaprikaProvider()
self.providers["alternative_me"] = AlternativeMeProvider()
self.providers["cryptopanic"] = CryptoPanicProvider()
self.providers["newsapi"] = NewsAPIProvider()
self.providers["sentiment_api"] = SentimentAPIProvider()
self.providers["etherscan_0"] = EtherscanProvider(0)
self.providers["etherscan_1"] = EtherscanProvider(1)
self.providers["bscscan"] = BscScanProvider()
self.providers["tronscan"] = TronScanProvider()
self.providers["blockchair_ethereum"] = BlockchairProvider("ethereum")
self.providers["blockchair_bsc"] = BlockchairProvider("binance-smart-chain")
self.providers["blockchair_tron"] = BlockchairProvider("tron")
self.providers["defillama"] = DefiLlamaProvider()
self.providers["defillama_prices"] = DefiLlamaPricesProvider()
self.providers["messari"] = MessariProvider()
self.providers["reddit"] = RedditProvider()
self.providers["coinstats"] = CoinStatsProvider()
self.providers["coinlore"] = CoinLoreProvider()
self.providers["kraken"] = KrakenProvider()
self.providers["bybit"] = BybitProvider()
self.providers["okx"] = OKXProvider()
self.providers["htx"] = HTXProvider()
self.providers["gateio"] = GateIOProvider()
self.providers["kucoin"] = KuCoinProvider()
self.providers["dexscreener"] = DexScreenerProvider()
self.providers["blockscout_eth"] = BlockscoutProvider("eth")
self._initialized = True
logger.info(f"Initialized {len(self.providers)} expanded providers")
def get_provider(self, provider_id: str) -> Optional[BaseDataProvider]:
if not self._initialized:
self.initialize()
return self.providers.get(provider_id)
def get_providers_by_category(self, category: ProviderCategory) -> List[BaseDataProvider]:
if not self._initialized:
self.initialize()
return [p for p in self.providers.values() if p.config.category == category]
def get_all_providers(self) -> Dict[str, BaseDataProvider]:
if not self._initialized:
self.initialize()
return self.providers
def get_all_health(self) -> List[Dict[str, Any]]:
if not self._initialized:
self.initialize()
return [
{
"id": pid,
"name": p.config.name,
"category": p.config.category.value,
"status": p.health.status,
"latency_ms": p.health.latency_ms,
"last_check": p.health.last_check,
"error_message": p.health.error_message,
"success_rate": p.health.success_rate,
"total_requests": p.health.total_requests,
"failed_requests": p.health.failed_requests,
"free_tier": p.config.free_tier,
"requires_auth": p.config.requires_auth,
}
for pid, p in self.providers.items()
]
async def fetch_with_fallback(
self,
category: ProviderCategory,
**kwargs
) -> Dict[str, Any]:
providers = sorted(
self.get_providers_by_category(category),
key=lambda p: p.config.priority
)
for provider in providers:
if not provider.circuit_breaker.can_attempt():
continue
try:
result = await provider.fetch_data(**kwargs)
if result.get("success"):
return {
"success": True,
"provider": provider.config.name,
"data": result.get("data"),
"latency_ms": result.get("latency_ms")
}
except Exception as e:
logger.warning(f"Provider {provider.config.name} failed: {e}")
continue
return {
"success": False,
"error": f"All providers for {category.value} failed",
"data": None
}
async def close_all(self):
for provider in self.providers.values():
await provider.close()
_manager = ExpandedProviderManager()
def get_provider_manager() -> ExpandedProviderManager:
return _manager
def initialize_providers():
_manager.initialize()
return {
"status": "initialized",
"provider_count": len(_manager.providers),
"providers": list(_manager.providers.keys())
}
async def fetch_market_data(symbols: List[str] = None) -> Dict[str, Any]:
_manager.initialize()
return await _manager.fetch_with_fallback(
ProviderCategory.MARKET_DATA,
symbols=symbols or ["bitcoin", "ethereum"]
)
async def fetch_fear_greed() -> Dict[str, Any]:
_manager.initialize()
return await _manager.fetch_with_fallback(ProviderCategory.FEAR_GREED)
async def fetch_news() -> Dict[str, Any]:
_manager.initialize()
return await _manager.fetch_with_fallback(ProviderCategory.NEWS)
async def fetch_ohlcv(symbol: str, interval: str = "1h") -> Dict[str, Any]:
_manager.initialize()
return await _manager.fetch_with_fallback(
ProviderCategory.OHLCV,
symbol=symbol,
interval=interval
)
def get_all_provider_health() -> List[Dict[str, Any]]:
_manager.initialize()
return _manager.get_all_health()