|
|
from typing import Dict, Any
|
|
|
from datetime import datetime, timedelta
|
|
|
from sqlalchemy import create_engine, desc, func
|
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
from contextlib import contextmanager
|
|
|
|
|
|
from database.models_hub import MarketPrice
|
|
|
|
|
|
|
|
|
class MarketSummaryService:
|
|
|
def __init__(self, db_path: str = "data/crypto_hub.db"):
|
|
|
self.db_path = db_path
|
|
|
db_url = f"sqlite:///{self.db_path}"
|
|
|
self.engine = create_engine(db_url, connect_args={"check_same_thread": False})
|
|
|
self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
|
|
|
self.cache = {}
|
|
|
self.cache_ttl = 60
|
|
|
|
|
|
@contextmanager
|
|
|
def get_session(self):
|
|
|
session = self.SessionLocal()
|
|
|
try:
|
|
|
yield session
|
|
|
finally:
|
|
|
session.close()
|
|
|
|
|
|
def get_market_summary(self, symbol: str) -> Dict[str, Any]:
|
|
|
cache_key = f"market_{symbol}"
|
|
|
if cache_key in self.cache:
|
|
|
cached_data, cached_time = self.cache[cache_key]
|
|
|
if (datetime.utcnow() - cached_time).seconds < self.cache_ttl:
|
|
|
return cached_data
|
|
|
|
|
|
try:
|
|
|
with self.get_session() as session:
|
|
|
price_data = session.query(MarketPrice)\
|
|
|
.filter(MarketPrice.symbol == symbol.upper())\
|
|
|
.order_by(desc(MarketPrice.collected_at))\
|
|
|
.first()
|
|
|
|
|
|
if not price_data:
|
|
|
return {
|
|
|
'status': 'unavailable',
|
|
|
'symbol': symbol.upper(),
|
|
|
'message': 'No data available for this symbol'
|
|
|
}
|
|
|
|
|
|
result = {
|
|
|
'status': 'available',
|
|
|
'symbol': price_data.symbol,
|
|
|
'price_usd': float(price_data.price_usd) if price_data.price_usd else None,
|
|
|
'market_cap_usd': float(price_data.market_cap) if price_data.market_cap else None,
|
|
|
'circulating_supply': float(price_data.circulating_supply) if price_data.circulating_supply else None,
|
|
|
'total_volume_24h_usd': float(price_data.volume_24h) if price_data.volume_24h else None,
|
|
|
'price_change_24h_pct': float(price_data.change_24h) if price_data.change_24h else None,
|
|
|
'liquidity_score': self._calculate_liquidity_score(price_data),
|
|
|
'last_updated': price_data.collected_at.isoformat()
|
|
|
}
|
|
|
|
|
|
self.cache[cache_key] = (result, datetime.utcnow())
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
'status': 'degraded',
|
|
|
'symbol': symbol.upper(),
|
|
|
'error': str(e)
|
|
|
}
|
|
|
|
|
|
def get_multiple_summaries(self, symbols: list) -> Dict[str, Any]:
|
|
|
results = {}
|
|
|
available = 0
|
|
|
degraded = 0
|
|
|
offline = 0
|
|
|
|
|
|
for symbol in symbols:
|
|
|
summary = self.get_market_summary(symbol)
|
|
|
results[symbol] = summary
|
|
|
|
|
|
if summary['status'] == 'available':
|
|
|
available += 1
|
|
|
elif summary['status'] == 'degraded':
|
|
|
degraded += 1
|
|
|
else:
|
|
|
offline += 1
|
|
|
|
|
|
return {
|
|
|
'status': 'available',
|
|
|
'summary': {
|
|
|
'total': len(symbols),
|
|
|
'available': available,
|
|
|
'degraded': degraded,
|
|
|
'offline': offline
|
|
|
},
|
|
|
'data': results
|
|
|
}
|
|
|
|
|
|
def _calculate_liquidity_score(self, price_data) -> float:
|
|
|
if not price_data.volume_24h or not price_data.market_cap:
|
|
|
return 0.0
|
|
|
|
|
|
volume_to_mcap = price_data.volume_24h / price_data.market_cap if price_data.market_cap > 0 else 0
|
|
|
liquidity_score = min(volume_to_mcap * 100, 100.0)
|
|
|
return round(liquidity_score, 2)
|
|
|
|