Coverage for hf_client.py: 0.00%
117 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-25 15:37 +0330
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-25 15:37 +0330
1from __future__ import annotations
3from typing import List, Dict, Any, Optional
4import os
5import time
6from functools import lru_cache
7from collections import deque
8from datetime import datetime
10ENABLE_SENTIMENT = os.getenv("ENABLE_SENTIMENT", "true").lower() in ("1", "true", "yes")
11SOCIAL_MODEL = os.getenv("SENTIMENT_SOCIAL_MODEL", "ElKulako/cryptobert")
12NEWS_MODEL = os.getenv("SENTIMENT_NEWS_MODEL", "kk08/CryptoBERT")
15@lru_cache(maxsize=4)
16def _pl(model_name: str):
17 if not ENABLE_SENTIMENT:
18 return None
19 from transformers import pipeline
20 return pipeline("sentiment-analysis", model=model_name)
23def _label_to_score(lbl: str) -> float:
24 l = (lbl or "").lower()
25 if "positive" in l or "bullish" in l:
26 return 1.0
27 if "negative" in l or "bearish" in l:
28 return -1.0
29 return 0.0
32def analyze_social_sentiment(text: str) -> Dict[str, Any]:
33 """Analyze social media text sentiment using CryptoBERT."""
34 if not ENABLE_SENTIMENT or not text or not text.strip():
35 return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0}
37 pipe = _pl(SOCIAL_MODEL)
38 if pipe is None:
39 return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0}
41 try:
42 result = pipe(text[:512])[0]
43 label = result.get("label", "NEUTRAL")
44 confidence = result.get("score", 0.0)
45 score = _label_to_score(label)
47 return {
48 "sentiment": label.lower(),
49 "score": score,
50 "confidence": confidence
51 }
52 except Exception as e:
53 return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0, "error": str(e)}
56def analyze_news_sentiment(text: str) -> Dict[str, Any]:
57 """Analyze news text sentiment using CryptoBERT."""
58 if not ENABLE_SENTIMENT or not text or not text.strip():
59 return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0}
61 pipe = _pl(NEWS_MODEL)
62 if pipe is None:
63 return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0}
65 try:
66 result = pipe(text[:512])[0]
67 label = result.get("label", "NEUTRAL")
68 confidence = result.get("score", 0.0)
69 score = _label_to_score(label)
71 return {
72 "sentiment": label.lower(),
73 "score": score,
74 "confidence": confidence
75 }
76 except Exception as e:
77 return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0, "error": str(e)}
80def batch_analyze_sentiment(
81 texts: List[str],
82 model_type: str = "social"
83) -> List[Dict[str, Any]]:
84 """Analyze multiple texts in batch."""
85 if not ENABLE_SENTIMENT or not texts:
86 return [{"sentiment": "neutral", "score": 0.0, "confidence": 0.0} for _ in texts]
88 model_name = SOCIAL_MODEL if model_type == "social" else NEWS_MODEL
89 pipe = _pl(model_name)
91 if pipe is None:
92 return [{"sentiment": "neutral", "score": 0.0, "confidence": 0.0} for _ in texts]
94 results = []
95 for text in texts:
96 if not text or not text.strip():
97 results.append({"sentiment": "neutral", "score": 0.0, "confidence": 0.0})
98 continue
100 try:
101 result = pipe(text[:512])[0]
102 label = result.get("label", "NEUTRAL")
103 confidence = result.get("score", 0.0)
104 score = _label_to_score(label)
106 results.append({
107 "sentiment": label.lower(),
108 "score": score,
109 "confidence": confidence
110 })
111 except Exception as e:
112 results.append({
113 "sentiment": "neutral",
114 "score": 0.0,
115 "confidence": 0.0,
116 "error": str(e)
117 })
119 return results
122class HFClient:
123 """HuggingFace Client for sentiment analysis with usage tracking."""
125 def __init__(self, max_history: int = 100):
126 """Initialize HFClient with usage tracking.
128 Args:
129 max_history: Maximum number of recent results to keep in history
130 """
131 self.max_history = max_history
132 self._history = deque(maxlen=max_history)
133 self._stats = {
134 "total_requests": 0,
135 "successful_requests": 0,
136 "failed_requests": 0,
137 "total_latency_ms": 0.0,
138 "model_usage": {}
139 }
141 def analyze_sentiment(
142 self,
143 text: str,
144 model_type: str = "social",
145 metadata: Optional[Dict[str, Any]] = None
146 ) -> Dict[str, Any]:
147 """Analyze sentiment with tracking.
149 Args:
150 text: Text to analyze
151 model_type: Type of model to use ("social" or "news")
152 metadata: Optional metadata to attach to result
154 Returns:
155 Sentiment analysis result with metadata
156 """
157 start_time = time.time()
158 self._stats["total_requests"] += 1
160 # Track model usage
161 model_name = SOCIAL_MODEL if model_type == "social" else NEWS_MODEL
162 if model_name not in self._stats["model_usage"]:
163 self._stats["model_usage"][model_name] = 0
164 self._stats["model_usage"][model_name] += 1
166 try:
167 # Perform analysis
168 if model_type == "social":
169 result = analyze_social_sentiment(text)
170 else:
171 result = analyze_news_sentiment(text)
173 # Calculate latency
174 latency_ms = (time.time() - start_time) * 1000
175 self._stats["total_latency_ms"] += latency_ms
177 # Track success
178 if "error" not in result:
179 self._stats["successful_requests"] += 1
180 else:
181 self._stats["failed_requests"] += 1
183 # Add metadata
184 enriched_result = {
185 **result,
186 "timestamp": datetime.utcnow().isoformat(),
187 "model_type": model_type,
188 "model_name": model_name,
189 "latency_ms": round(latency_ms, 2),
190 "metadata": metadata or {}
191 }
193 # Add to history
194 self._history.append(enriched_result)
196 return enriched_result
198 except Exception as e:
199 self._stats["failed_requests"] += 1
200 latency_ms = (time.time() - start_time) * 1000
201 self._stats["total_latency_ms"] += latency_ms
203 error_result = {
204 "sentiment": "neutral",
205 "score": 0.0,
206 "confidence": 0.0,
207 "error": str(e),
208 "timestamp": datetime.utcnow().isoformat(),
209 "model_type": model_type,
210 "model_name": model_name,
211 "latency_ms": round(latency_ms, 2),
212 "metadata": metadata or {}
213 }
215 self._history.append(error_result)
216 return error_result
218 def get_usage_stats(self) -> Dict[str, Any]:
219 """Get usage statistics.
221 Returns:
222 Dictionary containing usage statistics
223 """
224 total_requests = self._stats["total_requests"]
225 avg_latency = (
226 self._stats["total_latency_ms"] / total_requests
227 if total_requests > 0
228 else 0.0
229 )
230 success_rate = (
231 self._stats["successful_requests"] / total_requests * 100
232 if total_requests > 0
233 else 0.0
234 )
236 return {
237 "total_requests": total_requests,
238 "successful_requests": self._stats["successful_requests"],
239 "failed_requests": self._stats["failed_requests"],
240 "success_rate_percent": round(success_rate, 2),
241 "avg_latency_ms": round(avg_latency, 2),
242 "model_usage": self._stats["model_usage"],
243 "history_size": len(self._history),
244 "max_history": self.max_history
245 }
247 def get_recent_results(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
248 """Get recent analysis results.
250 Args:
251 limit: Maximum number of results to return (None for all)
253 Returns:
254 List of recent results
255 """
256 if limit is None or limit >= len(self._history):
257 return list(self._history)
258 return list(self._history)[-limit:]
260 def clear_history(self) -> None:
261 """Clear the results history."""
262 self._history.clear()
264 def reset_stats(self) -> None:
265 """Reset all usage statistics."""
266 self._stats = {
267 "total_requests": 0,
268 "successful_requests": 0,
269 "failed_requests": 0,
270 "total_latency_ms": 0.0,
271 "model_usage": {}
272 }
273 self.clear_history()