financial_news_bot / src /services /news_iterator.py
Dmitry Beresnev
add stock predictor, fix news iterator, etc
23c855e
import os
import json
import re
from zoneinfo import ZoneInfo
from datetime import datetime
import asyncio
from dataclasses import dataclass
from dotenv import load_dotenv
import finnhub
from google import genai
from src.telegram_bot.logger import main_logger as logger
load_dotenv()
def async_retry(func):
async def wrapper(self, *args, **kwargs):
retries = self.RETRY_COUNT
retry_exceptions = (genai.errors.ClientError,)
retry_count = 0
while retry_count < retries:
try:
return await func(self, *args, **kwargs)
except retry_exceptions as e:
error_str = str(e)
retry_count += 1
logger.debug(f"Retrying {func.__name__} ...")
if "RESOURCE_EXHAUSTED" in error_str:
await self._SentimentAnalyzer__handle_retry_error(error_str)
else:
raise
raise Exception("Max retries exceeded")
return wrapper
class SentimentDecorator:
JSON_REGEXP = r"```json\s*(\{.*?\})\s*```"
def __init__(self, data):
self.data = data
self.response_dict = None
self.__parse_model_response()
def __parse_model_response(self):
text = self.data.candidates[0].content.parts[0].text
json_match = re.search(self.JSON_REGEXP, text, re.DOTALL)
json_str = json_match.group(1)
self.response_dict = json.loads(json_str)
def sentiment(self):
return self.response_dict['sentiment']
def reasoning(self):
return self.response_dict['reasoning']
class SentimentAnalyzer:
MODEL = "gemini-2.5-flash"
PROMPT = """
You are a financial news sentiment analyzer.\n
Analyze the following news about {ticker} and determine its likely impact on {ticker}'\''s stock price.\n
Return ONLY a valid JSON object with:\n
- \"sentiment\": one of [\"positive\", \"negative\", \"neutral\"]\n
\n- \"reasoning\": short explanation (max 2 sentences)\n
\nNews:
"""
RETRY_COUNT = 2
DEFAULT_RETRY_DELAY = 22 # starting from 22 seconds
def __init__(self, google_api_key: str | None):
self.google_api_key = google_api_key
self.client = genai.Client(api_key=self.google_api_key)
@async_retry
async def call(self, post_text, ticker: str = 'NVIDIA'):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: self.client.models.generate_content(
model=self.MODEL,
contents=f"{self.PROMPT.format(ticker=ticker)} {post_text}"
)
)
return SentimentDecorator(result)
async def __handle_retry_error(self, error_str):
retry_delay_seconds = self.DEFAULT_RETRY_DELAY
retry_delay_match = re.search(r"'retryDelay':\s*'(\d+)s'", error_str)
if retry_delay_match:
retry_delay_seconds = int(retry_delay_match.group(1))
await asyncio.sleep(retry_delay_seconds)
@dataclass
class NewsPostDecorator:
category: str = ''
datetime: int = 0
headline: str = ''
id: str = ''
source: str = ''
summary: str = ''
url: str = ''
@classmethod
def from_dict(cls, data: dict):
return cls(
category=data.get('category', ''),
datetime=data.get('datetime', 0),
headline=data.get('headline', ''),
id=data.get('id', ''),
source=data.get('source', ''),
summary=data.get('summary', ''),
url=data.get('url', '')
)
class CompanyNewsPostsIterator:
NEWS_PING_PERIOD = 120 # seconds
def __init__(self, finhub_api_key: str | None, google_api_key: str | None, sentiments_only: list[str] | None = None,
company_symbol: str = "NVDA", time_zone: str = "America/New_York"):
self.finhub_api_key = finhub_api_key
self.finnhub_client = self.build_finnhub_client()
self.sentiment_analyzer = SentimentAnalyzer(google_api_key=google_api_key)
self.company_symbol = company_symbol
self.time_zone = time_zone
self.sentiments_only = sentiments_only or ['positive', 'negative']
self.news_posts = []
self.latest_timestamp = None
def __aiter__(self):
return self
async def __anext__(self):
while True:
if not self.news_posts:
await self.read_news_posts()
# no news posts ?
if not self.news_posts:
await asyncio.sleep(self.NEWS_PING_PERIOD)
continue
post_candidate = self.news_posts.pop()
self.latest_timestamp = post_candidate.datetime
# The wait time for the next call increases when reusing the quota.
# some artificial delay not to provoke RESOURCE_EXHAUSTED error
await asyncio.sleep(5)
sentiment_result = await self.analyse_sentiment(post_candidate)
if not sentiment_result.sentiment() in self.sentiments_only:
continue
return post_candidate, sentiment_result
async def analyse_sentiment(self, post):
return await self.sentiment_analyzer.call(self.sentiment_context(post), self.company_symbol)
def sentiment_context(self, post):
return f"{post.headline}\n {post.summary}"
async def read_news_posts(self):
loop = asyncio.get_event_loop()
date = self.news_date()
news_posts = await loop.run_in_executor(
None,
lambda: self.finnhub_client.company_news(self.company_symbol, _from=date, to=date)
)
decorated_news_posts = [NewsPostDecorator.from_dict(news_post) for news_post in news_posts]
self.news_posts = self.unread_posts(decorated_news_posts)
def build_finnhub_client(self):
return finnhub.Client(api_key=self.finhub_api_key)
def unread_posts(self, posts):
if not self.latest_timestamp:
return posts
idx = next((i for i, post in enumerate(posts) if post.datetime == self.latest_timestamp), 0)
return posts[:idx]
def news_date(self):
return datetime.now(ZoneInfo(self.time_zone)).strftime('%Y-%m-%d')
if __name__ == '__main__':
iterator = CompanyNewsPostsIterator(
finhub_api_key=os.getenv("FINNHUB_API_KEY"),
google_api_key=os.getenv("GOOGLE_KEY"),
company_symbol='NVDA')
async def main():
async for post, sentiment in iterator:
print(f"πŸ“° Headline : {post.headline()}")
print(f"πŸ“ Summary : {post.summary()}")
print(f"πŸ“Š Sentiment: {sentiment.sentiment().capitalize()}")
print(f"πŸ’‘ Reason : {sentiment.reasoning()}")
print()
asyncio.run(main())