financial_news_bot / src /services /news_pooling_service.py
Dmitry Beresnev
add a new openrouter api key, limit the total count of articles
e98f030
raw
history blame
14.1 kB
"""
News polling service for fetching and processing Finnhub news
THIS MODULE SHOULD BE REFACTORED
"""
import asyncio
import httpx
from datetime import datetime, timedelta
from typing import Any
import logging
from contextlib import asynccontextmanager
from itertools import islice
import os
import hashlib
from datasets import Dataset, load_dataset
from huggingface_hub import login
import pandas as pd
from pytz import timezone
from src.telegram_bot.config import Config
from src.telegram_bot.logger import main_logger as logger
class NewsPollingService:
def __init__(self, chat_id, finnhub_api_key: str = None, hf_token: str = None, hf_repo: str = None):
self._api_key = Config.FINNHUB_API_KEY or finnhub_api_key
self._base_url = "https://finnhub.io/api/v1"
self._http_client: httpx.AsyncClient | None = None
self._polling_task: asyncio.Task | None = None
self._is_running = False
self._poll_interval = 300 # 5 minutes
self._subscribers = [] # List of callbacks for new news
self._hf_token = Config.HF_TOKEN or hf_token
self._hf_repo = Config.HF_DATASET_REPO or hf_repo
self._hf_dataset_repo = 'researchengineering/finnhub-news'
self._chat_id = chat_id
self._article_batch_size = 7 # number of articles per batch
self._batch_delay = 200 # delay between batches in seconds
async def initialize(self):
"""Initialize the polling service"""
self._http_client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
)
logger.info("News polling service initialized")
async def cleanup(self):
"""Cleanup resources"""
await self.stop_polling()
if self._http_client:
await self._http_client.aclose()
logger.info("News polling service cleaned up")
def subscribe_to_news(self, callback):
"""Subscribe to new news notifications"""
self._subscribers.append(callback)
def unsubscribe_from_news(self, callback):
"""Unsubscribe from news notifications"""
if callback in self._subscribers:
self._subscribers.remove(callback)
async def _notify_subscribers(self, new_articles: list[dict[str, Any]]):
"""Notify all subscribers about new articles"""
if not new_articles or not self._subscribers:
return
for callback in self._subscribers[:]: # Copy list to avoid modification during iteration
try:
if asyncio.iscoroutinefunction(callback):
await callback(self._chat_id, new_articles)
else:
callback(self._chat_id, new_articles)
except Exception as e:
logger.error(f"Error notifying subscriber: {e}")
async def fetch_general_news(self, category: str = "general", min_id: str = "0") -> list[dict[str, Any]]:
"""Fetch general news from Finnhub"""
try:
url = f"{self._base_url}/news"
params = {
"category": category,
"token": self._api_key,
"minId": min_id
}
logger.info(f"Fetching news: category={category}, minId={min_id}")
response = await self._http_client.get(url, params=params)
response.raise_for_status()
articles = response.json()
logger.info(f"Fetched {len(articles)} articles from Finnhub")
return articles
except Exception as e:
logger.error(f"Error fetching general news: {e}")
return []
async def fetch_company_news(self, symbol: str, from_date: str, to_date: str) -> list[dict[str, Any]]:
"""Fetch company-specific news from Finnhub"""
try:
url = f"{self._base_url}/company-news"
params = {
"symbol": symbol,
"from": from_date,
"to": to_date,
"token": self._api_key
}
logger.info(f"Fetching company news: symbol={symbol}, from={from_date}, to={to_date}")
response = await self._http_client.get(url, params=params)
response.raise_for_status()
articles = response.json()
logger.info(f"Fetched {len(articles)} company articles for {symbol}")
return articles
except Exception as e:
logger.error(f"Error fetching company news for {symbol}: {e}")
return []
async def fetch_market_news(self, category: str = "forex") -> list[dict[str, Any]]:
"""Fetch market news (forex, crypto, merger)"""
try:
url = f"{self._base_url}/news"
params = {
"category": category,
"token": self._api_key
}
response = await self._http_client.get(url, params=params)
response.raise_for_status()
articles = response.json()
logger.info(f"Fetched {len(articles)} {category} articles")
return articles
except Exception as e:
logger.error(f"Error fetching {category} news: {e}")
return []
async def load_existing_articles(self) -> pd.DataFrame:
"""Load existing dataset from HF"""
if not self._hf_token or not self._hf_dataset_repo:
logger.warning("Hugging Face config not set.")
return pd.DataFrame()
try:
login(self._hf_token)
existing = load_dataset(self._hf_dataset_repo, split="train")
df = existing.to_pandas()
return df[df['chat_id'] == self._chat_id] if 'chat_id' in df.columns else df
except Exception as e:
logger.warning(f"Could not load HF dataset: {e}")
return pd.DataFrame()
async def save_articles_to_hf(self, articles: list[dict[str, Any]]):
"""Save new articles to Hugging Face Dataset"""
if not self._hf_token or not self._hf_dataset_repo:
logger.warning("HF_TOKEN or HF_DATASET_REPO not set.")
return
login(self._hf_token)
for article in articles:
uid = f"{article.get('datetime', '')}_{article.get('url', '')}"
article['id'] = hashlib.sha256(uid.encode()).hexdigest()
article['persisted_at'] = datetime.utcnow().isoformat()
article['chat_id'] = self._chat_id
df_new = pd.DataFrame(articles)
df_existing = await self.load_existing_articles()
df_combined = pd.concat([df_existing, df_new], ignore_index=True)
df_combined.drop_duplicates(subset=["id"], inplace=True)
dataset = Dataset.from_pandas(df_combined)
dataset.push_to_hub(self._hf_dataset_repo, private=True)
logger.info(f"Pushed {len(df_new)} new articles to HF dataset: {self._hf_dataset_repo}")
async def get_last_processed_timestamp(self) -> int:
"""Get latest timestamp from HF Dataset"""
df = await self.load_existing_articles()
if df.empty or "datetime" not in df.columns:
return 0
return int(df["datetime"].max())
def _chunked(self, iterable, size):
"""Yield successive chunks from iterable of length size."""
iterator = iter(iterable)
for first in iterator:
yield [first] + list(islice(iterator, size - 1))
async def process_new_articles(self, tickers: list[str]) -> int:
"""Process and persist new articles to Hugging Face Datasets"""
try:
last_timestamp = await self.get_last_processed_timestamp()
logger.info(f"Processing articles since timestamp: {last_timestamp}")
#now = datetime.now(timezone.utc)
eastern = timezone("US/Eastern")
now_eastern = datetime.now(eastern)
# Determine "current_date" based on market hours
if now_eastern.hour < 9 or (now_eastern.hour == 9 and now_eastern.minute < 30):
# Before 9:30 AM → use yesterday's date
current_date = (now_eastern - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
else:
# Market open or later → use today's date
current_date = now_eastern.replace(hour=0, minute=0, second=0, microsecond=0)
# Convert last_timestamp to Eastern-aware datetime
from_date = datetime.fromtimestamp(last_timestamp, eastern) if last_timestamp > 0 else current_date - timedelta(days=1)
# Format dates for API usage
from_str = from_date.strftime("%Y-%m-%d")
to_str = current_date.strftime("%Y-%m-%d")
all_new_articles = []
'''
# 1. General news
general_news = await self.fetch_general_news("general")
new_general = [a for a in general_news if a.get('datetime', 0) > last_timestamp]
all_new_articles.extend(new_general)
# 2. Market news
for category in ["forex", "crypto", "merger"]:
try:
cat_news = await self.fetch_market_news(category)
new_cat = [
{**a, 'category': category}
for a in cat_news
if a.get('datetime', 0) > last_timestamp
]
all_new_articles.extend(new_cat)
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error fetching {category} news: {e}")
'''
# 3. Company news
for ticker in tickers:
try:
company_news = await self.fetch_company_news(ticker, from_str, to_str)
new_company = [
{**a, 'category': f'company_{ticker.lower()}', 'related': [ticker]}
for a in company_news
if a.get('datetime', 0) > last_timestamp
]
all_new_articles.extend(new_company)
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error fetching news for {ticker}: {e}")
# Deduplication
seen = set()
unique_articles = []
for a in all_new_articles:
key = (a.get('url', ''), a.get('headline', ''))
if key not in seen and key != ('', ''):
seen.add(key)
unique_articles.append(a)
if unique_articles:
unique_articles.sort(key=lambda x: x.get('datetime', 0))
if len(unique_articles) > 20:
logger.warning(f"Too many new articles ({len(unique_articles)}), limiting to 5")
unique_articles = unique_articles[:5]
await self.save_articles_to_hf(unique_articles)
# Notify in batches
for batch in self._chunked(unique_articles, self._article_batch_size):
await self._notify_subscribers(batch)
await asyncio.sleep(self._batch_delay)
logger.info(f"Processed and saved {len(unique_articles)} articles.")
return len(unique_articles)
else:
logger.info("No new articles found.")
return 0
except Exception as e:
logger.error(f"Error processing new articles: {e}")
return 0
async def _polling_loop(self, tickers: list[str]):
"""Main polling loop"""
logger.info("Starting news polling loop")
while self._is_running:
try:
start_time = datetime.now()
new_count = await self.process_new_articles(tickers)
processing_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Polling completed in {processing_time:.2f}s, found {new_count} new articles")
# Wait for next poll
await asyncio.sleep(self._poll_interval)
except asyncio.CancelledError:
logger.info("Polling loop cancelled")
break
except Exception as e:
logger.error(f"Error in polling loop: {e}")
# Wait a bit before retrying
await asyncio.sleep(min(60, self._poll_interval))
async def start_polling(self, tickers: list[str], interval: int = 300):
"""Start the polling process"""
if self._is_running:
logger.warning("Polling is already running")
return
self._poll_interval = interval
self._is_running = True
self._polling_task = asyncio.create_task(self._polling_loop(tickers))
logger.info(f"Started news polling with {interval}s interval")
async def stop_polling(self):
"""Stop the polling process"""
if not self._is_running:
return
self._is_running = False
if self._polling_task and not self._polling_task.done():
self._polling_task.cancel()
try:
await self._polling_task
except asyncio.CancelledError:
pass
logger.info("Stopped news polling")
async def force_poll_now(self, tickers: list[str]) -> int:
"""Force an immediate poll"""
logger.info("Forcing immediate news poll")
return await self.process_new_articles(tickers)
async def get_polling_status(self) -> dict[str, Any]:
"""Get current polling status"""
stats = await self.db.get_database_stats()
return {
"is_running": self._is_running,
"poll_interval": self._poll_interval,
"subscribers_count": len(self._subscribers),
"database_stats": stats,
"task_status": "running" if self._polling_task and not self._polling_task.done() else "stopped"
}