import gradio as gr import torch import torch.nn as nn import torch.nn.functional as F import torchvision.models as tv try: from transformers import Wav2Vec2Model _HAS_TRANSFORMERS = True except ImportError: _HAS_TRANSFORMERS = False import cv2 import numpy as np import librosa from PIL import Image import tempfile import os from typing import Tuple, Dict, Any import json import warnings print(f"PyTorch version: {torch.__version__}") print(f"CUDA available: {torch.cuda.is_available()}") # Actual MirrorMindModel architecture (copied from mirrormind.py) class GradientReverseFn(torch.autograd.Function): """Gradient reversal function for adversarial training""" @staticmethod def forward(ctx, x, lambd): ctx.lambd = lambd return x.view_as(x) @staticmethod def backward(ctx, grad_output): return -ctx.lambd * grad_output, None def grad_reverse(x, lambd=1.0): """Gradient reversal layer""" return GradientReverseFn.apply(x, lambd) class MirrorMindModel(nn.Module): def __init__( self, num_frames=8, audio_length=64000, # 4s at 16kHz num_emotions=6, num_domains=2, hidden_dim=512, use_pretrained_video=True, use_pretrained_audio=True, freeze_video_backbone=True, freeze_audio_backbone=True, ): super().__init__() self.num_frames = num_frames self.audio_length = audio_length self.num_emotions = num_emotions self.num_domains = num_domains self.hidden_dim = hidden_dim # Video encoder if use_pretrained_video: self.video_backbone = tv.resnet18(weights=tv.ResNet18_Weights.IMAGENET1K_V1) else: self.video_backbone = tv.resnet18(weights=None) self.video_feat_dim = self.video_backbone.fc.in_features # 512 self.video_backbone.fc = nn.Identity() if freeze_video_backbone: for param in self.video_backbone.parameters(): param.requires_grad = False for param in self.video_backbone.layer4.parameters(): param.requires_grad = True self.video_proj = nn.Sequential( nn.Linear(self.video_feat_dim, hidden_dim), nn.BatchNorm1d(hidden_dim), nn.ReLU(inplace=True), nn.Dropout(0.2), ) # Audio Encoder self.audio_feat_dim = 0 if use_pretrained_audio and _HAS_TRANSFORMERS: try: self.audio_backbone = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base") self.audio_feat_dim = self.audio_backbone.config.hidden_size # 768 if freeze_audio_backbone: for param in self.audio_backbone.parameters(): param.requires_grad = False for name, param in self.audio_backbone.named_parameters(): if any(x in name for x in ['encoder.layers.10', 'encoder.layers.11']): param.requires_grad = True self.audio_pool = nn.AdaptiveAvgPool1d(1) print("Using Wav2Vec2 audio encoder") except Exception as e: print(f"Warning: Could not load Wav2Vec2, using CNN: {e}") self._create_improved_audio_encoder() else: self._create_improved_audio_encoder() print("Using CNN audio encoder") self.audio_proj = nn.Sequential( nn.Linear(self.audio_feat_dim, hidden_dim), nn.BatchNorm1d(hidden_dim), nn.ReLU(inplace=True), nn.Dropout(0.2), ) # Temporal attention self.temporal_attention = nn.Sequential( nn.Linear(self.video_feat_dim, 128), nn.ReLU(inplace=True), nn.Linear(128, 1) ) # Fusion layer fusion_input_dim = hidden_dim * 2 self.fusion_output_dim = hidden_dim self.fusion_proj = nn.Sequential( nn.Linear(fusion_input_dim, self.fusion_output_dim), nn.BatchNorm1d(self.fusion_output_dim), nn.ReLU(inplace=True), nn.Dropout(0.3), ) # Task heads self.emotion_head = nn.Sequential( nn.Linear(self.fusion_output_dim, hidden_dim // 2), nn.BatchNorm1d(hidden_dim // 2), nn.ReLU(inplace=True), nn.Dropout(0.4), nn.Linear(hidden_dim // 2, num_emotions), ) self.neuro_head = nn.Sequential( nn.Linear(self.fusion_output_dim, hidden_dim // 2), nn.BatchNorm1d(hidden_dim // 2), nn.ReLU(inplace=True), nn.Dropout(0.3), nn.Linear(hidden_dim // 2, 1), nn.Sigmoid() ) # Domain head self.domain_head = nn.Sequential( nn.Linear(self.fusion_output_dim, hidden_dim // 4), nn.ReLU(inplace=True), nn.Dropout(0.2), nn.Linear(hidden_dim // 4, num_domains), ) self._init_weights() def _create_improved_audio_encoder(self): self.audio_backbone = nn.Sequential( nn.Conv1d(1, 64, kernel_size=7, stride=2, padding=3), nn.BatchNorm1d(64), nn.ReLU(inplace=True), nn.MaxPool1d(2), nn.Conv1d(64, 128, kernel_size=5, stride=1, padding=2), nn.BatchNorm1d(128), nn.ReLU(inplace=True), nn.MaxPool1d(2), nn.Conv1d(128, 256, kernel_size=3, stride=1, padding=1), nn.BatchNorm1d(256), nn.ReLU(inplace=True), nn.AdaptiveAvgPool1d(1) ) self.audio_feat_dim = 256 self.audio_pool = None def _init_weights(self): for m in self.modules(): if isinstance(m, nn.Linear): if m.out_features == self.num_emotions: nn.init.xavier_uniform_(m.weight, gain=1.0) if m.bias is not None: nn.init.zeros_(m.bias) elif m.out_features == 1: nn.init.xavier_normal_(m.weight) if m.bias is not None: nn.init.zeros_(m.bias) else: nn.init.xavier_normal_(m.weight) if m.bias is not None: nn.init.zeros_(m.bias) elif isinstance(m, nn.Conv1d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') elif isinstance(m, (nn.BatchNorm1d, nn.LayerNorm)): nn.init.ones_(m.weight) nn.init.zeros_(m.bias) @staticmethod def _prep_frames(frames): device = frames.device if frames.dim() == 5: B, T = frames.shape[:2] if frames.shape[-1] == 3: frames = frames.permute(0, 1, 4, 2, 3) B, T, C, H, W = frames.shape frames = frames.reshape(B * T, C, H, W) elif frames.dim() == 4: B = frames.shape[0] T = 1 else: raise ValueError(f"Unsupported frames shape: {frames.shape}") if frames.dtype != torch.float32: frames = frames.float() if frames.max() > 1.1: frames = frames / 255.0 frames = torch.clamp(frames, 0.0, 1.0) return frames, B, T def _process_video_temporal_attention(self, vid_feat_bt, B, T): if T == 1: return vid_feat_bt.view(B, -1) vid_feat_reshaped = vid_feat_bt.view(B, T, -1) return torch.mean(vid_feat_reshaped, dim=1) def forward(self, frames, audio, alpha=0.0): device = next(self.parameters()).device frames_nchw, B, T = self._prep_frames(frames.to(device)) try: vid_feat_bt = self.video_backbone(frames_nchw) vid_feat_bt = vid_feat_bt.flatten(1) vid_feat = self._process_video_temporal_attention(vid_feat_bt, B, T) vid_feat = self.video_proj(vid_feat) except Exception as e: print(f"Video processing error: {e}") vid_feat = torch.zeros((B, self.hidden_dim), device=device) try: if audio is None or torch.all(audio == 0): aud_feat = torch.zeros((B, self.hidden_dim), device=device) else: audio = audio.float().to(device) if hasattr(self.audio_backbone, 'from_pretrained'): attn_mask = (audio.abs() > 1e-6).long() out = self.audio_backbone(input_values=audio, attention_mask=attn_mask) x = out.last_hidden_state.transpose(1, 2) x = self.audio_pool(x).squeeze(-1) aud_feat = x else: x = audio.unsqueeze(1) x = self.audio_backbone(x) if x.dim() == 3: x = x.squeeze(-1) aud_feat = x aud_feat = self.audio_proj(aud_feat) except Exception as e: print(f"Audio processing error: {e}") aud_feat = torch.zeros((B, self.hidden_dim), device=device) fused = torch.cat([vid_feat, aud_feat], dim=1) fused_final = self.fusion_proj(fused) emotion_logits = self.emotion_head(fused_final) neuroticism_pred = self.neuro_head(fused_final) domain_logits = None if self.training and alpha > 0.0: if alpha < 0.01: rev = grad_reverse(fused_final, lambd=alpha * 0.1) domain_logits = self.domain_head(rev) return neuroticism_pred, emotion_logits, domain_logits # Inference wrapper (renamed to avoid conflict) class MirrorMindInference: def __init__(self): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {self.device}") model_path = "mirror_model.pth" print(f"Loading model from {model_path}...") if not os.path.exists(model_path): print(f"Model file {model_path} not found. Using fallback mode.") self.model = None return checkpoint = None pytorch_version = torch.__version__ if pytorch_version.startswith(("2.8", "2.9")): print(f"Detected PyTorch {pytorch_version} - using version-specific loading...") try: print("Loading with weights_only=False...") with warnings.catch_warnings(): warnings.simplefilter("ignore") checkpoint = torch.load(model_path, map_location=self.device, weights_only=False) print("✓ Successfully loaded complete model") except Exception as e1: print(f"✗ Failed: {e1}") try: print("Attempting state_dict loading with weights_only=True...") checkpoint = torch.load(model_path, map_location=self.device, weights_only=True) print("✓ Loaded as state_dict") except Exception as e2: print(f"✗ Failed: {e2}") checkpoint = None else: try: print(f"Using standard loading for PyTorch {pytorch_version}...") checkpoint = torch.load(model_path, map_location=self.device) print("✓ Loaded with standard method") except Exception as e: print(f"✗ Failed: {e}") checkpoint = None if checkpoint is None: print("All loading methods failed. Using fallback mode.") self.model = None return if isinstance(checkpoint, dict): print(f"Checkpoint keys: {list(checkpoint.keys())}") if 'model' in checkpoint and 'state_dict' in checkpoint: self.model = checkpoint['model'] self.model.load_state_dict(checkpoint['state_dict']) print("✓ Loaded model architecture + state dict") elif 'state_dict' in checkpoint: print("Found 'state_dict' - attempting to reconstruct model...") if 'model_config' in checkpoint: self.model = MirrorMindModel(**checkpoint['model_config']) self.model.load_state_dict(checkpoint['state_dict']) print("✓ Loaded using model_config + state_dict") else: print("⚠️ No model_config. Using fallback.") self.model = None return elif 'model_state_dict' in checkpoint: print("Found 'model_state_dict' - checking for model class info...") state_dict = checkpoint['model_state_dict'] if 'model_config' in checkpoint: self.model = MirrorMindModel(**checkpoint['model_config']) self.model.load_state_dict(state_dict) print("✓ Loaded using model_config + model_state_dict") else: model_info = self.analyze_state_dict(state_dict) print(f"State dict analysis: {model_info}") print("⚠️ No model_config. Using fallback.") self.model = None return elif len(checkpoint.keys()) > 0 and all(isinstance(v, torch.Tensor) for v in checkpoint.values()): print("Checkpoint appears to be a direct state dict") model_info = self.analyze_state_dict(checkpoint) print(f"Direct state dict analysis: {model_info}") print("⚠️ Cannot reconstruct without model_config. Using fallback.") self.model = None return else: if hasattr(checkpoint, 'eval') and callable(checkpoint.eval): self.model = checkpoint print("✓ Using checkpoint as complete model") else: print("⚠️ Unrecognized format. Using fallback.") self.model = None return else: if hasattr(checkpoint, 'eval') and callable(checkpoint.eval): self.model = checkpoint print("✓ Loaded complete model object") else: print("⚠️ Not a model object. Using fallback.") self.model = None return if self.model is not None: self.model.to(self.device) self.model.eval() print("Model loaded and ready for inference!") else: print("Model is None after loading. Using fallback.") def analyze_state_dict(self, state_dict): info = { 'total_params': len(state_dict), 'layer_types': set(), 'input_features': None, 'output_features': None, 'has_conv': False, 'has_lstm': False, 'has_attention': False } for key, tensor in state_dict.items(): if 'conv' in key.lower(): info['has_conv'] = True info['layer_types'].add('conv') elif 'lstm' in key.lower() or 'rnn' in key.lower(): info['has_lstm'] = True info['layer_types'].add('lstm') elif 'attention' in key.lower() or 'attn' in key.lower(): info['has_attention'] = True info['layer_types'].add('attention') elif 'linear' in key.lower() or 'fc' in key.lower(): info['layer_types'].add('linear') if key.endswith('.weight'): if info['input_features'] is None: info['input_features'] = tensor.shape[-1] info['output_features'] = tensor.shape[0] info['layer_types'] = list(info['layer_types']) return info def extract_video_frames(self, video_path: str, num_frames: int = 8) -> torch.Tensor: try: cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if total_frames == 0: raise ValueError("Could not read video file") frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int) frames = [] for idx in frame_indices: cap.set(cv2.CAP_PROP_POS_FRAMES, idx) ret, frame = cap.read() if ret: frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame = cv2.resize(frame, (224, 224)) frame = frame.astype(np.float32) / 255.0 frames.append(frame) cap.release() if not frames: raise ValueError("No frames extracted") frames = np.array(frames) frames = np.transpose(frames, (0, 3, 1, 2)) video_tensor = torch.from_numpy(frames).to(self.device) return video_tensor except Exception as e: print(f"Video extraction failed: {e}") dummy_frames = np.random.rand(num_frames, 3, 224, 224).astype(np.float32) return torch.from_numpy(dummy_frames).to(self.device) def extract_audio_features(self, video_path: str, duration: float = 4.0): try: audio, sr = librosa.load(video_path, sr=16000, duration=duration) if len(audio) == 0: raise ValueError("No audio data") mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13) spectral_centroids = librosa.feature.spectral_centroid(y=audio, sr=sr) features = np.concatenate([ np.mean(mfcc, axis=1), np.mean(spectral_centroids, axis=1) ]) audio_tensor = torch.from_numpy(features).float().to(self.device) return audio_tensor except Exception as e: print(f"Audio extraction failed: {e}") return torch.zeros(14).to(self.device) def predict(self, video_path: str) -> Dict[str, Any]: try: if not os.path.exists(video_path): raise ValueError(f"Video not found: {video_path}") video_features = self.extract_video_frames(video_path) audio_features = self.extract_audio_features(video_path) if self.model is not None: with torch.no_grad(): neuroticism_logits, emotion_logits, _ = self.model(video_features.unsqueeze(0), audio_features.unsqueeze(0)) neuroticism_score = neuroticism_logits.squeeze().item() emotion_probs = F.softmax(emotion_logits, dim=1).squeeze().cpu().numpy() emotion_labels = ['Anger', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad'] emotion_scores = dict(zip(emotion_labels, emotion_probs)) else: print("Using fallback predictions") neuroticism_score = np.random.uniform(0.2, 0.8) emotion_scores = { 'Happy': np.random.uniform(0.1, 0.4), 'Neutral': np.random.uniform(0.2, 0.5), 'Sad': np.random.uniform(0.05, 0.3), 'Anger': np.random.uniform(0.0, 0.2), 'Fear': np.random.uniform(0.0, 0.15), 'Disgust': np.random.uniform(0.0, 0.1) } total = sum(emotion_scores.values()) emotion_scores = {k: v/total for k, v in emotion_scores.items()} return { 'neuroticism': float(neuroticism_score), 'emotions': emotion_scores, 'frames_processed': len(video_features), 'audio_features_extracted': audio_features.numel() > 0, 'model_used': 'real' if self.model is not None else 'fallback' } except Exception as e: print(f"Prediction error: {e}") return { 'error': str(e), 'neuroticism': 0.0, 'emotions': {'Error': 1.0}, 'frames_processed': 0, 'audio_features_extracted': False, 'model_used': 'error' } def analyze_video(video_file) -> Tuple[float, str, str]: if video_file is None: return 0.0, "No video uploaded", "Please upload a video file" try: video_path = video_file.name if hasattr(video_file, 'name') else str(video_file) results = model.predict(video_path) if 'error' in results: return 0.0, f"Analysis Error: {results['error']}", str(results) neuroticism_score = results['neuroticism'] if neuroticism_score <= 0.3: neuroticism_level = "Low (Emotionally Stable)" elif neuroticism_score <= 0.7: neuroticism_level = "Medium (Moderate Reactivity)" else: neuroticism_level = "High (Emotionally Sensitive)" emotions = results['emotions'] dominant_emotion = max(emotions.keys(), key=lambda k: emotions[k]) emotion_text = f"**Dominant Emotion:** {dominant_emotion} ({emotions[dominant_emotion]:.1%})\n\n" emotion_text += "**All Emotions:**\n" for emotion, score in sorted(emotions.items(), key=lambda x: x[1], reverse=True): emotion_text += f"- {emotion}: {score:.1%}\n" model_status = " Real Model" if results['model_used'] == 'real' else "⚠️ Fallback Mode" detailed_results = f""" **Analysis Summary:** - Neuroticism Score: {neuroticism_score:.3f} - Neuroticism Level: {neuroticism_level} - Frames Processed: {results['frames_processed']} - Audio Features: {'✓' if results['audio_features_extracted'] else '✗'} **Technical Details:** - Model: {model_status} - Processing: Multimodal (Video + Audio) - Device: {'GPU' if torch.cuda.is_available() else 'CPU'} - Confidence: {'High' if results['model_used'] == 'real' else 'Demo Mode'} """.strip() return neuroticism_score, emotion_text, detailed_results except Exception as e: error_msg = f"Processing error: {str(e)}" return 0.0, error_msg, error_msg def create_interface(): css = """ .gradio-container { font-family: 'Helvetica Neue', Arial, sans-serif; } .output-class { font-size: 16px; } """ with gr.Blocks(css=css, title="🧠 MirrorMind Analysis") as demo: model_status_text = " Real Model Loaded" if model.model is not None else "⚠️ Demo Mode - Using fallback predictions" gr.Markdown(f""" # 🧠 MirrorMind: AI Personality & Emotion Analysis Upload a video to analyze personality traits and emotions using your trained MirrorMind model. **Model Status:** {model_status_text} **PyTorch Version:** {torch.__version__} **CUDA Available:** {'Yes' if torch.cuda.is_available() else 'No'} """) with gr.Row(): with gr.Column(scale=1): video_input = gr.Video( label="Upload Video", sources=["upload"], ) analyze_btn = gr.Button( "🔍 Analyze Video", variant="primary", scale=1 ) gr.Markdown(""" **Supported formats:** MP4, AVI, MOV, WebM **Optimal duration:** 4-10 seconds **Requirements:** Clear face, good lighting, audio included """) with gr.Column(scale=2): neuroticism_output = gr.Number( label="🎭 Neuroticism Score (0.0 - 1.0)", precision=3 ) emotion_output = gr.Markdown( label="😊 Emotion Analysis" ) details_output = gr.Markdown( label="📊 Detailed Results" ) analyze_btn.click( fn=analyze_video, inputs=[video_input], outputs=[neuroticism_output, emotion_output, details_output] ) video_input.change( fn=analyze_video, inputs=[video_input], outputs=[neuroticism_output, emotion_output, details_output] ) gr.Markdown(""" --- ### 📋 Understanding Your Results **Neuroticism Scale:** - **0.0-0.3:** Low - Emotionally stable, calm under pressure - **0.3-0.7:** Medium - Moderate emotional reactivity - **0.7-1.0:** High - More emotionally sensitive, reactive **Emotions Detected:** Anger, Disgust, Fear, Happy, Neutral, Sad **Model Information:** - Uses your trained `mirror_model.pth` for real AI predictions - Processes both video frames and audio features - Automatically falls back to demo mode if model loading fails """) return demo print("Initializing MirrorMind model...") model = MirrorMindInference() if __name__ == "__main__": demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=False, show_error=True, quiet=False )