Spaces:
Restarting
Restarting
Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
| #!/usr/bin/env python3 | |
| """ | |
| Master Resource Orchestrator | |
| Orchestrates ALL 86+ resources hierarchically - NO IDLE RESOURCES | |
| مدیریت سلسلهمراتبی همه 86+ منبع - هیچ منبعی بیکار نمیماند | |
| """ | |
| import httpx | |
| import logging | |
| import asyncio | |
| from typing import Dict, Any, List, Optional, Tuple | |
| from datetime import datetime | |
| from enum import Enum | |
| from backend.services.hierarchical_fallback_config import ( | |
| hierarchical_config, | |
| Priority, | |
| ResourceConfig | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ResourceStatus(Enum): | |
| """Status of resource attempt""" | |
| SUCCESS = "success" | |
| FAILED = "failed" | |
| SKIPPED = "skipped" | |
| TIMEOUT = "timeout" | |
| class MasterResourceOrchestrator: | |
| """ | |
| Master orchestrator for ALL resources | |
| تمام 86+ منبع را به صورت سلسلهمراتبی مدیریت میکند | |
| """ | |
| def __init__(self): | |
| self.config = hierarchical_config | |
| self.timeout = 10.0 | |
| # Statistics tracking | |
| self.usage_stats = { | |
| "total_requests": 0, | |
| "successful_requests": 0, | |
| "failed_requests": 0, | |
| "resource_usage": {}, # Track usage per resource | |
| "priority_distribution": { # Track which priority level succeeded | |
| Priority.CRITICAL: 0, | |
| Priority.HIGH: 0, | |
| Priority.MEDIUM: 0, | |
| Priority.LOW: 0, | |
| Priority.EMERGENCY: 0 | |
| } | |
| } | |
| async def fetch_with_hierarchy( | |
| self, | |
| resource_list: List[ResourceConfig], | |
| fetch_function: callable, | |
| max_concurrent: int = 3 | |
| ) -> Tuple[Any, Dict[str, Any]]: | |
| """ | |
| Fetch data using hierarchical fallback | |
| دریافت داده با فالبک سلسلهمراتبی | |
| Args: | |
| resource_list: List of resources in priority order | |
| fetch_function: Async function to fetch data from a resource | |
| max_concurrent: Max concurrent attempts within same priority | |
| Returns: | |
| (data, metadata) - Data and information about which resource succeeded | |
| """ | |
| self.usage_stats["total_requests"] += 1 | |
| # Group resources by priority | |
| priority_groups = self._group_by_priority(resource_list) | |
| # Try each priority level | |
| for priority in [Priority.CRITICAL, Priority.HIGH, Priority.MEDIUM, Priority.LOW, Priority.EMERGENCY]: | |
| resources_in_priority = priority_groups.get(priority, []) | |
| if not resources_in_priority: | |
| continue | |
| logger.info(f"🔄 Trying {len(resources_in_priority)} resources at {priority.name} priority") | |
| # Try resources in this priority level | |
| # If max_concurrent > 1, try multiple resources in parallel | |
| if max_concurrent > 1 and len(resources_in_priority) > 1: | |
| result = await self._try_concurrent( | |
| resources_in_priority[:max_concurrent], | |
| fetch_function, | |
| priority | |
| ) | |
| else: | |
| result = await self._try_sequential( | |
| resources_in_priority, | |
| fetch_function, | |
| priority | |
| ) | |
| if result: | |
| data, metadata = result | |
| self.usage_stats["successful_requests"] += 1 | |
| self.usage_stats["priority_distribution"][priority] += 1 | |
| logger.info(f"✅ SUCCESS at {priority.name} priority: {metadata['resource_name']}") | |
| return data, metadata | |
| # All resources failed | |
| self.usage_stats["failed_requests"] += 1 | |
| logger.error(f"❌ ALL {len(resource_list)} resources failed") | |
| raise Exception(f"All {len(resource_list)} resources failed across all priority levels") | |
| def _group_by_priority( | |
| self, | |
| resources: List[ResourceConfig] | |
| ) -> Dict[Priority, List[ResourceConfig]]: | |
| """Group resources by priority level""" | |
| groups = { | |
| Priority.CRITICAL: [], | |
| Priority.HIGH: [], | |
| Priority.MEDIUM: [], | |
| Priority.LOW: [], | |
| Priority.EMERGENCY: [] | |
| } | |
| for resource in resources: | |
| groups[resource.priority].append(resource) | |
| return groups | |
| async def _try_sequential( | |
| self, | |
| resources: List[ResourceConfig], | |
| fetch_function: callable, | |
| priority: Priority | |
| ) -> Optional[Tuple[Any, Dict[str, Any]]]: | |
| """Try resources sequentially""" | |
| for idx, resource in enumerate(resources, 1): | |
| try: | |
| logger.info(f" 📡 [{idx}/{len(resources)}] Trying {resource.name}...") | |
| # Track usage | |
| if resource.name not in self.usage_stats["resource_usage"]: | |
| self.usage_stats["resource_usage"][resource.name] = { | |
| "attempts": 0, | |
| "successes": 0, | |
| "failures": 0 | |
| } | |
| self.usage_stats["resource_usage"][resource.name]["attempts"] += 1 | |
| # Attempt to fetch data | |
| start_time = datetime.utcnow() | |
| data = await fetch_function(resource) | |
| end_time = datetime.utcnow() | |
| if data: | |
| self.usage_stats["resource_usage"][resource.name]["successes"] += 1 | |
| metadata = { | |
| "resource_name": resource.name, | |
| "priority": priority.name, | |
| "base_url": resource.base_url, | |
| "response_time_ms": int((end_time - start_time).total_seconds() * 1000), | |
| "timestamp": int(end_time.timestamp() * 1000) | |
| } | |
| logger.info(f" ✅ {resource.name} succeeded in {metadata['response_time_ms']}ms") | |
| return data, metadata | |
| logger.warning(f" ⚠️ {resource.name} returned no data") | |
| self.usage_stats["resource_usage"][resource.name]["failures"] += 1 | |
| except asyncio.TimeoutError: | |
| logger.warning(f" ⏱️ {resource.name} timeout") | |
| self.usage_stats["resource_usage"][resource.name]["failures"] += 1 | |
| continue | |
| except Exception as e: | |
| logger.warning(f" ❌ {resource.name} failed: {e}") | |
| self.usage_stats["resource_usage"][resource.name]["failures"] += 1 | |
| continue | |
| return None | |
| async def _try_concurrent( | |
| self, | |
| resources: List[ResourceConfig], | |
| fetch_function: callable, | |
| priority: Priority | |
| ) -> Optional[Tuple[Any, Dict[str, Any]]]: | |
| """Try multiple resources concurrently (race condition - first success wins)""" | |
| logger.info(f" 🏁 Racing {len(resources)} resources in parallel...") | |
| tasks = [] | |
| for resource in resources: | |
| task = self._try_single_resource(resource, fetch_function, priority) | |
| tasks.append(task) | |
| # Wait for first success or all failures | |
| for completed_task in asyncio.as_completed(tasks): | |
| try: | |
| result = await completed_task | |
| if result: | |
| # Cancel remaining tasks | |
| for task in tasks: | |
| if not task.done(): | |
| task.cancel() | |
| return result | |
| except Exception: | |
| continue | |
| return None | |
| async def _try_single_resource( | |
| self, | |
| resource: ResourceConfig, | |
| fetch_function: callable, | |
| priority: Priority | |
| ) -> Optional[Tuple[Any, Dict[str, Any]]]: | |
| """Try a single resource (used in concurrent mode)""" | |
| try: | |
| # Track usage | |
| if resource.name not in self.usage_stats["resource_usage"]: | |
| self.usage_stats["resource_usage"][resource.name] = { | |
| "attempts": 0, | |
| "successes": 0, | |
| "failures": 0 | |
| } | |
| self.usage_stats["resource_usage"][resource.name]["attempts"] += 1 | |
| start_time = datetime.utcnow() | |
| data = await fetch_function(resource) | |
| end_time = datetime.utcnow() | |
| if data: | |
| self.usage_stats["resource_usage"][resource.name]["successes"] += 1 | |
| metadata = { | |
| "resource_name": resource.name, | |
| "priority": priority.name, | |
| "base_url": resource.base_url, | |
| "response_time_ms": int((end_time - start_time).total_seconds() * 1000), | |
| "timestamp": int(end_time.timestamp() * 1000) | |
| } | |
| logger.info(f" 🏆 {resource.name} won the race! ({metadata['response_time_ms']}ms)") | |
| return data, metadata | |
| self.usage_stats["resource_usage"][resource.name]["failures"] += 1 | |
| return None | |
| except Exception as e: | |
| logger.warning(f" ❌ {resource.name} failed: {e}") | |
| self.usage_stats["resource_usage"][resource.name]["failures"] += 1 | |
| return None | |
| def get_usage_statistics(self) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive usage statistics | |
| آمار کامل استفاده از منابع | |
| """ | |
| total_resources = len(self.usage_stats["resource_usage"]) | |
| used_resources = sum( | |
| 1 for stats in self.usage_stats["resource_usage"].values() | |
| if stats["attempts"] > 0 | |
| ) | |
| successful_resources = sum( | |
| 1 for stats in self.usage_stats["resource_usage"].values() | |
| if stats["successes"] > 0 | |
| ) | |
| # Calculate success rate per priority | |
| priority_success_rates = {} | |
| total_priority_requests = sum(self.usage_stats["priority_distribution"].values()) | |
| if total_priority_requests > 0: | |
| for priority, count in self.usage_stats["priority_distribution"].items(): | |
| priority_success_rates[priority.name] = { | |
| "count": count, | |
| "percentage": round((count / total_priority_requests) * 100, 2) | |
| } | |
| # Find most used resources | |
| most_used = sorted( | |
| self.usage_stats["resource_usage"].items(), | |
| key=lambda x: x[1]["attempts"], | |
| reverse=True | |
| )[:10] | |
| # Find most successful resources | |
| most_successful = sorted( | |
| self.usage_stats["resource_usage"].items(), | |
| key=lambda x: x[1]["successes"], | |
| reverse=True | |
| )[:10] | |
| return { | |
| "overview": { | |
| "total_requests": self.usage_stats["total_requests"], | |
| "successful_requests": self.usage_stats["successful_requests"], | |
| "failed_requests": self.usage_stats["failed_requests"], | |
| "success_rate": round( | |
| (self.usage_stats["successful_requests"] / self.usage_stats["total_requests"] * 100) | |
| if self.usage_stats["total_requests"] > 0 else 0, | |
| 2 | |
| ) | |
| }, | |
| "resource_utilization": { | |
| "total_resources_in_system": total_resources, | |
| "resources_used": used_resources, | |
| "resources_successful": successful_resources, | |
| "utilization_rate": round((used_resources / total_resources * 100) if total_resources > 0 else 0, 2) | |
| }, | |
| "priority_distribution": priority_success_rates, | |
| "top_10_most_used": [ | |
| { | |
| "resource": name, | |
| "attempts": stats["attempts"], | |
| "successes": stats["successes"], | |
| "failures": stats["failures"], | |
| "success_rate": round((stats["successes"] / stats["attempts"] * 100) if stats["attempts"] > 0 else 0, 2) | |
| } | |
| for name, stats in most_used | |
| ], | |
| "top_10_most_successful": [ | |
| { | |
| "resource": name, | |
| "successes": stats["successes"], | |
| "attempts": stats["attempts"], | |
| "success_rate": round((stats["successes"] / stats["attempts"] * 100) if stats["attempts"] > 0 else 0, 2) | |
| } | |
| for name, stats in most_successful | |
| ] | |
| } | |
| def get_resource_health_report(self) -> Dict[str, Any]: | |
| """ | |
| Get health report for all resources | |
| گزارش سلامت همه منابع | |
| """ | |
| healthy_resources = [] | |
| degraded_resources = [] | |
| failed_resources = [] | |
| unused_resources = [] | |
| for resource_name, stats in self.usage_stats["resource_usage"].items(): | |
| if stats["attempts"] == 0: | |
| unused_resources.append(resource_name) | |
| elif stats["successes"] == 0: | |
| failed_resources.append({ | |
| "name": resource_name, | |
| "attempts": stats["attempts"], | |
| "failures": stats["failures"] | |
| }) | |
| else: | |
| success_rate = (stats["successes"] / stats["attempts"]) * 100 | |
| if success_rate >= 80: | |
| healthy_resources.append({ | |
| "name": resource_name, | |
| "success_rate": round(success_rate, 2), | |
| "attempts": stats["attempts"] | |
| }) | |
| else: | |
| degraded_resources.append({ | |
| "name": resource_name, | |
| "success_rate": round(success_rate, 2), | |
| "attempts": stats["attempts"], | |
| "failures": stats["failures"] | |
| }) | |
| return { | |
| "healthy_resources": { | |
| "count": len(healthy_resources), | |
| "resources": healthy_resources | |
| }, | |
| "degraded_resources": { | |
| "count": len(degraded_resources), | |
| "resources": degraded_resources | |
| }, | |
| "failed_resources": { | |
| "count": len(failed_resources), | |
| "resources": failed_resources | |
| }, | |
| "unused_resources": { | |
| "count": len(unused_resources), | |
| "resources": unused_resources | |
| }, | |
| "overall_health": "Healthy" if len(healthy_resources) > len(failed_resources) else "Degraded" | |
| } | |
| # Global instance | |
| master_orchestrator = MasterResourceOrchestrator() | |
| __all__ = ["MasterResourceOrchestrator", "master_orchestrator", "ResourceStatus"] | |