KavyaBansal's picture
Update app.py
b2300b6 verified
# Tone Classification System
# This implementation combines text and acoustic features to detect emotions,
# including sarcasm and figures of speech
# Part 1: Install required packages with improved error handling
import sys
import os
# Function to install packages with error handling
def install_packages():
packages = [
"hf_xet","transformers", "pytorch-lightning", "datasets",
"numpy", "pandas", "matplotlib", "seaborn",
"librosa", "opensmile", "torch", "torchaudio",
"accelerate", "nltk", "scikit-learn"
]
for package in packages:
try:
print(f"Installing {package}...")
import subprocess
# Install a package quietly
subprocess.run([sys.executable, '-m', 'pip', 'install', package, '-q'])
print(f"Successfully installed {package}")
except Exception as e:
print(f"Error installing {package}: {e}")
print("Package installation completed!")
install_packages()
# Part 2: Import libraries with error handling
import numpy as np
import pandas as pd
import torch
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# Check for CUDA availability
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {DEVICE}")
# Try to import libraries that might cause issues with specific error handling
try:
import torchaudio
print("Successfully imported torchaudio")
except Exception as e:
print(f"Error importing torchaudio: {e}")
print("Some audio functionality may be limited")
try:
import librosa
print("Successfully imported librosa")
except Exception as e:
print(f"Error importing librosa: {e}")
print("Audio processing capabilities will be limited")
try:
import opensmile
print("Successfully imported opensmile")
except Exception as e:
print(f"Error importing opensmile: {e}")
print("Will use fallback feature extraction methods")
# Part 3: Define constants
EMOTIONS = ["neutral", "happy", "sad", "angry", "fearful", "disgust", "surprised", "sarcastic"]
MODEL_CACHE_DIR = "./model_cache"
# Create cache directory if it doesn't exist
os.makedirs(MODEL_CACHE_DIR, exist_ok=True)
print(f"Using model cache directory: {MODEL_CACHE_DIR}")
# Part 4: Model Loading with Error Handling and Cache
def load_model_with_cache(model_class, model_name, cache_subdir=""):
"""Load a model with proper error handling and caching"""
cache_path = os.path.join(MODEL_CACHE_DIR, cache_subdir)
os.makedirs(cache_path, exist_ok=True)
print(f"Loading model: {model_name}")
try:
model = model_class.from_pretrained(
model_name,
cache_dir=cache_path,
local_files_only=os.path.exists(os.path.join(cache_path, model_name.replace('/', '-')))
)
print(f"Successfully loaded model: {model_name}")
return model
except KeyboardInterrupt:
print("\nModel download interrupted. Try again or download manually.")
return None
except Exception as e:
print(f"Error loading model {model_name}: {e}")
print("Will try to continue with limited functionality.")
return None
# Part 5: Modified Whisper Transcriber with Error Handling
class WhisperTranscriber:
def __init__(self, model_size="tiny"): # Changed from base to tiny for faster loading
from transformers import WhisperProcessor, WhisperForConditionalGeneration
print("Initializing Whisper transcriber...")
try:
self.processor = load_model_with_cache(
WhisperProcessor,
f"openai/whisper-{model_size}",
"whisper"
)
self.model = load_model_with_cache(
WhisperForConditionalGeneration,
f"openai/whisper-{model_size}",
"whisper"
)
if self.model is not None:
self.model = self.model.to(DEVICE)
print("Whisper model loaded successfully and moved to device")
else:
print("Failed to load Whisper model")
except Exception as e:
print(f"Error initializing Whisper: {e}")
self.processor = None
self.model = None
def transcribe(self, audio_path):
if self.processor is None or self.model is None:
print("Whisper not properly initialized. Cannot transcribe.")
return "Error: Transcription failed."
try:
# Load audio
waveform, sample_rate = librosa.load(audio_path, sr=16000)
# Process audio
input_features = self.processor(waveform, sampling_rate=16000, return_tensors="pt").input_features.to(DEVICE)
# Generate transcription
with torch.no_grad():
predicted_ids = self.model.generate(input_features, max_length=100)
# Decode the transcription
transcription = self.processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
return transcription
except Exception as e:
print(f"Error in transcription: {e}")
return "Error: Transcription failed."
# Part 6: Text-based Emotion Analysis with Fallback Options
# Improved Text-based Emotion Analysis
class TextEmotionClassifier:
def __init__(self):
from transformers import AutoTokenizer, AutoModelForSequenceClassification
print("Initializing text emotion classifier...")
# Primary emotion model
self.emotion_model_name = "j-hartmann/emotion-english-distilroberta-base"
self.tokenizer = load_model_with_cache(
AutoTokenizer,
self.emotion_model_name,
"text_emotion"
)
self.model = load_model_with_cache(
AutoModelForSequenceClassification,
self.emotion_model_name,
"text_emotion"
)
if self.model is not None:
self.model = self.model.to(DEVICE)
# Sentiment model for sarcasm detection
self.sentiment_model_name = "cardiffnlp/twitter-roberta-base-sentiment"
self.sarcasm_tokenizer = load_model_with_cache(
AutoTokenizer,
self.sentiment_model_name,
"sentiment"
)
self.sarcasm_model = load_model_with_cache(
AutoModelForSequenceClassification,
self.sentiment_model_name,
"sentiment"
)
if self.sarcasm_model is not None:
self.sarcasm_model = self.sarcasm_model.to(DEVICE)
# Enhanced keyword-based analyzer as fallback and enhancement
self.keyword_analyzer = EnhancedKeywordEmotionAnalyzer()
def predict_emotion(self, text):
if self.tokenizer is None or self.model is None:
print("Text emotion model not properly initialized.")
# Use keyword-based analysis as primary method in this case
return self.keyword_analyzer.analyze(text)
try:
# Get model predictions
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(DEVICE)
with torch.no_grad():
outputs = self.model(**inputs)
# Get probabilities from model
model_probs = F.softmax(outputs.logits, dim=1).cpu().numpy()[0]
# Get keyword-based analysis
keyword_probs = self.keyword_analyzer.analyze(text)
# Combine both methods with weighting
# If text contains strong emotional keywords, give more weight to keyword analysis
keyword_strength = self.keyword_analyzer.get_keyword_strength(text)
# Adaptive weighting based on keyword strength
keyword_weight = min(0.6, keyword_strength * 0.1) # Cap at 0.6
model_weight = 1.0 - keyword_weight
# Combine predictions
combined_probs = (model_weight * model_probs) + (keyword_weight * keyword_probs)
# Normalize to ensure sum is 1
combined_probs = combined_probs / np.sum(combined_probs)
return combined_probs
except Exception as e:
print(f"Error in text emotion prediction: {e}")
# Fallback to keyword analysis
return self.keyword_analyzer.analyze(text)
def detect_sarcasm(self, text):
if self.sarcasm_tokenizer is None or self.sarcasm_model is None:
print("Sarcasm model not properly initialized.")
# Use keyword-based sarcasm detection as fallback
return self.keyword_analyzer.detect_sarcasm(text)
try:
inputs = self.sarcasm_tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(DEVICE)
with torch.no_grad():
outputs = self.sarcasm_model(**inputs)
sentiment_probs = F.softmax(outputs.logits, dim=1).cpu().numpy()[0]
# Enhance with keyword-based sarcasm detection
keyword_sarcasm = self.keyword_analyzer.detect_sarcasm(text)
# If keyword analysis strongly suggests sarcasm, blend with model prediction
if keyword_sarcasm[2] > 0.5: # If sarcasm probability is high from keywords
# Give 40% weight to keyword analysis
combined_probs = 0.6 * sentiment_probs + 0.4 * keyword_sarcasm
return combined_probs
return sentiment_probs
except Exception as e:
print(f"Error in sarcasm detection: {e}")
# Fallback to keyword analysis
return self.keyword_analyzer.detect_sarcasm(text)
# Enhanced keyword-based emotion analyzer
class EnhancedKeywordEmotionAnalyzer:
def __init__(self):
# Enhanced emotion keywords with weights
self.emotion_keywords = {
"happy": [
("happy", 1.0), ("joy", 1.0), ("delight", 0.9), ("excited", 0.9),
("glad", 0.8), ("pleased", 0.8), ("cheerful", 0.9), ("smile", 0.7),
("enjoy", 0.8), ("wonderful", 0.8), ("great", 0.7), ("excellent", 0.8),
("thrilled", 1.0), ("ecstatic", 1.0), ("content", 0.7), ("satisfied", 0.7),
("pleasure", 0.8), ("fantastic", 0.9), ("awesome", 0.9), ("love", 0.9),
("amazing", 0.9), ("perfect", 0.8), ("fun", 0.8), ("delighted", 1.0)
],
"sad": [
("sad", 1.0), ("unhappy", 0.9), ("depressed", 1.0), ("sorrow", 1.0),
("grief", 1.0), ("tearful", 0.9), ("miserable", 1.0), ("disappointed", 0.8),
("upset", 0.8), ("down", 0.7), ("heartbroken", 1.0), ("gloomy", 0.9),
("devastated", 1.0), ("hurt", 0.8), ("blue", 0.7), ("regret", 0.8),
("dejected", 0.9), ("dismal", 0.9), ("lonely", 0.8), ("terrible", 0.8),
("hopeless", 0.9), ("lost", 0.7), ("crying", 0.9), ("tragic", 0.9)
],
"angry": [
("angry", 1.0), ("mad", 0.9), ("furious", 1.0), ("annoyed", 0.8),
("irritated", 0.8), ("enraged", 1.0), ("livid", 1.0), ("outraged", 1.0),
("frustrated", 0.8), ("infuriated", 1.0), ("pissed", 0.9), ("hate", 0.9),
("hostile", 0.9), ("bitter", 0.8), ("resentful", 0.8), ("fuming", 0.9),
("irate", 1.0), ("outraged", 1.0), ("seething", 1.0), ("cross", 0.7),
("exasperated", 0.8), ("disgusted", 0.8), ("indignant", 0.9), ("rage", 1.0)
],
"fearful": [
("afraid", 1.0), ("scared", 1.0), ("frightened", 1.0), ("fear", 0.9),
("terror", 1.0), ("panic", 1.0), ("horrified", 1.0), ("worried", 0.8),
("anxious", 0.9), ("nervous", 0.8), ("terrified", 1.0), ("dread", 0.9),
("alarmed", 0.8), ("petrified", 1.0), ("threatened", 0.8), ("intimidated", 0.8),
("apprehensive", 0.8), ("uneasy", 0.7), ("tense", 0.7), ("stressed", 0.7),
("spooked", 0.9), ("paranoid", 0.9), ("freaked", 0.9), ("jumpy", 0.8)
],
"disgust": [
("disgust", 1.0), ("gross", 0.9), ("repulsed", 1.0), ("revolted", 1.0),
("sick", 0.8), ("nauseous", 0.8), ("yuck", 0.9), ("ew", 0.8),
("nasty", 0.9), ("repugnant", 1.0), ("foul", 0.9), ("appalled", 0.9),
("sickened", 0.9), ("offended", 0.8), ("distaste", 0.9), ("aversion", 0.9),
("abhorrent", 1.0), ("odious", 1.0), ("repellent", 1.0), ("objectionable", 0.8),
("detestable", 1.0), ("loathsome", 1.0), ("vile", 1.0), ("horrid", 0.9)
],
"surprised": [
("surprised", 1.0), ("shocked", 0.9), ("astonished", 1.0), ("amazed", 0.9),
("startled", 0.9), ("stunned", 0.9), ("speechless", 0.8), ("unexpected", 0.8),
("wow", 0.8), ("whoa", 0.8), ("unbelievable", 0.8), ("incredible", 0.8),
("dumbfounded", 1.0), ("flabbergasted", 1.0), ("staggered", 0.9), ("aghast", 0.9),
("astounded", 1.0), ("taken aback", 0.9), ("disbelief", 0.8), ("bewildered", 0.8),
("thunderstruck", 1.0), ("wonder", 0.7), ("sudden", 0.6), ("jaw-dropping", 0.9)
],
"neutral": [
("okay", 0.7), ("fine", 0.7), ("alright", 0.7), ("normal", 0.8),
("calm", 0.8), ("steady", 0.8), ("balanced", 0.8), ("ordinary", 0.8),
("routine", 0.8), ("regular", 0.8), ("standard", 0.8), ("moderate", 0.8),
("usual", 0.8), ("typical", 0.8), ("average", 0.8), ("common", 0.8),
("so-so", 0.7), ("fair", 0.7), ("acceptable", 0.7), ("stable", 0.8),
("unchanged", 0.8), ("plain", 0.7), ("mild", 0.7), ("middle-of-the-road", 0.8)
],
"sarcastic": [
("yeah right", 1.0), ("sure thing", 0.9), ("oh great", 0.9), ("how wonderful", 0.9),
("wow", 0.7), ("really", 0.7), ("obviously", 0.8), ("definitely", 0.7),
("of course", 0.7), ("totally", 0.7), ("exactly", 0.7), ("perfect", 0.7),
("brilliant", 0.8), ("genius", 0.8), ("whatever", 0.8), ("right", 0.7),
("nice job", 0.8), ("good one", 0.8), ("bravo", 0.8), ("slow clap", 1.0),
("im shocked", 0.9), ("never would have guessed", 0.9), ("shocking", 0.7), ("unbelievable", 0.7)
]
}
# Sarcasm indicators
self.sarcasm_indicators = [
"yeah right", "sure thing", "oh great", "riiiight", "suuure",
"*slow clap*", "/s", "wow just wow", "you don't say", "no kidding",
"what a surprise", "shocker", "congratulations", "well done", "genius",
"oh wow", "oh really", "totally", "absolutely", "clearly", "obviously",
"genius idea", "brilliant plan", "fantastic job", "amazing work"
]
# Negation words
self.negations = [
"not", "no", "never", "none", "nothing", "neither", "nor", "nowhere",
"hardly", "scarcely", "barely", "doesn't", "isn't", "wasn't", "shouldn't",
"wouldn't", "couldn't", "won't", "can't", "don't", "didn't", "haven't"
]
# Intensifiers
self.intensifiers = [
"very", "really", "extremely", "absolutely", "completely", "totally",
"utterly", "quite", "particularly", "especially", "remarkably", "truly",
"so", "too", "such", "incredibly", "exceedingly", "extraordinarily"
]
# Compile patterns for more efficient matching
import re
self.emotion_patterns = {}
for emotion, keywords in self.emotion_keywords.items():
self.emotion_patterns[emotion] = [
(re.compile(r'\b' + re.escape(word) + r'\b', re.IGNORECASE), weight)
for word, weight in keywords
]
self.negation_pattern = re.compile(r'\b(' + '|'.join(re.escape(n) for n in self.negations) + r')\s+(\w+)', re.IGNORECASE)
self.intensifier_pattern = re.compile(r'\b(' + '|'.join(re.escape(i) for i in self.intensifiers) + r')\s+(\w+)', re.IGNORECASE)
def analyze(self, text):
"""
Analyze text for emotions using enhanced keyword matching
Returns numpy array of emotion probabilities
"""
# Initialize scores
emotion_scores = {emotion: 0.0 for emotion in EMOTIONS}
# Set base score for neutral
emotion_scores["neutral"] = 1.0
# Convert to lowercase for case-insensitive matching
text_lower = text.lower()
# Process each emotion
for emotion, patterns in self.emotion_patterns.items():
for pattern, weight in patterns:
matches = pattern.findall(text_lower)
if matches:
# Add score based on number of matches and their weights
emotion_scores[emotion] += len(matches) * weight
# Process negations - look for "not happy" patterns
negation_matches = self.negation_pattern.finditer(text_lower)
for match in negation_matches:
negation, word = match.groups()
# Check if the negated word is in any emotion keywords
for emotion, keywords in self.emotion_keywords.items():
if any(word == kw[0] for kw in keywords):
# Reduce score for this emotion and slightly increase opposite emotions
emotion_scores[emotion] -= 0.7
# Increase opposite emotions (e.g., if "not happy", increase "sad")
if emotion == "happy":
emotion_scores["sad"] += 0.3
elif emotion == "sad":
emotion_scores["happy"] += 0.3
# Process intensifiers - "very happy" should increase score
intensifier_matches = self.intensifier_pattern.finditer(text_lower)
for match in intensifier_matches:
intensifier, word = match.groups()
# Check if the intensified word is in any emotion keywords
for emotion, keywords in self.emotion_keywords.items():
if any(word == kw[0] for kw in keywords):
# Increase score for this emotion
emotion_scores[emotion] += 0.5
# Ensure no negative scores
for emotion in emotion_scores:
emotion_scores[emotion] = max(0, emotion_scores[emotion])
# Normalize to probabilities
total = sum(emotion_scores.values())
if total > 0:
probs = {emotion: score/total for emotion, score in emotion_scores.items()}
else:
# If no emotions detected, default to neutral
probs = {emotion: 0.0 for emotion in EMOTIONS}
probs["neutral"] = 1.0
# Convert to numpy array in the same order as EMOTIONS
return np.array([probs[emotion] for emotion in EMOTIONS])
def detect_sarcasm(self, text):
"""
Detect sarcasm in text
Returns [negative, neutral, positive] probability array where high "positive"
with negative context indicates sarcasm
"""
text_lower = text.lower()
sarcasm_score = 0.0
# Check for direct sarcasm indicators
for indicator in self.sarcasm_indicators:
if indicator in text_lower:
sarcasm_score += 0.3
# Check for common sarcasm patterns
positive_words = [kw[0] for kw in self.emotion_keywords["happy"]]
has_positive = any(word in text_lower for word in positive_words)
negative_context = any(neg in text_lower for neg in ["terrible", "awful", "horrible", "fail", "disaster", "mess"])
# Positive words in negative context suggests sarcasm
if has_positive and negative_context:
sarcasm_score += 0.4
# Check for excessive punctuation which might indicate sarcasm
if "!!!" in text or "?!" in text:
sarcasm_score += 0.2
# Cap the score
sarcasm_score = min(1.0, sarcasm_score)
# If sarcasm detected, return sentiment array biased toward sarcasm
# [negative, neutral, positive] - high positive with negative context indicates sarcasm
if sarcasm_score > 0.3:
return np.array([0.1, 0.1, 0.8]) # High positive signal for sarcasm detection
else:
# Return balanced array (no strong indication of sarcasm)
return np.array([0.33, 0.34, 0.33])
def get_keyword_strength(self, text):
"""
Measure the strength of emotional keywords in the text
Returns a value between 0 and 10
"""
text_lower = text.lower()
total_matches = 0
weighted_matches = 0
# Count all matches across all emotions with their weights
for emotion, patterns in self.emotion_patterns.items():
for pattern, weight in patterns:
matches = pattern.findall(text_lower)
total_matches += len(matches)
weighted_matches += len(matches) * weight
# Calculate strength score on a scale of 0-10
if total_matches > 0:
avg_weight = weighted_matches / total_matches
# Scale based on number of matches and their average weight
strength = min(10, (total_matches * avg_weight) / 2)
return strength
else:
return 0.0
# Part 7: Acoustic Feature Extraction with Fallback
class AcousticFeatureExtractor:
def __init__(self):
self.use_opensmile = True
try:
import opensmile
# Initialize OpenSMILE with the eGeMAPS feature set instead of ComParE_2016
# eGeMAPS is specifically designed for voice analysis and emotion recognition
self.smile = opensmile.Smile(
feature_set=opensmile.FeatureSet.eGeMAPSv02,
feature_level=opensmile.FeatureLevel.Functionals,
)
print("OpenSMILE feature extractor initialized successfully with eGeMAPS")
except Exception as e:
print(f"Failed to initialize OpenSMILE: {e}")
print("Using librosa for feature extraction instead.")
self.use_opensmile = False
def extract_features(self, audio_path):
try:
if self.use_opensmile:
# Use OpenSMILE for feature extraction
features = self.smile.process_file(audio_path)
return features.values
else:
# Fallback to improved librosa feature extraction
return self._extract_librosa_features(audio_path)
except Exception as e:
print(f"Error in acoustic feature extraction: {e}")
print("Using dummy features as fallback")
# Return dummy features in case of error
return np.zeros(88) # eGeMAPS dimension
def _extract_librosa_features(self, audio_path):
"""Improved librosa feature extraction focusing on emotion-relevant features"""
try:
# Load audio
y, sr = librosa.load(audio_path, sr=22050)
# Extract features specifically relevant to emotion detection
# 1. Pitch features (fundamental frequency)
pitches, magnitudes = librosa.piptrack(y=y, sr=sr)
pitch_mean = np.mean(pitches[magnitudes > np.median(magnitudes)])
pitch_std = np.std(pitches[magnitudes > np.median(magnitudes)])
# 2. Energy/intensity features
rms = librosa.feature.rms(y=y)[0]
energy_mean = np.mean(rms)
energy_std = np.std(rms)
# 3. Tempo and rhythm features
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
# 4. Spectral features
spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)[0]
spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)[0]
spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)[0]
# 5. Voice quality features
zero_crossing_rate = librosa.feature.zero_crossing_rate(y)[0]
# Compute statistics for each feature
features = []
for feature in [spectral_centroid, spectral_bandwidth, spectral_rolloff, zero_crossing_rate]:
features.extend([np.mean(feature), np.std(feature), np.min(feature), np.max(feature)])
# Add pitch and energy features
features.extend([pitch_mean, pitch_std, energy_mean, energy_std, tempo])
# Add MFCCs (critical for speech emotion)
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
for mfcc in mfccs:
features.extend([np.mean(mfcc), np.std(mfcc)])
# Convert to numpy array
features = np.array(features)
# Handle NaN values
features = np.nan_to_num(features)
# Pad or truncate to match eGeMAPS dimension (88)
if len(features) < 88:
features = np.pad(features, (0, 88 - len(features)))
else:
features = features[:88]
return features
except Exception as e:
print(f"Error in librosa feature extraction: {e}")
return np.zeros(88) # Same dimension as eGeMAPS
# Part 8: Acoustic Emotion Classifier
class AcousticEmotionClassifier(nn.Module):
def __init__(self, input_dim, hidden_dim=128, num_classes=len(EMOTIONS)):
super().__init__()
# Normalize input features
self.batch_norm = nn.BatchNorm1d(input_dim)
# Feature extraction layers
self.feature_extractor = nn.Sequential(
nn.Linear(input_dim, hidden_dim * 2),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Dropout(0.3)
)
# Emotion classification head
self.classifier = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim // 2, num_classes)
)
# Initialize weights properly
self._init_weights()
def _init_weights(self):
"""Initialize weights with Xavier initialization"""
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
def forward(self, x):
# Handle different input shapes
if len(x.shape) == 1:
x = x.unsqueeze(0) # Add batch dimension
# Normalize features
x = self.batch_norm(x)
# Extract features
features = self.feature_extractor(x)
# Classify emotions
output = self.classifier(features)
return output
class PretrainedAudioClassifier:
"""A rule-based classifier for audio emotion detection until proper training"""
def __init__(self):
# Define acoustic feature thresholds for emotions based on research
# These are simplified heuristics based on acoustic phonetics research
self.feature_thresholds = {
"happy": {
"pitch_mean": (220, 400), # Higher pitch for happiness
"energy_mean": (0.6, 1.0), # Higher energy
"speech_rate": (0.8, 1.0) # Faster speech rate
},
"sad": {
"pitch_mean": (100, 220), # Lower pitch for sadness
"energy_mean": (0.1, 0.5), # Lower energy
"speech_rate": (0.3, 0.7) # Slower speech rate
},
"angry": {
"pitch_mean": (250, 400), # Higher pitch for anger
"energy_mean": (0.7, 1.0), # Higher energy
"speech_rate": (0.7, 1.0) # Faster speech rate
},
"fearful": {
"pitch_mean": (200, 350), # Higher pitch
"energy_mean": (0.4, 0.8), # Medium energy
"speech_rate": (0.6, 0.9) # Medium-fast speech rate
},
"neutral": {
"pitch_mean": (180, 240), # Medium pitch
"energy_mean": (0.3, 0.6), # Medium energy
"speech_rate": (0.4, 0.7) # Medium speech rate
}
}
def extract_key_features(self, audio_path):
"""Extract key acoustic features for rule-based classification"""
try:
y, sr = librosa.load(audio_path, sr=22050)
# Extract pitch
pitches, magnitudes = librosa.piptrack(y=y, sr=sr)
pitch_mean = np.mean(pitches[magnitudes > 0.1]) if np.any(magnitudes > 0.1) else 200
# Normalize pitch to 0-1 range (assuming human pitch range 80-400 Hz)
pitch_mean_norm = (pitch_mean - 80) / (400 - 80)
pitch_mean_norm = max(0, min(1, pitch_mean_norm))
# Extract energy
rms = librosa.feature.rms(y=y)[0]
energy_mean = np.mean(rms)
# Normalize energy
energy_mean_norm = energy_mean / 0.1 # Assuming 0.1 is a reasonable max RMS
energy_mean_norm = max(0, min(1, energy_mean_norm))
# Estimate speech rate from onsets
onset_env = librosa.onset.onset_strength(y=y, sr=sr)
onsets = librosa.onset.onset_detect(onset_envelope=onset_env, sr=sr)
if len(onsets) > 1:
speech_rate = len(onsets) / (len(y) / sr) # Onsets per second
speech_rate_norm = min(1.0, speech_rate / 5.0) # Normalize, assuming 5 onsets/sec is fast
else:
speech_rate_norm = 0.5 # Default to medium if can't detect
return {
"pitch_mean": pitch_mean_norm,
"energy_mean": energy_mean_norm,
"speech_rate": speech_rate_norm
}
except Exception as e:
print(f"Error extracting key features: {e}")
return {
"pitch_mean": 0.5, # Default to medium values
"energy_mean": 0.5,
"speech_rate": 0.5
}
def predict(self, audio_path):
"""Predict emotion based on acoustic features"""
# Extract key features
features = self.extract_key_features(audio_path)
# Calculate match scores for each emotion
emotion_scores = {}
for emotion, thresholds in self.feature_thresholds.items():
score = 0
for feature, (min_val, max_val) in thresholds.items():
# Normalize threshold to 0-1 range
min_norm = (min_val - 80) / (400 - 80) if feature == "pitch_mean" else min_val
max_norm = (max_val - 80) / (400 - 80) if feature == "pitch_mean" else max_val
# Check if feature is in the emotion's range
if min_norm <= features[feature] <= max_norm:
# Higher score if closer to the middle of the range
middle = (min_norm + max_norm) / 2
distance = abs(features[feature] - middle) / ((max_norm - min_norm) / 2)
feature_score = 1 - distance
score += feature_score
else:
# Penalty for being outside the range
score -= 0.5
emotion_scores[emotion] = max(0, score)
# Add small values for other emotions not in our basic set
for emotion in EMOTIONS:
if emotion not in emotion_scores:
emotion_scores[emotion] = 0.1
# Normalize scores to probabilities
total = sum(emotion_scores.values())
if total > 0:
probs = {emotion: score/total for emotion, score in emotion_scores.items()}
else:
# Default to neutral if all scores are 0
probs = {emotion: 0.1 for emotion in EMOTIONS}
probs["neutral"] = 0.5
# Convert to array in the same order as EMOTIONS
return np.array([probs[emotion] for emotion in EMOTIONS])
# Part 9: Improved Fusion Model for combining text and acoustic predictions
class AdaptiveModalityFusionModel(nn.Module):
def __init__(self, text_dim, acoustic_dim, hidden_dim=128, num_classes=len(EMOTIONS)):
super().__init__()
# Confidence estimators for each modality
self.text_confidence = nn.Sequential(
nn.Linear(text_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1),
nn.Sigmoid()
)
self.acoustic_confidence = nn.Sequential(
nn.Linear(acoustic_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1),
nn.Sigmoid()
)
# Feature transformation
self.text_transform = nn.Linear(text_dim, hidden_dim)
self.acoustic_transform = nn.Linear(acoustic_dim, hidden_dim)
# Final classifier
self.classifier = nn.Sequential(
nn.Linear(hidden_dim, num_classes),
nn.Softmax(dim=1)
)
# Initialize weights
self._init_weights()
def _init_weights(self):
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
def forward(self, text_features, acoustic_features):
# Estimate confidence for each modality
text_conf = self.text_confidence(text_features)
acoustic_conf = self.acoustic_confidence(acoustic_features)
# Normalize confidences to sum to 1
total_conf = text_conf + acoustic_conf
text_weight = text_conf / total_conf
acoustic_weight = acoustic_conf / total_conf
# Transform features
text_transformed = self.text_transform(text_features)
acoustic_transformed = self.acoustic_transform(acoustic_features)
# Weighted combination
combined = text_weight * text_transformed + acoustic_weight * acoustic_transformed
# Classification
output = self.classifier(combined)
return output
# Part 10: Simple Rule-based Fallback Classifier
class RuleBasedClassifier:
"""A simple rule-based classifier for fallback when models fail"""
def predict(self, text):
"""Predict emotion based on simple word matching"""
text = text.lower()
# Simple emotion keywords
emotion_keywords = {
"happy": ["happy", "joy", "delight", "excited", "glad", "pleased", "cheerful", "smile"],
"sad": ["sad", "unhappy", "depressed", "sorrow", "grief", "tearful", "miserable"],
"angry": ["angry", "mad", "furious", "annoyed", "irritated", "enraged", "livid"],
"fearful": ["afraid", "scared", "frightened", "fear", "terror", "panic", "horrified"],
"disgust": ["disgust", "gross", "repulsed", "revolted", "sick", "nauseous"],
"surprised": ["surprised", "shocked", "astonished", "amazed", "startled"],
"sarcastic": ["yeah right", "sure thing", "oh great", "wow", "really", "obviously"]
}
# Count matches for each emotion
emotion_scores = {emotion: 0 for emotion in EMOTIONS}
emotion_scores["neutral"] = 1 # Default to neutral
for emotion, keywords in emotion_keywords.items():
for keyword in keywords:
if keyword in text:
emotion_scores[emotion] += 1
# Return the emotion with highest score
max_emotion = max(emotion_scores, key=emotion_scores.get)
# Convert to probabilities
total = sum(emotion_scores.values())
probs = {emotion: score/total for emotion, score in emotion_scores.items()}
return max_emotion, probs
# Part 11: Complete Emotion Recognition Pipeline with Comprehensive Error Handling
class EmotionRecognitionPipeline:
def __init__(self, acoustic_model_path=None, fusion_model_path=None):
try:
print("Initializing Improved Emotion Recognition Pipeline...")
# Initialize transcriber
self.transcriber = WhisperTranscriber()
# Initialize text classifier
self.text_classifier = TextEmotionClassifier()
# Initialize feature extractor with improved features
self.feature_extractor = AcousticFeatureExtractor()
# Initialize rule-based audio classifier as fallback
self.rule_based_audio = PretrainedAudioClassifier()
# Initialize simple rule-based fallback
self.rule_based = RuleBasedClassifier()
# Define simple fusion strategy
self.use_adaptive_fusion = False
print("Improved Emotion Recognition Pipeline initialized successfully")
except Exception as e:
print(f"Error initializing pipeline: {e}")
print("Some functionality may be limited")
def predict(self, audio_path):
results = {
"transcription": "",
"text_emotions": {emotion: 0.0 for emotion in EMOTIONS},
"acoustic_emotions": {emotion: 0.0 for emotion in EMOTIONS},
"final_emotions": {emotion: 0.0 for emotion in EMOTIONS},
"predicted_emotion": "neutral",
"is_sarcastic": False,
"errors": []
}
# Step 1: Transcribe audio
try:
transcription = self.transcriber.transcribe(audio_path)
results["transcription"] = transcription
print(f"Transcription: {transcription}")
except Exception as e:
error_msg = f"Failed to transcribe audio: {e}"
print(error_msg)
results["errors"].append(error_msg)
results["transcription"] = "Error: Could not transcribe audio"
# Step 2: Analyze text emotions
try:
if results["transcription"].startswith("Error:"):
# Skip text analysis if transcription failed
text_emotions = np.ones(len(EMOTIONS)) / len(EMOTIONS) # Equal probabilities
sarcasm_indicators = np.array([0.33, 0.33, 0.33])
# Try rule-based as fallback
rule_emotion, rule_probs = self.rule_based.predict(results["transcription"])
results["text_emotions"] = rule_probs
else:
text_emotions = self.text_classifier.predict_emotion(results["transcription"])
sarcasm_indicators = self.text_classifier.detect_sarcasm(results["transcription"])
# Format text emotions result
results["text_emotions"] = {EMOTIONS[i]: float(text_emotions[i])
for i in range(min(len(text_emotions), len(EMOTIONS)))}
print(f"Text-based emotions: {results['text_emotions']}")
except Exception as e:
error_msg = f"Failed to analyze text emotions: {e}"
print(error_msg)
results["errors"].append(error_msg)
# Use equal probabilities as fallback
results["text_emotions"] = {emotion: 1.0/len(EMOTIONS) for emotion in EMOTIONS}
# Step 3: Use rule-based audio classifier instead of the untrained model
try:
# Get predictions from rule-based classifier
audio_probs = self.rule_based_audio.predict(audio_path)
# Format acoustic emotions result
results["acoustic_emotions"] = {EMOTIONS[i]: float(audio_probs[i])
for i in range(min(len(audio_probs), len(EMOTIONS)))}
print(f"Acoustic-based emotions: {results['acoustic_emotions']}")
except Exception as e:
error_msg = f"Failed to predict acoustic emotions: {e}"
print(error_msg)
results["errors"].append(error_msg)
# Use equal probabilities as fallback
results["acoustic_emotions"] = {emotion: 1.0/len(EMOTIONS) for emotion in EMOTIONS}
audio_probs = np.ones(len(EMOTIONS)) / len(EMOTIONS)
# Step 4: Improved fusion strategy - text-biased weighted average
try:
# Convert dictionaries to arrays
text_array = np.array(list(results["text_emotions"].values()))
audio_array = np.array(list(results["acoustic_emotions"].values()))
# Calculate confidence scores
text_confidence = 1.0 - np.std(text_array) # Higher confidence if distribution is more certain
audio_confidence = 1.0 - np.std(audio_array)
# Bias toward text model since it's working better
text_confidence *= 1.5 # Increase text confidence
# Normalize confidences
total_confidence = text_confidence + audio_confidence
text_weight = text_confidence / total_confidence
audio_weight = audio_confidence / total_confidence
# Weighted average
final_probs = (text_weight * text_array) + (audio_weight * audio_array)
# Format final emotions
results["final_emotions"] = {EMOTIONS[i]: float(final_probs[i])
for i in range(len(EMOTIONS))}
print(f"Fusion weights: Text={text_weight:.2f}, Audio={audio_weight:.2f}")
except Exception as e:
error_msg = f"Failed to fuse predictions: {e}"
print(error_msg)
results["errors"].append(error_msg)
# Fallback to text-only predictions since they're more reliable
results["final_emotions"] = results["text_emotions"]
# Get predicted emotion
try:
emotion_values = list(results["final_emotions"].values())
emotion_idx = np.argmax(emotion_values)
predicted_emotion = EMOTIONS[emotion_idx]
results["predicted_emotion"] = predicted_emotion
# Check for sarcasm
is_sarcastic = False
if hasattr(sarcasm_indicators, "__len__") and len(sarcasm_indicators) > 0:
if predicted_emotion in ["happy", "neutral"] and np.argmax(sarcasm_indicators) == 0:
is_sarcastic = True
results["predicted_emotion"] = "sarcastic"
results["is_sarcastic"] = is_sarcastic
except Exception as e:
error_msg = f"Failed to determine final emotion: {e}"
print(error_msg)
results["errors"].append(error_msg)
results["predicted_emotion"] = "neutral" # Default fallback
return results
# Part 12: Example on sample audio (with better error handling)
def demo_on_sample_audio(pipeline, audio_path):
if not os.path.exists(audio_path):
print(f"Error: Audio file not found at {audio_path}")
return
print(f"Analyzing audio file: {audio_path}")
try:
# Predict emotion from audio
result = pipeline.predict(audio_path)
# Print results
print("\n===== EMOTION ANALYSIS RESULTS =====")
print(f"Transcription: {result['transcription']}")
print(f"\nPredicted Emotion: {result['predicted_emotion'].upper()}")
print(f"Is Sarcastic: {'Yes' if result['is_sarcastic'] else 'No'}")
print("\nText-based Emotions:")
for emotion, score in result['text_emotions'].items():
print(f" {emotion}: {score:.4f}")
print("\nAcoustic-based Emotions:")
for emotion, score in result['acoustic_emotions'].items():
print(f" {emotion}: {score:.4f}")
print("\nFinal Fusion Emotions:")
for emotion, score in result['final_emotions'].items():
print(f" {emotion}: {score:.4f}")
if 'errors' in result and result['errors']:
print("\nErrors encountered:")
for error in result['errors']:
print(f" - {error}")
# Plot results for visualization
try:
emotions = list(result['text_emotions'].keys())
text_scores = list(result['text_emotions'].values())
acoustic_scores = list(result['acoustic_emotions'].values())
final_scores = list(result['final_emotions'].values())
plt.figure(figsize=(12, 6))
x = np.arange(len(emotions))
width = 0.25
plt.bar(x - width, text_scores, width, label='Text')
plt.bar(x, acoustic_scores, width, label='Acoustic')
plt.bar(x + width, final_scores, width, label='Final')
plt.xlabel('Emotions')
plt.ylabel('Probability')
plt.title('Emotion Prediction Results')
plt.xticks(x, emotions, rotation=45)
plt.legend()
plt.tight_layout()
plt.show()
except Exception as e:
print(f"Error creating visualization: {e}")
except Exception as e:
print(f"Error in demo: {e}")
# Part 13: Simplified dataset loading for RAVDESS dataset
def load_ravdess_sample():
"""
Download a small sample from RAVDESS dataset for testing
"""
# Create directory for sample data
sample_dir = "./sample_data"
os.makedirs(sample_dir, exist_ok=True)
# Try to download a sample file
try:
import urllib.request
# Example file from RAVDESS dataset (happy emotion)
url = "https://zenodo.org/record/1188976/files/Audio_Speech_Actors_01-24/Actor_01/03-01-01-01-01-01-01.wav"
sample_path = os.path.join(sample_dir, "sample_happy.wav")
if not os.path.exists(sample_path):
print(f"Downloading sample audio file from RAVDESS dataset...")
urllib.request.urlretrieve(url, sample_path)
print(f"Downloaded sample to {sample_path}")
else:
print(f"Sample file already exists at {sample_path}")
return sample_path
except Exception as e:
print(f"Error downloading RAVDESS sample: {e}")
return None
# Part 14: Simplified main function with proper error handling
def main():
print("Starting Tone Classification System...")
try:
# Create the pipeline
pipeline = EmotionRecognitionPipeline()
# Try to load a sample file
sample_audio_path = load_ravdess_sample()
if sample_audio_path and os.path.exists(sample_audio_path):
demo_on_sample_audio(pipeline, sample_audio_path)
else:
print("\nNo sample audio file available.")
print("To use the system, provide an audio file path when calling the demo_on_sample_audio function:")
print("\ndemo_on_sample_audio(pipeline, '/path/to/your/audio.wav')")
except Exception as e:
print(f"Error in main execution: {e}")
print("\nTroubleshooting tips:")
print("1. Check if your audio file exists and is in a supported format (WAV recommended)")
print("2. Ensure you have sufficient memory for model loading")
print("3. Try with a smaller model size in WhisperTranscriber (tiny instead of base)")
print("4. Make sure you have stable internet connection for model downloading")
if __name__ == "__main__":
main()
# Add this after the main() function definition but before the if __name__ == "__main__": line
def upload_and_analyze():
from IPython.display import display
import ipywidgets as widgets
# Create upload widget
upload_widget = widgets.FileUpload(
accept='.wav, .mp3',
multiple=False,
description='Upload Audio File',
button_style='primary'
)
display(upload_widget)
# Create button to trigger analysis
analyze_button = widgets.Button(description='Analyze Audio')
display(analyze_button)
# Create output area for results
output = widgets.Output()
display(output)
def on_analyze_click(b):
with output:
output.clear_output()
if not upload_widget.value:
print("Please upload an audio file first.")
return
# Get the uploaded file
file_data = next(iter(upload_widget.value.values()))
file_name = next(iter(upload_widget.value.keys()))
# Save to temp file
temp_file = f"./temp_{file_name}"
with open(temp_file, 'wb') as f:
f.write(file_data['content'])
print(f"Analyzing uploaded file: {file_name}")
# Create pipeline and analyze
pipeline = EmotionRecognitionPipeline()
demo_on_sample_audio(pipeline, temp_file)
analyze_button.on_click(on_analyze_click)
# Then modify the if __name__ == "__main__": section
if __name__ == "__main__":
try:
import ipywidgets
# If ipywidgets is available, we're in a notebook
print("Running in notebook mode - use the upload widget below:")
upload_and_analyze()
except ImportError:
# Otherwise, run the standard main function
main()
import os
import numpy as np
import torch
import matplotlib.pyplot as plt
import gradio as gr
from io import BytesIO
# Use the existing EmotionRecognitionPipeline class from your code
def analyze_audio(audio_path):
"""
Analyze an audio file and return the emotion recognition results
"""
if audio_path is None:
return "Please provide an audio file.", None, None
try:
# Create the pipeline
pipeline = EmotionRecognitionPipeline()
# Predict emotion from audio
result = pipeline.predict(audio_path)
# Format the results for display
transcription = result['transcription']
predicted_emotion = result['predicted_emotion'].upper()
is_sarcastic = 'Yes' if result['is_sarcastic'] else 'No'
# Create text summary
summary = f"Transcription: {transcription}\n\n"
summary += f"Predicted Emotion: {predicted_emotion}\n"
summary += f"Is Sarcastic: {is_sarcastic}\n\n"
summary += "Text-based Emotions:\n"
for emotion, score in result['text_emotions'].items():
summary += f" {emotion}: {score:.4f}\n"
summary += "\nAcoustic-based Emotions:\n"
for emotion, score in result['acoustic_emotions'].items():
summary += f" {emotion}: {score:.4f}\n"
summary += "\nFinal Fusion Emotions:\n"
for emotion, score in result['final_emotions'].items():
summary += f" {emotion}: {score:.4f}\n"
if 'errors' in result and result['errors']:
summary += "\nErrors encountered:\n"
for error in result['errors']:
summary += f" - {error}\n"
# Create visualization
fig = create_emotion_plot(result)
return summary, fig, result['predicted_emotion']
except Exception as e:
return f"Error analyzing audio: {str(e)}", None, "error"
def create_emotion_plot(result):
"""
Create a visualization of the emotion recognition results
"""
emotions = list(result['text_emotions'].keys())
text_scores = list(result['text_emotions'].values())
acoustic_scores = list(result['acoustic_emotions'].values())
final_scores = list(result['final_emotions'].values())
fig = plt.figure(figsize=(10, 6))
x = np.arange(len(emotions))
width = 0.25
plt.bar(x - width, text_scores, width, label='Text')
plt.bar(x, acoustic_scores, width, label='Acoustic')
plt.bar(x + width, final_scores, width, label='Final')
plt.xlabel('Emotions')
plt.ylabel('Probability')
plt.title('Emotion Recognition Results')
plt.xticks(x, emotions, rotation=45)
plt.legend()
plt.tight_layout()
return fig
# Create the Gradio interface with tabs for microphone and file upload
def create_gradio_interface():
with gr.Blocks(title="Tone Classification System") as demo:
gr.Markdown("# Tone Classification System")
gr.Markdown("This system analyzes audio to detect emotions, including sarcasm and figures of speech.")
with gr.Tabs():
with gr.TabItem("Microphone Input"):
with gr.Row():
with gr.Column():
audio_input = gr.Audio(
sources=["microphone"],
type="filepath",
label="Record your voice"
)
analyze_btn = gr.Button("Analyze Recording", variant="primary")
with gr.Column():
result_text = gr.Textbox(label="Analysis Results", lines=15)
emotion_plot = gr.Plot(label="Emotion Probabilities")
emotion_label = gr.Label(label="Detected Emotion")
analyze_btn.click(
fn=analyze_audio,
inputs=audio_input,
outputs=[result_text, emotion_plot, emotion_label]
)
with gr.TabItem("File Upload"):
with gr.Row():
with gr.Column():
file_input = gr.Audio(
sources=["upload"],
type="filepath",
label="Upload audio file (.wav, .mp3)"
)
file_analyze_btn = gr.Button("Analyze File", variant="primary")
with gr.Column():
file_result_text = gr.Textbox(label="Analysis Results", lines=15)
file_emotion_plot = gr.Plot(label="Emotion Probabilities")
file_emotion_label = gr.Label(label="Detected Emotion")
file_analyze_btn.click(
fn=analyze_audio,
inputs=file_input,
outputs=[file_result_text, file_emotion_plot, file_emotion_label]
)
gr.Markdown("## How to Use")
gr.Markdown("""
1. **Microphone Input**: Record your voice and click 'Analyze Recording'
2. **File Upload**: Upload an audio file (.wav or .mp3) and click 'Analyze File'
The system will transcribe the speech, analyze emotions from both text and acoustic features,
and display the results with a visualization of emotion probabilities.
""")
gr.Markdown("## About")
gr.Markdown("""
This tone classification system combines text and acoustic features to detect emotions in speech.
It uses a multi-modal approach with:
- Speech-to-text transcription
- Text-based emotion analysis
- Acoustic feature extraction
- Fusion of both modalities for final prediction
The system can detect: neutral, happy, sad, angry, fearful, disgust, surprised, and sarcastic tones.
""")
return demo
# Main function to launch the Gradio interface
def main():
demo = create_gradio_interface()
demo.launch()
if __name__ == "__main__":
main()