|
|
""" |
|
|
Comprehensive Scheduler for All Data Sources |
|
|
Schedules and runs data collection from all available sources with configurable intervals |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
from datetime import datetime, timezone, timedelta |
|
|
from typing import Dict, List, Optional, Any |
|
|
from pathlib import Path |
|
|
from utils.logger import setup_logger |
|
|
from collectors.master_collector import DataSourceCollector |
|
|
|
|
|
logger = setup_logger("comprehensive_scheduler") |
|
|
|
|
|
|
|
|
class ComprehensiveScheduler: |
|
|
""" |
|
|
Comprehensive scheduler that manages data collection from all sources |
|
|
""" |
|
|
|
|
|
def __init__(self, config_file: Optional[str] = None): |
|
|
""" |
|
|
Initialize the comprehensive scheduler |
|
|
|
|
|
Args: |
|
|
config_file: Path to scheduler configuration file |
|
|
""" |
|
|
self.collector = DataSourceCollector() |
|
|
self.config_file = config_file or "scheduler_config.json" |
|
|
self.config = self._load_config() |
|
|
self.last_run_times: Dict[str, datetime] = {} |
|
|
self.running = False |
|
|
logger.info("Comprehensive Scheduler initialized") |
|
|
|
|
|
def _load_config(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Load scheduler configuration |
|
|
|
|
|
Returns: |
|
|
Configuration dict |
|
|
""" |
|
|
default_config = { |
|
|
"schedules": { |
|
|
"market_data": { |
|
|
"interval_seconds": 60, |
|
|
"enabled": True |
|
|
}, |
|
|
"blockchain": { |
|
|
"interval_seconds": 300, |
|
|
"enabled": True |
|
|
}, |
|
|
"news": { |
|
|
"interval_seconds": 600, |
|
|
"enabled": True |
|
|
}, |
|
|
"sentiment": { |
|
|
"interval_seconds": 1800, |
|
|
"enabled": True |
|
|
}, |
|
|
"whale_tracking": { |
|
|
"interval_seconds": 300, |
|
|
"enabled": True |
|
|
}, |
|
|
"full_collection": { |
|
|
"interval_seconds": 3600, |
|
|
"enabled": True |
|
|
} |
|
|
}, |
|
|
"max_retries": 3, |
|
|
"retry_delay_seconds": 5, |
|
|
"persist_results": True, |
|
|
"results_directory": "data/collections" |
|
|
} |
|
|
|
|
|
config_path = Path(self.config_file) |
|
|
if config_path.exists(): |
|
|
try: |
|
|
with open(config_path, 'r') as f: |
|
|
loaded_config = json.load(f) |
|
|
|
|
|
default_config.update(loaded_config) |
|
|
logger.info(f"Loaded scheduler config from {config_path}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error loading config file: {e}, using defaults") |
|
|
|
|
|
return default_config |
|
|
|
|
|
def save_config(self): |
|
|
"""Save current configuration to file""" |
|
|
try: |
|
|
config_path = Path(self.config_file) |
|
|
config_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with open(config_path, 'w') as f: |
|
|
json.dump(self.config, f, indent=2) |
|
|
|
|
|
logger.info(f"Saved scheduler config to {config_path}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error saving config: {e}") |
|
|
|
|
|
async def _save_results(self, category: str, results: Any): |
|
|
""" |
|
|
Save collection results to file |
|
|
|
|
|
Args: |
|
|
category: Category name |
|
|
results: Results to save |
|
|
""" |
|
|
if not self.config.get("persist_results", True): |
|
|
return |
|
|
|
|
|
try: |
|
|
results_dir = Path(self.config.get("results_directory", "data/collections")) |
|
|
results_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") |
|
|
filename = results_dir / f"{category}_{timestamp}.json" |
|
|
|
|
|
with open(filename, 'w') as f: |
|
|
json.dump(results, f, indent=2, default=str) |
|
|
|
|
|
logger.info(f"Saved {category} results to {filename}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error saving results: {e}") |
|
|
|
|
|
def should_run(self, category: str) -> bool: |
|
|
""" |
|
|
Check if a category should run based on its schedule |
|
|
|
|
|
Args: |
|
|
category: Category name |
|
|
|
|
|
Returns: |
|
|
True if should run, False otherwise |
|
|
""" |
|
|
schedule = self.config.get("schedules", {}).get(category, {}) |
|
|
|
|
|
if not schedule.get("enabled", True): |
|
|
return False |
|
|
|
|
|
interval = schedule.get("interval_seconds", 3600) |
|
|
last_run = self.last_run_times.get(category) |
|
|
|
|
|
if not last_run: |
|
|
return True |
|
|
|
|
|
elapsed = (datetime.now(timezone.utc) - last_run).total_seconds() |
|
|
return elapsed >= interval |
|
|
|
|
|
async def run_category_with_retry(self, category: str) -> Optional[Any]: |
|
|
""" |
|
|
Run a category collection with retry logic |
|
|
|
|
|
Args: |
|
|
category: Category name |
|
|
|
|
|
Returns: |
|
|
Collection results or None if failed |
|
|
""" |
|
|
max_retries = self.config.get("max_retries", 3) |
|
|
retry_delay = self.config.get("retry_delay_seconds", 5) |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
logger.info(f"Running {category} collection (attempt {attempt + 1}/{max_retries})") |
|
|
|
|
|
if category == "full_collection": |
|
|
results = await self.collector.collect_all_data() |
|
|
else: |
|
|
results = await self.collector.collect_category(category) |
|
|
|
|
|
self.last_run_times[category] = datetime.now(timezone.utc) |
|
|
|
|
|
|
|
|
await self._save_results(category, results) |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in {category} collection (attempt {attempt + 1}): {e}") |
|
|
|
|
|
if attempt < max_retries - 1: |
|
|
logger.info(f"Retrying in {retry_delay} seconds...") |
|
|
await asyncio.sleep(retry_delay) |
|
|
else: |
|
|
logger.error(f"Failed {category} collection after {max_retries} attempts") |
|
|
return None |
|
|
|
|
|
async def run_cycle(self): |
|
|
"""Run one scheduler cycle - check and run due categories""" |
|
|
logger.info("Running scheduler cycle...") |
|
|
|
|
|
categories = self.config.get("schedules", {}).keys() |
|
|
tasks = [] |
|
|
|
|
|
for category in categories: |
|
|
if self.should_run(category): |
|
|
logger.info(f"Scheduling {category} collection") |
|
|
task = self.run_category_with_retry(category) |
|
|
tasks.append((category, task)) |
|
|
|
|
|
if tasks: |
|
|
|
|
|
results = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True) |
|
|
|
|
|
for (category, _), result in zip(tasks, results): |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"{category} collection failed: {str(result)}") |
|
|
else: |
|
|
if result: |
|
|
stats = result.get("statistics", {}) if isinstance(result, dict) else None |
|
|
if stats: |
|
|
logger.info( |
|
|
f"{category} collection complete: " |
|
|
f"{stats.get('successful_sources', 'N/A')}/{stats.get('total_sources', 'N/A')} successful" |
|
|
) |
|
|
else: |
|
|
logger.info("No collections due in this cycle") |
|
|
|
|
|
async def run_forever(self, cycle_interval: int = 30): |
|
|
""" |
|
|
Run the scheduler forever with specified cycle interval |
|
|
|
|
|
Args: |
|
|
cycle_interval: Seconds between scheduler cycles |
|
|
""" |
|
|
self.running = True |
|
|
logger.info(f"Starting comprehensive scheduler (cycle interval: {cycle_interval}s)") |
|
|
|
|
|
try: |
|
|
while self.running: |
|
|
await self.run_cycle() |
|
|
|
|
|
|
|
|
logger.info(f"Waiting {cycle_interval} seconds until next cycle...") |
|
|
await asyncio.sleep(cycle_interval) |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
logger.info("Scheduler interrupted by user") |
|
|
except Exception as e: |
|
|
logger.error(f"Scheduler error: {e}") |
|
|
finally: |
|
|
self.running = False |
|
|
logger.info("Scheduler stopped") |
|
|
|
|
|
def stop(self): |
|
|
"""Stop the scheduler""" |
|
|
logger.info("Stopping scheduler...") |
|
|
self.running = False |
|
|
|
|
|
async def run_once(self, category: Optional[str] = None): |
|
|
""" |
|
|
Run a single collection immediately |
|
|
|
|
|
Args: |
|
|
category: Category to run, or None for full collection |
|
|
""" |
|
|
if category: |
|
|
logger.info(f"Running single {category} collection...") |
|
|
results = await self.run_category_with_retry(category) |
|
|
else: |
|
|
logger.info("Running single full collection...") |
|
|
results = await self.run_category_with_retry("full_collection") |
|
|
|
|
|
return results |
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Get scheduler status |
|
|
|
|
|
Returns: |
|
|
Dict with scheduler status information |
|
|
""" |
|
|
now = datetime.now(timezone.utc) |
|
|
status = { |
|
|
"running": self.running, |
|
|
"current_time": now.isoformat(), |
|
|
"schedules": {} |
|
|
} |
|
|
|
|
|
for category, schedule in self.config.get("schedules", {}).items(): |
|
|
last_run = self.last_run_times.get(category) |
|
|
interval = schedule.get("interval_seconds", 0) |
|
|
|
|
|
next_run = None |
|
|
if last_run: |
|
|
next_run = last_run + timedelta(seconds=interval) |
|
|
|
|
|
time_until_next = None |
|
|
if next_run: |
|
|
time_until_next = (next_run - now).total_seconds() |
|
|
|
|
|
status["schedules"][category] = { |
|
|
"enabled": schedule.get("enabled", True), |
|
|
"interval_seconds": interval, |
|
|
"last_run": last_run.isoformat() if last_run else None, |
|
|
"next_run": next_run.isoformat() if next_run else None, |
|
|
"seconds_until_next": round(time_until_next, 2) if time_until_next else None, |
|
|
"should_run_now": self.should_run(category) |
|
|
} |
|
|
|
|
|
return status |
|
|
|
|
|
def update_schedule(self, category: str, interval_seconds: Optional[int] = None, enabled: Optional[bool] = None): |
|
|
""" |
|
|
Update schedule for a category |
|
|
|
|
|
Args: |
|
|
category: Category name |
|
|
interval_seconds: New interval in seconds |
|
|
enabled: Enable/disable the schedule |
|
|
""" |
|
|
if category not in self.config.get("schedules", {}): |
|
|
logger.error(f"Unknown category: {category}") |
|
|
return |
|
|
|
|
|
if interval_seconds is not None: |
|
|
self.config["schedules"][category]["interval_seconds"] = interval_seconds |
|
|
logger.info(f"Updated {category} interval to {interval_seconds}s") |
|
|
|
|
|
if enabled is not None: |
|
|
self.config["schedules"][category]["enabled"] = enabled |
|
|
logger.info(f"{'Enabled' if enabled else 'Disabled'} {category} schedule") |
|
|
|
|
|
self.save_config() |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
async def main(): |
|
|
scheduler = ComprehensiveScheduler() |
|
|
|
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
print("COMPREHENSIVE SCHEDULER STATUS") |
|
|
print("=" * 80) |
|
|
|
|
|
status = scheduler.get_status() |
|
|
print(f"Running: {status['running']}") |
|
|
print(f"Current Time: {status['current_time']}") |
|
|
print("\nSchedules:") |
|
|
print("-" * 80) |
|
|
|
|
|
for category, sched in status['schedules'].items(): |
|
|
enabled = "✓" if sched['enabled'] else "✗" |
|
|
interval = sched['interval_seconds'] |
|
|
next_run = sched.get('seconds_until_next', 'N/A') |
|
|
|
|
|
print(f"{enabled} {category:20} | Interval: {interval:6}s | Next in: {next_run}") |
|
|
|
|
|
print("=" * 80) |
|
|
|
|
|
|
|
|
print("\nRunning market_data collection once as example...") |
|
|
results = await scheduler.run_once("market_data") |
|
|
|
|
|
if results: |
|
|
print(f"\nCollected {len(results)} market data sources") |
|
|
successful = sum(1 for r in results if r.get('success', False)) |
|
|
print(f"Successful: {successful}/{len(results)}") |
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
print("To run scheduler forever, use: scheduler.run_forever()") |
|
|
print("=" * 80) |
|
|
|
|
|
asyncio.run(main()) |
|
|
|