Spaces:
Build error
Build error
import os | |
import sys | |
from fastapi import FastAPI, Request, UploadFile, File, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import JSONResponse, FileResponse, HTMLResponse | |
import uvicorn | |
import time | |
import shutil | |
import glob | |
import datetime | |
from random import choice | |
import torch | |
import torchvision | |
from torchvision import transforms | |
from torch import nn | |
import numpy as np | |
import cv2 | |
import face_recognition | |
from PIL import Image as pImage | |
import matplotlib.pyplot as plt | |
import matplotlib | |
matplotlib.use('Agg') # Use non-GUI backend for matplotlib | |
from typing import List | |
import base64 | |
import io | |
app = FastAPI() | |
# Configure CORS | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Create directories if they don't exist | |
os.makedirs("uploaded_images", exist_ok=True) | |
os.makedirs("static", exist_ok=True) | |
os.makedirs("uploaded_videos", exist_ok=True) | |
os.makedirs("models", exist_ok=True) | |
# Mount static files | |
app.mount("/uploaded_images", StaticFiles(directory="uploaded_images"), name="uploaded_images") | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
app.mount("/assets", StaticFiles(directory="frontend/dist/assets"), name="assets") | |
# Configuration | |
im_size = 112 | |
mean = [0.485, 0.456, 0.406] | |
std = [0.229, 0.224, 0.225] | |
sm = nn.Softmax(dim=1) | |
inv_normalize = transforms.Normalize( | |
mean=-1*np.divide(mean, std), std=np.divide([1, 1, 1], std)) | |
train_transforms = transforms.Compose([ | |
transforms.ToPILImage(), | |
transforms.Resize((im_size, im_size)), | |
transforms.ToTensor(), | |
transforms.Normalize(mean, std)]) | |
ALLOWED_VIDEO_EXTENSIONS = {'mp4', 'gif', 'webm', 'avi', '3gp', 'wmv', 'flv', 'mkv'} | |
# Detects GPU in device | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
class Model(nn.Module): | |
def __init__(self, num_classes, latent_dim=2048, lstm_layers=1, hidden_dim=2048, bidirectional=False): | |
super(Model, self).__init__() | |
model = torchvision.models.resnext50_32x4d(weights=torchvision.models.ResNeXt50_32X4D_Weights.DEFAULT) | |
self.model = nn.Sequential(*list(model.children())[:-2]) | |
self.lstm = nn.LSTM(latent_dim, hidden_dim, lstm_layers, bidirectional) | |
self.relu = nn.LeakyReLU() | |
self.dp = nn.Dropout(0.4) | |
self.linear1 = nn.Linear(2048, num_classes) | |
self.avgpool = nn.AdaptiveAvgPool2d(1) | |
def forward(self, x): | |
batch_size, seq_length, c, h, w = x.shape | |
x = x.view(batch_size * seq_length, c, h, w) | |
fmap = self.model(x) | |
x = self.avgpool(fmap) | |
x = x.view(batch_size, seq_length, 2048) | |
x_lstm, _ = self.lstm(x, None) | |
return fmap, self.dp(self.linear1(x_lstm[:, -1, :])) | |
class ValidationDataset(torch.utils.data.Dataset): | |
def __init__(self, video_names, sequence_length=60, transform=None): | |
self.video_names = video_names | |
self.transform = transform | |
self.count = sequence_length | |
def __len__(self): | |
return len(self.video_names) | |
def __getitem__(self, idx): | |
video_path = self.video_names[idx] | |
frames = [] | |
a = int(100/self.count) | |
first_frame = np.random.randint(0, a) | |
for i, frame in enumerate(self.frame_extract(video_path)): | |
faces = face_recognition.face_locations(frame) | |
try: | |
top, right, bottom, left = faces[0] | |
frame = frame[top:bottom, left:right, :] | |
except: | |
pass | |
frames.append(self.transform(frame)) | |
if (len(frames) == self.count): | |
break | |
frames = torch.stack(frames) | |
frames = frames[:self.count] | |
return frames.unsqueeze(0) # Shape: (1, seq_len, C, H, W) | |
def frame_extract(self, path): | |
vidObj = cv2.VideoCapture(path) | |
success = 1 | |
while success: | |
success, image = vidObj.read() | |
if success: | |
yield image | |
def allowed_video_file(filename): | |
return filename.split('.')[-1].lower() in ALLOWED_VIDEO_EXTENSIONS | |
def load_model(sequence_length=20): | |
"""Load the model from Hugging Face Hub if not available locally.""" | |
model_path = os.path.join("models", "model.pt") | |
if not os.path.exists(model_path): | |
try: | |
from huggingface_hub import hf_hub_download | |
model_path = hf_hub_download(repo_id="tayyabimam/Deepfake", | |
filename="model.pt", | |
local_dir="models") | |
except Exception as e: | |
raise Exception(f"Failed to download model: {str(e)}") | |
# Load model | |
model = Model(2).to(device) | |
model.load_state_dict(torch.load(model_path, map_location=device)) | |
model.eval() | |
return model | |
def im_convert(tensor, video_file_name=""): | |
"""Convert tensor to image for visualization.""" | |
image = tensor.to("cpu").clone().detach() | |
image = image.squeeze() | |
image = inv_normalize(image) | |
image = image.numpy() | |
image = image.transpose(1, 2, 0) | |
image = image.clip(0, 1) | |
return image | |
def generate_gradcam_heatmap(model, img, video_file_name=""): | |
"""Generate GradCAM heatmap showing areas of focus for deepfake detection.""" | |
# Forward pass | |
fmap, logits = model(img) | |
# Softmax on logits | |
logits_softmax = sm(logits) | |
confidence, prediction = torch.max(logits_softmax, 1) | |
confidence_val = confidence.item() * 100 | |
pred_idx = prediction.item() | |
# Get weights and feature maps | |
weight_softmax = model.linear1.weight.detach().cpu().numpy() | |
fmap_last = fmap[-1].detach().cpu().numpy() | |
nc, h, w = fmap_last.shape | |
fmap_reshaped = fmap_last.reshape(nc, h*w) | |
# Compute GradCAM heatmap | |
heatmap_raw = np.dot(fmap_reshaped.T, weight_softmax[pred_idx, :].T) | |
heatmap_raw -= heatmap_raw.min() | |
heatmap_raw /= heatmap_raw.max() | |
heatmap_img = np.uint8(255 * heatmap_raw.reshape(h, w)) | |
# Resize heatmap to model input size | |
heatmap_resized = cv2.resize(heatmap_img, (im_size, im_size)) | |
heatmap_colored = cv2.applyColorMap(heatmap_resized, cv2.COLORMAP_JET) | |
# Convert original image tensor to numpy | |
original_img = im_convert(img[:, -1, :, :, :]) | |
original_img_uint8 = (original_img * 255).astype(np.uint8) | |
# Overlay heatmap on original image | |
overlay = cv2.addWeighted(original_img_uint8, 0.6, heatmap_colored, 0.4, 0) | |
# Save overlayed image | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
result_filename = f"result_{timestamp}.jpg" | |
save_path = os.path.join("static", result_filename) | |
plt.figure(figsize=(10, 5)) | |
# Plot original and heatmap | |
plt.subplot(1, 2, 1) | |
plt.imshow(original_img) | |
plt.title("Original") | |
plt.axis('off') | |
plt.subplot(1, 2, 2) | |
plt.imshow(cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB)) | |
plt.title(f"{'FAKE' if pred_idx == 1 else 'REAL'} ({confidence_val:.2f}%)") | |
plt.axis('off') | |
plt.tight_layout() | |
plt.savefig(save_path) | |
plt.close() | |
return { | |
"prediction": "FAKE" if pred_idx == 1 else "REAL", | |
"confidence": confidence_val, | |
"heatmap_url": f"/static/{result_filename}", | |
"original_filename": video_file_name | |
} | |
def predict_with_gradcam(model, img, video_file_name=""): | |
"""Predict with GradCAM visualization.""" | |
return generate_gradcam_heatmap(model, img, video_file_name) | |
async def api_upload_video(file: UploadFile = File(...), sequence_length: int = 20): | |
"""API endpoint for video upload and analysis.""" | |
if not allowed_video_file(file.filename): | |
raise HTTPException(status_code=400, detail="Invalid file format. Supported formats: mp4, gif, webm, avi, 3gp, wmv, flv, mkv") | |
# Save uploaded file | |
temp_file = f"uploaded_videos/{file.filename}" | |
with open(temp_file, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
try: | |
# Process the video | |
result = process_video(temp_file, sequence_length) | |
return result | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def process_video(video_file, sequence_length): | |
"""Process video for deepfake detection.""" | |
# Load model | |
model = load_model(sequence_length) | |
# Prepare dataset | |
test_dataset = ValidationDataset(video_names=[video_file], | |
sequence_length=sequence_length, | |
transform=train_transforms) | |
# Get frames | |
frames = test_dataset[0] | |
frames = frames.to(device) | |
# Make prediction with GradCAM | |
result = predict_with_gradcam(model, frames, os.path.basename(video_file)) | |
return result | |
async def serve_frontend(path: str): | |
# First check if the path exists in the frontend dist | |
if os.path.exists(f"frontend/dist/{path}"): | |
return FileResponse(f"frontend/dist/{path}") | |
# Otherwise return the index.html | |
return FileResponse("frontend/dist/index.html") | |
async def root(): | |
return FileResponse("frontend/dist/index.html") | |
async def api_root(): | |
"""Root endpoint with API documentation.""" | |
return { | |
"message": "Welcome to DeepSight DeepFake Detection API", | |
"usage": "POST /api/upload-video with a video file to detect deepfakes" | |
} | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |