Really-amin's picture
Upload 553 files
386790e verified
#!/usr/bin/env python3
"""
🔌 Fallback Integrator - اتصال سیستم fallback نهایی به پروژه موجود
Integration of Ultimate Fallback System with existing project
"""
import logging
from typing import Optional, Dict, Any, List
from datetime import datetime
try:
import httpx
HTTPX_AVAILABLE = True
except ImportError:
HTTPX_AVAILABLE = False
try:
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
from backend.services.ultimate_fallback_system import (
ultimate_fallback,
fetch_with_fallback,
Resource
)
logger = logging.getLogger(__name__)
class FallbackIntegrator:
"""
کلاس ادغام‌کننده سیستم fallback با collectors موجود
Integrator class for fallback system with existing collectors
"""
def __init__(self):
self.http_client = None
if HTTPX_AVAILABLE:
import httpx
self.http_client = httpx.AsyncClient(timeout=30.0)
elif AIOHTTP_AVAILABLE:
import aiohttp
self.session = None # will be created on first use
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'sources_used': {}
}
logger.info(f"🔌 FallbackIntegrator initialized (httpx={HTTPX_AVAILABLE}, aiohttp={AIOHTTP_AVAILABLE})")
async def fetch_market_data(
self,
symbol: str,
vs_currency: str = 'usd',
max_attempts: int = 10
) -> Optional[Dict]:
"""
دریافت داده‌های بازار با fallback خودکار
Args:
symbol: نماد ارز (bitcoin, ethereum, etc.)
vs_currency: ارز مبنا
max_attempts: حداکثر تلاش
Returns:
داده‌های بازار یا None
"""
self.stats['total_requests'] += 1
# دریافت زنجیره fallback
resources = ultimate_fallback.get_fallback_chain('market_data', count=max_attempts)
for resource in resources:
if not resource.is_available():
continue
try:
logger.info(f"🔄 Trying {resource.name} for {symbol}")
# ساخت URL براساس منبع
if 'coingecko' in resource.base_url:
url = f"{resource.base_url}/simple/price"
params = {'ids': symbol, 'vs_currencies': vs_currency}
elif 'binance' in resource.base_url:
# تبدیل symbol به format Binance (BTC → BTCUSDT)
symbol_upper = symbol.upper()
if symbol_upper == 'BITCOIN':
symbol_upper = 'BTC'
elif symbol_upper == 'ETHEREUM':
symbol_upper = 'ETH'
url = f"{resource.base_url}/ticker/price"
params = {'symbol': f"{symbol_upper}USDT"}
elif 'coinpaprika' in resource.base_url:
url = f"{resource.base_url}/tickers/{symbol}-{symbol}"
params = {}
elif 'coincap' in resource.base_url:
url = f"{resource.base_url}/assets/{symbol}"
params = {}
else:
# Default endpoint
url = f"{resource.base_url}/price"
params = {'symbol': symbol, 'currency': vs_currency}
# افزودن کلید API اگر نیاز باشد
headers = {}
if resource.auth_type == "apiKeyHeader":
api_key = resource.get_api_key()
if api_key and resource.header_name:
headers[resource.header_name] = api_key
elif resource.auth_type == "apiKeyQuery":
api_key = resource.get_api_key()
if api_key and resource.param_name:
params[resource.param_name] = api_key
# ارسال درخواست
response = await self.http_client.get(url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
# Normalize data format
normalized = self._normalize_market_data(data, symbol, resource)
# ثبت موفقیت
ultimate_fallback.mark_result(resource.id, 'market_data', True)
self.stats['successful_requests'] += 1
self.stats['sources_used'][resource.name] = \
self.stats['sources_used'].get(resource.name, 0) + 1
logger.info(f"✅ Success from {resource.name}: ${normalized.get('price', 'N/A')}")
return normalized
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
logger.warning(f"⏳ {resource.name} rate limited")
ultimate_fallback.mark_result(resource.id, 'market_data', False, 'rate_limit')
else:
logger.warning(f"❌ {resource.name} HTTP error: {e.response.status_code}")
ultimate_fallback.mark_result(resource.id, 'market_data', False)
except Exception as e:
logger.warning(f"❌ {resource.name} failed: {e}")
ultimate_fallback.mark_result(resource.id, 'market_data', False)
continue
# همه منابع شکست خوردند
self.stats['failed_requests'] += 1
logger.error(f"❌ All {max_attempts} sources failed for {symbol}")
return None
async def fetch_news(
self,
query: str = 'cryptocurrency',
limit: int = 10,
max_attempts: int = 10
) -> List[Dict]:
"""
دریافت اخبار با fallback خودکار
Args:
query: کلمه کلیدی جستجو
limit: تعداد اخبار
max_attempts: حداکثر تلاش
Returns:
لیست اخبار
"""
self.stats['total_requests'] += 1
resources = ultimate_fallback.get_fallback_chain('news', count=max_attempts)
for resource in resources:
if not resource.is_available():
continue
try:
logger.info(f"🔄 Trying {resource.name} for news")
# ساخت URL براساس منبع
if 'cryptopanic' in resource.base_url:
url = f"{resource.base_url}/posts"
params = {'filter': 'hot'}
elif 'newsapi' in resource.base_url:
url = f"{resource.base_url}/everything"
params = {'q': query, 'pageSize': limit}
elif 'rss' in resource.name.lower():
# RSS feed
url = resource.base_url
params = {}
else:
url = f"{resource.base_url}/news"
params = {'limit': limit}
# کلید API
headers = {}
if resource.auth_type in ["apiKeyHeader", "apiKeyHeaderOptional"]:
api_key = resource.get_api_key()
if api_key and resource.header_name:
headers[resource.header_name] = api_key
elif resource.auth_type in ["apiKeyQuery", "apiKeyQueryOptional"]:
api_key = resource.get_api_key()
if api_key and resource.param_name:
params[resource.param_name] = api_key
response = await self.http_client.get(url, params=params, headers=headers)
response.raise_for_status()
# Parse response
if 'rss' in resource.name.lower() or 'xml' in response.headers.get('content-type', ''):
news_items = self._parse_rss_feed(response.text)
else:
data = response.json()
news_items = self._normalize_news_data(data, resource)
# ثبت موفقیت
ultimate_fallback.mark_result(resource.id, 'news', True)
self.stats['successful_requests'] += 1
self.stats['sources_used'][resource.name] = \
self.stats['sources_used'].get(resource.name, 0) + 1
logger.info(f"✅ Got {len(news_items)} news from {resource.name}")
return news_items[:limit]
except Exception as e:
logger.warning(f"❌ {resource.name} failed: {e}")
ultimate_fallback.mark_result(resource.id, 'news', False)
continue
self.stats['failed_requests'] += 1
logger.error(f"❌ All news sources failed")
return []
async def fetch_sentiment(
self,
max_attempts: int = 10
) -> Optional[Dict]:
"""
دریافت شاخص احساسات با fallback خودکار
Args:
max_attempts: حداکثر تلاش
Returns:
داده‌های احساسات یا None
"""
self.stats['total_requests'] += 1
resources = ultimate_fallback.get_fallback_chain('sentiment', count=max_attempts)
for resource in resources:
if not resource.is_available():
continue
try:
logger.info(f"🔄 Trying {resource.name} for sentiment")
# ساخت URL
if 'alternative.me' in resource.base_url:
url = f"{resource.base_url}/fng/"
params = {'limit': 1, 'format': 'json'}
elif 'cfgi' in resource.base_url:
url = f"{resource.base_url}/v1/fear-greed"
params = {}
else:
url = resource.base_url
params = {}
response = await self.http_client.get(url, params=params)
response.raise_for_status()
data = response.json()
normalized = self._normalize_sentiment_data(data, resource)
ultimate_fallback.mark_result(resource.id, 'sentiment', True)
self.stats['successful_requests'] += 1
logger.info(f"✅ Sentiment from {resource.name}: {normalized.get('value', 'N/A')}")
return normalized
except Exception as e:
logger.warning(f"❌ {resource.name} failed: {e}")
ultimate_fallback.mark_result(resource.id, 'sentiment', False)
continue
self.stats['failed_requests'] += 1
return None
async def analyze_with_hf_models(
self,
text: str,
task: str = 'sentiment',
max_models: int = 5
) -> Dict:
"""
آنالیز متن با چند مدل HuggingFace
Args:
text: متن برای آنالیز
task: نوع task (sentiment, generation, summarization)
max_models: حداکثر تعداد مدل
Returns:
نتیجه آنالیز
"""
models = ultimate_fallback.get_fallback_chain('hf_models', count=max_models)
results = []
for model in models:
if not model.is_available():
continue
# فیلتر براساس task
if task == 'sentiment' and 'sentiment' not in model.name.lower():
continue
if task == 'generation' and 'gpt' not in model.name.lower():
continue
if task == 'summarization' and 'summar' not in model.name.lower():
continue
try:
logger.info(f"🔄 Analyzing with {model.name}")
headers = {}
api_key = model.get_api_key()
if api_key:
headers['Authorization'] = f'Bearer {api_key}'
payload = {'inputs': text}
response = await self.http_client.post(
model.base_url,
json=payload,
headers=headers,
timeout=60.0
)
response.raise_for_status()
result = response.json()
results.append({
'model': model.name,
'result': result
})
ultimate_fallback.mark_result(model.id, 'hf_models', True)
# اگر 3 مدل موفق شدند، کافی است
if len(results) >= 3:
break
except Exception as e:
logger.warning(f"❌ {model.name} failed: {e}")
ultimate_fallback.mark_result(model.id, 'hf_models', False)
continue
# Ensemble results
if results:
return self._ensemble_results(results, task)
else:
return {'status': 'error', 'message': 'All models failed'}
def _normalize_market_data(self, data: Dict, symbol: str, resource: Resource) -> Dict:
"""Normalize market data format"""
try:
# CoinGecko format
if symbol in data:
return {
'symbol': symbol,
'price': data[symbol].get('usd', 0),
'source': resource.name,
'timestamp': datetime.now().isoformat()
}
# Binance format
if 'price' in data:
return {
'symbol': symbol,
'price': float(data['price']),
'source': resource.name,
'timestamp': datetime.now().isoformat()
}
# CoinPaprika format
if 'quotes' in data:
return {
'symbol': symbol,
'price': data['quotes'].get('USD', {}).get('price', 0),
'source': resource.name,
'timestamp': datetime.now().isoformat()
}
# Generic format
return {
'symbol': symbol,
'price': data.get('price', data.get('last', 0)),
'source': resource.name,
'timestamp': datetime.now().isoformat(),
'raw_data': data
}
except Exception as e:
logger.error(f"Error normalizing market data: {e}")
return {'symbol': symbol, 'price': 0, 'error': str(e)}
def _normalize_news_data(self, data: Dict, resource: Resource) -> List[Dict]:
"""Normalize news data format"""
try:
news_items = []
# CryptoPanic format
if 'results' in data:
for item in data['results'][:10]:
news_items.append({
'title': item.get('title'),
'url': item.get('url'),
'source': item.get('source', {}).get('title', resource.name),
'published': item.get('published_at')
})
# NewsAPI format
elif 'articles' in data:
for item in data['articles'][:10]:
news_items.append({
'title': item.get('title'),
'url': item.get('url'),
'source': item.get('source', {}).get('name', resource.name),
'published': item.get('publishedAt')
})
# Generic format
elif isinstance(data, list):
for item in data[:10]:
news_items.append({
'title': item.get('title', item.get('headline')),
'url': item.get('url', item.get('link')),
'source': resource.name,
'published': item.get('published', item.get('date'))
})
return news_items
except Exception as e:
logger.error(f"Error normalizing news data: {e}")
return []
def _normalize_sentiment_data(self, data: Dict, resource: Resource) -> Dict:
"""Normalize sentiment data format"""
try:
# Alternative.me format
if 'data' in data and isinstance(data['data'], list):
item = data['data'][0]
return {
'value': int(item.get('value', 50)),
'classification': item.get('value_classification', 'neutral'),
'source': resource.name,
'timestamp': item.get('timestamp')
}
# Generic format
return {
'value': data.get('value', data.get('score', 50)),
'classification': data.get('classification', 'neutral'),
'source': resource.name,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error normalizing sentiment data: {e}")
return {'value': 50, 'classification': 'neutral', 'error': str(e)}
def _parse_rss_feed(self, xml_content: str) -> List[Dict]:
"""Parse RSS feed (basic implementation)"""
# TODO: استفاده از feedparser برای parse کامل
return []
def _ensemble_results(self, results: List[Dict], task: str) -> Dict:
"""Combine results from multiple models"""
if not results:
return {'status': 'error', 'message': 'No results'}
if task == 'sentiment':
# میانگین‌گیری
sentiments = []
for r in results:
model_result = r['result']
if isinstance(model_result, list) and len(model_result) > 0:
# استخراج label
label = model_result[0].get('label', 'neutral')
sentiments.append(label)
# اکثریت vote
if sentiments:
most_common = max(set(sentiments), key=sentiments.count)
return {
'sentiment': most_common,
'models_used': len(results),
'confidence': sentiments.count(most_common) / len(sentiments),
'details': results
}
return {
'status': 'success',
'models_used': len(results),
'results': results
}
def get_stats(self) -> Dict:
"""دریافت آمار استفاده"""
success_rate = 0
if self.stats['total_requests'] > 0:
success_rate = (self.stats['successful_requests'] / self.stats['total_requests']) * 100
return {
'total_requests': self.stats['total_requests'],
'successful_requests': self.stats['successful_requests'],
'failed_requests': self.stats['failed_requests'],
'success_rate': round(success_rate, 2),
'sources_used': self.stats['sources_used']
}
async def close(self):
"""بستن http client"""
if self.http_client and HTTPX_AVAILABLE:
await self.http_client.aclose()
elif AIOHTTP_AVAILABLE and hasattr(self, 'session') and self.session:
await self.session.close()
# ═══════════════════════════════════════════════════════════════
# Global Instance
# ═══════════════════════════════════════════════════════════════
fallback_integrator = FallbackIntegrator()
# ═══════════════════════════════════════════════════════════════
# Test
# ═══════════════════════════════════════════════════════════════
async def test_integrator():
"""تست integrator"""
print("=" * 80)
print("🧪 Testing Fallback Integrator")
print("=" * 80)
print()
# Test 1: Market Data
print("📊 Test 1: Market Data")
data = await fallback_integrator.fetch_market_data('bitcoin')
if data:
print(f"✅ Price: ${data.get('price', 'N/A')} from {data.get('source')}")
else:
print("❌ Failed to fetch market data")
print()
# Test 2: News
print("📰 Test 2: News")
news = await fallback_integrator.fetch_news('bitcoin', limit=5)
print(f"✅ Got {len(news)} news articles")
if news:
print(f" First: {news[0].get('title', 'N/A')}")
print()
# Test 3: Sentiment
print("💭 Test 3: Sentiment")
sentiment = await fallback_integrator.fetch_sentiment()
if sentiment:
print(f"✅ Sentiment: {sentiment.get('classification', 'N/A')} ({sentiment.get('value', 'N/A')})")
else:
print("❌ Failed to fetch sentiment")
print()
# Stats
print("=" * 80)
print("📊 Statistics")
print("=" * 80)
stats = fallback_integrator.get_stats()
print(f"Total Requests: {stats['total_requests']}")
print(f"Successful: {stats['successful_requests']}")
print(f"Failed: {stats['failed_requests']}")
print(f"Success Rate: {stats['success_rate']}%")
print()
print("Sources Used:")
for source, count in stats['sources_used'].items():
print(f" - {source}: {count}")
await fallback_integrator.close()
if __name__ == "__main__":
import asyncio
asyncio.run(test_integrator())