Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
#!/usr/bin/env python3
"""
AI Models Monitor & Database Manager
سیستم نظارت و مدیریت دیتابیس مدل‌های AI
Features:
- شناسایی تمام مدل‌های AI از Hugging Face
- تست عملکرد هر مدل
- جمع‌آوری metrics (latency, success rate, etc.)
- ذخیره در دیتابیس
- Agent خودکار برای بررسی هر 5 دقیقه
"""
import asyncio
import logging
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import httpx
from pathlib import Path
import sqlite3
logger = logging.getLogger(__name__)
class AIModelsDatabase:
"""
مدیریت دیتابیس مدل‌های AI
"""
def __init__(self, db_path: str = "data/ai_models.db"):
self.db_path = db_path
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.init_database()
def init_database(self):
"""ایجاد جداول دیتابیس"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# جدول مدل‌ها
cursor.execute('''
CREATE TABLE IF NOT EXISTS ai_models (
id INTEGER PRIMARY KEY AUTOINCREMENT,
model_id TEXT UNIQUE NOT NULL,
model_key TEXT,
task TEXT,
category TEXT,
provider TEXT DEFAULT 'huggingface',
requires_auth BOOLEAN DEFAULT 0,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# جدول metrics (عملکرد مدل‌ها)
cursor.execute('''
CREATE TABLE IF NOT EXISTS model_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
model_id TEXT NOT NULL,
status TEXT, -- 'available', 'loading', 'failed', 'auth_required'
response_time_ms REAL,
success BOOLEAN,
error_message TEXT,
test_input TEXT,
test_output TEXT,
confidence REAL,
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (model_id) REFERENCES ai_models(model_id)
)
''')
# جدول آمار کلی
cursor.execute('''
CREATE TABLE IF NOT EXISTS model_stats (
model_id TEXT PRIMARY KEY,
total_checks INTEGER DEFAULT 0,
successful_checks INTEGER DEFAULT 0,
failed_checks INTEGER DEFAULT 0,
avg_response_time_ms REAL,
last_success_at TIMESTAMP,
last_failure_at TIMESTAMP,
success_rate REAL,
FOREIGN KEY (model_id) REFERENCES ai_models(model_id)
)
''')
conn.commit()
conn.close()
logger.info(f"✅ Database initialized: {self.db_path}")
def add_model(self, model_info: Dict[str, Any]):
"""اضافه کردن یا بروزرسانی مدل"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO ai_models
(model_id, model_key, task, category, provider, requires_auth, updated_at)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
model_info['model_id'],
model_info.get('model_key'),
model_info.get('task'),
model_info.get('category'),
model_info.get('provider', 'huggingface'),
model_info.get('requires_auth', False)
))
conn.commit()
conn.close()
def save_metric(self, metric: Dict[str, Any]):
"""ذخیره metric"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO model_metrics
(model_id, status, response_time_ms, success, error_message,
test_input, test_output, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
metric['model_id'],
metric.get('status'),
metric.get('response_time_ms'),
metric.get('success', False),
metric.get('error_message'),
metric.get('test_input'),
json.dumps(metric.get('test_output')),
metric.get('confidence')
))
# بروزرسانی آمار کلی
self._update_model_stats(cursor, metric['model_id'], metric.get('success', False))
conn.commit()
conn.close()
def _update_model_stats(self, cursor, model_id: str, success: bool):
"""بروزرسانی آمار مدل"""
# دریافت آمار فعلی
cursor.execute('''
SELECT total_checks, successful_checks, failed_checks, avg_response_time_ms
FROM model_stats WHERE model_id = ?
''', (model_id,))
row = cursor.fetchone()
if row:
total, successful, failed, avg_time = row
total += 1
successful += 1 if success else 0
failed += 0 if success else 1
# محاسبه میانگین زمان پاسخ جدید
cursor.execute('''
SELECT AVG(response_time_ms) FROM model_metrics
WHERE model_id = ? AND success = 1
''', (model_id,))
avg_time = cursor.fetchone()[0] or 0
success_rate = (successful / total * 100) if total > 0 else 0
cursor.execute('''
UPDATE model_stats SET
total_checks = ?,
successful_checks = ?,
failed_checks = ?,
avg_response_time_ms = ?,
success_rate = ?,
last_success_at = CASE WHEN ? THEN CURRENT_TIMESTAMP ELSE last_success_at END,
last_failure_at = CASE WHEN ? THEN CURRENT_TIMESTAMP ELSE last_failure_at END
WHERE model_id = ?
''', (total, successful, failed, avg_time, success_rate,
success, not success, model_id))
else:
# ایجاد رکورد جدید
cursor.execute('''
INSERT INTO model_stats
(model_id, total_checks, successful_checks, failed_checks,
success_rate, last_success_at, last_failure_at)
VALUES (?, 1, ?, ?, ?,
CASE WHEN ? THEN CURRENT_TIMESTAMP END,
CASE WHEN ? THEN CURRENT_TIMESTAMP END)
''', (model_id,
1 if success else 0,
0 if success else 1,
100.0 if success else 0.0,
success, not success))
def get_all_models(self) -> List[Dict[str, Any]]:
"""دریافت همه مدل‌ها"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT m.*, s.total_checks, s.successful_checks, s.success_rate, s.avg_response_time_ms
FROM ai_models m
LEFT JOIN model_stats s ON m.model_id = s.model_id
WHERE m.is_active = 1
''')
models = [dict(row) for row in cursor.fetchall()]
conn.close()
return models
def get_model_history(self, model_id: str, limit: int = 100) -> List[Dict[str, Any]]:
"""دریافت تاریخچه مدل"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM model_metrics
WHERE model_id = ?
ORDER BY checked_at DESC
LIMIT ?
''', (model_id, limit))
history = [dict(row) for row in cursor.fetchall()]
conn.close()
return history
class AIModelsMonitor:
"""
مانیتور مدل‌های AI
شناسایی، تست، و نظارت بر همه مدل‌ها
"""
def __init__(self, db: AIModelsDatabase):
self.db = db
import os
self.hf_api_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
# استفاده از router endpoint جدید
self.base_url = "https://api-inference.huggingface.co/models"
# مدل‌های شناخته شده (از کدهای موجود)
self.known_models = self._load_known_models()
def _load_known_models(self) -> List[Dict[str, Any]]:
"""بارگذاری مدل‌های شناخته شده"""
models = []
# از real_ai_models.py
sentiment_models = [
{"model_id": "ElKulako/cryptobert", "task": "sentiment-analysis", "category": "crypto", "requires_auth": True},
{"model_id": "kk08/CryptoBERT", "task": "sentiment-analysis", "category": "crypto"},
{"model_id": "ProsusAI/finbert", "task": "sentiment-analysis", "category": "financial"},
{"model_id": "cardiffnlp/twitter-roberta-base-sentiment-latest", "task": "sentiment-analysis", "category": "twitter"},
{"model_id": "StephanAkkerman/FinTwitBERT-sentiment", "task": "sentiment-analysis", "category": "financial"},
{"model_id": "finiteautomata/bertweet-base-sentiment-analysis", "task": "sentiment-analysis", "category": "twitter"},
{"model_id": "yiyanghkust/finbert-tone", "task": "sentiment-analysis", "category": "financial"},
{"model_id": "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis", "task": "sentiment-analysis", "category": "news"},
{"model_id": "distilbert-base-uncased-finetuned-sst-2-english", "task": "sentiment-analysis", "category": "general"},
{"model_id": "nlptown/bert-base-multilingual-uncased-sentiment", "task": "sentiment-analysis", "category": "general"},
{"model_id": "mayurjadhav/crypto-sentiment-model", "task": "sentiment-analysis", "category": "crypto"},
{"model_id": "mathugo/crypto_news_bert", "task": "sentiment-analysis", "category": "crypto_news"},
{"model_id": "burakutf/finetuned-finbert-crypto", "task": "sentiment-analysis", "category": "crypto"},
]
generation_models = [
{"model_id": "OpenC/crypto-gpt-o3-mini", "task": "text-generation", "category": "crypto"},
{"model_id": "agarkovv/CryptoTrader-LM", "task": "text-generation", "category": "trading"},
{"model_id": "gpt2", "task": "text-generation", "category": "general"},
{"model_id": "distilgpt2", "task": "text-generation", "category": "general"},
]
summarization_models = [
{"model_id": "facebook/bart-large-cnn", "task": "summarization", "category": "news"},
{"model_id": "sshleifer/distilbart-cnn-12-6", "task": "summarization", "category": "news"},
{"model_id": "FurkanGozukara/Crypto-Financial-News-Summarizer", "task": "summarization", "category": "crypto_news"},
]
zero_shot_models = [
{"model_id": "facebook/bart-large-mnli", "task": "zero-shot-classification", "category": "general"},
]
models.extend(sentiment_models)
models.extend(generation_models)
models.extend(summarization_models)
models.extend(zero_shot_models)
return models
async def test_model(self, model_info: Dict[str, Any]) -> Dict[str, Any]:
"""
تست یک مدل
Returns:
Dict با اطلاعات کامل نتیجه تست
"""
model_id = model_info['model_id']
task = model_info.get('task', 'sentiment-analysis')
# متن تست بر اساس task
test_inputs = {
'sentiment-analysis': "Bitcoin is showing strong bullish momentum!",
'text-generation': "The future of cryptocurrency is",
'summarization': "Bitcoin reached new all-time highs today as institutional investors continue to show strong interest in cryptocurrency markets. Analysts predict further growth in the coming months.",
'zero-shot-classification': "Bitcoin price surging",
}
test_input = test_inputs.get(task, "Test input")
url = f"{self.base_url}/{model_id}"
headers = {"Content-Type": "application/json"}
if self.hf_api_token:
headers["Authorization"] = f"Bearer {self.hf_api_token}"
# Payload بر اساس task
if task == 'zero-shot-classification':
payload = {
"inputs": test_input,
"parameters": {"candidate_labels": ["bullish", "bearish", "neutral"]}
}
else:
payload = {"inputs": test_input}
start_time = datetime.now()
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(url, headers=headers, json=payload)
end_time = datetime.now()
response_time = (end_time - start_time).total_seconds() * 1000 # ms
result = {
'model_id': model_id,
'task': task,
'category': model_info.get('category'),
'test_input': test_input,
'response_time_ms': response_time,
'http_status': response.status_code
}
if response.status_code == 200:
data = response.json()
result['status'] = 'available'
result['success'] = True
result['test_output'] = data
# استخراج confidence
if isinstance(data, list) and len(data) > 0:
if isinstance(data[0], dict):
result['confidence'] = data[0].get('score', 0.0)
elif isinstance(data[0], list) and len(data[0]) > 0:
result['confidence'] = data[0][0].get('score', 0.0)
logger.info(f"✅ {model_id}: {response_time:.0f}ms")
elif response.status_code == 503:
result['status'] = 'loading'
result['success'] = False
result['error_message'] = "Model is loading"
logger.warning(f"⏳ {model_id}: Loading...")
elif response.status_code == 401:
result['status'] = 'auth_required'
result['success'] = False
result['error_message'] = "Authentication required"
logger.warning(f"🔐 {model_id}: Auth required")
elif response.status_code == 404:
result['status'] = 'not_found'
result['success'] = False
result['error_message'] = "Model not found"
logger.error(f"❌ {model_id}: Not found")
else:
result['status'] = 'failed'
result['success'] = False
result['error_message'] = f"HTTP {response.status_code}"
logger.error(f"❌ {model_id}: HTTP {response.status_code}")
return result
except asyncio.TimeoutError:
return {
'model_id': model_id,
'task': task,
'category': model_info.get('category'),
'status': 'timeout',
'success': False,
'error_message': "Request timeout (30s)",
'test_input': test_input
}
except Exception as e:
return {
'model_id': model_id,
'task': task,
'category': model_info.get('category'),
'status': 'error',
'success': False,
'error_message': str(e)[:200],
'test_input': test_input
}
async def scan_all_models(self) -> Dict[str, Any]:
"""
اسکن همه مدل‌ها
"""
logger.info(f"🔍 Starting scan of {len(self.known_models)} models...")
# اضافه کردن مدل‌ها به دیتابیس
for model_info in self.known_models:
self.db.add_model(model_info)
# تست همه مدل‌ها
tasks = [self.test_model(model_info) for model_info in self.known_models]
results = await asyncio.gather(*tasks, return_exceptions=True)
# پردازش نتایج
summary = {
'total': len(results),
'available': 0,
'loading': 0,
'failed': 0,
'auth_required': 0,
'not_found': 0,
'models': []
}
for result in results:
if isinstance(result, Exception):
logger.error(f"Exception: {result}")
continue
# ذخیره در دیتابیس
self.db.save_metric(result)
# آمار
status = result.get('status', 'unknown')
if status == 'available':
summary['available'] += 1
elif status == 'loading':
summary['loading'] += 1
elif status == 'auth_required':
summary['auth_required'] += 1
elif status == 'not_found':
summary['not_found'] += 1
else:
summary['failed'] += 1
summary['models'].append({
'model_id': result['model_id'],
'status': status,
'response_time_ms': result.get('response_time_ms'),
'success': result.get('success', False)
})
logger.info(f"✅ Scan complete: {summary['available']}/{summary['total']} available")
return summary
def get_models_by_status(self, status: str = None) -> List[Dict[str, Any]]:
"""دریافت مدل‌ها بر اساس وضعیت"""
models = self.db.get_all_models()
if status:
# فیلتر بر اساس آخرین وضعیت
filtered = []
for model in models:
history = self.db.get_model_history(model['model_id'], limit=1)
if history and history[0]['status'] == status:
filtered.append(model)
return filtered
return models
class AIModelsAgent:
"""
Agent خودکار برای نظارت مدل‌ها
هر 5 دقیقه یکبار بررسی می‌کند
"""
def __init__(self, monitor: AIModelsMonitor, interval_minutes: int = 5):
self.monitor = monitor
self.interval = interval_minutes * 60 # به ثانیه
self.running = False
self.task = None
async def run(self):
"""اجرای Agent"""
self.running = True
logger.info(f"🤖 AI Models Agent started (interval: {self.interval/60:.0f} minutes)")
while self.running:
try:
logger.info(f"🔄 Starting periodic scan...")
result = await self.monitor.scan_all_models()
logger.info(f"📊 Scan Results:")
logger.info(f" Available: {result['available']}")
logger.info(f" Loading: {result['loading']}")
logger.info(f" Failed: {result['failed']}")
logger.info(f" Auth Required: {result['auth_required']}")
# صبر برای interval بعدی
logger.info(f"⏰ Next scan in {self.interval/60:.0f} minutes...")
await asyncio.sleep(self.interval)
except Exception as e:
logger.error(f"❌ Agent error: {e}")
await asyncio.sleep(60) # صبر 1 دقیقه در صورت خطا
def start(self):
"""شروع Agent"""
if not self.task:
self.task = asyncio.create_task(self.run())
return self.task
async def stop(self):
"""توقف Agent"""
self.running = False
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
logger.info("🛑 AI Models Agent stopped")
# Global instances
db = AIModelsDatabase()
monitor = AIModelsMonitor(db)
agent = AIModelsAgent(monitor, interval_minutes=5)
__all__ = ["AIModelsDatabase", "AIModelsMonitor", "AIModelsAgent", "db", "monitor", "agent"]