Datasourceforcryptocurrency-2 / backend /services /hf_inference_api_client.py
Really-amin's picture
Upload 553 files
386790e verified
#!/usr/bin/env python3
"""
Hugging Face Inference API Client
استفاده از API به جای بارگذاری مستقیم مدل‌ها
"""
import aiohttp
import os
from typing import Dict, List, Optional, Any
import asyncio
import logging
from collections import Counter
logger = logging.getLogger(__name__)
class HFInferenceAPIClient:
"""
کلاینت برای Hugging Face Inference API
مزایا:
- نیازی به بارگذاری مدل در RAM نیست
- دسترسی به مدل‌های بزرگتر
- پردازش سریعتر (GPU در سرورهای HF)
- 30,000 درخواست رایگان در ماه
"""
def __init__(self, api_token: Optional[str] = None):
self.api_token = api_token or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
self.base_url = "https://api-inference.huggingface.co/models"
self.session = None
# مدل‌های تأیید شده که در HF API کار می‌کنند
self.verified_models = {
"crypto_sentiment": "kk08/CryptoBERT",
"social_sentiment": "ElKulako/cryptobert",
"financial_sentiment": "ProsusAI/finbert",
"twitter_sentiment": "cardiffnlp/twitter-roberta-base-sentiment-latest",
"fintwit_sentiment": "StephanAkkerman/FinTwitBERT-sentiment",
"crypto_gen": "OpenC/crypto-gpt-o3-mini",
"crypto_trader": "agarkovv/CryptoTrader-LM",
}
# Cache برای نتایج (برای کاهش تعداد درخواست‌ها)
self._cache = {}
self._cache_ttl = 300 # 5 دقیقه
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
def _get_cache_key(self, text: str, model_key: str) -> str:
"""ایجاد کلید cache"""
return f"{model_key}:{text[:100]}"
def _check_cache(self, cache_key: str) -> Optional[Dict[str, Any]]:
"""بررسی cache"""
if cache_key in self._cache:
cached_result, timestamp = self._cache[cache_key]
if asyncio.get_event_loop().time() - timestamp < self._cache_ttl:
return cached_result
else:
del self._cache[cache_key]
return None
def _set_cache(self, cache_key: str, result: Dict[str, Any]):
"""ذخیره در cache"""
self._cache[cache_key] = (result, asyncio.get_event_loop().time())
async def analyze_sentiment(
self,
text: str,
model_key: str = "crypto_sentiment",
use_cache: bool = True
) -> Dict[str, Any]:
"""
تحلیل sentiment با استفاده از HF Inference API
Args:
text: متن برای تحلیل
model_key: کلید مدل (crypto_sentiment, social_sentiment, ...)
use_cache: استفاده از cache
Returns:
Dict شامل label, confidence, و اطلاعات دیگر
"""
# بررسی cache
if use_cache:
cache_key = self._get_cache_key(text, model_key)
cached = self._check_cache(cache_key)
if cached:
cached["from_cache"] = True
return cached
model_id = self.verified_models.get(model_key)
if not model_id:
return {
"status": "error",
"error": f"Unknown model key: {model_key}. Available: {list(self.verified_models.keys())}"
}
url = f"{self.base_url}/{model_id}"
headers = {}
if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"
payload = {"inputs": text[:512]} # محدودیت طول متن
try:
if not self.session:
self.session = aiohttp.ClientSession()
async with self.session.post(
url,
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 503:
# مدل در حال بارگذاری است
return {
"status": "loading",
"message": "Model is loading, please retry in 20 seconds",
"model": model_id
}
if response.status == 429:
# محدودیت rate limit
return {
"status": "rate_limited",
"error": "Rate limit exceeded. Please try again later.",
"model": model_id
}
if response.status == 401:
return {
"status": "error",
"error": "Authentication required. Please set HF_TOKEN environment variable.",
"model": model_id
}
if response.status == 200:
data = await response.json()
# استخراج نتیجه
if isinstance(data, list) and len(data) > 0:
if isinstance(data[0], list):
# برخی مدل‌ها لیستی از لیست‌ها برمی‌گردانند
result = data[0][0] if data[0] else {}
else:
result = data[0]
# استانداردسازی خروجی
label = result.get("label", "NEUTRAL").upper()
score = result.get("score", 0.5)
# تبدیل به فرمت استاندارد
mapped = self._map_label(label)
response_data = {
"status": "success",
"label": mapped,
"confidence": score,
"score": score,
"raw_label": label,
"model": model_id,
"model_key": model_key,
"engine": "hf_inference_api",
"available": True,
"from_cache": False
}
# ذخیره در cache
if use_cache:
cache_key = self._get_cache_key(text, model_key)
self._set_cache(cache_key, response_data)
return response_data
error_text = await response.text()
logger.warning(f"HF API error: HTTP {response.status}: {error_text[:200]}")
return {
"status": "error",
"error": f"HTTP {response.status}: {error_text[:200]}",
"model": model_id
}
except asyncio.TimeoutError:
logger.error(f"HF API timeout for model {model_id}")
return {
"status": "error",
"error": "Request timeout after 30 seconds",
"model": model_id
}
except Exception as e:
logger.error(f"HF API exception for model {model_id}: {e}")
return {
"status": "error",
"error": str(e)[:200],
"model": model_id
}
def _map_label(self, label: str) -> str:
"""تبدیل برچسب‌های مختلف به فرمت استاندارد"""
label_upper = label.upper()
# Positive/Bullish mapping
if any(x in label_upper for x in ["POSITIVE", "BULLISH", "LABEL_2", "BUY"]):
return "bullish"
# Negative/Bearish mapping
elif any(x in label_upper for x in ["NEGATIVE", "BEARISH", "LABEL_0", "SELL"]):
return "bearish"
# Neutral/Hold mapping
else:
return "neutral"
async def ensemble_sentiment(
self,
text: str,
models: Optional[List[str]] = None,
min_models: int = 2
) -> Dict[str, Any]:
"""
استفاده از چندین مدل به صورت همزمان (ensemble)
Args:
text: متن برای تحلیل
models: لیست کلیدهای مدل (None = استفاده از مدل‌های پیش‌فرض)
min_models: حداقل تعداد مدل‌های موفق برای نتیجه معتبر
Returns:
Dict شامل نتیجه ensemble
"""
if models is None:
# مدل‌های پیش‌فرض برای ensemble
models = ["crypto_sentiment", "social_sentiment", "financial_sentiment"]
# فراخوانی موازی مدل‌ها
tasks = [self.analyze_sentiment(text, model) for model in models]
results = await asyncio.gather(*tasks, return_exceptions=True)
# جمع‌آوری نتایج موفق
successful_results = []
failed_models = []
loading_models = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failed_models.append({
"model": models[i],
"error": str(result)[:100]
})
continue
if isinstance(result, dict):
if result.get("status") == "success":
successful_results.append(result)
elif result.get("status") == "loading":
loading_models.append(result.get("model"))
else:
failed_models.append({
"model": models[i],
"error": result.get("error", "Unknown error")[:100]
})
# اگر همه مدل‌ها در حال بارگذاری هستند
if loading_models and not successful_results:
return {
"status": "loading",
"message": f"{len(loading_models)} model(s) are loading",
"loading_models": loading_models
}
# اگر تعداد مدل‌های موفق کمتر از حداقل باشد
if len(successful_results) < min_models:
return {
"status": "insufficient_models",
"error": f"Only {len(successful_results)} models succeeded (min: {min_models})",
"successful": len(successful_results),
"failed": len(failed_models),
"failed_models": failed_models[:3], # نمایش 3 خطای اول
"fallback": True
}
# رای‌گیری بین نتایج
labels = [r["label"] for r in successful_results]
confidences = [r["confidence"] for r in successful_results]
# شمارش آرا
label_counts = Counter(labels)
final_label = label_counts.most_common(1)[0][0]
# محاسبه اعتماد وزنی
# مدل‌هایی که با اکثریت موافق هستند، وزن بیشتری دارند
weighted_confidence = sum(
r["confidence"] for r in successful_results
if r["label"] == final_label
) / len([r for r in successful_results if r["label"] == final_label])
# میانگین کل
avg_confidence = sum(confidences) / len(confidences)
# آماره‌های تفصیلی
scores_breakdown = {
"bullish": 0.0,
"bearish": 0.0,
"neutral": 0.0
}
for result in successful_results:
label = result["label"]
confidence = result["confidence"]
scores_breakdown[label] += confidence
# نرمال‌سازی
total_score = sum(scores_breakdown.values())
if total_score > 0:
scores_breakdown = {
k: v / total_score
for k, v in scores_breakdown.items()
}
return {
"status": "success",
"label": final_label,
"confidence": weighted_confidence,
"avg_confidence": avg_confidence,
"score": weighted_confidence,
"scores": scores_breakdown,
"model_count": len(successful_results),
"votes": dict(label_counts),
"consensus": label_counts[final_label] / len(successful_results),
"models_used": [r["model"] for r in successful_results],
"engine": "hf_inference_api_ensemble",
"available": True,
"failed_count": len(failed_models),
"failed_models": failed_models[:3] if failed_models else []
}
async def analyze_with_fallback(
self,
text: str,
primary_model: str = "crypto_sentiment",
fallback_models: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
تحلیل با fallback خودکار
اگر مدل اصلی موفق نشد، از مدل‌های fallback استفاده می‌کند
"""
if fallback_models is None:
fallback_models = ["social_sentiment", "financial_sentiment", "twitter_sentiment"]
# تلاش با مدل اصلی
result = await self.analyze_sentiment(text, primary_model)
if result.get("status") == "success":
result["used_fallback"] = False
return result
# تلاش با مدل‌های fallback
for fallback_model in fallback_models:
result = await self.analyze_sentiment(text, fallback_model)
if result.get("status") == "success":
result["used_fallback"] = True
result["fallback_model"] = fallback_model
result["primary_model_failed"] = primary_model
return result
# همه مدل‌ها ناموفق بودند
return {
"status": "all_failed",
"error": "All models failed",
"primary_model": primary_model,
"fallback_models": fallback_models
}
def get_available_models(self) -> Dict[str, Any]:
"""
دریافت لیست مدل‌های موجود
"""
return {
"total": len(self.verified_models),
"models": [
{
"key": key,
"model_id": model_id,
"provider": "HuggingFace",
"type": "sentiment" if "sentiment" in key else ("generation" if "gen" in key else "trading")
}
for key, model_id in self.verified_models.items()
]
}
def get_cache_stats(self) -> Dict[str, Any]:
"""
آمار cache
"""
return {
"cache_size": len(self._cache),
"cache_ttl": self._cache_ttl
}
# ===== توابع کمکی برای استفاده آسان =====
async def analyze_crypto_sentiment_via_api(
text: str,
use_ensemble: bool = True
) -> Dict[str, Any]:
"""
تحلیل sentiment کریپتو با استفاده از HF Inference API
Args:
text: متن برای تحلیل
use_ensemble: استفاده از ensemble (چند مدل)
Returns:
Dict شامل نتیجه تحلیل
"""
async with HFInferenceAPIClient() as client:
if use_ensemble:
return await client.ensemble_sentiment(text)
else:
return await client.analyze_sentiment(text, "crypto_sentiment")
async def quick_sentiment(text: str) -> str:
"""
تحلیل سریع sentiment - فقط برچسب را برمی‌گرداند
Args:
text: متن برای تحلیل
Returns:
str: "bullish", "bearish", یا "neutral"
"""
result = await analyze_crypto_sentiment_via_api(text, use_ensemble=False)
return result.get("label", "neutral")
# ===== مثال استفاده =====
if __name__ == "__main__":
async def test_client():
"""تست کلاینت"""
print("🧪 Testing HF Inference API Client...")
test_texts = [
"Bitcoin is showing strong bullish momentum!",
"Major exchange hacked, prices crashing",
"Market consolidating, waiting for direction"
]
async with HFInferenceAPIClient() as client:
# تست تک مدل
print("\n1️⃣ Single Model Test:")
for text in test_texts:
result = await client.analyze_sentiment(text, "crypto_sentiment")
print(f" Text: {text[:50]}...")
print(f" Result: {result.get('label')} ({result.get('confidence', 0):.2%})")
# تست ensemble
print("\n2️⃣ Ensemble Test:")
text = "Bitcoin breaking new all-time highs!"
result = await client.ensemble_sentiment(text)
print(f" Text: {text}")
print(f" Result: {result.get('label')} ({result.get('confidence', 0):.2%})")
print(f" Votes: {result.get('votes')}")
print(f" Models: {result.get('model_count')}")
# تست fallback
print("\n3️⃣ Fallback Test:")
result = await client.analyze_with_fallback(text)
print(f" Used fallback: {result.get('used_fallback', False)}")
print(f" Result: {result.get('label')} ({result.get('confidence', 0):.2%})")
# لیست مدل‌ها
print("\n4️⃣ Available Models:")
models = client.get_available_models()
for model in models["models"][:5]:
print(f" - {model['key']}: {model['model_id']}")
print("\n✅ Testing complete!")
import asyncio
asyncio.run(test_client())