""" ═══════════════════════════════════════════════════════════════════ DATA HUB SERVICE Central service for accessing ALL collected crypto data ═══════════════════════════════════════════════════════════════════ This service provides a unified interface to access data from: - Market prices (from CoinGecko, CoinCap, Binance, etc.) - OHLCV candlestick data (for charts) - News articles (from CryptoPanic, NewsAPI, RSS feeds) - Sentiment indicators (Fear & Greed, social metrics) - Whale transactions (large on-chain movements) - On-chain metrics (network stats, DeFi data) - Provider health status All data is served from the local SQLite database (crypto_hub.db) which is continuously updated by the collector system. @version 1.0.0 @author Crypto Intelligence Hub """ from typing import Dict, Any, List, Optional from datetime import datetime, timedelta from sqlalchemy import create_engine, desc, func from sqlalchemy.orm import sessionmaker, Session from contextlib import contextmanager from database.models_hub import ( MarketPrice, OHLCVData, NewsArticle, SentimentData, WhaleTransaction, OnChainMetric, ProviderHealth, DataCollectionLog ) from utils.logger import setup_logger class DataHubService: """ Central service for accessing all collected crypto data This service provides high-level methods to query the database and return formatted data for API endpoints. """ def __init__(self, db_path: str = "data/crypto_hub.db", log_level: str = "INFO"): """ Initialize the Data Hub Service Args: db_path: Path to the SQLite database log_level: Logging level """ self.db_path = db_path self.logger = setup_logger("DataHubService", level=log_level) # Create database engine and session factory db_url = f"sqlite:///{self.db_path}" self.engine = create_engine( db_url, echo=False, connect_args={"check_same_thread": False} ) self.SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=self.engine ) self.logger.info("Data Hub Service initialized") @contextmanager def get_session(self) -> Session: """ Context manager for database sessions Yields: SQLAlchemy session """ session = self.SessionLocal() try: yield session except Exception as e: self.logger.error(f"Session error: {str(e)}") raise finally: session.close() # ═══════════════════════════════════════════════════════════════ # MARKET DATA METHODS # ═══════════════════════════════════════════════════════════════ def get_latest_price(self, symbol: str) -> Optional[Dict[str, Any]]: """ Get latest price for a specific symbol Args: symbol: Cryptocurrency symbol (e.g., 'BTC', 'ETH') Returns: Dictionary with price data or None if not found """ try: with self.get_session() as session: price = session.query(MarketPrice)\ .filter(MarketPrice.symbol == symbol.upper())\ .order_by(desc(MarketPrice.collected_at))\ .first() if not price: return None return self._format_market_price(price) except Exception as e: self.logger.error(f"Error getting price for {symbol}: {str(e)}") return None def get_top_coins(self, limit: int = 100) -> List[Dict[str, Any]]: """ Get top cryptocurrencies by market cap Args: limit: Maximum number of coins to return Returns: List of coin data dictionaries """ try: with self.get_session() as session: # Get latest prices for each symbol, ordered by market cap subquery = session.query( MarketPrice.symbol, func.max(MarketPrice.collected_at).label('latest') ).group_by(MarketPrice.symbol).subquery() prices = session.query(MarketPrice)\ .join(subquery, (MarketPrice.symbol == subquery.c.symbol) & (MarketPrice.collected_at == subquery.c.latest))\ .filter(MarketPrice.market_cap.isnot(None))\ .order_by(desc(MarketPrice.market_cap))\ .limit(limit)\ .all() return [self._format_market_price(p) for p in prices] except Exception as e: self.logger.error(f"Error getting top coins: {str(e)}") return [] def get_prices_bulk(self, symbols: List[str]) -> Dict[str, Dict[str, Any]]: """ Get latest prices for multiple symbols Args: symbols: List of cryptocurrency symbols Returns: Dictionary mapping symbols to price data """ try: with self.get_session() as session: symbols_upper = [s.upper() for s in symbols] # Get latest price for each symbol subquery = session.query( MarketPrice.symbol, func.max(MarketPrice.collected_at).label('latest') ).filter(MarketPrice.symbol.in_(symbols_upper))\ .group_by(MarketPrice.symbol).subquery() prices = session.query(MarketPrice)\ .join(subquery, (MarketPrice.symbol == subquery.c.symbol) & (MarketPrice.collected_at == subquery.c.latest))\ .all() return { p.symbol: self._format_market_price(p) for p in prices } except Exception as e: self.logger.error(f"Error getting bulk prices: {str(e)}") return {} def _format_market_price(self, price: MarketPrice) -> Dict[str, Any]: """Format MarketPrice model to dictionary""" import json return { 'symbol': price.symbol, 'price_usd': price.price_usd, 'change_1h': price.change_1h, 'change_24h': price.change_24h, 'change_7d': price.change_7d, 'volume_24h': price.volume_24h, 'market_cap': price.market_cap, 'circulating_supply': price.circulating_supply, 'total_supply': price.total_supply, 'sources_count': price.sources_count, 'sources': json.loads(price.sources_list) if price.sources_list else [], 'collected_at': price.collected_at.isoformat() } # ═══════════════════════════════════════════════════════════════ # OHLCV DATA METHODS (for charts) # ═══════════════════════════════════════════════════════════════ def get_ohlcv( self, symbol: str, timeframe: str = '1h', limit: int = 100, source: Optional[str] = None ) -> List[Dict[str, Any]]: """ Get OHLCV candlestick data for charting Args: symbol: Cryptocurrency symbol timeframe: Timeframe (1m, 5m, 15m, 1h, 4h, 1d, 1w) limit: Number of candles to return source: Specific source (optional) Returns: List of OHLCV candles """ try: with self.get_session() as session: query = session.query(OHLCVData)\ .filter(OHLCVData.symbol == symbol.upper())\ .filter(OHLCVData.timeframe == timeframe) if source: query = query.filter(OHLCVData.source == source) candles = query\ .order_by(desc(OHLCVData.timestamp))\ .limit(limit)\ .all() # Return in chronological order (oldest first) return [self._format_ohlcv(c) for c in reversed(candles)] except Exception as e: self.logger.error(f"Error getting OHLCV data: {str(e)}") return [] def _format_ohlcv(self, candle: OHLCVData) -> Dict[str, Any]: """Format OHLCV model to dictionary""" return { 'timestamp': candle.timestamp.isoformat(), 'time': int(candle.timestamp.timestamp()), # Unix timestamp for charts 'open': candle.open, 'high': candle.high, 'low': candle.low, 'close': candle.close, 'volume': candle.volume } # ═══════════════════════════════════════════════════════════════ # SENTIMENT DATA METHODS # ═══════════════════════════════════════════════════════════════ def get_fear_greed(self) -> Optional[Dict[str, Any]]: """ Get latest Fear & Greed Index Returns: Dictionary with Fear & Greed data or None """ try: with self.get_session() as session: sentiment = session.query(SentimentData)\ .filter(SentimentData.indicator == 'fear_greed')\ .order_by(desc(SentimentData.collected_at))\ .first() if not sentiment: return None return { 'value': sentiment.value, 'classification': sentiment.classification, 'source': sentiment.source, 'collected_at': sentiment.collected_at.isoformat() } except Exception as e: self.logger.error(f"Error getting Fear & Greed: {str(e)}") return None def get_sentiment_history( self, indicator: str = 'fear_greed', days: int = 30 ) -> List[Dict[str, Any]]: """ Get sentiment history for charting Args: indicator: Sentiment indicator name days: Number of days of history Returns: List of sentiment data points """ try: with self.get_session() as session: cutoff = datetime.utcnow() - timedelta(days=days) sentiments = session.query(SentimentData)\ .filter(SentimentData.indicator == indicator)\ .filter(SentimentData.collected_at >= cutoff)\ .order_by(SentimentData.collected_at)\ .all() return [ { 'timestamp': s.collected_at.isoformat(), 'value': s.value, 'classification': s.classification } for s in sentiments ] except Exception as e: self.logger.error(f"Error getting sentiment history: {str(e)}") return [] # ═══════════════════════════════════════════════════════════════ # NEWS METHODS # ═══════════════════════════════════════════════════════════════ def get_news( self, limit: int = 50, source: Optional[str] = None, symbol: Optional[str] = None ) -> List[Dict[str, Any]]: """ Get latest news articles Args: limit: Maximum number of articles source: Filter by source (optional) symbol: Filter by related symbol (optional) Returns: List of news articles """ try: with self.get_session() as session: query = session.query(NewsArticle) if source: query = query.filter(NewsArticle.source == source) if symbol: # Search for symbol in related_symbols JSON query = query.filter( NewsArticle.related_symbols.like(f'%{symbol.upper()}%') ) articles = query\ .order_by(desc(NewsArticle.published_at))\ .limit(limit)\ .all() return [self._format_news_article(a) for a in articles] except Exception as e: self.logger.error(f"Error getting news: {str(e)}") return [] def _format_news_article(self, article: NewsArticle) -> Dict[str, Any]: """Format NewsArticle model to dictionary""" import json return { 'title': article.title, 'url': article.url, 'content': article.content, 'summary': article.summary, 'source': article.source, 'author': article.author, 'published_at': article.published_at.isoformat(), 'sentiment': article.sentiment, 'sentiment_score': article.sentiment_score, 'related_symbols': json.loads(article.related_symbols) if article.related_symbols else [], 'collected_at': article.collected_at.isoformat() } # ═══════════════════════════════════════════════════════════════ # WHALE TRANSACTION METHODS # ═══════════════════════════════════════════════════════════════ def get_whale_alerts( self, min_usd: float = 1000000, limit: int = 50, blockchain: Optional[str] = None ) -> List[Dict[str, Any]]: """ Get recent large transactions Args: min_usd: Minimum USD value limit: Maximum number of transactions blockchain: Filter by blockchain (optional) Returns: List of whale transactions """ try: with self.get_session() as session: query = session.query(WhaleTransaction)\ .filter(WhaleTransaction.usd_value >= min_usd) if blockchain: query = query.filter(WhaleTransaction.blockchain == blockchain) transactions = query\ .order_by(desc(WhaleTransaction.tx_time))\ .limit(limit)\ .all() return [self._format_whale_transaction(t) for t in transactions] except Exception as e: self.logger.error(f"Error getting whale alerts: {str(e)}") return [] def _format_whale_transaction(self, tx: WhaleTransaction) -> Dict[str, Any]: """Format WhaleTransaction model to dictionary""" return { 'tx_hash': tx.tx_hash, 'blockchain': tx.blockchain, 'from_address': tx.from_address, 'to_address': tx.to_address, 'amount': tx.amount, 'symbol': tx.symbol, 'usd_value': tx.usd_value, 'tx_time': tx.tx_time.isoformat(), 'block_number': tx.block_number, 'source': tx.source, 'collected_at': tx.collected_at.isoformat() } # ═══════════════════════════════════════════════════════════════ # PROVIDER HEALTH & STATISTICS # ═══════════════════════════════════════════════════════════════ def get_provider_status(self) -> Dict[str, Any]: """ Get health status of all data providers Returns: Dictionary with provider health information """ try: with self.get_session() as session: # Get latest health check for each provider subquery = session.query( ProviderHealth.provider_id, func.max(ProviderHealth.checked_at).label('latest') ).group_by(ProviderHealth.provider_id).subquery() providers = session.query(ProviderHealth)\ .join(subquery, (ProviderHealth.provider_id == subquery.c.provider_id) & (ProviderHealth.checked_at == subquery.c.latest))\ .all() return { 'total_providers': len(providers), 'healthy': sum(1 for p in providers if p.status == 'healthy'), 'degraded': sum(1 for p in providers if p.status == 'degraded'), 'down': sum(1 for p in providers if p.status == 'down'), 'providers': [ { 'id': p.provider_id, 'name': p.provider_name, 'category': p.category, 'status': p.status, 'response_time_ms': p.response_time_ms, 'last_success': p.last_success.isoformat() if p.last_success else None, 'last_failure': p.last_failure.isoformat() if p.last_failure else None, 'error_count': p.error_count, 'error_message': p.error_message, 'checked_at': p.checked_at.isoformat() } for p in providers ] } except Exception as e: self.logger.error(f"Error getting provider status: {str(e)}") return {'total_providers': 0, 'healthy': 0, 'degraded': 0, 'down': 0, 'providers': []} def get_stats(self) -> Dict[str, Any]: """ Get Data Hub statistics Returns: Dictionary with database statistics """ try: with self.get_session() as session: stats = { 'market_prices': session.query(MarketPrice).count(), 'ohlcv_candles': session.query(OHLCVData).count(), 'news_articles': session.query(NewsArticle).count(), 'sentiment_records': session.query(SentimentData).count(), 'whale_transactions': session.query(WhaleTransaction).count(), 'onchain_metrics': session.query(OnChainMetric).count(), 'collection_logs': session.query(DataCollectionLog).count() } # Get latest collection time latest_log = session.query(DataCollectionLog)\ .order_by(desc(DataCollectionLog.collected_at))\ .first() stats['last_collection'] = latest_log.collected_at.isoformat() if latest_log else None # Get total records stats['total_records'] = sum([ stats['market_prices'], stats['ohlcv_candles'], stats['news_articles'], stats['sentiment_records'], stats['whale_transactions'], stats['onchain_metrics'] ]) return stats except Exception as e: self.logger.error(f"Error getting stats: {str(e)}") return {} # Export service class __all__ = ['DataHubService']