import os import json import re from zoneinfo import ZoneInfo from datetime import datetime import asyncio from dataclasses import dataclass from dotenv import load_dotenv import finnhub from google import genai from src.telegram_bot.logger import main_logger as logger load_dotenv() def async_retry(func): async def wrapper(self, *args, **kwargs): retries = self.RETRY_COUNT retry_exceptions = (genai.errors.ClientError,) retry_count = 0 while retry_count < retries: try: return await func(self, *args, **kwargs) except retry_exceptions as e: error_str = str(e) retry_count += 1 logger.debug(f"Retrying {func.__name__} ...") if "RESOURCE_EXHAUSTED" in error_str: await self._SentimentAnalyzer__handle_retry_error(error_str) else: raise raise Exception("Max retries exceeded") return wrapper class SentimentDecorator: JSON_REGEXP = r"```json\s*(\{.*?\})\s*```" def __init__(self, data): self.data = data self.response_dict = None self.__parse_model_response() def __parse_model_response(self): text = self.data.candidates[0].content.parts[0].text json_match = re.search(self.JSON_REGEXP, text, re.DOTALL) json_str = json_match.group(1) self.response_dict = json.loads(json_str) def sentiment(self): return self.response_dict['sentiment'] def reasoning(self): return self.response_dict['reasoning'] class SentimentAnalyzer: MODEL = "gemini-2.5-flash" PROMPT = """ You are a financial news sentiment analyzer.\n Analyze the following news about {ticker} and determine its likely impact on {ticker}'\''s stock price.\n Return ONLY a valid JSON object with:\n - \"sentiment\": one of [\"positive\", \"negative\", \"neutral\"]\n \n- \"reasoning\": short explanation (max 2 sentences)\n \nNews: """ RETRY_COUNT = 2 DEFAULT_RETRY_DELAY = 22 # starting from 22 seconds def __init__(self, google_api_key: str | None): self.google_api_key = google_api_key self.client = genai.Client(api_key=self.google_api_key) @async_retry async def call(self, post_text, ticker: str = 'NVIDIA'): loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda: self.client.models.generate_content( model=self.MODEL, contents=f"{self.PROMPT.format(ticker=ticker)} {post_text}" ) ) return SentimentDecorator(result) async def __handle_retry_error(self, error_str): retry_delay_seconds = self.DEFAULT_RETRY_DELAY retry_delay_match = re.search(r"'retryDelay':\s*'(\d+)s'", error_str) if retry_delay_match: retry_delay_seconds = int(retry_delay_match.group(1)) await asyncio.sleep(retry_delay_seconds) @dataclass class NewsPostDecorator: category: str = '' datetime: int = 0 headline: str = '' id: str = '' source: str = '' summary: str = '' url: str = '' @classmethod def from_dict(cls, data: dict): return cls( category=data.get('category', ''), datetime=data.get('datetime', 0), headline=data.get('headline', ''), id=data.get('id', ''), source=data.get('source', ''), summary=data.get('summary', ''), url=data.get('url', '') ) class CompanyNewsPostsIterator: NEWS_PING_PERIOD = 120 # seconds def __init__(self, finhub_api_key: str | None, google_api_key: str | None, sentiments_only: list[str] | None = None, company_symbol: str = "NVDA", time_zone: str = "America/New_York"): self.finhub_api_key = finhub_api_key self.finnhub_client = self.build_finnhub_client() self.sentiment_analyzer = SentimentAnalyzer(google_api_key=google_api_key) self.company_symbol = company_symbol self.time_zone = time_zone self.sentiments_only = sentiments_only or ['positive', 'negative'] self.news_posts = [] self.latest_timestamp = None def __aiter__(self): return self async def __anext__(self): while True: if not self.news_posts: await self.read_news_posts() # no news posts ? if not self.news_posts: await asyncio.sleep(self.NEWS_PING_PERIOD) continue post_candidate = self.news_posts.pop() self.latest_timestamp = post_candidate.datetime # The wait time for the next call increases when reusing the quota. # some artificial delay not to provoke RESOURCE_EXHAUSTED error await asyncio.sleep(5) sentiment_result = await self.analyse_sentiment(post_candidate) if not sentiment_result.sentiment() in self.sentiments_only: continue return post_candidate, sentiment_result async def analyse_sentiment(self, post): return await self.sentiment_analyzer.call(self.sentiment_context(post), self.company_symbol) def sentiment_context(self, post): return f"{post.headline}\n {post.summary}" async def read_news_posts(self): loop = asyncio.get_event_loop() date = self.news_date() news_posts = await loop.run_in_executor( None, lambda: self.finnhub_client.company_news(self.company_symbol, _from=date, to=date) ) decorated_news_posts = [NewsPostDecorator.from_dict(news_post) for news_post in news_posts] self.news_posts = self.unread_posts(decorated_news_posts) def build_finnhub_client(self): return finnhub.Client(api_key=self.finhub_api_key) def unread_posts(self, posts): if not self.latest_timestamp: return posts idx = next((i for i, post in enumerate(posts) if post.datetime == self.latest_timestamp), 0) return posts[:idx] def news_date(self): return datetime.now(ZoneInfo(self.time_zone)).strftime('%Y-%m-%d') if __name__ == '__main__': iterator = CompanyNewsPostsIterator( finhub_api_key=os.getenv("FINNHUB_API_KEY"), google_api_key=os.getenv("GOOGLE_KEY"), company_symbol='NVDA') async def main(): async for post, sentiment in iterator: print(f"📰 Headline : {post.headline()}") print(f"📝 Summary : {post.summary()}") print(f"📊 Sentiment: {sentiment.sentiment().capitalize()}") print(f"💡 Reason : {sentiment.reasoning()}") print() asyncio.run(main())