|
import torch |
|
import gradio as gr |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
import plotly.graph_objects as go |
|
import plotly.express as px |
|
from plotly.subplots import make_subplots |
|
import numpy as np |
|
from wordcloud import WordCloud |
|
from collections import Counter, defaultdict |
|
import re |
|
import json |
|
import csv |
|
import io |
|
import tempfile |
|
from datetime import datetime |
|
import logging |
|
from functools import lru_cache |
|
from dataclasses import dataclass |
|
from typing import List, Dict, Optional, Tuple |
|
import nltk |
|
from nltk.corpus import stopwords |
|
import langdetect |
|
import pandas as pd |
|
|
|
|
|
@dataclass |
|
class Config: |
|
MAX_HISTORY_SIZE: int = 500 |
|
BATCH_SIZE_LIMIT: int = 30 |
|
MAX_TEXT_LENGTH: int = 512 |
|
CACHE_SIZE: int = 64 |
|
|
|
|
|
SUPPORTED_LANGUAGES = { |
|
'auto': 'Auto Detect', |
|
'en': 'English', |
|
'zh': 'Chinese', |
|
'es': 'Spanish', |
|
'fr': 'French', |
|
'de': 'German', |
|
'sv': 'Swedish' |
|
} |
|
|
|
MODELS = { |
|
'en': "cardiffnlp/twitter-roberta-base-sentiment-latest", |
|
'multilingual': "cardiffnlp/twitter-xlm-roberta-base-sentiment" |
|
} |
|
|
|
|
|
THEMES = { |
|
'default': {'pos': '#4CAF50', 'neg': '#F44336', 'neu': '#FF9800'}, |
|
'ocean': {'pos': '#0077BE', 'neg': '#FF6B35', 'neu': '#00BCD4'}, |
|
'dark': {'pos': '#66BB6A', 'neg': '#EF5350', 'neu': '#FFA726'}, |
|
'rainbow': {'pos': '#9C27B0', 'neg': '#E91E63', 'neu': '#FF5722'} |
|
} |
|
|
|
config = Config() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
try: |
|
nltk.download('stopwords', quiet=True) |
|
nltk.download('punkt', quiet=True) |
|
STOP_WORDS = set(stopwords.words('english')) |
|
except: |
|
STOP_WORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} |
|
|
|
class ModelManager: |
|
"""Manages multiple language models""" |
|
def __init__(self): |
|
self.models = {} |
|
self.tokenizers = {} |
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
self._load_default_model() |
|
|
|
def _load_default_model(self): |
|
"""Load the default English model""" |
|
try: |
|
model_name = config.MODELS['multilingual'] |
|
self.tokenizers['default'] = AutoTokenizer.from_pretrained(model_name) |
|
self.models['default'] = AutoModelForSequenceClassification.from_pretrained(model_name) |
|
self.models['default'].to(self.device) |
|
logger.info(f"Default model loaded: {model_name}") |
|
except Exception as e: |
|
logger.error(f"Failed to load default model: {e}") |
|
raise |
|
|
|
def get_model(self, language='en'): |
|
"""Get model for specific language""" |
|
if language in ['en', 'auto'] or language not in config.SUPPORTED_LANGUAGES: |
|
return self.models['default'], self.tokenizers['default'] |
|
return self.models['default'], self.tokenizers['default'] |
|
|
|
@staticmethod |
|
def detect_language(text: str) -> str: |
|
"""Detect text language""" |
|
try: |
|
detected = langdetect.detect(text) |
|
return detected if detected in config.SUPPORTED_LANGUAGES else 'en' |
|
except: |
|
return 'en' |
|
|
|
model_manager = ModelManager() |
|
|
|
class HistoryManager: |
|
"""Manages analysis history""" |
|
def __init__(self): |
|
self._history = [] |
|
|
|
def add_entry(self, entry: Dict): |
|
self._history.append(entry) |
|
if len(self._history) > config.MAX_HISTORY_SIZE: |
|
self._history = self._history[-config.MAX_HISTORY_SIZE:] |
|
|
|
def get_history(self) -> List[Dict]: |
|
return self._history.copy() |
|
|
|
def clear(self) -> int: |
|
count = len(self._history) |
|
self._history.clear() |
|
return count |
|
|
|
def get_stats(self) -> Dict: |
|
if not self._history: |
|
return {} |
|
|
|
sentiments = [item['sentiment'] for item in self._history] |
|
confidences = [item['confidence'] for item in self._history] |
|
|
|
return { |
|
'total_analyses': len(self._history), |
|
'positive_count': sentiments.count('Positive'), |
|
'negative_count': sentiments.count('Negative'), |
|
'avg_confidence': np.mean(confidences), |
|
'languages_detected': len(set(item.get('language', 'en') for item in self._history)) |
|
} |
|
|
|
history_manager = HistoryManager() |
|
|
|
class TextProcessor: |
|
"""Enhanced text processing""" |
|
|
|
@staticmethod |
|
@lru_cache(maxsize=config.CACHE_SIZE) |
|
def clean_text(text: str, remove_punctuation: bool = True, remove_numbers: bool = False) -> str: |
|
"""Clean text with options""" |
|
text = text.lower().strip() |
|
|
|
if remove_numbers: |
|
text = re.sub(r'\d+', '', text) |
|
|
|
if remove_punctuation: |
|
text = re.sub(r'[^\w\s]', '', text) |
|
|
|
words = text.split() |
|
cleaned_words = [w for w in words if w not in STOP_WORDS and len(w) > 2] |
|
return ' '.join(cleaned_words) |
|
|
|
@staticmethod |
|
def extract_keywords(text: str, top_k: int = 5) -> List[str]: |
|
"""Extract key words from text""" |
|
cleaned = TextProcessor.clean_text(text) |
|
words = cleaned.split() |
|
word_freq = Counter(words) |
|
return [word for word, _ in word_freq.most_common(top_k)] |
|
|
|
class SentimentAnalyzer: |
|
"""Enhanced sentiment analysis""" |
|
|
|
@staticmethod |
|
def analyze_text(text: str, language: str = 'auto', preprocessing_options: Dict = None) -> Dict: |
|
"""Analyze single text with language support""" |
|
if not text.strip(): |
|
raise ValueError("Empty text provided") |
|
|
|
|
|
if language == 'auto': |
|
detected_lang = model_manager.detect_language(text) |
|
else: |
|
detected_lang = language |
|
|
|
|
|
model, tokenizer = model_manager.get_model(detected_lang) |
|
|
|
|
|
options = preprocessing_options or {} |
|
processed_text = text |
|
if options.get('clean_text', False): |
|
processed_text = TextProcessor.clean_text( |
|
text, |
|
options.get('remove_punctuation', True), |
|
options.get('remove_numbers', False) |
|
) |
|
|
|
try: |
|
|
|
inputs = tokenizer(processed_text, return_tensors="pt", padding=True, |
|
truncation=True, max_length=config.MAX_TEXT_LENGTH).to(model_manager.device) |
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
probs = torch.nn.functional.softmax(outputs.logits, dim=-1).cpu().numpy()[0] |
|
|
|
|
|
if len(probs) == 3: |
|
sentiment_idx = np.argmax(probs) |
|
sentiment_labels = ['Negative', 'Neutral', 'Positive'] |
|
sentiment = sentiment_labels[sentiment_idx] |
|
confidence = float(probs[sentiment_idx]) |
|
|
|
result = { |
|
'sentiment': sentiment, |
|
'confidence': confidence, |
|
'neg_prob': float(probs[0]), |
|
'neu_prob': float(probs[1]), |
|
'pos_prob': float(probs[2]), |
|
'has_neutral': True |
|
} |
|
else: |
|
pred = np.argmax(probs) |
|
sentiment = "Positive" if pred == 1 else "Negative" |
|
confidence = float(probs[pred]) |
|
|
|
result = { |
|
'sentiment': sentiment, |
|
'confidence': confidence, |
|
'neg_prob': float(probs[0]), |
|
'pos_prob': float(probs[1]), |
|
'neu_prob': 0.0, |
|
'has_neutral': False |
|
} |
|
|
|
|
|
result.update({ |
|
'language': detected_lang, |
|
'keywords': TextProcessor.extract_keywords(text), |
|
'word_count': len(text.split()), |
|
'char_count': len(text) |
|
}) |
|
|
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"Analysis failed: {e}") |
|
raise |
|
|
|
class PlotlyVisualizer: |
|
"""Enhanced visualizations with Plotly""" |
|
|
|
@staticmethod |
|
def create_sentiment_gauge(result: Dict, theme: str = 'default') -> go.Figure: |
|
"""Create an animated sentiment gauge""" |
|
colors = config.THEMES[theme] |
|
|
|
if result['has_neutral']: |
|
|
|
fig = go.Figure(go.Indicator( |
|
mode = "gauge+number+delta", |
|
value = result['pos_prob'] * 100, |
|
domain = {'x': [0, 1], 'y': [0, 1]}, |
|
title = {'text': f"Sentiment: {result['sentiment']}"}, |
|
delta = {'reference': 50}, |
|
gauge = { |
|
'axis': {'range': [None, 100]}, |
|
'bar': {'color': colors['pos'] if result['sentiment'] == 'Positive' else colors['neg']}, |
|
'steps': [ |
|
{'range': [0, 33], 'color': colors['neg']}, |
|
{'range': [33, 67], 'color': colors['neu']}, |
|
{'range': [67, 100], 'color': colors['pos']} |
|
], |
|
'threshold': { |
|
'line': {'color': "red", 'width': 4}, |
|
'thickness': 0.75, |
|
'value': 90 |
|
} |
|
} |
|
)) |
|
else: |
|
|
|
fig = go.Figure(go.Indicator( |
|
mode = "gauge+number", |
|
value = result['confidence'] * 100, |
|
domain = {'x': [0, 1], 'y': [0, 1]}, |
|
title = {'text': f"Confidence: {result['sentiment']}"}, |
|
gauge = { |
|
'axis': {'range': [None, 100]}, |
|
'bar': {'color': colors['pos'] if result['sentiment'] == 'Positive' else colors['neg']}, |
|
'steps': [ |
|
{'range': [0, 50], 'color': "lightgray"}, |
|
{'range': [50, 100], 'color': "gray"} |
|
] |
|
} |
|
)) |
|
|
|
fig.update_layout(height=400, font={'size': 16}) |
|
return fig |
|
|
|
@staticmethod |
|
def create_probability_bars(result: Dict, theme: str = 'default') -> go.Figure: |
|
"""Create probability bar chart""" |
|
colors = config.THEMES[theme] |
|
|
|
if result['has_neutral']: |
|
labels = ['Negative', 'Neutral', 'Positive'] |
|
values = [result['neg_prob'], result['neu_prob'], result['pos_prob']] |
|
bar_colors = [colors['neg'], colors['neu'], colors['pos']] |
|
else: |
|
labels = ['Negative', 'Positive'] |
|
values = [result['neg_prob'], result['pos_prob']] |
|
bar_colors = [colors['neg'], colors['pos']] |
|
|
|
fig = go.Figure(data=[ |
|
go.Bar(x=labels, y=values, marker_color=bar_colors, text=[f'{v:.3f}' for v in values]) |
|
]) |
|
|
|
fig.update_traces(texttemplate='%{text}', textposition='outside') |
|
fig.update_layout( |
|
title="Sentiment Probabilities", |
|
yaxis_title="Probability", |
|
height=400, |
|
showlegend=False |
|
) |
|
|
|
return fig |
|
|
|
@staticmethod |
|
def create_history_dashboard(history: List[Dict]) -> go.Figure: |
|
"""Create comprehensive history dashboard""" |
|
if len(history) < 2: |
|
return go.Figure() |
|
|
|
|
|
fig = make_subplots( |
|
rows=2, cols=2, |
|
subplot_titles=['Sentiment Timeline', 'Confidence Distribution', |
|
'Language Distribution', 'Sentiment Summary'], |
|
specs=[[{"secondary_y": False}, {"secondary_y": False}], |
|
[{"type": "pie"}, {"type": "bar"}]] |
|
) |
|
|
|
|
|
indices = list(range(len(history))) |
|
pos_probs = [item['pos_prob'] for item in history] |
|
confidences = [item['confidence'] for item in history] |
|
sentiments = [item['sentiment'] for item in history] |
|
languages = [item.get('language', 'en') for item in history] |
|
|
|
|
|
colors = ['#4CAF50' if s == 'Positive' else '#F44336' for s in sentiments] |
|
fig.add_trace( |
|
go.Scatter(x=indices, y=pos_probs, mode='lines+markers', |
|
marker=dict(color=colors, size=8), |
|
name='Positive Probability'), |
|
row=1, col=1 |
|
) |
|
|
|
|
|
fig.add_trace( |
|
go.Histogram(x=confidences, nbinsx=10, name='Confidence'), |
|
row=1, col=2 |
|
) |
|
|
|
|
|
lang_counts = Counter(languages) |
|
fig.add_trace( |
|
go.Pie(labels=list(lang_counts.keys()), values=list(lang_counts.values()), |
|
name="Languages"), |
|
row=2, col=1 |
|
) |
|
|
|
|
|
sent_counts = Counter(sentiments) |
|
fig.add_trace( |
|
go.Bar(x=list(sent_counts.keys()), y=list(sent_counts.values()), |
|
marker_color=['#4CAF50' if k == 'Positive' else '#F44336' for k in sent_counts.keys()]), |
|
row=2, col=2 |
|
) |
|
|
|
fig.update_layout(height=800, showlegend=False) |
|
return fig |
|
|
|
|
|
def analyze_single_text(text: str, language: str, theme: str, clean_text: bool, |
|
remove_punct: bool, remove_nums: bool): |
|
"""Enhanced single text analysis""" |
|
try: |
|
if not text.strip(): |
|
return "Please enter text", None, None, "No analysis performed" |
|
|
|
|
|
language_map = { |
|
'Auto Detect': 'auto', |
|
'English': 'en', |
|
'Chinese': 'zh', |
|
'Spanish': 'es', |
|
'French': 'fr', |
|
'German': 'de', |
|
'Swedish': 'sv' |
|
} |
|
language_code = language_map.get(language, 'auto') |
|
|
|
preprocessing_options = { |
|
'clean_text': clean_text, |
|
'remove_punctuation': remove_punct, |
|
'remove_numbers': remove_nums |
|
} |
|
|
|
result = SentimentAnalyzer.analyze_text(text, language_code, preprocessing_options) |
|
|
|
|
|
history_entry = { |
|
'text': text[:100] + '...' if len(text) > 100 else text, |
|
'full_text': text, |
|
'sentiment': result['sentiment'], |
|
'confidence': result['confidence'], |
|
'pos_prob': result['pos_prob'], |
|
'neg_prob': result['neg_prob'], |
|
'neu_prob': result.get('neu_prob', 0), |
|
'language': result['language'], |
|
'timestamp': datetime.now().isoformat() |
|
} |
|
history_manager.add_entry(history_entry) |
|
|
|
|
|
gauge_fig = PlotlyVisualizer.create_sentiment_gauge(result, theme) |
|
bars_fig = PlotlyVisualizer.create_probability_bars(result, theme) |
|
|
|
|
|
info_text = f""" |
|
**Analysis Results:** |
|
- **Sentiment:** {result['sentiment']} ({result['confidence']:.3f} confidence) |
|
- **Language:** {result['language'].upper()} |
|
- **Keywords:** {', '.join(result['keywords'])} |
|
- **Stats:** {result['word_count']} words, {result['char_count']} characters |
|
""" |
|
|
|
return info_text, gauge_fig, bars_fig, "Analysis completed successfully" |
|
|
|
except Exception as e: |
|
logger.error(f"Analysis failed: {e}") |
|
return f"Error: {str(e)}", None, None, "Analysis failed" |
|
|
|
def get_history_stats(): |
|
"""Get history statistics""" |
|
stats = history_manager.get_stats() |
|
if not stats: |
|
return "No analysis history available" |
|
|
|
return f""" |
|
**History Statistics:** |
|
- Total Analyses: {stats['total_analyses']} |
|
- Positive: {stats['positive_count']} | Negative: {stats['negative_count']} |
|
- Average Confidence: {stats['avg_confidence']:.3f} |
|
- Languages Detected: {stats['languages_detected']} |
|
""" |
|
|
|
def plot_history_dashboard(): |
|
"""Create history dashboard""" |
|
history = history_manager.get_history() |
|
if len(history) < 2: |
|
return None, "Need at least 2 analyses for dashboard" |
|
|
|
fig = PlotlyVisualizer.create_history_dashboard(history) |
|
return fig, f"Dashboard showing {len(history)} analyses" |
|
|
|
def export_history_excel(): |
|
"""Export history to Excel""" |
|
history = history_manager.get_history() |
|
if not history: |
|
return None, "No history to export" |
|
|
|
try: |
|
df = pd.DataFrame(history) |
|
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') |
|
df.to_excel(temp_file.name, index=False) |
|
return temp_file.name, f"Exported {len(history)} entries to Excel" |
|
except Exception as e: |
|
return None, f"Export failed: {str(e)}" |
|
|
|
def clear_all_history(): |
|
"""Clear analysis history""" |
|
count = history_manager.clear() |
|
return f"Cleared {count} entries from history" |
|
|
|
|
|
SAMPLE_TEXTS = [ |
|
|
|
["The film had its moments, but overall it felt a bit too long and lacked emotional depth."], |
|
|
|
|
|
["I was completely blown away by the movie — the performances were raw and powerful, and the story stayed with me long after the credits rolled."], |
|
|
|
|
|
["这部电影节奏拖沓,剧情老套,完全没有让我产生任何共鸣,是一次失望的观影体验。"], |
|
|
|
|
|
["Una obra maestra del cine contemporáneo, con actuaciones sobresalientes, un guion bien escrito y una dirección impecable."], |
|
|
|
|
|
["Je m'attendais à beaucoup mieux. Le scénario était confus, les dialogues ennuyeux, et je me suis presque endormi au milieu du film."], |
|
|
|
|
|
["Der Film war ein emotionales Erlebnis mit großartigen Bildern, einem mitreißenden Soundtrack und einer Geschichte, die zum Nachdenken anregt."], |
|
|
|
|
|
["Filmen var en besvikelse – tråkig handling, överdrivet skådespeleri och ett slut som inte gav något avslut alls."] |
|
] |
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft(), title="Advanced Sentiment Analyzer") as demo: |
|
gr.Markdown("# 🎭 Multilingual Sentiment Analyzer") |
|
gr.Markdown("Analyze sentiment with multiple languages, themes, and advanced visualizations") |
|
|
|
with gr.Tab("📝 Single Analysis"): |
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
text_input = gr.Textbox( |
|
label="Text to Analyze", |
|
placeholder="Enter your text here... (supports multiple languages)", |
|
lines=4 |
|
) |
|
|
|
with gr.Row(): |
|
language_select = gr.Dropdown( |
|
choices=['Auto Detect', 'English', 'Chinese', 'Spanish', 'French', 'German'], |
|
value='Auto Detect', |
|
label="Language" |
|
) |
|
theme_select = gr.Dropdown( |
|
choices=list(config.THEMES.keys()), |
|
value='default', |
|
label="Theme" |
|
) |
|
|
|
with gr.Row(): |
|
clean_text = gr.Checkbox(label="Clean Text", value=False) |
|
remove_punct = gr.Checkbox(label="Remove Punctuation", value=True) |
|
remove_nums = gr.Checkbox(label="Remove Numbers", value=False) |
|
|
|
analyze_btn = gr.Button("🔍 Analyze", variant="primary", size="lg") |
|
|
|
gr.Examples( |
|
examples=SAMPLE_TEXTS, |
|
inputs=text_input, |
|
label="Sample Texts (Multiple Languages)" |
|
) |
|
|
|
with gr.Column(scale=1): |
|
result_info = gr.Markdown("Enter text and click Analyze") |
|
|
|
with gr.Row(): |
|
gauge_plot = gr.Plot(label="Sentiment Gauge") |
|
bars_plot = gr.Plot(label="Probability Distribution") |
|
|
|
status_output = gr.Textbox(label="Status", interactive=False) |
|
|
|
with gr.Tab("📊 History & Analytics"): |
|
with gr.Row(): |
|
stats_btn = gr.Button("📈 Get Statistics") |
|
dashboard_btn = gr.Button("📊 View Dashboard") |
|
clear_btn = gr.Button("🗑️ Clear History", variant="stop") |
|
|
|
with gr.Row(): |
|
export_excel_btn = gr.Button("📁 Export Excel") |
|
|
|
stats_output = gr.Markdown("Click 'Get Statistics' to view analysis history") |
|
dashboard_plot = gr.Plot(label="Analytics Dashboard") |
|
excel_file = gr.File(label="Download Excel Report") |
|
history_status = gr.Textbox(label="Status", interactive=False) |
|
|
|
|
|
analyze_btn.click( |
|
analyze_single_text, |
|
inputs=[text_input, language_select, theme_select, clean_text, remove_punct, remove_nums], |
|
outputs=[result_info, gauge_plot, bars_plot, status_output] |
|
) |
|
|
|
stats_btn.click( |
|
get_history_stats, |
|
outputs=stats_output |
|
) |
|
|
|
dashboard_btn.click( |
|
plot_history_dashboard, |
|
outputs=[dashboard_plot, history_status] |
|
) |
|
|
|
export_excel_btn.click( |
|
export_history_excel, |
|
outputs=[excel_file, history_status] |
|
) |
|
|
|
clear_btn.click( |
|
clear_all_history, |
|
outputs=history_status |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(share=True) |