financial_news_bot / src /services /async_stock_price_predictor.py
Dmitry Beresnev
add stock predictor, fix news iterator, etc
23c855e
import os
import pickle
import asyncio
from datetime import datetime, timezone, timedelta
from collections import defaultdict
from typing import Any
import warnings
import numpy as np
import pandas as pd
import aiohttp
import tensorflow as tf
import keras
from sklearn.preprocessing import MinMaxScaler
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
from huggingface_hub import hf_hub_download
from concurrent.futures import ProcessPoolExecutor
import yfinance as yf
from src.telegram_bot.logger import main_logger as logger
os.environ["KERAS_BACKEND"] = "jax"
class AsyncStockPricePredictor:
"""
Asynchronous stock price predictor using Keras 3.0 models from Hugging Face.
This class loads LSTM models and sentiment analysis models directly from
Hugging Face Hub using the new Keras 3.0 model loading API.
"""
REQUIRED_COLUMNS = ['Open', 'High', 'Low', 'Close', 'Volume', 'Sentiment']
DEFAULT_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
def __init__(
self,
lstm_model_repo: str = "jengyang/lstm-stock-prediction-model",
scaler_repo: str = "jengyang/lstm-stock-prediction-model",
sentiment_model: str = "TLOB/roberta-base-finetuned-financial-text-classification",
sequence_length: int = 60,
news_lookback_days: int = 7,
device: int = -1,
max_workers: int = 4,
timeout: int = 30,
keras_backend: str = "jax",
use_auth_token: str | None = None
):
"""
Initialize the async stock predictor with Keras 3.0 and HuggingFace models.
Args:
lstm_model_repo: HF repository for LSTM model (Keras 3.0 compatible)
scaler_repo: HF repository for scalers
sentiment_model: HF repository for sentiment analysis
sequence_length: Number of days for LSTM sequence
news_lookback_days: Days of news to analyze
device: Device for transformers (-1 for CPU, 0+ for GPU)
max_workers: Max threads for CPU-bound operations
timeout: HTTP request timeout
keras_backend: Keras backend ("jax", "torch", "tensorflow")
use_auth_token: HF token for private repos
"""
# Set Keras backend if not already set
if "KERAS_BACKEND" not in os.environ:
os.environ["KERAS_BACKEND"] = keras_backend
self.sequence_length = sequence_length
self.news_lookback_days = news_lookback_days
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.use_auth_token = use_auth_token or os.getenv("HF_TOKEN")
# Load models from Hugging Face
self._load_keras_models(lstm_model_repo, scaler_repo, sentiment_model, device)
# Thread executor for CPU-bound operations
self.executor = ProcessPoolExecutor(max_workers=max_workers)
def _load_keras_models(
self,
lstm_repo: str,
scaler_repo: str,
sentiment_repo: str,
device: int
) -> None:
"""Load models from Hugging Face Hub using multiple fallback approaches."""
try:
# Try multiple approaches to load the model
model_loaded = False
# Approach 1: Try Keras 3.0 format first
try:
logger.info(f"Attempting to load Keras 3.0 model from hf://{lstm_repo}")
self.model = keras.saving.load_model(f"hf://{lstm_repo}")
logger.info(
f"Keras 3.0 model loaded successfully with {os.environ.get('KERAS_BACKEND', 'default')} backend")
model_loaded = True
except Exception as e:
logger.warning(f"Keras 3.0 loading failed: {e}")
# Approach 2: Try downloading individual model files
if not model_loaded:
logger.info(f"Trying to download model files from {lstm_repo}")
model_files = [
"model.keras",
"model.h5",
"lstm_model.keras",
"lstm_model.h5",
"saved_model.pb",
"pytorch_model.bin"
]
for filename in model_files:
try:
model_path = hf_hub_download(
repo_id=lstm_repo,
filename=filename,
token=self.use_auth_token
)
logger.info(f"Found model file: {filename}")
if filename.endswith('.keras') or filename.endswith('.h5'):
# Load with Keras
if os.environ.get("KERAS_BACKEND") != "tensorflow":
# For JAX/PyTorch backends, we might need TensorFlow compatibility
tf_model = tf.keras.models.load_model(model_path)
# Convert to Keras 3.0 format
self.model = keras.Model.from_config(tf_model.get_config())
self.model.set_weights(tf_model.get_weights())
else:
self.model = keras.saving.load_model(model_path)
model_loaded = True
break
elif filename == 'saved_model.pb':
# Load TensorFlow SavedModel and convert
tf_model = tf.keras.models.load_model(os.path.dirname(model_path))
self.model = keras.Model.from_config(tf_model.get_config())
self.model.set_weights(tf_model.get_weights())
model_loaded = True
break
except Exception as e:
logger.debug(f"Model file {filename} not found or failed to load: {e}")
continue
# Approach 3: Try alternative repositories or create a simple LSTM
if not model_loaded:
logger.warning(f"Could not load model from {lstm_repo}, trying alternative approaches")
# Try some known working repositories
alternative_repos = [
"microsoft/DialoGPT-medium", # Just as a test - we'll replace with LSTM
"huggingface/CodeBERTa-small-v1" # Another test repo
]
for alt_repo in alternative_repos:
try:
logger.info(f"Trying alternative repo: {alt_repo}")
# This won't work for LSTM, but let's build our own
break
except:
continue
# Create a simple LSTM model if all else fails
logger.warning("Creating a simple LSTM model as fallback")
self.model = self._create_fallback_lstm_model()
model_loaded = True
if not model_loaded:
raise RuntimeError(f"Could not load any model from {lstm_repo}")
logger.info("LSTM model loaded successfully")
# Try to load scalers from the same repo or scaler_repo
logger.info(f"Downloading scalers from {scaler_repo}")
scaler_files = [
"scalers.pkl",
"scaler.pkl",
"preprocessing.pkl",
"feature_scalers.pkl",
"minmax_scalers.pkl"
]
scaler_path = None
for filename in scaler_files:
try:
scaler_path = hf_hub_download(
repo_id=scaler_repo,
filename=filename,
token=self.use_auth_token
)
logger.info(f"Found scaler file: {filename}")
break
except Exception as e:
logger.debug(f"Scaler file {filename} not found: {e}")
continue
if scaler_path:
with open(scaler_path, 'rb') as f:
self.scalers = pickle.load(f)
logger.info("Scalers loaded successfully")
# Validate required scalers exist
missing_scalers = set(self.REQUIRED_COLUMNS) - set(self.scalers.keys())
if missing_scalers:
logger.warning(f"Missing scalers for columns: {missing_scalers}")
# Create dummy scalers for missing columns
for col in missing_scalers:
self.scalers[col] = MinMaxScaler()
logger.info(f"Created dummy scaler for {col}")
else:
logger.warning("No scaler file found, will use manual normalization")
self.scalers = {}
# Initialize sentiment analysis pipeline
logger.info(f"Loading sentiment model: {sentiment_repo}")
self.tokenizer = AutoTokenizer.from_pretrained(sentiment_repo)
sentiment_model = AutoModelForSequenceClassification.from_pretrained(sentiment_repo)
self.sentiment_pipe = pipeline(
"sentiment-analysis",
model=sentiment_model,
tokenizer=self.tokenizer,
device=device
)
logger.info("Sentiment analysis pipeline initialized")
except Exception as e:
logger.error(f"Failed to load models from Hugging Face: {e}")
raise
def _create_fallback_lstm_model(self):
"""Create a simple LSTM model as fallback."""
try:
logger.info("Creating fallback LSTM model")
# Create a simple LSTM model structure
model = keras.Sequential([
keras.layers.LSTM(50, return_sequences=True,
input_shape=(self.sequence_length, len(self.REQUIRED_COLUMNS))),
keras.layers.Dropout(0.2),
keras.layers.LSTM(50, return_sequences=True),
keras.layers.Dropout(0.2),
keras.layers.LSTM(50),
keras.layers.Dropout(0.2),
keras.layers.Dense(1)
])
model.compile(optimizer='adam', loss='mean_squared_error')
# Initialize with random weights
dummy_input = np.random.random((1, self.sequence_length, len(self.REQUIRED_COLUMNS)))
model.predict(dummy_input, verbose=0)
logger.warning("Using fallback LSTM model - predictions may not be accurate")
return model
except Exception as e:
logger.error(f"Failed to create fallback model: {e}")
raise
async def fetch_stock_data(
self,
ticker: str,
period: str = "1y",
interval: str = "1d"
) -> pd.DataFrame:
"""
Asynchronously fetch historical stock data from Yahoo Finance.
"""
url = (
f"https://query1.finance.yahoo.com/v8/finance/chart/{ticker.upper()}"
f"?range={period}&interval={interval}&includePrePost=false"
)
try:
async with aiohttp.ClientSession(
timeout=self.timeout,
headers=self.DEFAULT_HEADERS
) as session:
async with session.get(url) as response:
response.raise_for_status()
data = await response.json()
result = data['chart']['result'][0]
timestamps = pd.to_datetime(result['timestamp'], unit='s')
quotes = result['indicators']['quote'][0]
# Handle missing data
ohlcv_data = {
'Open': quotes.get('open', []),
'High': quotes.get('high', []),
'Low': quotes.get('low', []),
'Close': quotes.get('close', []),
'Volume': quotes.get('volume', [])
}
df = pd.DataFrame(ohlcv_data, index=timestamps)
# Remove timezone info and handle missing values
df.index = df.index.tz_localize(None)
df = df.dropna()
if df.empty:
raise ValueError(f"No valid data found for ticker {ticker}")
logger.info(f"Fetched {len(df)} data points for {ticker}")
return df
except Exception as e:
logger.error(f"Error fetching stock data for {ticker}: {e}")
raise RuntimeError(f"Failed to fetch stock data: {e}")
@staticmethod
async def fetch_prices(ticker: str, period: str = "6mo", interval: str = "1d") -> pd.DataFrame | None:
"""
Fetch historical stock price data from Yahoo Finance.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
period: Time period for data (1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max)
interval: Data interval (1m, 2m, 5m, 15m, 30m, 60m, 90m, 1h, 1d, 5d, 1wk, 1mo, 3mo)
Returns:
DataFrame with OHLCV data or None if error occurs
"""
try:
logger.info(f"Fetching data for {ticker}")
# Suppress yfinance warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
df = yf.download(ticker, period=period, interval=interval, progress=False)
if df.empty:
logger.error(f"No data found for ticker {ticker}")
return None
# Select relevant columns
df = df[["Open", "High", "Low", "Close", "Volume"]].copy()
df.dropna(inplace=True)
if len(df) < 60:
logger.warning(f"Insufficient data for {ticker}. Got {len(df)} days, need at least 60")
return None
logger.info(f"Successfully fetched {len(df)} data points for {ticker}")
return df
except Exception as e:
logger.error(f"Error fetching data for {ticker}: {e}")
return None
async def fetch_news(self, ticker: str) -> list[dict[str, Any]]:
"""Fetch recent news for a stock ticker."""
url = f"https://query1.finance.yahoo.com/v6/finance/news?symbols={ticker.upper()}"
try:
async with aiohttp.ClientSession(
timeout=self.timeout,
headers=self.DEFAULT_HEADERS
) as session:
async with session.get(url) as response:
response.raise_for_status()
data = await response.json()
news_items = data.get('items', {}).get('result', [])
logger.info(f"Fetched {len(news_items)} news items for {ticker}")
return news_items
except Exception as e:
logger.warning(f"Error fetching news for {ticker}: {e}")
return []
async def analyze_sentiment_batch(
self,
texts: list[str],
batch_size: int = 10
) -> list[float]:
"""Analyze sentiment for a batch of texts asynchronously."""
if not texts:
return []
async def process_batch(batch: list[str]) -> list[float]:
"""Process a single batch of texts."""
try:
predictions = await asyncio.get_event_loop().run_in_executor(
self.executor,
lambda: self.sentiment_pipe(batch, truncation=True, max_length=512)
)
scores = []
for pred in predictions:
label = pred.get('label', '').lower()
confidence = float(pred.get('score', 0.0))
# Handle different label formats for financial sentiment
if any(pos in label for pos in ['positive', 'pos', 'bullish', 'label_2']):
scores.append(confidence)
elif any(neg in label for neg in ['negative', 'neg', 'bearish', 'label_0']):
scores.append(-confidence)
else: # neutral or unknown
scores.append(0.0)
return scores
except Exception as e:
logger.warning(f"Error in sentiment analysis batch: {e}")
return [0.0] * len(batch)
# Split texts into batches
batches = [texts[i:i + batch_size] for i in range(0, len(texts), batch_size)]
# Process all batches concurrently
batch_results = await asyncio.gather(*[process_batch(batch) for batch in batches])
# Flatten results
all_scores = []
for batch_scores in batch_results:
all_scores.extend(batch_scores)
return all_scores
async def compute_daily_sentiment(
self,
news_items: list[dict[str, Any]],
since_date: datetime.date
) -> dict[datetime.date, float]:
"""Compute daily sentiment scores from news items."""
news_by_date = defaultdict(list)
for item in news_items:
# Extract timestamp
timestamp = (
item.get('providerPublishTime') or
item.get('pubDate') or
item.get('published')
)
if isinstance(timestamp, (int, float)):
date = datetime.fromtimestamp(int(timestamp), tz=timezone.utc).date()
else:
try:
date = pd.to_datetime(timestamp).tz_convert(None).date()
except Exception:
date = datetime.now().date()
if date < since_date:
continue
# Combine title and summary
title = item.get('title', '').strip()
summary = item.get('summary', '').strip()
text = f"{title}. {summary}".strip()
if text and len(text) > 10: # Filter out very short texts
news_by_date[date].append(text)
# Analyze sentiment for each date
daily_sentiment = {}
for date, texts in news_by_date.items():
if texts:
sentiment_scores = await self.analyze_sentiment_batch(texts)
daily_sentiment[date] = float(np.mean(sentiment_scores))
else:
daily_sentiment[date] = 0.0
logger.info(f"Computed sentiment for {len(daily_sentiment)} days")
return daily_sentiment
def align_sentiment_to_prices(
self,
price_df: pd.DataFrame,
daily_sentiment: dict[datetime.date, float]
) -> pd.Series:
"""Align daily sentiment scores to price DataFrame index."""
sentiment_values = []
for date in price_df.index.date:
sentiment_values.append(daily_sentiment.get(date, 0.0))
return pd.Series(sentiment_values, index=price_df.index, name='Sentiment')
def prepare_sequences(self, df: pd.DataFrame) -> np.ndarray:
"""Prepare input sequences for the LSTM model."""
# Ensure we have required columns in correct order
available_columns = [col for col in self.REQUIRED_COLUMNS if col in df.columns]
df = df[available_columns].copy()
# Scale features
scaled_data = {}
for column in df.columns:
if column in self.scalers:
# Use existing scaler
scaled_data[column] = self.scalers[column].transform(
df[[column]]
).flatten()
else:
# Manual min-max normalization if scaler not available
col_values = df[column].values
min_val, max_val = col_values.min(), col_values.max()
if max_val > min_val:
scaled_data[column] = (col_values - min_val) / (max_val - min_val)
else:
scaled_data[column] = np.zeros_like(col_values)
df_scaled = pd.DataFrame(scaled_data, index=df.index)
# Pad with zeros if we have fewer columns than expected
while len(df_scaled.columns) < len(self.REQUIRED_COLUMNS):
missing_col = f"feature_{len(df_scaled.columns)}"
df_scaled[missing_col] = 0.0
# Create sequences
sequences = []
for i in range(len(df_scaled) - self.sequence_length + 1):
sequence = df_scaled.iloc[i:i + self.sequence_length].values
sequences.append(sequence)
return np.array(sequences)
async def predict_next_day_price(
self,
ticker: str,
period: str = "1y",
interval: str = "1d"
) -> dict[str, Any]:
"""Predict the next day's stock price using Keras 3.0 model."""
try:
# Fetch data concurrently
logger.info(f"Starting prediction for {ticker}")
stock_data, news_items = await asyncio.gather(
self.fetch_stock_data(ticker, period, interval),
self.fetch_news(ticker)
)
# Compute sentiment
since_date = stock_data.index[-1].date() - timedelta(days=self.news_lookback_days)
daily_sentiment = await self.compute_daily_sentiment(news_items, since_date)
# Align sentiment with price data
sentiment_series = self.align_sentiment_to_prices(stock_data, daily_sentiment)
combined_data = stock_data.copy()
combined_data['Sentiment'] = sentiment_series
# Validate data length
if len(combined_data) < self.sequence_length:
raise RuntimeError(
f"Insufficient data: {len(combined_data)} days, "
f"need at least {self.sequence_length} days"
)
# Prepare sequences and make prediction
sequences = self.prepare_sequences(combined_data)
last_sequence = sequences[-1:]
# Run prediction in executor to avoid blocking
# Convert to appropriate array type for the backend
if os.environ.get("KERAS_BACKEND") == "jax":
import jax.numpy as jnp
last_sequence = jnp.array(last_sequence)
scaled_prediction = await asyncio.get_event_loop().run_in_executor(
self.executor,
lambda: self.model.predict(last_sequence, verbose=0)
)
# Convert back to numpy if needed
if hasattr(scaled_prediction, 'numpy'):
scaled_prediction = scaled_prediction.numpy()
elif hasattr(scaled_prediction, '__array__'):
scaled_prediction = np.array(scaled_prediction)
# Inverse transform prediction
if 'Close' in self.scalers:
predicted_price = self.scalers['Close'].inverse_transform(
scaled_prediction.reshape(-1, 1)
)[0][0]
else:
# Manual denormalization if scaler not available
close_data = combined_data['Close'].values
min_val, max_val = close_data.min(), close_data.max()
predicted_price = scaled_prediction[0][0] * (max_val - min_val) + min_val
# Calculate metrics
last_close = float(combined_data['Close'].iloc[-1])
change_percent = ((predicted_price - last_close) / last_close) * 100
# Sentiment summary
if daily_sentiment:
avg_sentiment = np.mean(list(daily_sentiment.values()))
sentiment_label = (
"Positive" if avg_sentiment > 0.1 else
"Negative" if avg_sentiment < -0.1 else
"Neutral"
)
else:
avg_sentiment = 0.0
sentiment_label = "No recent news"
# Trend emoji
trend_emoji = (
"πŸš€" if change_percent > 1 else
"πŸ“‰" if change_percent < -1 else
"➑️"
)
result = {
'ticker': ticker.upper(),
'predicted_price': round(float(predicted_price), 2),
'last_price': round(last_close, 2),
'change_percent': round(change_percent, 2),
'last_date': str(stock_data.index[-1].date()),
'trend_emoji': trend_emoji,
'sentiment_score': round(avg_sentiment, 3),
'sentiment_label': sentiment_label,
'data_points': len(combined_data),
'news_items': len(news_items),
'backend': os.environ.get("KERAS_BACKEND", "tensorflow")
}
logger.info(
f"Prediction completed for {ticker}: ${result['predicted_price']} (backend: {result['backend']})")
return result
except Exception as e:
logger.error(f"Prediction failed for {ticker}: {e}")
raise
def format_telegram_message(self, prediction: dict[str, Any]) -> str:
"""Format prediction result as a Telegram message."""
return (
f"πŸ“Š *Stock Prediction for {prediction['ticker']}*\n"
f"Date: {prediction['last_date']}\n\n"
f"Last closing price: `${prediction['last_price']:.2f}`\n"
f"Predicted next price: *${prediction['predicted_price']:.2f}* {prediction['trend_emoji']}\n\n"
f"Expected change: {prediction['change_percent']:+.2f}%\n\n"
f"πŸ“° News sentiment: {prediction['sentiment_label']}\n"
f"πŸ“ˆ Data points used: {prediction['data_points']}\n"
f"πŸ“‘ News articles: {prediction['news_items']}\n\n"
f"πŸ€– Powered by Keras 3.0 ({prediction.get('backend', 'JAX')}) + HuggingFace"
)
async def cleanup(self) -> None:
"""Cleanup resources."""
if hasattr(self, 'executor'):
self.executor.shutdown(wait=True)
logger.info("Thread executor shut down")
@staticmethod
def get_available_backends() -> list[str]:
"""Get list of available Keras backends."""
backends = ["tensorflow", "jax", "torch"]
available = []
for backend in backends:
try:
if backend == "jax":
import jax
available.append("jax")
elif backend == "torch":
import torch
available.append("torch")
elif backend == "tensorflow":
import tensorflow
available.append("tensorflow")
except ImportError:
continue
return available
@classmethod
def create_with_backend(cls, backend: str = "jax", **kwargs):
"""Create predictor with specific Keras backend."""
os.environ["KERAS_BACKEND"] = backend
return cls(**kwargs)
# Usage example for Telegram bot
async def handle_stock_prediction(ticker: str, predictor: AsyncStockPricePredictor) -> str:
"""Handle stock prediction request for Telegram bot."""
try:
prediction = await predictor.predict_next_day_price(ticker.upper())
return predictor.format_telegram_message(prediction)
except Exception as e:
return f"❌ Error predicting {ticker.upper()}: {str(e)}"
# Example usage
async def main():
"""Example usage of the async stock predictor with Keras 3.0."""
# Show available backends
available_backends = AsyncStockPricePredictor.get_available_backends()
print(f"Available Keras backends: {available_backends}")
# Initialize with Keras 3.0 and JAX backend
predictor = AsyncStockPricePredictor(
lstm_model_repo="jengyang/lstm-stock-prediction-model",
scaler_repo="jengyang/lstm-stock-prediction-model",
sentiment_model="TLOB/roberta-base-finetuned-financial-text-classification",
max_workers=2,
keras_backend="jax" # Can also use "torch" or "tensorflow"
)
try:
# Test prediction
result = await handle_stock_prediction("AAPL", predictor)
print(result)
print(f"\nUsing Keras backend: {os.environ.get('KERAS_BACKEND')}")
finally:
await predictor.cleanup()
if __name__ == "__main__":
asyncio.run(main())