#!/usr/bin/env python3 """ Data Sources Database Model مدل دیتابیس برای مدیریت منابع داده این مدل برای ذخیره و مدیریت منابع داده استفاده می‌شود. شامل اطلاعات منبع، وضعیت فعال/غیرفعال، و آمار استفاده. """ from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, Text, Enum, Index from sqlalchemy.orm import relationship from datetime import datetime import enum from typing import Dict, Any, List, Optional import json # Use the existing Base from models.py try: from database.models import Base except ImportError: from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class DataSourceType(enum.Enum): """نوع منبع داده""" MARKET = "market" NEWS = "news" SENTIMENT = "sentiment" SOCIAL = "social" ONCHAIN = "onchain" DEFI = "defi" HISTORICAL = "historical" TECHNICAL = "technical" AGGREGATED = "aggregated" class DataSourceStatus(enum.Enum): """وضعیت منبع داده""" ACTIVE = "active" INACTIVE = "inactive" RATE_LIMITED = "rate_limited" ERROR = "error" MAINTENANCE = "maintenance" class CollectionInterval(enum.Enum): """بازه جمع‌آوری داده""" REALTIME = "realtime" # On-demand from client MINUTES_1 = "1m" MINUTES_5 = "5m" MINUTES_15 = "15m" MINUTES_30 = "30m" HOURLY = "1h" HOURS_4 = "4h" DAILY = "1d" class DataSource(Base): """ Data Source Model - منبع داده ذخیره اطلاعات و وضعیت منابع داده در دیتابیس """ __tablename__ = 'data_sources' id = Column(Integer, primary_key=True, autoincrement=True) # Basic Info source_id = Column(String(100), nullable=False, unique=True, index=True) name = Column(String(255), nullable=False) source_type = Column(String(50), nullable=False, index=True) description = Column(Text, nullable=True) # Connection Info base_url = Column(String(500), nullable=False) api_version = Column(String(20), nullable=True) # Authentication requires_api_key = Column(Boolean, default=False) api_key_env_var = Column(String(100), nullable=True) has_api_key_configured = Column(Boolean, default=False) # Rate Limiting rate_limit_description = Column(String(100), nullable=True) rate_limit_per_minute = Column(Integer, nullable=True) rate_limit_per_hour = Column(Integer, nullable=True) rate_limit_per_day = Column(Integer, nullable=True) # Collection Settings collection_interval = Column(String(20), default="30m") # Default: 30 minutes for bulk supports_realtime = Column(Boolean, default=False) # Can fetch on-demand # Supported Features supported_timeframes = Column(Text, nullable=True) # JSON array categories = Column(Text, nullable=True) # JSON array features = Column(Text, nullable=True) # JSON array # Status is_active = Column(Boolean, default=True, index=True) status = Column(String(50), default="active", index=True) status_message = Column(Text, nullable=True) # Priority & Weight priority = Column(Integer, default=5) # 1-10, lower is higher priority weight = Column(Integer, default=1) # For load balancing # Verification is_verified = Column(Boolean, default=False) is_free_tier = Column(Boolean, default=True) # Statistics total_requests = Column(Integer, default=0) successful_requests = Column(Integer, default=0) failed_requests = Column(Integer, default=0) avg_response_time_ms = Column(Float, default=0.0) last_success_at = Column(DateTime, nullable=True) last_failure_at = Column(DateTime, nullable=True) last_error_message = Column(Text, nullable=True) # Timestamps created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) last_checked_at = Column(DateTime, nullable=True) # Indexes for common queries __table_args__ = ( Index('idx_source_type_active', 'source_type', 'is_active'), Index('idx_status_priority', 'status', 'priority'), ) def __repr__(self): return f"" def to_dict(self) -> Dict[str, Any]: """تبدیل به دیکشنری""" return { "id": self.id, "source_id": self.source_id, "name": self.name, "source_type": self.source_type, "description": self.description, "base_url": self.base_url, "api_version": self.api_version, "requires_api_key": self.requires_api_key, "api_key_env_var": self.api_key_env_var, "has_api_key_configured": self.has_api_key_configured, "rate_limit_description": self.rate_limit_description, "collection_interval": self.collection_interval, "supports_realtime": self.supports_realtime, "supported_timeframes": json.loads(self.supported_timeframes) if self.supported_timeframes else [], "categories": json.loads(self.categories) if self.categories else [], "features": json.loads(self.features) if self.features else [], "is_active": self.is_active, "status": self.status, "status_message": self.status_message, "priority": self.priority, "weight": self.weight, "is_verified": self.is_verified, "is_free_tier": self.is_free_tier, "total_requests": self.total_requests, "successful_requests": self.successful_requests, "failed_requests": self.failed_requests, "success_rate": (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0, "avg_response_time_ms": self.avg_response_time_ms, "last_success_at": self.last_success_at.isoformat() if self.last_success_at else None, "last_failure_at": self.last_failure_at.isoformat() if self.last_failure_at else None, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, "last_checked_at": self.last_checked_at.isoformat() if self.last_checked_at else None } class DataCollectionLog(Base): """ Data Collection Log - لاگ جمع‌آوری داده ثبت تاریخچه جمع‌آوری داده از منابع """ __tablename__ = 'data_collection_logs' id = Column(Integer, primary_key=True, autoincrement=True) source_id = Column(String(100), nullable=False, index=True) # Collection Info collection_type = Column(String(50), nullable=False) # scheduled, on_demand, bulk interval_used = Column(String(20), nullable=True) # Timing started_at = Column(DateTime, nullable=False, default=datetime.utcnow) completed_at = Column(DateTime, nullable=True) duration_ms = Column(Integer, nullable=True) # Results success = Column(Boolean, default=False) records_fetched = Column(Integer, default=0) records_stored = Column(Integer, default=0) # Error Info error_type = Column(String(100), nullable=True) error_message = Column(Text, nullable=True) # HTTP Info http_status_code = Column(Integer, nullable=True) response_size_bytes = Column(Integer, nullable=True) # Indexes __table_args__ = ( Index('idx_collection_source_time', 'source_id', 'started_at'), Index('idx_collection_success', 'success', 'started_at'), ) def to_dict(self) -> Dict[str, Any]: """تبدیل به دیکشنری""" return { "id": self.id, "source_id": self.source_id, "collection_type": self.collection_type, "interval_used": self.interval_used, "started_at": self.started_at.isoformat() if self.started_at else None, "completed_at": self.completed_at.isoformat() if self.completed_at else None, "duration_ms": self.duration_ms, "success": self.success, "records_fetched": self.records_fetched, "records_stored": self.records_stored, "error_type": self.error_type, "error_message": self.error_message, "http_status_code": self.http_status_code, "response_size_bytes": self.response_size_bytes } class CollectionSchedule(Base): """ Collection Schedule - زمان‌بندی جمع‌آوری تنظیم بازه‌های جمع‌آوری داده برای هر منبع """ __tablename__ = 'collection_schedules' id = Column(Integer, primary_key=True, autoincrement=True) source_id = Column(String(100), nullable=False, unique=True, index=True) # Schedule Settings collection_interval = Column(String(20), nullable=False, default="30m") is_enabled = Column(Boolean, default=True) # Execution Times last_run_at = Column(DateTime, nullable=True) next_run_at = Column(DateTime, nullable=True) # Statistics consecutive_failures = Column(Integer, default=0) total_runs = Column(Integer, default=0) successful_runs = Column(Integer, default=0) # Backoff Settings backoff_until = Column(DateTime, nullable=True) # If in backoff state backoff_multiplier = Column(Float, default=1.0) # Timestamps created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def to_dict(self) -> Dict[str, Any]: """تبدیل به دیکشنری""" return { "id": self.id, "source_id": self.source_id, "collection_interval": self.collection_interval, "is_enabled": self.is_enabled, "last_run_at": self.last_run_at.isoformat() if self.last_run_at else None, "next_run_at": self.next_run_at.isoformat() if self.next_run_at else None, "consecutive_failures": self.consecutive_failures, "total_runs": self.total_runs, "successful_runs": self.successful_runs, "success_rate": (self.successful_runs / self.total_runs * 100) if self.total_runs > 0 else 0, "backoff_until": self.backoff_until.isoformat() if self.backoff_until else None, "backoff_multiplier": self.backoff_multiplier, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } # ===== DATA SOURCE MANAGER ===== class DataSourceManager: """ مدیریت منابع داده در دیتابیس Data Source Manager for database operations """ def __init__(self, session): self.session = session def create_source(self, source_data: Dict[str, Any]) -> Optional[DataSource]: """ایجاد منبع جدید""" try: source = DataSource( source_id=source_data["source_id"], name=source_data["name"], source_type=source_data.get("source_type", "market"), description=source_data.get("description"), base_url=source_data["base_url"], api_version=source_data.get("api_version"), requires_api_key=source_data.get("requires_api_key", False), api_key_env_var=source_data.get("api_key_env_var"), rate_limit_description=source_data.get("rate_limit_description"), collection_interval=source_data.get("collection_interval", "30m"), supports_realtime=source_data.get("supports_realtime", False), supported_timeframes=json.dumps(source_data.get("supported_timeframes", [])), categories=json.dumps(source_data.get("categories", [])), features=json.dumps(source_data.get("features", [])), is_active=source_data.get("is_active", True), status=source_data.get("status", "active"), priority=source_data.get("priority", 5), weight=source_data.get("weight", 1), is_verified=source_data.get("is_verified", False), is_free_tier=source_data.get("is_free_tier", True) ) self.session.add(source) self.session.commit() return source except Exception as e: self.session.rollback() print(f"Error creating source: {e}") return None def get_source(self, source_id: str) -> Optional[DataSource]: """دریافت منبع با شناسه""" return self.session.query(DataSource).filter_by(source_id=source_id).first() def get_all_sources(self) -> List[DataSource]: """دریافت همه منابع""" return self.session.query(DataSource).all() def get_active_sources(self) -> List[DataSource]: """دریافت منابع فعال""" return self.session.query(DataSource).filter_by(is_active=True).all() def get_sources_by_type(self, source_type: str) -> List[DataSource]: """دریافت منابع بر اساس نوع""" return self.session.query(DataSource).filter_by(source_type=source_type, is_active=True).all() def update_source_status(self, source_id: str, is_active: bool, status: str = None, status_message: str = None) -> bool: """به‌روزرسانی وضعیت منبع""" try: source = self.get_source(source_id) if source: source.is_active = is_active if status: source.status = status if status_message: source.status_message = status_message source.updated_at = datetime.utcnow() self.session.commit() return True return False except Exception as e: self.session.rollback() print(f"Error updating source status: {e}") return False def record_request(self, source_id: str, success: bool, response_time_ms: float, error_message: str = None) -> bool: """ثبت درخواست""" try: source = self.get_source(source_id) if source: source.total_requests += 1 if success: source.successful_requests += 1 source.last_success_at = datetime.utcnow() else: source.failed_requests += 1 source.last_failure_at = datetime.utcnow() if error_message: source.last_error_message = error_message # Update average response time if source.avg_response_time_ms > 0: source.avg_response_time_ms = (source.avg_response_time_ms + response_time_ms) / 2 else: source.avg_response_time_ms = response_time_ms source.last_checked_at = datetime.utcnow() self.session.commit() return True return False except Exception as e: self.session.rollback() print(f"Error recording request: {e}") return False def get_sources_for_collection(self, interval: str) -> List[DataSource]: """دریافت منابع برای جمع‌آوری بر اساس بازه""" return self.session.query(DataSource).filter( DataSource.is_active == True, DataSource.collection_interval == interval, DataSource.status != "error" ).order_by(DataSource.priority).all() def get_statistics(self) -> Dict[str, Any]: """آمار منابع""" all_sources = self.get_all_sources() active_sources = [s for s in all_sources if s.is_active] total_requests = sum(s.total_requests for s in all_sources) successful_requests = sum(s.successful_requests for s in all_sources) return { "total_sources": len(all_sources), "active_sources": len(active_sources), "by_type": {}, # Would need to count by type "total_requests": total_requests, "successful_requests": successful_requests, "success_rate": (successful_requests / total_requests * 100) if total_requests > 0 else 0, "sources_with_errors": len([s for s in all_sources if s.status == "error"]) } # ===== INITIALIZATION HELPER ===== def init_data_sources_from_registry(session, registry): """ Initialize data sources in database from registry پر کردن جدول منابع از رجیستری """ manager = DataSourceManager(session) for source_id, source_info in registry.to_dict().items(): existing = manager.get_source(source_id) if not existing: source_data = { "source_id": source_id, "name": source_info["name"], "source_type": source_info["source_type"], "description": source_info.get("description"), "base_url": source_info["url"], "requires_api_key": source_info.get("requires_api_key", False), "api_key_env_var": source_info.get("api_key_env"), "rate_limit_description": source_info.get("rate_limit"), "collection_interval": "30m", # Default to 30 minutes "supports_realtime": "realtime" in source_info.get("supported_timeframes", []), "supported_timeframes": source_info.get("supported_timeframes", []), "categories": source_info.get("categories", []), "features": source_info.get("features", []), "is_active": source_info.get("is_active", True), "priority": source_info.get("priority", 5), "is_verified": source_info.get("verified", False), "is_free_tier": source_info.get("free_tier", True) } manager.create_source(source_data) print(f"Created data source: {source_id}") else: print(f"Data source already exists: {source_id}") return manager # ===== COLLECTION INTERVALS CONFIGURATION ===== # Recommended collection intervals for different data types COLLECTION_INTERVALS = { # Bulk data - collect every 15-30 minutes "market": "15m", "historical": "30m", "onchain": "30m", "defi": "15m", # News - collect every 15-30 minutes "news": "15m", "social": "30m", # Sentiment - collect every 15 minutes "sentiment": "15m", # Technical - collect every 15 minutes "technical": "15m", # Aggregated - collect every 15 minutes "aggregated": "15m" } # Real-time sources - fetch on-demand from client REALTIME_SOURCES = [ "binance_historical", "coingecko_historical", "coincap_realtime", "fear_greed_index" ]