Spaces:
Sleeping
Sleeping
import json | |
import shutil | |
import subprocess | |
from typing import List | |
from fastapi import UploadFile, File, Form | |
from fastapi.responses import FileResponse | |
from fastapi import APIRouter, BackgroundTasks | |
import os | |
router = APIRouter() | |
async def cut_videos( | |
videos: List[UploadFile] = File(...), | |
timestamps: str = Form(...), | |
background_tasks: BackgroundTasks = BackgroundTasks() | |
): | |
timestamps_dict = json.loads(timestamps) | |
temp_dir = "./data/video_editor/videos" | |
if not os.path.exists(temp_dir): | |
os.makedirs(temp_dir) | |
save_and_cut(videos, temp_dir, timestamps_dict) | |
clean_temp_dir = remove_temp_files(temp_dir) | |
zip_filename = shutil.make_archive(f"{clean_temp_dir}", 'zip', "./data/video_editor/") | |
background_tasks.add_task(remove_zip_folder, zip_filename) | |
return FileResponse(zip_filename) | |
def save_and_cut(videos, temp_dir, timestamps_dict): | |
for index, video in enumerate(videos): | |
temp_video_path = f"{temp_dir}/{video.filename}-{index}.mp4" | |
with open(temp_video_path, "wb") as f: | |
shutil.copyfileobj(video.file, f) | |
cut_start = timestamps_dict[str(index)]["start"] | |
cut_end = timestamps_dict[str(index)]["end"] | |
# Remove the "-index" from the filename by removing everything after the last "-" | |
filename_without_extension = os.path.splitext(temp_video_path)[0] | |
filename_without_index = filename_without_extension.rsplit("-", 1)[0] | |
output_video_path = f"{filename_without_index}_cut.mp4" | |
subprocess.run([ | |
"ffmpeg", | |
"-i", temp_video_path, | |
"-ss", cut_start, | |
"-t", cut_end, | |
"-y", output_video_path | |
]) | |
# Remove the temporary video file | |
os.remove(temp_video_path) | |
def remove_temp_files(temp_dir): | |
for filename in os.listdir(temp_dir): | |
if "cut" not in filename: | |
os.remove(f"{temp_dir}/{filename}") | |
if "cut" in filename: | |
os.rename(f"{temp_dir}/{filename}", f"{temp_dir}/{filename.replace('_cut.mp4', '')}") | |
return temp_dir | |
def remove_zip_folder(zip_filename): | |
if os.path.exists(zip_filename): | |
shutil.rmtree(zip_filename[:-4]) | |
os.remove(zip_filename) | |