""" Data Collection Worker - Background Service Runs collectors on schedule and stores data to database """ import asyncio import os import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Any, Optional # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from database.db_manager import DatabaseManager from database.models import MarketPrice, NewsArticle, SentimentMetric, WhaleTransaction from collectors.market_data import get_coingecko_simple_price from utils.logger import setup_logger logger = setup_logger("data_collection_worker") class DataCollectionWorker: """ Background worker service that collects data from free APIs and stores it in the database """ def __init__(self, db_path: str = "data/api_monitor.db"): """ Initialize the data collection worker Args: db_path: Path to SQLite database """ self.db_manager = DatabaseManager(db_path=db_path) self.db_manager.init_database() self.running = False self.collection_stats = { "total_runs": 0, "successful_runs": 0, "failed_runs": 0, "last_run": None, "errors": [] } logger.info("Data Collection Worker initialized") async def collect_and_store_coingecko_prices(self) -> bool: """ Collect prices from CoinGecko and store in database Returns: True if successful, False otherwise """ try: logger.info("šŸ”„ Collecting prices from CoinGecko...") # Fetch data from CoinGecko result = await get_coingecko_simple_price() if not result["success"]: logger.error(f"āŒ CoinGecko fetch failed: {result.get('error')}") return False # Extract price data data = result["data"] if not data: logger.warning("āš ļø No data returned from CoinGecko") return False # Store in database stored_count = 0 with self.db_manager.get_session() as session: for coin_id, coin_data in data.items(): if not isinstance(coin_data, dict): continue # Map CoinGecko IDs to symbols symbol_map = { "bitcoin": "BTC", "ethereum": "ETH", "binancecoin": "BNB" } symbol = symbol_map.get(coin_id, coin_id.upper()) # Create MarketPrice record price_record = MarketPrice( symbol=symbol, price_usd=coin_data.get("usd", 0), market_cap=coin_data.get("usd_market_cap"), volume_24h=coin_data.get("usd_24h_vol"), price_change_24h=coin_data.get("usd_24h_change"), timestamp=datetime.now(timezone.utc), source="CoinGecko" ) session.add(price_record) stored_count += 1 session.commit() logger.info(f"āœ… Stored {stored_count} price records from CoinGecko") return True except Exception as e: logger.error(f"āŒ Error collecting CoinGecko prices: {str(e)}", exc_info=True) self.collection_stats["errors"].append({ "timestamp": datetime.now(timezone.utc).isoformat(), "collector": "coingecko_prices", "error": str(e) }) return False async def run_collection_cycle(self): """ Run one complete collection cycle for all enabled collectors """ cycle_start = datetime.now(timezone.utc) logger.info(f"šŸš€ Starting collection cycle at {cycle_start.isoformat()}") self.collection_stats["total_runs"] += 1 successful_collectors = 0 failed_collectors = 0 # Run collectors collectors = [ ("CoinGecko Prices", self.collect_and_store_coingecko_prices), # Add more collectors here as we implement them ] for collector_name, collector_func in collectors: try: logger.info(f"šŸ“Š Running {collector_name}...") success = await collector_func() if success: successful_collectors += 1 logger.info(f"āœ… {collector_name} completed successfully") else: failed_collectors += 1 logger.warning(f"āš ļø {collector_name} completed with errors") except Exception as e: failed_collectors += 1 logger.error(f"āŒ {collector_name} crashed: {str(e)}", exc_info=True) # Update stats if failed_collectors == 0: self.collection_stats["successful_runs"] += 1 else: self.collection_stats["failed_runs"] += 1 self.collection_stats["last_run"] = datetime.now(timezone.utc).isoformat() cycle_duration = (datetime.now(timezone.utc) - cycle_start).total_seconds() logger.info( f"šŸ“ˆ Collection cycle completed in {cycle_duration:.2f}s | " f"Success: {successful_collectors} | Failed: {failed_collectors}" ) async def start_continuous_collection(self, interval_seconds: int = 60): """ Start continuous data collection on a schedule Args: interval_seconds: Seconds between collection cycles (default: 60) """ self.running = True logger.info(f"šŸš€ Starting continuous collection (interval: {interval_seconds}s)") while self.running: try: # Run collection cycle await self.run_collection_cycle() # Wait for next cycle if self.running: logger.info(f"ā³ Waiting {interval_seconds}s until next cycle...") await asyncio.sleep(interval_seconds) except asyncio.CancelledError: logger.info("šŸ›‘ Collection cancelled") break except Exception as e: logger.error(f"āŒ Unexpected error in collection loop: {str(e)}", exc_info=True) # Wait a bit before retrying to avoid rapid failure loops await asyncio.sleep(10) def stop(self): """Stop the collection worker""" logger.info("šŸ›‘ Stopping data collection worker...") self.running = False def get_stats(self) -> Dict[str, Any]: """ Get collection statistics Returns: Dictionary with statistics """ return { **self.collection_stats, "running": self.running, "database_stats": self.db_manager.get_database_stats() } async def main(): """ Main entry point for the worker """ logger.info("=" * 80) logger.info("šŸš€ CRYPTO DATA COLLECTION WORKER") logger.info("=" * 80) # Create worker worker = DataCollectionWorker() # Get collection interval from environment (default: 60 seconds) interval = int(os.getenv("COLLECTION_INTERVAL_SECONDS", "60")) try: # Start continuous collection await worker.start_continuous_collection(interval_seconds=interval) except KeyboardInterrupt: logger.info("\nšŸ›‘ Received shutdown signal") worker.stop() except Exception as e: logger.error(f"āŒ Worker crashed: {str(e)}", exc_info=True) worker.stop() # Print final stats stats = worker.get_stats() logger.info("\nšŸ“Š Final Statistics:") logger.info(f" Total Runs: {stats['total_runs']}") logger.info(f" Successful: {stats['successful_runs']}") logger.info(f" Failed: {stats['failed_runs']}") logger.info(f" Last Run: {stats['last_run']}") logger.info(f" Database Records: {stats['database_stats']}") logger.info("=" * 80) logger.info("šŸ‘‹ Worker stopped") if __name__ == "__main__": # Run the worker asyncio.run(main())