import logging import gradio as gr import pandas as pd import torch import numpy as np import matplotlib.pyplot as plt from GoogleNews import GoogleNews from transformers import pipeline from datetime import datetime, timedelta import matplotlib import yfinance as yf matplotlib.use('Agg') # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) SENTIMENT_ANALYSIS_MODEL = ( "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis" ) DEVICE = "cuda" if torch.cuda.is_available() else "cpu" logging.info(f"Using device: {DEVICE}") logging.info("Initializing sentiment analysis model...") sentiment_analyzer = pipeline( "sentiment-analysis", model=SENTIMENT_ANALYSIS_MODEL, device=DEVICE ) logging.info("Model initialized successfully") # 상장 종목 심볼 매핑을 위한 일반적인 종목명 사전 (필요에 따라 확장) COMMON_TICKERS = { "apple": "AAPL", "microsoft": "MSFT", "amazon": "AMZN", "google": "GOOGL", "alphabet": "GOOGL", "facebook": "META", "meta": "META", "tesla": "TSLA", "nvidia": "NVDA", "netflix": "NFLX", "amd": "AMD", "intel": "INTC", "ibm": "IBM", "oracle": "ORCL", "paypal": "PYPL", "adobe": "ADBE", "cisco": "CSCO", "bitcoin": "BTC-USD", "ethereum": "ETH-USD", "dogecoin": "DOGE-USD", "cardano": "ADA-USD", "xrp": "XRP-USD", "litecoin": "LTC-USD", "samsung": "005930.KS", # 한국 삼성전자 "hyundai": "005380.KS", # 현대자동차 "sk hynix": "000660.KS", # SK하이닉스 "lg": "003550.KS", # LG "lge": "066570.KS", # LG전자 "ncsoft": "036570.KS", # 엔씨소프트 "kakao": "035720.KS", # 카카오 "naver": "035420.KS", # 네이버 "현대차": "005380.KS", # 현대자동차 "삼성전자": "005930.KS", # 삼성전자 "삼성": "005930.KS", # 삼성전자 "카카오": "035720.KS", # 카카오 "네이버": "035420.KS", # 네이버 } def fetch_articles(query, max_articles=30): try: logging.info(f"Fetching up to {max_articles} articles for query: '{query}'") googlenews = GoogleNews(lang="en") googlenews.search(query) # 첫 페이지 결과 가져오기 articles = googlenews.result() # 목표 기사 수에 도달할 때까지 추가 페이지 가져오기 page = 2 while len(articles) < max_articles and page <= 10: # 최대 10페이지까지만 시도 logging.info(f"Fetched {len(articles)} articles so far. Getting page {page}...") googlenews.get_page(page) page_results = googlenews.result() # 새 결과가 없으면 중단 if not page_results: logging.info(f"No more results found after page {page-1}") break articles.extend(page_results) page += 1 # 최대 기사 수로 제한 articles = articles[:max_articles] logging.info(f"Successfully fetched {len(articles)} articles") return articles except Exception as e: logging.error( f"Error while searching articles for query: '{query}'. Error: {e}" ) raise gr.Error( f"Unable to search articles for query: '{query}'. Try again later...", duration=5, ) def analyze_article_sentiment(article): logging.info(f"Analyzing sentiment for article: {article['title']}") sentiment = sentiment_analyzer(article["desc"])[0] article["sentiment"] = sentiment return article def calculate_time_weight(article_date_str): """ 기사 시간 기준으로 가중치 계산 - 1시간 내 기사는 24% 가중치 - 시간이 지날수록 1%씩 감소 (최소 1%) - 예: 1시간 내 기사 = 24%, 10시간 전 기사 = 15%, 24시간 전 기사 = 1% - 24시간 이상이면 1%로 고정 """ try: # 기사 날짜 문자열 파싱 (다양한 형식 처리) date_formats = [ '%a, %d %b %Y %H:%M:%S %z', # 기본 GoogleNews 형식 '%Y-%m-%d %H:%M:%S', '%a, %d %b %Y %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z', '%a %b %d, %Y', '%d %b %Y' ] parsed_date = None for format_str in date_formats: try: parsed_date = datetime.strptime(article_date_str, format_str) break except ValueError: continue # 어떤 형식으로도 파싱할 수 없으면 현재 시간 기준 24시간 전으로 가정 if parsed_date is None: logging.warning(f"Could not parse date: {article_date_str}, using default 24h ago") return 0.01 # 최소 가중치 1% # 현재 시간과의 차이 계산 (시간 단위) now = datetime.now() if parsed_date.tzinfo is not None: now = now.replace(tzinfo=parsed_date.tzinfo) hours_diff = (now - parsed_date).total_seconds() / 3600 # 24시간 이내인 경우만 고려 if hours_diff < 1: # 1시간 이내 return 0.24 # 24% 가중치 elif hours_diff < 24: # 1~23시간 # 1시간당 1%씩 감소 (1시간 = 24%, 2시간 = 23%, ...) return max(0.01, 0.24 - ((hours_diff - 1) * 0.01)) else: return 0.01 # 24시간 이상 지난 기사는 1% 가중치 except Exception as e: logging.error(f"Error calculating time weight: {e}") return 0.01 # 오류 발생 시 최소 가중치 적용 def calculate_sentiment_score(sentiment_label, time_weight): """ 감성 레이블에 따른 기본 점수 계산 및 시간 가중치 적용 - positive: +3점 - neutral: 0점 - negative: -3점 시간 가중치는 백분율로 적용 (기본 점수에 가중치 % 만큼 추가) 예: - 1시간 내 긍정 기사: 3점 + (3 * 24%) = 3 + 0.72 = 3.72점 - 10시간 전 부정 기사: -3점 + (-3 * 15%) = -3 - 0.45 = -3.45점 """ base_score = { 'positive': 3, 'neutral': 0, 'negative': -3 }.get(sentiment_label, 0) # 가중치를 적용한 추가 점수 계산 weighted_addition = base_score * time_weight return base_score, weighted_addition def get_stock_ticker(asset_name): """ 자산명으로부터 주식 티커 심볼을 추출 """ logging.info(f"Identifying ticker for: {asset_name}") # 소문자로 변환하여 매핑 확인 asset_lower = asset_name.lower().strip() # 직접 티커로 입력한 경우 (대문자 3-5자 형태) if asset_name.isupper() and 2 <= len(asset_name) <= 6: logging.info(f"Input appears to be a ticker symbol: {asset_name}") return asset_name # 일반적인 종목명 매핑 확인 if asset_lower in COMMON_TICKERS: ticker = COMMON_TICKERS[asset_lower] logging.info(f"Found ticker in common tickers map: {ticker}") return ticker # 여러 단어로 된 이름의 각 부분에 대한 검색도 시도 asset_parts = asset_lower.split() for part in asset_parts: if part in COMMON_TICKERS: ticker = COMMON_TICKERS[part] logging.info(f"Found ticker for part '{part}': {ticker}") return ticker # 그 외의 경우 직접 티커로 시도 potential_ticker = asset_name.upper().replace(" ", "") if 2 <= len(potential_ticker) <= 6: # 실제로 존재하는지 확인 try: logging.info(f"Trying potential ticker: {potential_ticker}") test_data = yf.download(potential_ticker, period="1d", progress=False) if not test_data.empty: logging.info(f"Valid ticker found: {potential_ticker}") return potential_ticker except Exception as e: logging.debug(f"Error testing potential ticker: {e}") # 그 외의 경우 yfinance로 검색 시도 (info 데이터) try: # 일부 티커는 직접 yfinance 기반 검색으로는 오류가 발생할 수 있음 ticker_search = yf.Ticker(asset_name) try: info = ticker_search.info if 'symbol' in info and info['symbol']: ticker = info['symbol'] logging.info(f"Found ticker from info API: {ticker}") return ticker except (ValueError, KeyError, TypeError) as e: logging.debug(f"Error getting ticker info: {e}") pass except Exception as e: logging.debug(f"Error initializing ticker object: {e}") # 추가 시도: 일반적인 미국 증시 티커 형식 확인 major_exchanges = ["", ".KS", ".KQ", "-USD"] # 주요 거래소 접미사 (한국 포함) for exchange in major_exchanges: try: test_ticker = f"{asset_name.upper().replace(' ', '')}{exchange}" logging.info(f"Trying with exchange suffix: {test_ticker}") test_data = yf.download(test_ticker, period="1d", progress=False) if not test_data.empty: logging.info(f"Valid ticker found with suffix: {test_ticker}") return test_ticker except: pass logging.warning(f"Could not identify ticker for: {asset_name}") return None def create_stock_chart(ticker, period="1mo"): """ 주식 티커에 대한 차트 생성 """ try: logging.info(f"Fetching stock data for {ticker}") # Graceful handling for problematic symbols try: stock_data = yf.download(ticker, period=period, progress=False) except Exception as dl_error: logging.error(f"Error downloading stock data: {dl_error}") # Try alternative symbol format if "-" in ticker: alt_ticker = ticker.replace("-", ".") logging.info(f"Trying alternative ticker format: {alt_ticker}") stock_data = yf.download(alt_ticker, period=period, progress=False) else: raise dl_error if len(stock_data) == 0: logging.warning(f"No stock data found for ticker: {ticker}") return None # 데이터 확인 및 디버그 로깅 logging.info(f"Downloaded data shape: {stock_data.shape}") logging.info(f"Data columns: {stock_data.columns.tolist()}") # 그래프 작성 fig, ax = plt.subplots(figsize=(10, 6)) # 종가 그래프 - 멀티인덱스 처리 if isinstance(stock_data.columns, pd.MultiIndex): # 멀티인덱스인 경우 ('Close', ticker) 형태 close_col = ('Close', ticker) if close_col in stock_data.columns: ax.plot(stock_data.index, stock_data[close_col], label='Close Price', color='blue') # 이동평균선 추가 (20일) if len(stock_data) > 20: stock_data['MA20'] = stock_data[close_col].rolling(window=20).mean() ax.plot(stock_data.index, stock_data['MA20'], label='20-day MA', color='orange') # 거래량 서브플롯 추가 (거래량이 있는 경우만) volume_col = ('Volume', ticker) if volume_col in stock_data.columns and not stock_data[volume_col].isna().all(): ax2 = ax.twinx() ax2.bar(stock_data.index, stock_data[volume_col], alpha=0.3, color='gray', label='Volume') ax2.set_ylabel('Volume') # 범례 추가 (거래량 있는 경우) lines, labels = ax.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels() ax.legend(lines + lines2, labels + labels2, loc='upper left') else: # 거래량 없는 경우 종가만 표시 ax.legend(loc='upper left') else: raise ValueError(f"Close column not found in data columns: {stock_data.columns}") else: # 일반 인덱스인 경우 if 'Close' in stock_data.columns: ax.plot(stock_data.index, stock_data['Close'], label='Close Price', color='blue') # 이동평균선 추가 (20일) if len(stock_data) > 20: stock_data['MA20'] = stock_data['Close'].rolling(window=20).mean() ax.plot(stock_data.index, stock_data['MA20'], label='20-day MA', color='orange') # 거래량 서브플롯 추가 (거래량이 있는 경우만) if 'Volume' in stock_data.columns and not stock_data['Volume'].isna().all(): ax2 = ax.twinx() ax2.bar(stock_data.index, stock_data['Volume'], alpha=0.3, color='gray', label='Volume') ax2.set_ylabel('Volume') # 범례 추가 (거래량 있는 경우) lines, labels = ax.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels() ax.legend(lines + lines2, labels + labels2, loc='upper left') else: # 거래량 없는 경우 종가만 표시 ax.legend(loc='upper left') else: raise ValueError(f"Close column not found in data columns: {stock_data.columns}") # 차트 스타일링 ax.set_title(f"{ticker} Stock Price") ax.set_xlabel('Date') ax.set_ylabel('Price') ax.grid(True, alpha=0.3) plt.tight_layout() # 이미지 저장 chart_path = f"stock_chart_{ticker.replace('-', '_').replace('.', '_')}.png" plt.savefig(chart_path) plt.close() logging.info(f"Stock chart created: {chart_path}") return chart_path except Exception as e: logging.error(f"Error creating stock chart for {ticker}: {e}") # 오류 발생 시에도 그래프 생성 시도 (기본 텍스트 안내) try: fig, ax = plt.subplots(figsize=(10, 6)) ax.text(0.5, 0.5, f"Unable to load data for {ticker}\nError: {str(e)}", horizontalalignment='center', verticalalignment='center', transform=ax.transAxes) ax.set_axis_off() chart_path = f"stock_chart_error_{ticker.replace('-', '_').replace('.', '_')}.png" plt.savefig(chart_path) plt.close() return chart_path except: return None def analyze_asset_sentiment(asset_name): logging.info(f"Starting sentiment analysis for asset: {asset_name}") logging.info("Fetching up to 30 articles") articles = fetch_articles(asset_name, max_articles=30) logging.info("Analyzing sentiment of each article") analyzed_articles = [analyze_article_sentiment(article) for article in articles] # 각 기사에 대한 시간 가중치 및 감성 점수 계산 for article in analyzed_articles: time_weight = calculate_time_weight(article["date"]) article["time_weight"] = time_weight sentiment_label = article["sentiment"]["label"] base_score, weighted_addition = calculate_sentiment_score(sentiment_label, time_weight) article["base_score"] = base_score article["weighted_addition"] = weighted_addition article["total_score"] = base_score + weighted_addition logging.info("Sentiment analysis completed") # 종합 점수 계산 및 그래프 생성 sentiment_summary = create_sentiment_summary(analyzed_articles, asset_name) # 주식 티커 확인 및 차트 생성 stock_chart = None ticker = get_stock_ticker(asset_name) if ticker: logging.info(f"Found ticker {ticker} for asset {asset_name}") stock_chart = create_stock_chart(ticker) return convert_to_dataframe(analyzed_articles), sentiment_summary, stock_chart, ticker def create_sentiment_summary(analyzed_articles, asset_name): """ 감성 분석 결과를 요약하고 그래프로 시각화 """ total_articles = len(analyzed_articles) positive_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "positive") neutral_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "neutral") negative_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "negative") # 기본 점수 합계 base_score_sum = sum(a["base_score"] for a in analyzed_articles) # 가중치 적용 점수 합계 weighted_score_sum = sum(a["total_score"] for a in analyzed_articles) # 그래프 생성 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6)) # 1. 감성 분포 파이 차트 labels = ['Positive', 'Neutral', 'Negative'] sizes = [positive_count, neutral_count, negative_count] colors = ['green', 'gray', 'red'] ax1.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90) ax1.axis('equal') ax1.set_title(f'Sentiment Distribution for {asset_name}') # 2. 시간별 가중치 적용 점수 (정렬) sorted_articles = sorted(analyzed_articles, key=lambda x: x.get("date", ""), reverse=True) # 최대 표시할 기사 수 (가독성을 위해) max_display = min(15, len(sorted_articles)) display_articles = sorted_articles[:max_display] dates = [a.get("date", "")[:10] for a in display_articles] # 날짜 부분만 표시 scores = [a.get("total_score", 0) for a in display_articles] # 점수에 따른 색상 설정 bar_colors = ['green' if s > 0 else 'red' if s < 0 else 'gray' for s in scores] bars = ax2.bar(range(len(dates)), scores, color=bar_colors) ax2.set_xticks(range(len(dates))) ax2.set_xticklabels(dates, rotation=45, ha='right') ax2.set_ylabel('Weighted Sentiment Score') ax2.set_title(f'Recent Article Scores for {asset_name}') ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3) # 요약 텍스트 추가 summary_text = f""" Analysis Summary for {asset_name}: Total Articles: {total_articles} Positive: {positive_count} ({positive_count/total_articles*100:.1f}%) Neutral: {neutral_count} ({neutral_count/total_articles*100:.1f}%) Negative: {negative_count} ({negative_count/total_articles*100:.1f}%) Base Score Sum: {base_score_sum:.2f} Weighted Score Sum: {weighted_score_sum:.2f} """ plt.figtext(0.5, 0.01, summary_text, ha='center', fontsize=10, bbox={"facecolor":"orange", "alpha":0.2, "pad":5}) plt.tight_layout(rect=[0, 0.1, 1, 0.95]) # 이미지 저장 fig_path = f"sentiment_summary_{asset_name.replace(' ', '_')}.png" plt.savefig(fig_path) plt.close() return fig_path def convert_to_dataframe(analyzed_articles): df = pd.DataFrame(analyzed_articles) df["Title"] = df.apply( lambda row: f'{row["title"]}', axis=1, ) df["Description"] = df["desc"] df["Date"] = df["date"] def sentiment_badge(sentiment): colors = { "negative": "red", "neutral": "gray", "positive": "green", } color = colors.get(sentiment, "grey") return f'{sentiment}' df["Sentiment"] = df["sentiment"].apply(lambda x: sentiment_badge(x["label"])) # 점수 컬럼 추가 df["Base Score"] = df["base_score"] df["Weight"] = df["time_weight"].apply(lambda x: f"{x*100:.0f}%") df["Total Score"] = df["total_score"].apply(lambda x: f"{x:.2f}") return df[["Sentiment", "Title", "Description", "Date", "Base Score", "Weight", "Total Score"]] def main(): with gr.Blocks() as iface: gr.Markdown("# Trading Asset Sentiment Analysis") gr.Markdown( "Enter the name of a trading asset, and I'll fetch recent articles and analyze their sentiment!" ) with gr.Row(): input_asset = gr.Textbox( label="Asset Name", lines=1, placeholder="Enter the name of the trading asset...", ) with gr.Row(): analyze_button = gr.Button("Analyze Sentiment", size="sm") # 예제 입력값을 코드에 정의된 티커 매핑의 키들로 반영 (중복되지 않도록 정렬) examples_list = sorted(set(COMMON_TICKERS.keys()), key=lambda x: x.lower()) gr.Examples( examples=examples_list, inputs=input_asset, ) # 주식 차트 영역 추가 with gr.Row(): with gr.Column(): with gr.Blocks(): gr.Markdown("## Stock Chart") with gr.Row(): stock_chart = gr.Image(type="filepath", label="Stock Price Chart") ticker_info = gr.Textbox(label="Ticker Symbol") with gr.Row(): with gr.Column(): with gr.Blocks(): gr.Markdown("## Sentiment Summary") sentiment_summary = gr.Image(type="filepath", label="Sentiment Analysis Summary") with gr.Row(): with gr.Column(): with gr.Blocks(): gr.Markdown("## Articles and Sentiment Analysis") articles_output = gr.Dataframe( headers=["Sentiment", "Title", "Description", "Date", "Base Score", "Weight", "Total Score"], datatype=["markdown", "html", "markdown", "markdown", "number", "markdown", "markdown"], wrap=False, ) analyze_button.click( analyze_asset_sentiment, inputs=[input_asset], outputs=[articles_output, sentiment_summary, stock_chart, ticker_info], ) logging.info("Launching Gradio interface") iface.queue().launch() if __name__ == "__main__": main()