import logging import gradio as gr import pandas as pd import torch import numpy as np import matplotlib.pyplot as plt from GoogleNews import GoogleNews from transformers import pipeline from datetime import datetime, timedelta import matplotlib import yfinance as yf matplotlib.use('Agg') # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) SENTIMENT_ANALYSIS_MODEL = ( "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis" ) DEVICE = "cuda" if torch.cuda.is_available() else "cpu" logging.info(f"Using device: {DEVICE}") logging.info("Initializing sentiment analysis model...") sentiment_analyzer = pipeline( "sentiment-analysis", model=SENTIMENT_ANALYSIS_MODEL, device=DEVICE ) logging.info("Model initialized successfully") # 상장 종목 심볼 매핑을 위한 일반적인 종목명 사전 (필요에 따라 확장) COMMON_TICKERS = { "apple": "AAPL", "microsoft": "MSFT", "amazon": "AMZN", "google": "GOOGL", "alphabet": "GOOGL", "facebook": "META", "meta": "META", "tesla": "TSLA", "nvidia": "NVDA", "bitcoin": "BTC-USD", "ethereum": "ETH-USD", "samsung": "005930.KS", # 한국 삼성전자 "hyundai": "005380.KS", # 현대자동차 "sk hynix": "000660.KS", # SK하이닉스 } def fetch_articles(query, max_articles=30): try: logging.info(f"Fetching up to {max_articles} articles for query: '{query}'") googlenews = GoogleNews(lang="en") googlenews.search(query) # 첫 페이지 결과 가져오기 articles = googlenews.result() # 목표 기사 수에 도달할 때까지 추가 페이지 가져오기 page = 2 while len(articles) < max_articles and page <= 10: # 최대 10페이지까지만 시도 logging.info(f"Fetched {len(articles)} articles so far. Getting page {page}...") googlenews.get_page(page) page_results = googlenews.result() # 새 결과가 없으면 중단 if not page_results: logging.info(f"No more results found after page {page-1}") break articles.extend(page_results) page += 1 # 최대 기사 수로 제한 articles = articles[:max_articles] logging.info(f"Successfully fetched {len(articles)} articles") return articles except Exception as e: logging.error( f"Error while searching articles for query: '{query}'. Error: {e}" ) raise gr.Error( f"Unable to search articles for query: '{query}'. Try again later...", duration=5, ) def analyze_article_sentiment(article): logging.info(f"Analyzing sentiment for article: {article['title']}") sentiment = sentiment_analyzer(article["desc"])[0] article["sentiment"] = sentiment return article def calculate_time_weight(article_date_str): """ 기사 시간 기준으로 가중치 계산 - 1시간 내 기사는 24% 가중치 - 시간이 지날수록 1%씩 감소 (최소 1%) - 예: 1시간 내 기사 = 24%, 10시간 전 기사 = 15%, 24시간 전 기사 = 1% - 24시간 이상이면 1%로 고정 """ try: # 기사 날짜 문자열 파싱 (다양한 형식 처리) date_formats = [ '%a, %d %b %Y %H:%M:%S %z', # 기본 GoogleNews 형식 '%Y-%m-%d %H:%M:%S', '%a, %d %b %Y %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z', '%a %b %d, %Y', '%d %b %Y' ] parsed_date = None for format_str in date_formats: try: parsed_date = datetime.strptime(article_date_str, format_str) break except ValueError: continue # 어떤 형식으로도 파싱할 수 없으면 현재 시간 기준 24시간 전으로 가정 if parsed_date is None: logging.warning(f"Could not parse date: {article_date_str}, using default 24h ago") return 0.01 # 최소 가중치 1% # 현재 시간과의 차이 계산 (시간 단위) now = datetime.now() if parsed_date.tzinfo is not None: now = now.replace(tzinfo=parsed_date.tzinfo) hours_diff = (now - parsed_date).total_seconds() / 3600 # 24시간 이내인 경우만 고려 if hours_diff < 1: # 1시간 이내 return 0.24 # 24% 가중치 elif hours_diff < 24: # 1~23시간 # 1시간당 1%씩 감소 (1시간 = 24%, 2시간 = 23%, ...) return max(0.01, 0.24 - ((hours_diff - 1) * 0.01)) else: return 0.01 # 24시간 이상 지난 기사는 1% 가중치 except Exception as e: logging.error(f"Error calculating time weight: {e}") return 0.01 # 오류 발생 시 최소 가중치 적용 def calculate_sentiment_score(sentiment_label, time_weight): """ 감성 레이블에 따른 기본 점수 계산 및 시간 가중치 적용 - positive: +3점 - neutral: 0점 - negative: -3점 시간 가중치는 백분율로 적용 (기본 점수에 가중치 % 만큼 추가) 예: - 1시간 내 긍정 기사: 3점 + (3 * 24%) = 3 + 0.72 = 3.72점 - 10시간 전 부정 기사: -3점 + (-3 * 15%) = -3 - 0.45 = -3.45점 """ base_score = { 'positive': 3, 'neutral': 0, 'negative': -3 }.get(sentiment_label, 0) # 가중치를 적용한 추가 점수 계산 weighted_addition = base_score * time_weight return base_score, weighted_addition def get_stock_ticker(asset_name): """ 자산명으로부터 주식 티커 심볼을 추출 """ # 소문자로 변환하여 매핑 확인 asset_lower = asset_name.lower() # 직접 티커로 입력한 경우 (대문자 3-5자 형태) if asset_name.isupper() and 3 <= len(asset_name) <= 5: return asset_name # 일반적인 종목명 매핑 확인 if asset_lower in COMMON_TICKERS: return COMMON_TICKERS[asset_lower] # 그 외의 경우 yfinance로 검색 시도 try: ticker_search = yf.Ticker(asset_name) # 기본 정보 가져와서 유효한 티커인지 확인 info = ticker_search.info if 'symbol' in info: return info['symbol'] except: pass return None def create_stock_chart(ticker, period="1mo"): """ 주식 티커에 대한 차트 생성 """ try: logging.info(f"Fetching stock data for {ticker}") stock_data = yf.download(ticker, period=period) if stock_data.empty: logging.warning(f"No stock data found for ticker: {ticker}") return None fig, ax = plt.subplots(figsize=(10, 6)) # 종가 그래프 ax.plot(stock_data.index, stock_data['Close'], label='Close Price', color='blue') # 이동평균선 추가 (20일) if len(stock_data) > 20: stock_data['MA20'] = stock_data['Close'].rolling(window=20).mean() ax.plot(stock_data.index, stock_data['MA20'], label='20-day MA', color='orange') # 거래량 서브플롯 추가 ax2 = ax.twinx() ax2.bar(stock_data.index, stock_data['Volume'], alpha=0.3, color='gray', label='Volume') ax2.set_ylabel('Volume') # 차트 스타일링 ax.set_title(f"{ticker} Stock Price") ax.set_xlabel('Date') ax.set_ylabel('Price') ax.grid(True, alpha=0.3) # 범례 추가 lines, labels = ax.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels() ax.legend(lines + lines2, labels + labels2, loc='upper left') plt.tight_layout() # 이미지 저장 chart_path = f"stock_chart_{ticker.replace('-', '_')}.png" plt.savefig(chart_path) plt.close() logging.info(f"Stock chart created: {chart_path}") return chart_path except Exception as e: logging.error(f"Error creating stock chart for {ticker}: {e}") return None def analyze_asset_sentiment(asset_name): logging.info(f"Starting sentiment analysis for asset: {asset_name}") logging.info("Fetching up to 30 articles") articles = fetch_articles(asset_name, max_articles=30) logging.info("Analyzing sentiment of each article") analyzed_articles = [analyze_article_sentiment(article) for article in articles] # 각 기사에 대한 시간 가중치 및 감성 점수 계산 for article in analyzed_articles: time_weight = calculate_time_weight(article["date"]) article["time_weight"] = time_weight sentiment_label = article["sentiment"]["label"] base_score, weighted_addition = calculate_sentiment_score(sentiment_label, time_weight) article["base_score"] = base_score article["weighted_addition"] = weighted_addition article["total_score"] = base_score + weighted_addition logging.info("Sentiment analysis completed") # 종합 점수 계산 및 그래프 생성 sentiment_summary = create_sentiment_summary(analyzed_articles, asset_name) # 주식 티커 확인 및 차트 생성 stock_chart = None ticker = get_stock_ticker(asset_name) if ticker: logging.info(f"Found ticker {ticker} for asset {asset_name}") stock_chart = create_stock_chart(ticker) return convert_to_dataframe(analyzed_articles), sentiment_summary, stock_chart, ticker def create_sentiment_summary(analyzed_articles, asset_name): """ 감성 분석 결과를 요약하고 그래프로 시각화 """ total_articles = len(analyzed_articles) positive_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "positive") neutral_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "neutral") negative_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "negative") # 기본 점수 합계 base_score_sum = sum(a["base_score"] for a in analyzed_articles) # 가중치 적용 점수 합계 weighted_score_sum = sum(a["total_score"] for a in analyzed_articles) # 그래프 생성 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6)) # 1. 감성 분포 파이 차트 labels = ['Positive', 'Neutral', 'Negative'] sizes = [positive_count, neutral_count, negative_count] colors = ['green', 'gray', 'red'] ax1.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90) ax1.axis('equal') ax1.set_title(f'Sentiment Distribution for {asset_name}') # 2. 시간별 가중치 적용 점수 (정렬) sorted_articles = sorted(analyzed_articles, key=lambda x: x.get("date", ""), reverse=True) # 최대 표시할 기사 수 (가독성을 위해) max_display = min(15, len(sorted_articles)) display_articles = sorted_articles[:max_display] dates = [a.get("date", "")[:10] for a in display_articles] # 날짜 부분만 표시 scores = [a.get("total_score", 0) for a in display_articles] # 점수에 따른 색상 설정 bar_colors = ['green' if s > 0 else 'red' if s < 0 else 'gray' for s in scores] bars = ax2.bar(range(len(dates)), scores, color=bar_colors) ax2.set_xticks(range(len(dates))) ax2.set_xticklabels(dates, rotation=45, ha='right') ax2.set_ylabel('Weighted Sentiment Score') ax2.set_title(f'Recent Article Scores for {asset_name}') ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3) # 요약 텍스트 추가 summary_text = f""" Analysis Summary for {asset_name}: Total Articles: {total_articles} Positive: {positive_count} ({positive_count/total_articles*100:.1f}%) Neutral: {neutral_count} ({neutral_count/total_articles*100:.1f}%) Negative: {negative_count} ({negative_count/total_articles*100:.1f}%) Base Score Sum: {base_score_sum:.2f} Weighted Score Sum: {weighted_score_sum:.2f} """ plt.figtext(0.5, 0.01, summary_text, ha='center', fontsize=10, bbox={"facecolor":"orange", "alpha":0.2, "pad":5}) plt.tight_layout(rect=[0, 0.1, 1, 0.95]) # 이미지 저장 fig_path = f"sentiment_summary_{asset_name.replace(' ', '_')}.png" plt.savefig(fig_path) plt.close() return fig_path def convert_to_dataframe(analyzed_articles): df = pd.DataFrame(analyzed_articles) df["Title"] = df.apply( lambda row: f'{row["title"]}', axis=1, ) df["Description"] = df["desc"] df["Date"] = df["date"] def sentiment_badge(sentiment): colors = { "negative": "red", "neutral": "gray", "positive": "green", } color = colors.get(sentiment, "grey") return f'{sentiment}' df["Sentiment"] = df["sentiment"].apply(lambda x: sentiment_badge(x["label"])) # 점수 컬럼 추가 df["Base Score"] = df["base_score"] df["Weight"] = df["time_weight"].apply(lambda x: f"{x*100:.0f}%") df["Total Score"] = df["total_score"].apply(lambda x: f"{x:.2f}") return df[["Sentiment", "Title", "Description", "Date", "Base Score", "Weight", "Total Score"]] with gr.Blocks() as iface: gr.Markdown("# Trading Asset Sentiment Analysis") gr.Markdown( "Enter the name of a trading asset, and I'll fetch recent articles and analyze their sentiment!" ) with gr.Row(): input_asset = gr.Textbox( label="Asset Name", lines=1, placeholder="Enter the name of the trading asset...", ) with gr.Row(): analyze_button = gr.Button("Analyze Sentiment", size="sm") gr.Examples( examples=[ "Bitcoin", "Tesla", "Apple", "Amazon", ], inputs=input_asset, ) # 주식 차트 영역 추가 with gr.Row(): with gr.Column(): with gr.Blocks(): gr.Markdown("## Stock Chart") with gr.Row(): stock_chart = gr.Image(type="filepath", label="Stock Price Chart") ticker_info = gr.Textbox(label="Ticker Symbol") with gr.Row(): with gr.Column(): with gr.Blocks(): gr.Markdown("## Sentiment Summary") sentiment_summary = gr.Image(type="filepath", label="Sentiment Analysis Summary") with gr.Row(): with gr.Column(): with gr.Blocks(): gr.Markdown("## Articles and Sentiment Analysis") articles_output = gr.Dataframe( headers=["Sentiment", "Title", "Description", "Date", "Base Score", "Weight", "Total Score"], datatype=["markdown", "html", "markdown", "markdown", "number", "markdown", "markdown"], wrap=False, ) analyze_button.click( analyze_asset_sentiment, inputs=[input_asset], outputs=[articles_output, sentiment_summary, stock_chart, ticker_info], ) logging.info("Launching Gradio interface") iface.queue().launch()