import logging
import gradio as gr
import pandas as pd
import torch
import numpy as np
import matplotlib.pyplot as plt
from GoogleNews import GoogleNews
from transformers import pipeline
from datetime import datetime, timedelta
import matplotlib
matplotlib.use('Agg')
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
SENTIMENT_ANALYSIS_MODEL = (
"mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis"
)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
logging.info(f"Using device: {DEVICE}")
logging.info("Initializing sentiment analysis model...")
sentiment_analyzer = pipeline(
"sentiment-analysis", model=SENTIMENT_ANALYSIS_MODEL, device=DEVICE
)
logging.info("Model initialized successfully")
def fetch_articles(query, max_articles=30):
try:
logging.info(f"Fetching up to {max_articles} articles for query: '{query}'")
googlenews = GoogleNews(lang="en")
googlenews.search(query)
# 첫 페이지 결과 가져오기
articles = googlenews.result()
# 목표 기사 수에 도달할 때까지 추가 페이지 가져오기
page = 2
while len(articles) < max_articles and page <= 10: # 최대 10페이지까지만 시도
logging.info(f"Fetched {len(articles)} articles so far. Getting page {page}...")
googlenews.get_page(page)
page_results = googlenews.result()
# 새 결과가 없으면 중단
if not page_results:
logging.info(f"No more results found after page {page-1}")
break
articles.extend(page_results)
page += 1
# 최대 기사 수로 제한
articles = articles[:max_articles]
logging.info(f"Successfully fetched {len(articles)} articles")
return articles
except Exception as e:
logging.error(
f"Error while searching articles for query: '{query}'. Error: {e}"
)
raise gr.Error(
f"Unable to search articles for query: '{query}'. Try again later...",
duration=5,
)
def analyze_article_sentiment(article):
logging.info(f"Analyzing sentiment for article: {article['title']}")
sentiment = sentiment_analyzer(article["desc"])[0]
article["sentiment"] = sentiment
return article
def calculate_time_weight(article_date_str):
"""
기사 시간 기준으로 가중치 계산
- 1시간당 1%씩 감소, 최대 24시간까지만 고려
- 1시간 내 기사: 24% 가중치
- 10시간 전 기사: 15% 가중치
- 24시간 이상 전 기사: 1% 가중치
"""
try:
# 기사 날짜 문자열 파싱 (다양한 형식 처리)
date_formats = [
'%a, %d %b %Y %H:%M:%S %z', # 기본 GoogleNews 형식
'%Y-%m-%d %H:%M:%S',
'%a, %d %b %Y %H:%M:%S',
'%Y-%m-%dT%H:%M:%S%z',
'%a %b %d, %Y',
'%d %b %Y'
]
parsed_date = None
for format_str in date_formats:
try:
parsed_date = datetime.strptime(article_date_str, format_str)
break
except ValueError:
continue
# 어떤 형식으로도 파싱할 수 없으면 현재 시간 기준 24시간 전으로 가정
if parsed_date is None:
logging.warning(f"Could not parse date: {article_date_str}, using default 24h ago")
return 0.01 # 최소 가중치 1%
# 현재 시간과의 차이 계산 (시간 단위)
now = datetime.now()
if parsed_date.tzinfo is not None:
now = now.replace(tzinfo=parsed_date.tzinfo)
hours_diff = (now - parsed_date).total_seconds() / 3600
# 24시간 이내인 경우만 고려
if hours_diff <= 24:
weight = 0.24 - (0.01 * int(hours_diff)) # 1시간당 1%씩 감소
return max(0.01, weight) # 최소 1% 보장
else:
return 0.01 # 24시간 이상 지난 기사는 1% 가중치
except Exception as e:
logging.error(f"Error calculating time weight: {e}")
return 0.01 # 오류 발생 시 최소 가중치 적용
def calculate_sentiment_score(sentiment_label, time_weight):
"""
감성 레이블에 따른 기본 점수 계산 및 시간 가중치 적용
- positive: +3점
- neutral: 0점
- negative: -3점
"""
base_score = {
'positive': 3,
'neutral': 0,
'negative': -3
}.get(sentiment_label, 0)
# 가중치를 적용한 추가 점수 계산
weighted_addition = base_score * time_weight
return base_score, weighted_addition
def analyze_asset_sentiment(asset_name):
logging.info(f"Starting sentiment analysis for asset: {asset_name}")
logging.info("Fetching up to 30 articles")
articles = fetch_articles(asset_name, max_articles=30)
logging.info("Analyzing sentiment of each article")
analyzed_articles = [analyze_article_sentiment(article) for article in articles]
# 각 기사에 대한 시간 가중치 및 감성 점수 계산
for article in analyzed_articles:
time_weight = calculate_time_weight(article["date"])
article["time_weight"] = time_weight
sentiment_label = article["sentiment"]["label"]
base_score, weighted_addition = calculate_sentiment_score(sentiment_label, time_weight)
article["base_score"] = base_score
article["weighted_addition"] = weighted_addition
article["total_score"] = base_score + weighted_addition
logging.info("Sentiment analysis completed")
# 종합 점수 계산 및 그래프 생성
sentiment_summary = create_sentiment_summary(analyzed_articles, asset_name)
return convert_to_dataframe(analyzed_articles), sentiment_summary
def create_sentiment_summary(analyzed_articles, asset_name):
"""
감성 분석 결과를 요약하고 그래프로 시각화
"""
total_articles = len(analyzed_articles)
positive_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "positive")
neutral_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "neutral")
negative_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "negative")
# 기본 점수 합계
base_score_sum = sum(a["base_score"] for a in analyzed_articles)
# 가중치 적용 점수 합계
weighted_score_sum = sum(a["total_score"] for a in analyzed_articles)
# 그래프 생성
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
# 1. 감성 분포 파이 차트
labels = ['Positive', 'Neutral', 'Negative']
sizes = [positive_count, neutral_count, negative_count]
colors = ['green', 'gray', 'red']
ax1.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
ax1.axis('equal')
ax1.set_title(f'Sentiment Distribution for {asset_name}')
# 2. 시간별 가중치 적용 점수 (정렬)
sorted_articles = sorted(analyzed_articles, key=lambda x: x.get("date", ""), reverse=True)
# 최대 표시할 기사 수 (가독성을 위해)
max_display = min(15, len(sorted_articles))
display_articles = sorted_articles[:max_display]
dates = [a.get("date", "")[:10] for a in display_articles] # 날짜 부분만 표시
scores = [a.get("total_score", 0) for a in display_articles]
# 점수에 따른 색상 설정
bar_colors = ['green' if s > 0 else 'red' if s < 0 else 'gray' for s in scores]
bars = ax2.bar(range(len(dates)), scores, color=bar_colors)
ax2.set_xticks(range(len(dates)))
ax2.set_xticklabels(dates, rotation=45, ha='right')
ax2.set_ylabel('Weighted Sentiment Score')
ax2.set_title(f'Recent Article Scores for {asset_name}')
ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
# 요약 텍스트 추가
summary_text = f"""
Analysis Summary for {asset_name}:
Total Articles: {total_articles}
Positive: {positive_count} ({positive_count/total_articles*100:.1f}%)
Neutral: {neutral_count} ({neutral_count/total_articles*100:.1f}%)
Negative: {negative_count} ({negative_count/total_articles*100:.1f}%)
Base Score Sum: {base_score_sum:.2f}
Weighted Score Sum: {weighted_score_sum:.2f}
"""
plt.figtext(0.5, 0.01, summary_text, ha='center', fontsize=10, bbox={"facecolor":"orange", "alpha":0.2, "pad":5})
plt.tight_layout(rect=[0, 0.1, 1, 0.95])
# 이미지 저장
fig_path = f"sentiment_summary_{asset_name.replace(' ', '_')}.png"
plt.savefig(fig_path)
plt.close()
return fig_path
def convert_to_dataframe(analyzed_articles):
df = pd.DataFrame(analyzed_articles)
df["Title"] = df.apply(
lambda row: f'{row["title"]}',
axis=1,
)
df["Description"] = df["desc"]
df["Date"] = df["date"]
def sentiment_badge(sentiment):
colors = {
"negative": "red",
"neutral": "gray",
"positive": "green",
}
color = colors.get(sentiment, "grey")
return f'{sentiment}'
df["Sentiment"] = df["sentiment"].apply(lambda x: sentiment_badge(x["label"]))
# 점수 컬럼 추가
df["Base Score"] = df["base_score"]
df["Weight"] = df["time_weight"].apply(lambda x: f"{x*100:.0f}%")
df["Total Score"] = df["total_score"].apply(lambda x: f"{x:.2f}")
return df[["Sentiment", "Title", "Description", "Date", "Base Score", "Weight", "Total Score"]]
with gr.Blocks() as iface:
gr.Markdown("# Trading Asset Sentiment Analysis")
gr.Markdown(
"Enter the name of a trading asset, and I'll fetch recent articles and analyze their sentiment!"
)
with gr.Row():
input_asset = gr.Textbox(
label="Asset Name",
lines=1,
placeholder="Enter the name of the trading asset...",
)
with gr.Row():
analyze_button = gr.Button("Analyze Sentiment", size="sm")
gr.Examples(
examples=[
"Bitcoin",
"Tesla",
"Apple",
"Amazon",
],
inputs=input_asset,
)
with gr.Row():
with gr.Column():
with gr.Blocks():
gr.Markdown("## Sentiment Summary")
sentiment_summary = gr.Image(type="filepath", label="Sentiment Analysis Summary")
with gr.Row():
with gr.Column():
with gr.Blocks():
gr.Markdown("## Articles and Sentiment Analysis")
articles_output = gr.Dataframe(
headers=["Sentiment", "Title", "Description", "Date", "Base Score", "Weight", "Total Score"],
datatype=["markdown", "html", "markdown", "markdown", "number", "markdown", "markdown"],
wrap=False,
)
analyze_button.click(
analyze_asset_sentiment,
inputs=[input_asset],
outputs=[articles_output, sentiment_summary],
)
logging.info("Launching Gradio interface")
iface.queue().launch()