Spaces:
Running
Running
import streamlit as st | |
import cv2 | |
import numpy as np | |
import time | |
import plotly.graph_objects as go | |
from transformers import pipeline | |
from PIL import Image | |
import torch | |
from collections import deque | |
import os | |
import tempfile | |
# Set page config | |
st.set_page_config( | |
page_title="Emotion Detection", | |
page_icon="😀", | |
layout="wide" | |
) | |
# --- App Title and Description --- | |
st.title("Emotion Detection") | |
st.write(""" | |
This app detects emotions in real-time using webcam, video files, or images. | |
If your webcam isn't working, try the simulation mode or upload a video file. | |
""") | |
# --- Load Models --- | |
def load_emotion_detector(model_name="dima806/facial_emotions_image_detection"): | |
"""Load the emotion detection model.""" | |
with st.spinner(f"Loading emotion detection model ({model_name})..."): | |
classifier = pipeline("image-classification", model=model_name) | |
return classifier | |
def load_face_detector(): | |
"""Load the face detector model.""" | |
with st.spinner("Loading face detection model..."): | |
# Load OpenCV's face detector | |
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
return face_cascade | |
# --- Sidebar: Model and Settings --- | |
st.sidebar.header("Settings") | |
# Model selection | |
model_options = { | |
"Facial Emotions (Default)": "dima806/facial_emotions_image_detection", | |
"Facial Expressions": "juliensimon/distilbert-emotion" | |
} | |
selected_model = st.sidebar.selectbox( | |
"Choose Emotion Model", | |
list(model_options.keys()) | |
) | |
# Input method selection with addition of video upload and simulation | |
input_method = st.sidebar.radio( | |
"Choose Input Method", | |
["Real-time Webcam", "Upload an Image", "Capture Image"] | |
) | |
# Confidence threshold | |
confidence_threshold = st.sidebar.slider( | |
"Confidence Threshold", | |
min_value=0.0, | |
max_value=1.0, | |
value=0.5, | |
step=0.05 | |
) | |
# Face detection toggle | |
use_face_detection = st.sidebar.checkbox("Enable Face Detection", value=True) | |
# History length for real-time tracking | |
if input_method in ["Real-time Webcam", "Upload Video", "Simulation Mode"]: | |
history_length = st.sidebar.slider( | |
"Emotion History Length (seconds)", | |
min_value=5, | |
max_value=60, | |
value=10, | |
step=5 | |
) | |
# Load the selected model | |
classifier = load_emotion_detector(model_options[selected_model]) | |
face_detector = load_face_detector() | |
# --- Utility Functions --- | |
def detect_faces(image): | |
"""Detect faces in an image using OpenCV.""" | |
# Convert PIL Image to OpenCV format | |
if isinstance(image, Image.Image): | |
opencv_image = np.array(image) | |
opencv_image = opencv_image[:, :, ::-1].copy() # Convert RGB to BGR | |
else: | |
opencv_image = image | |
# Convert to grayscale for face detection | |
gray = cv2.cvtColor(opencv_image, cv2.COLOR_BGR2GRAY) | |
# Detect faces | |
faces = face_detector.detectMultiScale( | |
gray, | |
scaleFactor=1.1, | |
minNeighbors=5, | |
minSize=(30, 30) | |
) | |
return faces, opencv_image | |
def process_image_for_emotion(image, face=None): | |
"""Process image for emotion detection.""" | |
if isinstance(image, np.ndarray): | |
# Convert OpenCV image to PIL | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
image = Image.fromarray(image) | |
if face is not None: | |
# Crop to face region | |
x, y, w, h = face | |
image = image.crop((x, y, x+w, y+h)) | |
return image | |
def predict_emotion(image): | |
"""Predict emotion from an image.""" | |
try: | |
results = classifier(image) | |
return results[0] # Return top prediction | |
except Exception as e: | |
st.error(f"Error during emotion prediction: {str(e)}") | |
return {"label": "Error", "score": 0.0} | |
def draw_faces_with_emotions(image, faces, emotions): | |
"""Draw rectangles around faces and label with emotions.""" | |
img = image.copy() | |
# Define colors for different emotions (BGR format) | |
emotion_colors = { | |
"happy": (0, 255, 0), # Green | |
"sad": (255, 0, 0), # Blue | |
"neutral": (255, 255, 0), # Cyan | |
"angry": (0, 0, 255), # Red | |
"surprise": (255, 165, 0), # Orange | |
"fear": (128, 0, 128), # Purple | |
"disgust": (0, 128, 128) # Brown | |
} | |
# Default color for unknown emotions | |
default_color = (255, 255, 255) # White | |
for (x, y, w, h), emotion in zip(faces, emotions): | |
# Get color based on emotion (lowercase and remove any prefix) | |
emotion_key = emotion["label"].lower().split("_")[-1] | |
color = emotion_colors.get(emotion_key, default_color) | |
# Draw rectangle around face | |
cv2.rectangle(img, (x, y), (x+w, y+h), color, 2) | |
# Add emotion label and confidence | |
label = f"{emotion['label']} ({emotion['score']:.2f})" | |
cv2.putText(img, label, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) | |
return img | |
def generate_simulated_face(frame_num, canvas_size=(640, 480)): | |
"""Generate a simulated face with changing expressions.""" | |
# Create a blank canvas | |
canvas = np.ones((canvas_size[1], canvas_size[0], 3), dtype=np.uint8) * 230 | |
# Calculate center position and face size | |
center_x, center_y = canvas_size[0] // 2, canvas_size[1] // 2 | |
face_radius = min(canvas_size) // 4 | |
# Face movement based on frame number | |
movement_x = int(np.sin(frame_num * 0.02) * 50) | |
movement_y = int(np.cos(frame_num * 0.03) * 30) | |
face_x = center_x + movement_x | |
face_y = center_y + movement_y | |
# Draw face circle | |
cv2.circle(canvas, (face_x, face_y), face_radius, (220, 210, 180), -1) | |
# Draw eyes | |
eye_y = face_y - int(face_radius * 0.2) | |
left_eye_x = face_x - int(face_radius * 0.5) | |
right_eye_x = face_x + int(face_radius * 0.5) | |
eye_size = max(5, face_radius // 8) | |
# Blink occasionally | |
if frame_num % 50 > 45: # Blink every 50 frames for 5 frames | |
cv2.ellipse(canvas, (left_eye_x, eye_y), (eye_size, 1), 0, 0, 360, (30, 30, 30), -1) | |
cv2.ellipse(canvas, (right_eye_x, eye_y), (eye_size, 1), 0, 0, 360, (30, 30, 30), -1) | |
else: | |
cv2.circle(canvas, (left_eye_x, eye_y), eye_size, (255, 255, 255), -1) | |
cv2.circle(canvas, (right_eye_x, eye_y), eye_size, (255, 255, 255), -1) | |
cv2.circle(canvas, (left_eye_x, eye_y), eye_size-2, (70, 70, 70), -1) | |
cv2.circle(canvas, (right_eye_x, eye_y), eye_size-2, (70, 70, 70), -1) | |
# Draw mouth - change shape based on frame number to simulate different emotions | |
mouth_y = face_y + int(face_radius * 0.3) | |
mouth_width = int(face_radius * 0.6) | |
mouth_height = int(face_radius * 0.2) | |
# Cycle through different emotions based on frame number | |
emotion_cycle = (frame_num // 100) % 4 | |
if emotion_cycle == 0: # Happy | |
# Smile | |
cv2.ellipse(canvas, (face_x, mouth_y), (mouth_width, mouth_height), | |
0, 0, 180, (50, 50, 50), 2) | |
elif emotion_cycle == 1: # Sad | |
# Frown | |
cv2.ellipse(canvas, (face_x, mouth_y + mouth_height), | |
(mouth_width, mouth_height), 0, 180, 360, (50, 50, 50), 2) | |
elif emotion_cycle == 2: # Surprised | |
# O mouth | |
cv2.circle(canvas, (face_x, mouth_y), mouth_height, (50, 50, 50), 2) | |
else: # Neutral | |
# Straight line | |
cv2.line(canvas, (face_x - mouth_width//2, mouth_y), | |
(face_x + mouth_width//2, mouth_y), (50, 50, 50), 2) | |
# Add some text showing what emotion is being simulated | |
emotions = ["Happy", "Sad", "Surprised", "Neutral"] | |
cv2.putText(canvas, f"Simulating: {emotions[emotion_cycle]}", | |
(20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (50, 50, 50), 2) | |
cv2.putText(canvas, "Simulation Mode - No webcam required", | |
(20, canvas_size[1] - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 100, 100), 1) | |
return canvas | |
def process_video_feed(feed_source, is_simulation=False): | |
"""Process video feed (webcam, video file, or simulation).""" | |
# Create placeholders | |
video_placeholder = st.empty() | |
metrics_placeholder = st.empty() | |
chart_placeholder = st.empty() | |
# Initialize session state for tracking emotions over time | |
if 'emotion_history' not in st.session_state: | |
st.session_state.emotion_history = {} | |
st.session_state.last_update_time = time.time() | |
st.session_state.frame_count = 0 | |
st.session_state.simulation_frame = 0 | |
# Start/Stop button | |
start_button = st.button("Start" if 'running' not in st.session_state or not st.session_state.running else "Stop") | |
if start_button: | |
st.session_state.running = not st.session_state.get('running', False) | |
# If running, capture and process video feed | |
if st.session_state.get('running', False): | |
try: | |
# Initialize video source | |
if is_simulation: | |
# No need to open a video source for simulation | |
pass | |
else: | |
cap = feed_source | |
# Check if video source opened successfully | |
if not cap.isOpened(): | |
st.error("Could not open video source. Please check your settings.") | |
st.session_state.running = False | |
return | |
# Create deques for tracking emotions | |
emotion_deques = {} | |
timestamp_deque = deque(maxlen=30*history_length) # Store timestamps for X seconds at 30fps | |
while st.session_state.get('running', False): | |
# Get frame | |
if is_simulation: | |
# Generate a simulated frame | |
frame = generate_simulated_face(st.session_state.simulation_frame) | |
st.session_state.simulation_frame += 1 | |
ret = True | |
else: | |
# Read from video source | |
ret, frame = cap.read() | |
if not ret: | |
if is_simulation: | |
st.error("Simulation error") | |
elif input_method == "Upload Video": | |
# For video files, loop back to the beginning | |
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
continue | |
else: | |
st.error("Failed to capture frame from video source") | |
break | |
# For webcam, flip horizontally for a more natural view | |
if input_method == "Real-time Webcam" and not is_simulation: | |
frame = cv2.flip(frame, 1) | |
# Increment frame count for FPS calculation | |
st.session_state.frame_count += 1 | |
# Detect faces | |
if use_face_detection: | |
faces, _ = detect_faces(frame) | |
if len(faces) > 0: | |
# Process each face | |
emotions = [] | |
for face in faces: | |
face_img = process_image_for_emotion(frame, face) | |
emotions.append(predict_emotion(face_img)) | |
# Draw faces with emotions | |
frame = draw_faces_with_emotions(frame, faces, emotions) | |
# Update emotion history | |
current_time = time.time() | |
timestamp_deque.append(current_time) | |
for i, emotion in enumerate(emotions): | |
if emotion["score"] >= confidence_threshold: | |
face_id = f"Face {i+1}" | |
if face_id not in emotion_deques: | |
emotion_deques[face_id] = deque(maxlen=30*history_length) | |
emotion_deques[face_id].append({ | |
"emotion": emotion["label"], | |
"confidence": emotion["score"], | |
"time": current_time | |
}) | |
else: | |
# No faces detected | |
pass | |
else: | |
# Process the whole frame | |
pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
emotion = predict_emotion(pil_image) | |
# Display emotion on frame | |
cv2.putText( | |
frame, | |
f"{emotion['label']} ({emotion['score']:.2f})", | |
(10, 30), | |
cv2.FONT_HERSHEY_SIMPLEX, | |
1, | |
(0, 255, 0), | |
2 | |
) | |
# Update emotion history | |
current_time = time.time() | |
timestamp_deque.append(current_time) | |
if "Frame" not in emotion_deques: | |
emotion_deques["Frame"] = deque(maxlen=30*history_length) | |
emotion_deques["Frame"].append({ | |
"emotion": emotion["label"], | |
"confidence": emotion["score"], | |
"time": current_time | |
}) | |
# Calculate FPS | |
current_time = time.time() | |
time_diff = current_time - st.session_state.last_update_time | |
if time_diff >= 1.0: # Update every second | |
fps = st.session_state.frame_count / time_diff | |
st.session_state.last_update_time = current_time | |
st.session_state.frame_count = 0 | |
# Update metrics | |
with metrics_placeholder.container(): | |
cols = st.columns(3) | |
cols[0].metric("FPS", f"{fps:.1f}") | |
cols[1].metric("Faces Detected", len(faces) if use_face_detection else "N/A") | |
# Display the frame | |
video_placeholder.image(frame, channels="BGR", use_column_width=True) | |
# Update emotion history chart periodically | |
if len(timestamp_deque) > 0 and time_diff >= 0.5: # Update chart every 0.5 seconds | |
with chart_placeholder.container(): | |
# Create tabs for each face | |
if len(emotion_deques) > 0: | |
tabs = st.tabs(list(emotion_deques.keys())) | |
for i, (face_id, emotion_data) in enumerate(emotion_deques.items()): | |
with tabs[i]: | |
if len(emotion_data) > 0: | |
# Count occurrences of each emotion | |
emotion_counts = {} | |
for entry in emotion_data: | |
emotion = entry["emotion"] | |
if emotion not in emotion_counts: | |
emotion_counts[emotion] = 0 | |
emotion_counts[emotion] += 1 | |
# Create pie chart for emotion distribution | |
fig = go.Figure(data=[go.Pie( | |
labels=list(emotion_counts.keys()), | |
values=list(emotion_counts.values()), | |
hole=.3 | |
)]) | |
fig.update_layout(title=f"Emotion Distribution - {face_id}") | |
st.plotly_chart(fig, use_container_width=True) | |
# Create line chart for emotion confidence over time | |
emotions = list(emotion_data)[-20:] # Get the last 20 entries | |
times = [(e["time"] - emotions[0]["time"]) for e in emotions] | |
confidences = [e["confidence"] for e in emotions] | |
emotion_labels = [e["emotion"] for e in emotions] | |
fig = go.Figure() | |
fig.add_trace(go.Scatter( | |
x=times, | |
y=confidences, | |
mode='lines+markers', | |
text=emotion_labels, | |
hoverinfo='text+y' | |
)) | |
fig.update_layout( | |
title=f"Emotion Confidence Over Time - {face_id}", | |
xaxis_title="Time (seconds)", | |
yaxis_title="Confidence", | |
yaxis=dict(range=[0, 1]) | |
) | |
st.plotly_chart(fig, use_container_width=True) | |
else: | |
st.info(f"No emotion data available for {face_id} yet.") | |
else: | |
st.info("No emotion data available yet.") | |
# Control processing speed for videos and simulation | |
if input_method in ["Upload Video", "Simulation Mode"]: | |
time.sleep(0.03 / processing_speed) # Adjust delay based on processing_speed | |
# Release resources when done | |
if not is_simulation and cap.isOpened(): | |
cap.release() | |
except Exception as e: | |
st.error(f"Error during processing: {str(e)}") | |
st.session_state.running = False | |
else: | |
# Display a placeholder image when not running | |
placeholder_img = np.zeros((300, 500, 3), dtype=np.uint8) | |
cv2.putText( | |
placeholder_img, | |
"Click 'Start' to begin", | |
(80, 150), | |
cv2.FONT_HERSHEY_SIMPLEX, | |
1, | |
(255, 255, 255), | |
2 | |
) | |
video_placeholder.image(placeholder_img, channels="BGR", use_column_width=True) | |
# --- Process uploaded image --- | |
def process_static_image(image): | |
col1, col2 = st.columns(2) | |
with col1: | |
st.image(image, caption="Image", use_column_width=True) | |
# Process image | |
if use_face_detection: | |
faces, opencv_image = detect_faces(image) | |
if len(faces) > 0: | |
emotions = [] | |
for face in faces: | |
face_img = process_image_for_emotion(image, face) | |
emotions.append(predict_emotion(face_img)) | |
# Draw faces with emotions | |
result_image = draw_faces_with_emotions(opencv_image, faces, emotions) | |
with col2: | |
st.image(result_image, caption="Detected Emotions", channels="BGR", use_column_width=True) | |
# Display predictions | |
st.subheader("Detected Emotions:") | |
for i, (emotion, face) in enumerate(zip(emotions, faces)): | |
if emotion["score"] >= confidence_threshold: | |
st.write(f"Face {i+1}: **{emotion['label']}** (Confidence: {emotion['score']:.2f})") | |
# Show confidence bars | |
top_emotions = classifier(process_image_for_emotion(image, face)) | |
labels = [item["label"] for item in top_emotions] | |
scores = [item["score"] for item in top_emotions] | |
fig = go.Figure(go.Bar( | |
x=scores, | |
y=labels, | |
orientation='h' | |
)) | |
fig.update_layout( | |
title=f"Emotion Confidence - Face {i+1}", | |
xaxis_title="Confidence", | |
yaxis_title="Emotion", | |
height=300 | |
) | |
st.plotly_chart(fig, use_container_width=True) | |
else: | |
st.warning("No faces detected in the image. Try another image or disable face detection.") | |
else: | |
# Process the whole image | |
prediction = predict_emotion(image) | |
st.subheader("Prediction:") | |
st.write(f"**Emotion:** {prediction['label']}") | |
st.write(f"**Confidence:** {prediction['score']:.2f}") | |
# --- Main App Logic --- | |
if input_method == "Upload an Image": | |
uploaded_file = st.file_uploader("Choose an image file", type=["jpg", "jpeg", "png"]) | |
if uploaded_file is not None: | |
# Load and display image | |
image = Image.open(uploaded_file).convert("RGB") | |
process_static_image(image) | |
elif input_method == "Capture Image": | |
picture = st.camera_input("Capture an Image") | |
if picture is not None: | |
image = Image.open(picture).convert("RGB") | |
process_static_image(image) | |
elif input_method == "Upload Video": | |
uploaded_video = st.file_uploader("Upload a video file", type=["mp4", "avi", "mov", "mkv"]) | |
if uploaded_video is not None: | |
# Save the uploaded video to a temporary file | |
tfile = tempfile.NamedTemporaryFile(delete=False) | |
tfile.write(uploaded_video.read()) | |
# Open the video file | |
cap = cv2.VideoCapture(tfile.name) | |
# Process the video | |
process_video_feed(cap) | |
# Clean up the temporary file | |
os.unlink(tfile.name) | |
elif input_method == "Simulation Mode": | |
st.info("Simulation mode uses a generated animated face. No webcam required!") | |
process_video_feed(None, is_simulation=True) | |
elif input_method == "Real-time Webcam": | |
try: | |
# First check if we can access the webcam | |
cap = cv2.VideoCapture(0) | |
if not cap.isOpened(): | |
st.error("Could not open webcam. Please try the Simulation Mode instead.") | |
st.info("If you're using Streamlit in a browser, make sure you've granted camera permissions.") | |
# Show troubleshooting tips | |
with st.expander("Webcam Troubleshooting Tips"): | |
st.markdown(""" | |
1. **Check Browser Permissions**: Make sure your browser has permission to access your camera. | |
2. **Close Other Applications**: Other applications might be using your webcam. | |
3. **Refresh the Page**: Sometimes simply refreshing can resolve the issue. | |
4. **Try a Different Browser**: Some browsers handle webcam access better than others. | |
5. **Use Simulation Mode**: If you cannot get the webcam working, use the Simulation Mode. | |
""") | |
else: | |
# Webcam available, process it | |
process_video_feed(cap) | |
except Exception as e: | |
st.error(f"Error accessing webcam: {str(e)}") | |
st.info("Please try the Simulation Mode instead, which doesn't require webcam access.") | |