import os import re import subprocess import torch from datetime import datetime from pathlib import Path from typing import List, Optional, Tuple import platform class VideoConverter: def __init__(self): # Check the operating system if platform.system() == 'Windows': # Windows-specific FFmpeg path self.ffmpeg_path = os.path.join(os.getcwd(), "ffmpeg", "bin", "ffmpeg.exe") else: # On Linux, assume FFmpeg is installed and available in PATH self.ffmpeg_path = "ffmpeg" self.gpu_available = torch.cuda.is_available() self.formats = ["MP4", "MKV", "AVI", "MOV", "WMV", "FLV", "MPEG"] self.codecs = [ "H.264", "HEVC (H.265)", "MPEG-4 (Part 2)", "MPEG-2", "ProRes Proxy", "ProRes Light", "ProRes Standard", "ProRes HQ" ] def get_codec_params(self, codec: str) -> List[str]: codec_params = { "H.264": ["-c:v", "libx264", "-preset", "medium"], "HEVC (H.265)": ["-c:v", "libx265", "-preset", "medium"], "MPEG-4 (Part 2)": ["-c:v", "mpeg4"], "MPEG-2": ["-c:v", "mpeg2video"], "ProRes Proxy": ["-c:v", "prores_ks", "-profile:v", "0"], "ProRes Light": ["-c:v", "prores_ks", "-profile:v", "1"], "ProRes Standard": ["-c:v", "prores_ks", "-profile:v", "2"], "ProRes HQ": ["-c:v", "prores_ks", "-profile:v", "3"] } return codec_params.get(codec, ["-c:v", "libx264", "-preset", "medium"]) def get_video_duration(self, input_path: str) -> float: cmd = [ self.ffmpeg_path, "-i", input_path ] try: output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True) for line in output.split('\n'): if "Duration" in line: time_str = line.split("Duration: ")[1].split(",")[0] h, m, s = time_str.split(':') return float(h) * 3600 + float(m) * 60 + float(s) except Exception as e: print(f"Error getting video duration: {e}") return 0 def convert_video( self, input_path: str, output_format: str, codec: str, output_dir: str, progress_callback: Optional[callable] = None, output_filename: Optional[str] = None, output_resolution: str = "Same as input", output_bitrate: str = "auto", output_fps: str = "Same as input" ) -> Tuple[bool, str]: try: if not os.path.exists(input_path): return False, f"Input file does not exist: {Path(input_path).name}" if not os.path.exists(output_dir): os.makedirs(output_dir) output_path = os.path.join(output_dir, output_filename) # Base FFmpeg command cmd = [ self.ffmpeg_path, "-y" ] # Add GPU acceleration if available if self.gpu_available: if codec in ["H.264", "HEVC (H.265)"]: cmd.extend(["-hwaccel", "cuda"]) # On Linux, you might need to specify the correct hardware encoder cmd.extend(["-c:v", "h264_nvenc"]) # For NVIDIA GPUs # Add input file cmd.extend([ "-i", input_path ]) # Add codec parameters cmd.extend(self.get_codec_params(codec)) # Add scaling if necessary if output_resolution != "Same as input": resolution_map = { "3840x2160 (4K)": "3840:2160", "2560x1440 (1440p)": "2560:1440", "1920x1080 (1080p)": "1920:1080", "1280x720 (720p)": "1280:720", "854x480 (480p)": "854:480" } scale = resolution_map.get(output_resolution) if scale: cmd.extend(["-vf", f"scale={scale}"]) # Add FPS if specified if output_fps != "Same as input": cmd.extend(["-r", output_fps]) # Add bitrate or CRF settings if output_bitrate.lower() != "auto": cmd.extend(["-b:v", output_bitrate]) else: # Use a default CRF value for quality cmd.extend(["-crf", "23"]) # Optionally, you can add audio codec settings # cmd.extend(["-c:a", "aac", "-b:a", "128k"]) # Add output file cmd.append(output_path) # Print the FFmpeg command for debugging print("FFmpeg command:", ' '.join(cmd)) duration = self.get_video_duration(input_path) print(f"Video duration: {duration} seconds") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Combine stderr into stdout universal_newlines=True ) while True: line = process.stdout.readline() if not line: if process.poll() is not None: break continue # Print the line for debugging print("FFmpeg output:", line.strip()) if "frame=" in line and progress_callback and duration > 0: # Extract time from the output match = re.search(r'time=(\d+:\d+:\d+\.\d+)', line) if match: time_str = match.group(1) h, m, s = time_str.split(':') current_time = float(h) * 3600 + float(m) * 60 + float(s) progress = current_time / duration progress_callback(progress) process.wait() if process.returncode == 0: return True, f"Successfully converted: {output_filename}" else: error_output = process.stdout.read() return False, f"Error converting {Path(input_path).name}: FFmpeg error code {process.returncode}\n{error_output}" except Exception as e: return False, f"Error converting {Path(input_path).name}: {str(e)}"