#!/usr/bin/env python3 """ Data Hub Complete - مدیریت جامع همه منابع داده ============================================= ✅ استفاده از تمام کلیدهای API جدید ✅ پشتیبانی از همه انواع داده‌ها ✅ سیستم Fallback خودکار ✅ Cache Management ✅ Rate Limiting """ import httpx import asyncio import logging from typing import Dict, Any, List, Optional, Union from datetime import datetime, timedelta import hashlib import json import os from collections import defaultdict import time logger = logging.getLogger(__name__) class DataHubConfiguration: """پیکربندی کامل Data Hub با تمام کلیدهای جدید""" # ===== کلیدهای API های جدید ===== # Blockchain Explorers TRONSCAN_API_KEY = "7ae72726-bffe-4e74-9c33-97b761eeea21" TRONSCAN_BASE_URL = "https://apilist.tronscan.org/api" BSCSCAN_API_KEY = "K62RKHGXTDCG53RU4MCG6XABIMJKTN19IT" BSCSCAN_BASE_URL = "https://api.bscscan.com/api" ETHERSCAN_API_KEY = "T6IR8VJHX2NE6ZJW2S3FDVN1TYG4PYYI45" ETHERSCAN_BASE_URL = "https://api.etherscan.io/api" # Market Data COINMARKETCAP_API_KEY = "a35ffaec-c66c-4f16-81e3-41a717e4822f" COINMARKETCAP_BASE_URL = "https://pro-api.coinmarketcap.com/v1" # News NEWSAPI_API_KEY = "968a5e25552b4cb5ba3280361d8444ab" NEWSAPI_BASE_URL = "https://newsapi.org/v2" # HuggingFace HF_API_TOKEN = os.getenv("HF_API_TOKEN", "").strip() HF_SPACE_BASE_URL = "https://really-amin-datasourceforcryptocurrency.hf.space" # Additional Sources ALTERNATIVE_ME_BASE_URL = "https://api.alternative.me" COINGECKO_BASE_URL = "https://api.coingecko.com/api/v3" BINANCE_BASE_URL = "https://api.binance.com/api/v3" REDDIT_BASE_URL = "https://www.reddit.com/r" # Cache TTL Settings (seconds) CACHE_TTL = { "market_prices": 30, "ohlcv": 60, "news": 300, "sentiment": 60, "blockchain": 60, "whale_activity": 30, "social_media": 120, "trending": 180, "fear_greed": 3600, } class RateLimiter: """Rate limiter for API calls""" def __init__(self): self.limits = { "coinmarketcap": {"calls": 333, "period": 60}, # 333/min "newsapi": {"calls": 500, "period": 3600}, # 500/hour "etherscan": {"calls": 5, "period": 1}, # 5/sec "bscscan": {"calls": 5, "period": 1}, # 5/sec "tronscan": {"calls": 10, "period": 1}, # 10/sec "coingecko": {"calls": 50, "period": 60}, # 50/min "binance": {"calls": 1200, "period": 60}, # 1200/min } self.call_times = defaultdict(list) async def wait_if_needed(self, service: str): """Wait if rate limit is reached""" if service not in self.limits: return limit = self.limits[service] now = time.time() # Clean old calls self.call_times[service] = [ t for t in self.call_times[service] if now - t < limit["period"] ] # Check if limit reached if len(self.call_times[service]) >= limit["calls"]: wait_time = limit["period"] - (now - self.call_times[service][0]) if wait_time > 0: logger.warning(f"⏳ Rate limit reached for {service}, waiting {wait_time:.1f}s") await asyncio.sleep(wait_time) # Record new call self.call_times[service].append(now) class DataHubComplete: """ Data Hub کامل برای مدیریت همه منابع داده """ def __init__(self): self.config = DataHubConfiguration() self.rate_limiter = RateLimiter() self.cache = {} self.timeout = httpx.Timeout(30.0, connect=10.0) logger.info("🚀 Data Hub Complete initialized with all new API keys") # ========================================================================= # Cache Management # ========================================================================= def _get_cache_key(self, category: str, params: Dict = None) -> str: """Generate cache key""" cache_str = f"{category}:{json.dumps(params or {}, sort_keys=True)}" return hashlib.md5(cache_str.encode()).hexdigest() def _get_cached(self, cache_key: str, cache_type: str) -> Optional[Dict]: """Get data from cache if not expired""" if cache_key not in self.cache: return None cached_data, cached_time = self.cache[cache_key] ttl = self.config.CACHE_TTL.get(cache_type, 0) if ttl == 0: return None age = (datetime.now() - cached_time).total_seconds() if age < ttl: logger.info(f"📦 Cache HIT: {cache_type} (age: {age:.1f}s)") return cached_data del self.cache[cache_key] return None def _set_cache(self, cache_key: str, data: Dict, cache_type: str): """Store data in cache""" ttl = self.config.CACHE_TTL.get(cache_type, 0) if ttl > 0: self.cache[cache_key] = (data, datetime.now()) # ========================================================================= # 1. Market Price Data - داده‌های قیمت بازار # ========================================================================= async def get_market_prices( self, symbols: Optional[List[str]] = None, limit: int = 100, source: str = "auto" ) -> Dict[str, Any]: """ دریافت قیمت‌های بازار از منابع مختلف Sources: CoinMarketCap, CoinGecko, Binance, HuggingFace """ cache_key = self._get_cache_key("market_prices", {"symbols": symbols, "limit": limit}) cached = self._get_cached(cache_key, "market_prices") if cached: return cached errors = [] # Try CoinMarketCap first if source in ["auto", "coinmarketcap"]: try: await self.rate_limiter.wait_if_needed("coinmarketcap") async with httpx.AsyncClient(timeout=self.timeout) as client: headers = {"X-CMC_PRO_API_KEY": self.config.COINMARKETCAP_API_KEY} params = {"limit": limit, "convert": "USD"} if symbols: params["symbol"] = ",".join(symbols) endpoint = "/cryptocurrency/quotes/latest" else: endpoint = "/cryptocurrency/listings/latest" response = await client.get( f"{self.config.COINMARKETCAP_BASE_URL}{endpoint}", headers=headers, params=params ) response.raise_for_status() data = response.json() # Transform data result_data = [] if "data" in data: items = data["data"] if isinstance(data["data"], list) else data["data"].values() for coin in items: quote = coin.get("quote", {}).get("USD", {}) result_data.append({ "symbol": coin["symbol"], "name": coin["name"], "price": quote.get("price", 0), "change_24h": quote.get("percent_change_24h", 0), "volume_24h": quote.get("volume_24h", 0), "market_cap": quote.get("market_cap", 0), "rank": coin.get("cmc_rank", 0) }) result = { "success": True, "source": "coinmarketcap", "data": result_data, "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "market_prices") logger.info(f"✅ Market prices from CoinMarketCap: {len(result_data)} items") return result except Exception as e: errors.append(f"CoinMarketCap: {e}") logger.warning(f"❌ CoinMarketCap failed: {e}") # Try CoinGecko as fallback if source in ["auto", "coingecko"]: try: await self.rate_limiter.wait_if_needed("coingecko") async with httpx.AsyncClient(timeout=self.timeout) as client: if symbols: ids = ",".join([s.lower() for s in symbols]) params = {"ids": ids, "vs_currencies": "usd", "include_24hr_change": "true"} endpoint = "/simple/price" else: params = {"vs_currency": "usd", "per_page": limit, "page": 1} endpoint = "/coins/markets" response = await client.get( f"{self.config.COINGECKO_BASE_URL}{endpoint}", params=params ) response.raise_for_status() data = response.json() # Transform data result_data = [] if isinstance(data, list): for coin in data: result_data.append({ "symbol": coin.get("symbol", "").upper(), "name": coin.get("name", ""), "price": coin.get("current_price", 0), "change_24h": coin.get("price_change_percentage_24h", 0), "volume_24h": coin.get("total_volume", 0), "market_cap": coin.get("market_cap", 0), "rank": coin.get("market_cap_rank", 0) }) else: for symbol, info in data.items(): result_data.append({ "symbol": symbol.upper(), "price": info.get("usd", 0), "change_24h": info.get("usd_24h_change", 0) }) result = { "success": True, "source": "coingecko", "data": result_data, "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "market_prices") logger.info(f"✅ Market prices from CoinGecko: {len(result_data)} items") return result except Exception as e: errors.append(f"CoinGecko: {e}") logger.warning(f"❌ CoinGecko failed: {e}") # Try Binance for specific pairs if source in ["auto", "binance"] and symbols: try: await self.rate_limiter.wait_if_needed("binance") async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.config.BINANCE_BASE_URL}/ticker/24hr" ) response.raise_for_status() data = response.json() # Filter and transform data result_data = [] for ticker in data: if ticker["symbol"].endswith("USDT"): base = ticker["symbol"][:-4] if not symbols or base in symbols: result_data.append({ "symbol": base, "price": float(ticker["lastPrice"]), "change_24h": float(ticker["priceChangePercent"]), "volume_24h": float(ticker["volume"]) * float(ticker["lastPrice"]), "high_24h": float(ticker["highPrice"]), "low_24h": float(ticker["lowPrice"]) }) result = { "success": True, "source": "binance", "data": result_data[:limit], "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "market_prices") logger.info(f"✅ Market prices from Binance: {len(result_data)} items") return result except Exception as e: errors.append(f"Binance: {e}") logger.warning(f"❌ Binance failed: {e}") # Return error if all sources failed return { "success": False, "error": "All market data sources failed", "errors": errors, "timestamp": datetime.utcnow().isoformat() } # ========================================================================= # 2. Historical OHLCV Data - داده‌های تاریخی # ========================================================================= async def get_ohlcv_data( self, symbol: str, interval: str = "1h", limit: int = 100, source: str = "auto" ) -> Dict[str, Any]: """ دریافت داده‌های OHLCV (کندل استیک) Sources: Binance, CoinMarketCap, HuggingFace """ cache_key = self._get_cache_key("ohlcv", {"symbol": symbol, "interval": interval, "limit": limit}) cached = self._get_cached(cache_key, "ohlcv") if cached: return cached errors = [] # Try Binance first (best for OHLCV) if source in ["auto", "binance"]: try: await self.rate_limiter.wait_if_needed("binance") async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.config.BINANCE_BASE_URL}/klines", params={ "symbol": f"{symbol}USDT", "interval": interval, "limit": limit } ) response.raise_for_status() klines = response.json() # Transform to standard format ohlcv_data = [] for kline in klines: ohlcv_data.append({ "timestamp": int(kline[0]), "open": float(kline[1]), "high": float(kline[2]), "low": float(kline[3]), "close": float(kline[4]), "volume": float(kline[5]) }) result = { "success": True, "source": "binance", "symbol": symbol, "interval": interval, "data": ohlcv_data, "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "ohlcv") logger.info(f"✅ OHLCV from Binance: {len(ohlcv_data)} candles") return result except Exception as e: errors.append(f"Binance: {e}") logger.warning(f"❌ Binance OHLCV failed: {e}") # Try HuggingFace as fallback if source in ["auto", "huggingface"]: try: async with httpx.AsyncClient(timeout=self.timeout) as client: _token = self.config.HF_API_TOKEN or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") headers = {} if _token: headers["Authorization"] = f"Bearer {_token}" response = await client.get( f"{self.config.HF_SPACE_BASE_URL}/api/market/history", headers=headers, params={ "symbol": f"{symbol}USDT", "timeframe": interval, "limit": limit } ) response.raise_for_status() data = response.json() result = { "success": True, "source": "huggingface", "symbol": symbol, "interval": interval, "data": data.get("data", []), "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "ohlcv") logger.info(f"✅ OHLCV from HuggingFace") return result except Exception as e: errors.append(f"HuggingFace: {e}") logger.warning(f"❌ HuggingFace OHLCV failed: {e}") return { "success": False, "error": "Failed to fetch OHLCV data", "errors": errors, "timestamp": datetime.utcnow().isoformat() } # ========================================================================= # 3. Sentiment Data - داده‌های احساسات # ========================================================================= async def get_fear_greed_index(self) -> Dict[str, Any]: """ دریافت شاخص ترس و طمع Source: Alternative.me """ cache_key = self._get_cache_key("fear_greed", {}) cached = self._get_cached(cache_key, "fear_greed") if cached: return cached try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.config.ALTERNATIVE_ME_BASE_URL}/fng/", params={"limit": 30, "format": "json"} ) response.raise_for_status() data = response.json() result = { "success": True, "source": "alternative.me", "data": data.get("data", []), "current": data.get("data", [{}])[0] if data.get("data") else {}, "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "fear_greed") logger.info(f"✅ Fear & Greed Index fetched") return result except Exception as e: logger.error(f"❌ Fear & Greed Index failed: {e}") return { "success": False, "error": str(e), "timestamp": datetime.utcnow().isoformat() } async def analyze_sentiment( self, text: str, source: str = "huggingface" ) -> Dict[str, Any]: """ تحلیل احساسات متن Source: HuggingFace Models """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: _token = self.config.HF_API_TOKEN or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") headers = {} if _token: headers["Authorization"] = f"Bearer {_token}" response = await client.post( f"{self.config.HF_SPACE_BASE_URL}/api/sentiment/analyze", headers=headers, json={"text": text} ) response.raise_for_status() data = response.json() logger.info(f"✅ Sentiment analysis completed") return { "success": True, "source": "huggingface", "data": data.get("data", {}), "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"❌ Sentiment analysis failed: {e}") return { "success": False, "error": str(e), "timestamp": datetime.utcnow().isoformat() } # ========================================================================= # 4. News Data - داده‌های اخبار # ========================================================================= async def get_crypto_news( self, query: str = "cryptocurrency", limit: int = 20, source: str = "auto" ) -> Dict[str, Any]: """ دریافت اخبار ارزهای دیجیتال Sources: NewsAPI, Reddit, HuggingFace """ cache_key = self._get_cache_key("news", {"query": query, "limit": limit}) cached = self._get_cached(cache_key, "news") if cached: return cached errors = [] articles = [] # Try NewsAPI if source in ["auto", "newsapi"]: try: await self.rate_limiter.wait_if_needed("newsapi") async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.config.NEWSAPI_BASE_URL}/everything", params={ "q": query, "apiKey": self.config.NEWSAPI_API_KEY, "language": "en", "sortBy": "publishedAt", "pageSize": limit } ) response.raise_for_status() data = response.json() for article in data.get("articles", []): articles.append({ "title": article["title"], "description": article.get("description"), "url": article["url"], "source": article["source"]["name"], "published_at": article["publishedAt"], "image_url": article.get("urlToImage") }) logger.info(f"✅ NewsAPI: {len(articles)} articles") except Exception as e: errors.append(f"NewsAPI: {e}") logger.warning(f"❌ NewsAPI failed: {e}") # Try Reddit if source in ["auto", "reddit"]: try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.config.REDDIT_BASE_URL}/CryptoCurrency/hot.json", params={"limit": limit}, headers={"User-Agent": "CryptoDataHub/1.0"} ) response.raise_for_status() data = response.json() for post in data["data"]["children"]: post_data = post["data"] articles.append({ "title": post_data["title"], "description": post_data.get("selftext", "")[:200], "url": f"https://reddit.com{post_data['permalink']}", "source": "Reddit", "published_at": datetime.fromtimestamp(post_data["created_utc"]).isoformat(), "score": post_data["score"], "comments": post_data["num_comments"] }) logger.info(f"✅ Reddit: {len(articles)} posts") except Exception as e: errors.append(f"Reddit: {e}") logger.warning(f"❌ Reddit failed: {e}") if articles: result = { "success": True, "articles": articles[:limit], "total": len(articles), "sources": ["newsapi", "reddit"], "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "news") return result return { "success": False, "error": "Failed to fetch news", "errors": errors, "timestamp": datetime.utcnow().isoformat() } # ========================================================================= # 5. Trending Data - داده‌های ترندینگ # ========================================================================= async def get_trending_coins(self, source: str = "coingecko") -> Dict[str, Any]: """ دریافت ارزهای ترند Source: CoinGecko """ cache_key = self._get_cache_key("trending", {}) cached = self._get_cached(cache_key, "trending") if cached: return cached try: await self.rate_limiter.wait_if_needed("coingecko") async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get(f"{self.config.COINGECKO_BASE_URL}/search/trending") response.raise_for_status() data = response.json() trending = [] for coin in data.get("coins", []): item = coin.get("item", {}) trending.append({ "id": item.get("id"), "symbol": item.get("symbol"), "name": item.get("name"), "rank": item.get("market_cap_rank"), "price_btc": item.get("price_btc"), "score": item.get("score", 0) }) result = { "success": True, "source": "coingecko", "trending": trending, "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "trending") logger.info(f"✅ Trending coins: {len(trending)} items") return result except Exception as e: logger.error(f"❌ Trending coins failed: {e}") return { "success": False, "error": str(e), "timestamp": datetime.utcnow().isoformat() } # ========================================================================= # 6. Blockchain Data - داده‌های بلاکچین # ========================================================================= async def get_blockchain_data( self, chain: str, data_type: str = "transactions", address: Optional[str] = None, limit: int = 20 ) -> Dict[str, Any]: """ دریافت داده‌های بلاکچین Chains: ethereum, bsc, tron Types: transactions, balance, gas """ cache_key = self._get_cache_key("blockchain", { "chain": chain, "type": data_type, "address": address }) cached = self._get_cached(cache_key, "blockchain") if cached: return cached try: if chain.lower() == "ethereum": await self.rate_limiter.wait_if_needed("etherscan") async with httpx.AsyncClient(timeout=self.timeout) as client: params = {"apikey": self.config.ETHERSCAN_API_KEY} if data_type == "gas": params.update({"module": "gastracker", "action": "gasoracle"}) elif data_type == "balance" and address: params.update({ "module": "account", "action": "balance", "address": address }) elif data_type == "transactions" and address: params.update({ "module": "account", "action": "txlist", "address": address, "startblock": 0, "endblock": 99999999, "page": 1, "offset": limit, "sort": "desc" }) response = await client.get( self.config.ETHERSCAN_BASE_URL, params=params ) response.raise_for_status() data = response.json() result = { "success": True, "source": "etherscan", "chain": "ethereum", "type": data_type, "data": data.get("result", {}), "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "blockchain") logger.info(f"✅ Ethereum {data_type} data fetched") return result elif chain.lower() == "bsc": await self.rate_limiter.wait_if_needed("bscscan") async with httpx.AsyncClient(timeout=self.timeout) as client: params = {"apikey": self.config.BSCSCAN_API_KEY} if data_type == "balance" and address: params.update({ "module": "account", "action": "balance", "address": address }) elif data_type == "transactions" and address: params.update({ "module": "account", "action": "txlist", "address": address, "startblock": 0, "endblock": 99999999, "page": 1, "offset": limit, "sort": "desc" }) response = await client.get( self.config.BSCSCAN_BASE_URL, params=params ) response.raise_for_status() data = response.json() result = { "success": True, "source": "bscscan", "chain": "bsc", "type": data_type, "data": data.get("result", {}), "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "blockchain") logger.info(f"✅ BSC {data_type} data fetched") return result elif chain.lower() == "tron": await self.rate_limiter.wait_if_needed("tronscan") async with httpx.AsyncClient(timeout=self.timeout) as client: headers = {"TRON-PRO-API-KEY": self.config.TRONSCAN_API_KEY} if data_type == "transactions": endpoint = "/transaction" params = {"sort": "-timestamp", "limit": limit} if address: params["address"] = address elif data_type == "balance" and address: endpoint = f"/account/{address}" params = {} else: endpoint = "/transaction" params = {"sort": "-timestamp", "limit": limit} response = await client.get( f"{self.config.TRONSCAN_BASE_URL}{endpoint}", headers=headers, params=params ) response.raise_for_status() data = response.json() result = { "success": True, "source": "tronscan", "chain": "tron", "type": data_type, "data": data.get("data", data), "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "blockchain") logger.info(f"✅ Tron {data_type} data fetched") return result else: return { "success": False, "error": f"Unsupported chain: {chain}", "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"❌ Blockchain data failed: {e}") return { "success": False, "error": str(e), "timestamp": datetime.utcnow().isoformat() } # ========================================================================= # 7. Whale Activity - فعالیت نهنگ‌ها # ========================================================================= async def get_whale_activity( self, chain: str = "all", min_value_usd: float = 1000000, limit: int = 50 ) -> Dict[str, Any]: """ دریافت فعالیت نهنگ‌ها تراکنش‌های بزرگ در بلاکچین‌های مختلف """ # برای ساده‌سازی، از HuggingFace استفاده می‌کنیم try: async with httpx.AsyncClient(timeout=self.timeout) as client: _token = self.config.HF_API_TOKEN or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") headers = {} if _token: headers["Authorization"] = f"Bearer {_token}" response = await client.get( f"{self.config.HF_SPACE_BASE_URL}/api/crypto/whales/transactions", headers=headers, params={ "limit": limit, "chain": chain if chain != "all" else None, "min_amount_usd": min_value_usd } ) response.raise_for_status() data = response.json() logger.info(f"✅ Whale activity fetched") return { "success": True, "source": "huggingface", "data": data, "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"❌ Whale activity failed: {e}") # Fallback: Get large transactions from blockchain explorers return { "success": False, "error": str(e), "timestamp": datetime.utcnow().isoformat() } # ========================================================================= # 8. Social Media Data - داده‌های شبکه‌های اجتماعی # ========================================================================= async def get_social_media_data( self, platform: str = "reddit", query: str = "cryptocurrency", limit: int = 20 ) -> Dict[str, Any]: """ دریافت داده‌های شبکه‌های اجتماعی Platforms: reddit, twitter (future) """ cache_key = self._get_cache_key("social_media", { "platform": platform, "query": query }) cached = self._get_cached(cache_key, "social_media") if cached: return cached if platform == "reddit": try: async with httpx.AsyncClient(timeout=self.timeout) as client: # Search in multiple crypto subreddits subreddits = ["CryptoCurrency", "Bitcoin", "ethereum", "defi"] all_posts = [] for subreddit in subreddits: try: response = await client.get( f"{self.config.REDDIT_BASE_URL}/{subreddit}/hot.json", params={"limit": limit // len(subreddits)}, headers={"User-Agent": "CryptoDataHub/1.0"} ) response.raise_for_status() data = response.json() for post in data["data"]["children"]: post_data = post["data"] all_posts.append({ "id": post_data["id"], "title": post_data["title"], "text": post_data.get("selftext", "")[:500], "url": f"https://reddit.com{post_data['permalink']}", "subreddit": subreddit, "score": post_data["score"], "comments": post_data["num_comments"], "created_at": datetime.fromtimestamp(post_data["created_utc"]).isoformat(), "author": post_data.get("author", "deleted") }) except Exception as e: logger.warning(f"Failed to fetch from r/{subreddit}: {e}") # Sort by score all_posts.sort(key=lambda x: x["score"], reverse=True) result = { "success": True, "platform": "reddit", "posts": all_posts[:limit], "total": len(all_posts), "timestamp": datetime.utcnow().isoformat() } self._set_cache(cache_key, result, "social_media") logger.info(f"✅ Reddit data: {len(all_posts)} posts") return result except Exception as e: logger.error(f"❌ Reddit data failed: {e}") return { "success": False, "error": str(e), "timestamp": datetime.utcnow().isoformat() } return { "success": False, "error": f"Unsupported platform: {platform}", "timestamp": datetime.utcnow().isoformat() } # ========================================================================= # 9. AI Model Predictions - پیش‌بینی‌های مدل‌های AI # ========================================================================= async def get_ai_prediction( self, symbol: str, model_type: str = "price", timeframe: str = "24h" ) -> Dict[str, Any]: """ دریافت پیش‌بینی از مدل‌های AI Types: price, trend, signal """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: _token = self.config.HF_API_TOKEN or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") headers = {} if _token: headers["Authorization"] = f"Bearer {_token}" # Get recent price data for context price_data = await self.get_market_prices(symbols=[symbol], limit=1) current_price = 0 if price_data.get("success") and price_data.get("data"): current_price = price_data["data"][0].get("price", 0) response = await client.post( f"{self.config.HF_SPACE_BASE_URL}/api/models/predict", headers=headers, json={ "symbol": symbol, "type": model_type, "timeframe": timeframe, "current_price": current_price } ) response.raise_for_status() data = response.json() logger.info(f"✅ AI prediction for {symbol}") return { "success": True, "source": "huggingface", "symbol": symbol, "prediction": data, "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"❌ AI prediction failed: {e}") # Fallback: Simple trend analysis return { "success": False, "error": str(e), "timestamp": datetime.utcnow().isoformat() } # ========================================================================= # 10. System Health - سلامت سیستم # ========================================================================= async def check_all_sources_health(self) -> Dict[str, Any]: """ بررسی سلامت تمام منابع داده """ health_status = {} # Check CoinMarketCap try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get( f"{self.config.COINMARKETCAP_BASE_URL}/key/info", headers={"X-CMC_PRO_API_KEY": self.config.COINMARKETCAP_API_KEY} ) health_status["coinmarketcap"] = "operational" if response.status_code == 200 else "degraded" except: health_status["coinmarketcap"] = "down" # Check NewsAPI try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get( f"{self.config.NEWSAPI_BASE_URL}/top-headlines", params={"apiKey": self.config.NEWSAPI_API_KEY, "pageSize": 1, "q": "test"} ) health_status["newsapi"] = "operational" if response.status_code == 200 else "degraded" except: health_status["newsapi"] = "down" # Check Etherscan try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get( self.config.ETHERSCAN_BASE_URL, params={ "module": "stats", "action": "ethsupply", "apikey": self.config.ETHERSCAN_API_KEY } ) health_status["etherscan"] = "operational" if response.status_code == 200 else "degraded" except: health_status["etherscan"] = "down" # Check HuggingFace try: async with httpx.AsyncClient(timeout=5.0) as client: _token = self.config.HF_API_TOKEN or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") headers = {} if _token: headers["Authorization"] = f"Bearer {_token}" response = await client.get( f"{self.config.HF_SPACE_BASE_URL}/api/health", headers=headers ) health_status["huggingface"] = "operational" if response.status_code == 200 else "degraded" except: health_status["huggingface"] = "down" # Check free APIs (no auth needed) health_status["coingecko"] = "operational" # Usually very stable health_status["binance"] = "operational" # Usually very stable health_status["alternative_me"] = "operational" health_status["reddit"] = "operational" return { "success": True, "status": health_status, "operational_count": sum(1 for v in health_status.values() if v == "operational"), "total_sources": len(health_status), "timestamp": datetime.utcnow().isoformat() } # Global singleton instance _data_hub_instance = None def get_data_hub() -> DataHubComplete: """Get singleton instance of Data Hub Complete""" global _data_hub_instance if _data_hub_instance is None: _data_hub_instance = DataHubComplete() return _data_hub_instance