Coverage for collectors \ aggregator.py: 0.00%

207 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-25 15:37 +0330

1"""Async collectors that power the FastAPI endpoints.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import json 

7import logging 

8import time 

9from dataclasses import dataclass 

10from datetime import datetime, timezone 

11from pathlib import Path 

12from typing import Any, Dict, List, Optional 

13 

14import httpx 

15 

16from config import CACHE_TTL, COIN_SYMBOL_MAPPING, USER_AGENT, get_settings 

17 

18logger = logging.getLogger(__name__) 

19settings = get_settings() 

20 

21 

22class CollectorError(RuntimeError): 

23 """Raised when a provider fails to return data.""" 

24 

25 def __init__(self, message: str, provider: Optional[str] = None, status_code: Optional[int] = None): 

26 super().__init__(message) 

27 self.provider = provider 

28 self.status_code = status_code 

29 

30 

31@dataclass 

32class CacheEntry: 

33 value: Any 

34 expires_at: float 

35 

36 

37class TTLCache: 

38 """Simple in-memory TTL cache safe for async usage.""" 

39 

40 def __init__(self, ttl: int = CACHE_TTL) -> None: 

41 self.ttl = ttl or CACHE_TTL 

42 self._store: Dict[str, CacheEntry] = {} 

43 self._lock = asyncio.Lock() 

44 

45 async def get(self, key: str) -> Any: 

46 async with self._lock: 

47 entry = self._store.get(key) 

48 if not entry: 

49 return None 

50 if entry.expires_at < time.time(): 

51 self._store.pop(key, None) 

52 return None 

53 return entry.value 

54 

55 async def set(self, key: str, value: Any) -> None: 

56 async with self._lock: 

57 self._store[key] = CacheEntry(value=value, expires_at=time.time() + self.ttl) 

58 

59 

60class ProvidersRegistry: 

61 """Utility that loads provider definitions from disk.""" 

62 

63 def __init__(self, path: Optional[Path] = None) -> None: 

64 self.path = Path(path or settings.providers_config_path) 

65 self._providers: Dict[str, Any] = {} 

66 self._load() 

67 

68 def _load(self) -> None: 

69 if not self.path.exists(): 

70 logger.warning("Providers config not found at %s", self.path) 

71 self._providers = {} 

72 return 

73 with self.path.open("r", encoding="utf-8") as handle: 

74 data = json.load(handle) 

75 self._providers = data.get("providers", {}) 

76 

77 @property 

78 def providers(self) -> Dict[str, Any]: 

79 return self._providers 

80 

81 

82class MarketDataCollector: 

83 """Fetch market data from public providers with caching and fallbacks.""" 

84 

85 def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: 

86 self.registry = registry or ProvidersRegistry() 

87 self.cache = TTLCache(settings.cache_ttl) 

88 self._symbol_map = {symbol.lower(): coin_id for coin_id, symbol in COIN_SYMBOL_MAPPING.items()} 

89 self.headers = {"User-Agent": settings.user_agent or USER_AGENT} 

90 self.timeout = 15.0 

91 

92 async def _request(self, provider_key: str, path: str, params: Optional[Dict[str, Any]] = None) -> Any: 

93 provider = self.registry.providers.get(provider_key) 

94 if not provider: 

95 raise CollectorError(f"Provider {provider_key} not configured", provider=provider_key) 

96 

97 url = provider["base_url"].rstrip("/") + path 

98 async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: 

99 response = await client.get(url, params=params) 

100 if response.status_code != 200: 

101 raise CollectorError( 

102 f"{provider_key} request failed with HTTP {response.status_code}", 

103 provider=provider_key, 

104 status_code=response.status_code, 

105 ) 

106 return response.json() 

107 

108 async def get_top_coins(self, limit: int = 10) -> List[Dict[str, Any]]: 

109 cache_key = f"top_coins:{limit}" 

110 cached = await self.cache.get(cache_key) 

111 if cached: 

112 return cached 

113 

114 providers = ["coingecko", "coincap"] 

115 last_error: Optional[Exception] = None 

116 for provider in providers: 

117 try: 

118 if provider == "coingecko": 

119 data = await self._request( 

120 "coingecko", 

121 "/coins/markets", 

122 { 

123 "vs_currency": "usd", 

124 "order": "market_cap_desc", 

125 "per_page": limit, 

126 "page": 1, 

127 "sparkline": "false", 

128 "price_change_percentage": "24h", 

129 }, 

130 ) 

131 coins = [ 

132 { 

133 "name": item.get("name"), 

134 "symbol": item.get("symbol", "").upper(), 

135 "price": item.get("current_price"), 

136 "change_24h": item.get("price_change_percentage_24h"), 

137 "market_cap": item.get("market_cap"), 

138 "volume_24h": item.get("total_volume"), 

139 "rank": item.get("market_cap_rank"), 

140 "last_updated": item.get("last_updated"), 

141 } 

142 for item in data 

143 ] 

144 await self.cache.set(cache_key, coins) 

145 return coins 

146 

147 if provider == "coincap": 

148 data = await self._request("coincap", "/assets", {"limit": limit}) 

149 coins = [ 

150 { 

151 "name": item.get("name"), 

152 "symbol": item.get("symbol", "").upper(), 

153 "price": float(item.get("priceUsd", 0)), 

154 "change_24h": float(item.get("changePercent24Hr", 0)), 

155 "market_cap": float(item.get("marketCapUsd", 0)), 

156 "volume_24h": float(item.get("volumeUsd24Hr", 0)), 

157 "rank": int(item.get("rank", 0)), 

158 } 

159 for item in data.get("data", []) 

160 ] 

161 await self.cache.set(cache_key, coins) 

162 return coins 

163 except Exception as exc: # pragma: no cover - network heavy 

164 last_error = exc 

165 logger.warning("Provider %s failed: %s", provider, exc) 

166 

167 raise CollectorError("Unable to fetch top coins", provider=str(last_error)) 

168 

169 async def _coin_id(self, symbol: str) -> str: 

170 symbol_lower = symbol.lower() 

171 if symbol_lower in self._symbol_map: 

172 return self._symbol_map[symbol_lower] 

173 

174 cache_key = "coingecko:symbols" 

175 cached = await self.cache.get(cache_key) 

176 if cached: 

177 mapping = cached 

178 else: 

179 data = await self._request("coingecko", "/coins/list") 

180 mapping = {item["symbol"].lower(): item["id"] for item in data} 

181 await self.cache.set(cache_key, mapping) 

182 

183 if symbol_lower not in mapping: 

184 raise CollectorError(f"Unknown symbol: {symbol}") 

185 

186 return mapping[symbol_lower] 

187 

188 async def get_coin_details(self, symbol: str) -> Dict[str, Any]: 

189 coin_id = await self._coin_id(symbol) 

190 cache_key = f"coin:{coin_id}" 

191 cached = await self.cache.get(cache_key) 

192 if cached: 

193 return cached 

194 

195 data = await self._request( 

196 "coingecko", 

197 f"/coins/{coin_id}", 

198 {"localization": "false", "tickers": "false", "market_data": "true"}, 

199 ) 

200 market_data = data.get("market_data", {}) 

201 coin = { 

202 "id": coin_id, 

203 "name": data.get("name"), 

204 "symbol": data.get("symbol", "").upper(), 

205 "description": data.get("description", {}).get("en"), 

206 "homepage": data.get("links", {}).get("homepage", [None])[0], 

207 "price": market_data.get("current_price", {}).get("usd"), 

208 "market_cap": market_data.get("market_cap", {}).get("usd"), 

209 "volume_24h": market_data.get("total_volume", {}).get("usd"), 

210 "change_24h": market_data.get("price_change_percentage_24h"), 

211 "high_24h": market_data.get("high_24h", {}).get("usd"), 

212 "low_24h": market_data.get("low_24h", {}).get("usd"), 

213 "circulating_supply": market_data.get("circulating_supply"), 

214 "total_supply": market_data.get("total_supply"), 

215 "ath": market_data.get("ath", {}).get("usd"), 

216 "atl": market_data.get("atl", {}).get("usd"), 

217 "last_updated": data.get("last_updated"), 

218 } 

219 await self.cache.set(cache_key, coin) 

220 return coin 

221 

222 async def get_market_stats(self) -> Dict[str, Any]: 

223 cache_key = "market:stats" 

224 cached = await self.cache.get(cache_key) 

225 if cached: 

226 return cached 

227 

228 global_data = await self._request("coingecko", "/global") 

229 stats = global_data.get("data", {}) 

230 market = { 

231 "total_market_cap": stats.get("total_market_cap", {}).get("usd"), 

232 "total_volume_24h": stats.get("total_volume", {}).get("usd"), 

233 "market_cap_change_percentage_24h": stats.get("market_cap_change_percentage_24h_usd"), 

234 "btc_dominance": stats.get("market_cap_percentage", {}).get("btc"), 

235 "eth_dominance": stats.get("market_cap_percentage", {}).get("eth"), 

236 "active_cryptocurrencies": stats.get("active_cryptocurrencies"), 

237 "markets": stats.get("markets"), 

238 "updated_at": stats.get("updated_at"), 

239 } 

240 await self.cache.set(cache_key, market) 

241 return market 

242 

243 async def get_price_history(self, symbol: str, timeframe: str = "7d") -> List[Dict[str, Any]]: 

244 coin_id = await self._coin_id(symbol) 

245 mapping = {"1d": 1, "7d": 7, "30d": 30, "90d": 90} 

246 days = mapping.get(timeframe, 7) 

247 cache_key = f"history:{coin_id}:{days}" 

248 cached = await self.cache.get(cache_key) 

249 if cached: 

250 return cached 

251 

252 data = await self._request( 

253 "coingecko", 

254 f"/coins/{coin_id}/market_chart", 

255 {"vs_currency": "usd", "days": days}, 

256 ) 

257 prices = [ 

258 { 

259 "timestamp": datetime.fromtimestamp(point[0] / 1000, tz=timezone.utc).isoformat(), 

260 "price": round(point[1], 4), 

261 } 

262 for point in data.get("prices", []) 

263 ] 

264 await self.cache.set(cache_key, prices) 

265 return prices 

266 

267 async def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[Dict[str, Any]]: 

268 """Return OHLCV data from Binance with caching and validation.""" 

269 

270 cache_key = f"ohlcv:{symbol.upper()}:{interval}:{limit}" 

271 cached = await self.cache.get(cache_key) 

272 if cached: 

273 return cached 

274 

275 params = {"symbol": symbol.upper(), "interval": interval, "limit": min(max(limit, 1), 1000)} 

276 data = await self._request("binance", "/klines", params) 

277 

278 candles: List[Dict[str, Any]] = [] 

279 for item in data: 

280 try: 

281 candles.append( 

282 { 

283 "timestamp": datetime.fromtimestamp(item[0] / 1000, tz=timezone.utc).isoformat(), 

284 "open": float(item[1]), 

285 "high": float(item[2]), 

286 "low": float(item[3]), 

287 "close": float(item[4]), 

288 "volume": float(item[5]), 

289 } 

290 ) 

291 except (TypeError, ValueError): # pragma: no cover - defensive 

292 continue 

293 

294 if not candles: 

295 raise CollectorError(f"No OHLCV data returned for {symbol}", provider="binance") 

296 

297 await self.cache.set(cache_key, candles) 

298 return candles 

299 

300 

301class NewsCollector: 

302 """Fetch latest crypto news.""" 

303 

304 def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: 

305 self.registry = registry or ProvidersRegistry() 

306 self.cache = TTLCache(settings.cache_ttl) 

307 self.headers = {"User-Agent": settings.user_agent or USER_AGENT} 

308 self.timeout = 15.0 

309 

310 async def get_latest_news(self, limit: int = 10) -> List[Dict[str, Any]]: 

311 cache_key = f"news:{limit}" 

312 cached = await self.cache.get(cache_key) 

313 if cached: 

314 return cached 

315 

316 url = "https://min-api.cryptocompare.com/data/v2/news/" 

317 params = {"lang": "EN"} 

318 async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: 

319 response = await client.get(url, params=params) 

320 if response.status_code != 200: 

321 raise CollectorError(f"News provider error: HTTP {response.status_code}") 

322 

323 payload = response.json() 

324 items = [] 

325 for entry in payload.get("Data", [])[:limit]: 

326 published = datetime.fromtimestamp(entry.get("published_on", 0), tz=timezone.utc) 

327 items.append( 

328 { 

329 "id": entry.get("id"), 

330 "title": entry.get("title"), 

331 "body": entry.get("body"), 

332 "url": entry.get("url"), 

333 "source": entry.get("source"), 

334 "categories": entry.get("categories"), 

335 "published_at": published.isoformat(), 

336 } 

337 ) 

338 

339 await self.cache.set(cache_key, items) 

340 return items 

341 

342 

343class ProviderStatusCollector: 

344 """Perform lightweight health checks against configured providers.""" 

345 

346 def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: 

347 self.registry = registry or ProvidersRegistry() 

348 self.cache = TTLCache(max(settings.cache_ttl, 600)) 

349 self.headers = {"User-Agent": settings.user_agent or USER_AGENT} 

350 self.timeout = 8.0 

351 

352 async def _check_provider(self, client: httpx.AsyncClient, provider_id: str, data: Dict[str, Any]) -> Dict[str, Any]: 

353 url = data.get("health_check") or data.get("base_url") 

354 start = time.perf_counter() 

355 try: 

356 response = await client.get(url, timeout=self.timeout) 

357 latency = round((time.perf_counter() - start) * 1000, 2) 

358 status = "online" if response.status_code < 400 else "degraded" 

359 return { 

360 "provider_id": provider_id, 

361 "name": data.get("name", provider_id), 

362 "category": data.get("category"), 

363 "status": status, 

364 "status_code": response.status_code, 

365 "latency_ms": latency, 

366 } 

367 except Exception as exc: # pragma: no cover - network heavy 

368 logger.warning("Provider %s health check failed: %s", provider_id, exc) 

369 return { 

370 "provider_id": provider_id, 

371 "name": data.get("name", provider_id), 

372 "category": data.get("category"), 

373 "status": "offline", 

374 "status_code": None, 

375 "latency_ms": None, 

376 "error": str(exc), 

377 } 

378 

379 async def get_providers_status(self) -> List[Dict[str, Any]]: 

380 cached = await self.cache.get("providers_status") 

381 if cached: 

382 return cached 

383 

384 providers = self.registry.providers 

385 if not providers: 

386 return [] 

387 

388 results: List[Dict[str, Any]] = [] 

389 async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: 

390 tasks = [self._check_provider(client, pid, data) for pid, data in providers.items()] 

391 for chunk in asyncio.as_completed(tasks): 

392 results.append(await chunk) 

393 

394 await self.cache.set("providers_status", results) 

395 return results 

396 

397 

398__all__ = [ 

399 "CollectorError", 

400 "MarketDataCollector", 

401 "NewsCollector", 

402 "ProviderStatusCollector", 

403]