Spaces:
Runtime error
Runtime error
import os | |
import re | |
import json | |
import time | |
import random | |
import requests | |
import numpy as np | |
from PIL import Image | |
from io import BytesIO | |
import tempfile | |
import base64 | |
import uuid | |
import subprocess | |
# Check if running in Colab - no needed in gradio | |
#try: | |
# import google.colab | |
# IN_COLAB = True | |
#except: | |
# IN_COLAB = False | |
#if IN_COLAB: | |
# from google.colab import drive | |
# drive.mount('/content/drive') | |
# --- Package Installation --- | |
def install_packages(): | |
"""Installs required packages if they are not already installed.""" | |
packages = [ | |
"transformers", "moviepy", "pytube", "pydub", "yt-dlp", | |
"gradio", "huggingface_hub", "librosa", "soundfile", "openai-whisper", "imageio-ffmpeg" | |
] | |
for package in packages: | |
try: | |
__import__(package) | |
print(f"β {package} is already installed.") | |
except ImportError: | |
print(f"β³ Installing {package}...") | |
try: | |
subprocess.check_call(["pip", "install", "-q", package]) | |
print(f"β {package} installed successfully.") | |
except subprocess.CalledProcessError as e: | |
print(f"β Error installing {package}: {e}") | |
install_packages() | |
import moviepy.editor as mp | |
from moviepy import * | |
from moviepy.audio.fx.all import volumex | |
from huggingface_hub import InferenceClient | |
import gradio as gr # Import gradio | |
import librosa #for audio | |
import soundfile #for audio | |
import torch # Import PyTorch | |
class AIVideoGenerator: | |
def __init__(self): | |
"""Initialize the AI Video Generator system.""" | |
self.script = None | |
self.scenes = [] | |
self.audio_clips = [] | |
self.video_clips = [] | |
self.final_video = None | |
self.temp_dir = tempfile.mkdtemp() | |
self.setup_directories() | |
self.hf_client = InferenceClient() | |
print("π¬ AI Video Generator initialized!") | |
print("β Temporary directories created") | |
print("β Hugging Face client initialized") | |
def setup_directories(self): | |
"""Set up the necessary directories for the project.""" | |
os.makedirs(os.path.join(self.temp_dir, "images"), exist_ok=True) | |
os.makedirs(os.path.join(self.temp_dir, "videos"), exist_ok=True) | |
os.makedirs(os.path.join(self.temp_dir, "audio"), exist_ok=True) | |
os.makedirs(os.path.join(self.temp_dir, "output"), exist_ok=True) | |
print(f"π Working directory: {self.temp_dir}") | |
def generate_script(self, user_prompt, verbose=True): | |
""" | |
Generate a structured script using the Hugging Face Inference API | |
with qwen-2.5-7B-instruct model. | |
Args: | |
user_prompt (str): The user's input describing the video they want to create | |
verbose (bool): Whether to print the generated script | |
Returns: | |
dict: A structured JSON storyboard | |
""" | |
if verbose: | |
print("π€ Generating script using qwen-2.5-7B-instruct...") | |
# Prepare the prompt for the model | |
system_prompt = """You are an expert screenplay writer and video producer. | |
Create a detailed JSON storyboard for a video based on the user's input. | |
The storyboard should be formatted as a valid JSON with the following structure: | |
{ | |
"title": "Title of the video", | |
"description": "Brief description of the overall video", | |
"duration": "Estimated duration in seconds", | |
"scenes": [ | |
{ | |
"scene_id": 1, | |
"title": "Scene title", | |
"duration": "Duration in seconds", | |
"narration": "Text to be spoken in this scene", | |
"tone": "Emotional tone for the narration (cheerful, serious, etc.)", | |
"visuals": "Description of what should be shown visually", | |
"keywords": ["keyword1", "keyword2", "keyword3"], | |
"transition": "Type of transition to next scene" | |
}, | |
...more scenes... | |
] | |
} | |
Make sure: | |
1. Each scene is 5-15 seconds long | |
2. The narration matches the visuals | |
3. Keywords are specific and searchable | |
4. Transitions are varied (cut, fade, dissolve, etc.) | |
5. The entire video tells a cohesive story | |
The output MUST be a valid JSON only with no additional text.""" | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": f"Create a storyboard for a video about: {user_prompt}"} | |
] | |
try: | |
# Call the Hugging Face Inference API | |
response = self.hf_client.chat_completion( | |
model="Qwen/Qwen2.5-7B-Instruct", | |
messages=messages, | |
temperature=0.7, | |
max_tokens=2000 | |
) | |
script_text = response.choices[0].message.content | |
# Extract JSON from the response (in case there's surrounding text) | |
json_match = re.search(r'```json(.*?)```', script_text, re.DOTALL) | |
if json_match: | |
script_text = json_match.group(1).strip() | |
else: | |
# Try to find JSON without code blocks | |
json_match = re.search(r'({.*})', script_text, re.DOTALL) | |
if json_match: | |
script_text = json_match.group(1).strip() | |
# Parse the JSON | |
self.script = json.loads(script_text) | |
if verbose: | |
print(f"β Script generated with {len(self.script['scenes'])} scenes") | |
print(f"π¬ Title: {self.script['title']}") | |
print(f"β±οΈ Estimated duration: {self.script['duration']}") | |
if verbose > 1: | |
print("\nπ Script overview:") | |
for i, scene in enumerate(self.script['scenes']): | |
print(f"\nScene {i+1}: {scene['title']} ({scene['duration']}s)") | |
print(f"Narration: {scene['narration'][:100]}...") | |
print(f"Keywords: {', '.join(scene['keywords'])}") | |
return self.script | |
except Exception as e: | |
print(f"β Error generating script: {e}") | |
# Fallback to a simple script structure | |
self.script = { | |
"title": f"Video about {user_prompt}", | |
"description": f"A video exploring {user_prompt}", | |
"duration": "60", | |
"scenes": [ | |
{ | |
"scene_id": 1, | |
"title": "Introduction", | |
"duration": "10", | |
"narration": f"Let's explore {user_prompt} together.", | |
"tone": "neutral", | |
"visuals": f"Imagery related to {user_prompt}", | |
"keywords": [user_prompt, "introduction", "overview"], | |
"transition": "fade" | |
}, | |
{ | |
"scene_id": 2, | |
"title": "Main Content", | |
"duration": "40", | |
"narration": f"Here are the key points about {user_prompt}.", | |
"tone": "informative", | |
"visuals": f"Detailed imagery of {user_prompt}", | |
"keywords": [user_prompt, "details", "explanation"], | |
"transition": "cut" | |
}, | |
{ | |
"scene_id": 3, | |
"title": "Conclusion", | |
"duration": "10", | |
"narration": f"That's a brief overview of {user_prompt}.", | |
"tone": "conclusive", | |
"visuals": f"Summary imagery of {user_prompt}", | |
"keywords": [user_prompt, "conclusion", "summary"], | |
"transition": "fade" | |
} | |
] | |
} | |
print("β οΈ Used fallback script generation") | |
return self.script | |
def fetch_images(self, keywords, num_images=3, verbose=True): | |
""" | |
Fetch images based on keywords using Unsplash API or Pixabay API. | |
Args: | |
keywords (list): List of keywords to search for | |
num_images (int): Number of images to fetch | |
verbose (bool): Whether to print progress | |
Returns: | |
list: List of image paths saved locally | |
""" | |
if verbose: | |
print(f"πΌοΈ Fetching images for keywords: {', '.join(keywords)}") | |
# Join keywords for the search query | |
query = " ".join(keywords) | |
image_paths = [] | |
# Try different free image APIs | |
image_sources = [ | |
# Unsplash Source (no API key needed for basic usage) | |
lambda q, n: [f"https://source.unsplash.com/1600x900/?{q}&sig={i}" for i in range(n)], | |
# Pixabay API with fallback to no-API approach | |
lambda q, n: [f"https://pixabay.com/api/?key=demo&q={q}&image_type=photo&per_page={n}"] | |
] | |
successful = False | |
for source_func in image_sources: | |
if successful: | |
break | |
try: | |
urls = source_func(query, num_images) | |
for i, url in enumerate(urls): | |
try: | |
response = requests.get(url, timeout=10) | |
# For direct image URLs (Unsplash) | |
if response.headers.get('content-type', '').startswith('image/'): | |
img = Image.open(BytesIO(response.content)) | |
filename = os.path.join(self.temp_dir, "images", f"{query.replace(' ', '_')}_{i}.jpg") | |
img.save(filename) | |
image_paths.append(filename) | |
# For API responses (Pixabay) | |
elif response.headers.get('content-type', '').startswith('application/json'): | |
data = response.json() | |
if 'hits' in data and len(data['hits']) > 0: | |
for j, hit in enumerate(data['hits'][:num_images]): | |
img_url = hit.get('largeImageURL') or hit.get('webformatURL') | |
if img_url: | |
img_response = requests.get(img_url, timeout=10) | |
img = Image.open(BytesIO(img_response.content)) | |
filename = os.path.join(self.temp_dir, "images", f"{query.replace(' ', '_')}_{j}.jpg") | |
img.save(filename) | |
image_paths.append(filename) | |
if len(image_paths) >= num_images: | |
successful = True | |
break | |
except Exception as e: | |
if verbose: | |
print(f"β οΈ Error fetching image {i}: {e}") | |
continue | |
except Exception as e: | |
if verbose: | |
print(f"β οΈ Error with image source: {e}") | |
continue | |
# If we couldn't get any images, create placeholder images | |
if len(image_paths) == 0: | |
if verbose: | |
print("β οΈ Creating placeholder images") | |
for i in range(num_images): | |
# Create a colored background with text | |
img = Image.new('RGB', (1600, 900), color=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))) | |
filename = os.path.join(self.temp_dir, "images", f"{query.replace(' ', '_')}_{i}.jpg") | |
img.save(filename) | |
image_paths.append(filename) | |
if verbose: | |
print(f"β Downloaded {len(image_paths)} images") | |
return image_paths | |
def fetch_videos(self, keywords, duration=10, verbose=True): | |
""" | |
Fetch video clips based on keywords from public sources. | |
Args: | |
keywords (list): List of keywords to search for | |
duration (int): Approximate desired duration in seconds | |
verbose (bool): Whether to print progress | |
Returns: | |
str: Path to the downloaded video | |
""" | |
if verbose: | |
print(f"π₯ Fetching videos for keywords: {', '.join(keywords)}") | |
query = " ".join(keywords) | |
output_path = os.path.join(self.temp_dir, "videos", f"{query.replace(' ', '_')}.mp4") | |
# Check if we already have this video | |
if os.path.exists(output_path): | |
if verbose: | |
print("β Using cached video") | |
return output_path | |
# Try to fetch from YouTube | |
try: | |
# Search YouTube using yt-dlp | |
command = f'yt-dlp ytsearch5:"{query}" --print title,duration,webpage_url --flat-playlist > search_results.txt' | |
os.system(command) | |
# Parse the results | |
with open("search_results.txt", "r") as f: | |
lines = f.readlines() | |
# Find a suitable video (not too long) | |
video_url = None | |
for i in range(0, len(lines), 3): | |
if i+2 < len(lines): | |
try: | |
title = lines[i].strip() | |
duration_str = lines[i+1].strip() | |
url = lines[i+2].strip() | |
# Parse duration | |
if ':' in duration_str: | |
parts = duration_str.split(':') | |
if len(parts) == 2: # MM:SS | |
video_duration = int(parts[0]) * 60 + int(parts[1]) | |
else: # HH:MM:SS | |
video_duration = int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2]) | |
else: | |
video_duration = int(duration_str) | |
# Select video that's not too long (< 10 mins) | |
if video_duration < 600: | |
video_url = url | |
break | |
except: | |
continue | |
if video_url: | |
if verbose: | |
print(f"π¬ Found video: {title}") | |
# Download a short clip using yt-dlp | |
command = f'yt-dlp "{video_url}" -f "best[height<=720]" --postprocessor-args "ffmpeg:-ss 0 -t {duration + 5}" -o "{output_path}"' | |
os.system(command) | |
# Verify the file exists and has content | |
if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
if verbose: | |
print(f"β Downloaded video clip to {output_path}") | |
return output_path | |
except Exception as e: | |
if verbose: | |
print(f"β οΈ Error downloading YouTube video: {e}") | |
# Fallback: Generate a video from images | |
if verbose: | |
print("β οΈ Falling back to creating video from images") | |
# Get images for the keywords | |
image_paths = self.fetch_images(keywords, num_images=5, verbose=False) | |
try: | |
# Create a video from the images | |
clips = [] | |
for img_path in image_paths: | |
clip = ImageClip(img_path).set_duration(duration / len(image_paths)) | |
# Add a simple pan effect (Ken Burns effect) | |
clip = clip.resize(height=1080).resize(lambda t: 1 + 0.05 * t) | |
clips.append(clip) | |
concat_clip = concatenate_videoclips(clips, method="compose") | |
concat_clip.write_videofile(output_path, fps=24, audio=False, codec="libx264") | |
if verbose: | |
print(f"β Created video from images at {output_path}") | |
return output_path | |
except Exception as e: | |
if verbose: | |
print(f"β Error creating video from images: {e}") | |
# Last resort: Create a colored screen video | |
try: | |
# Create a colored clip with text | |
color_clip = ColorClip(size=(1280, 720), color=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))) | |
color_clip = color_clip.set_duration(duration) | |
# Add text with keywords | |
txt_clip = TextClip(txt=query, fontsize=60, color='white') | |
txt_clip = txt_clip.set_position('center').set_duration(duration) | |
# Combine clips | |
final_clip = CompositeVideoClip([color_clip, txt_clip]) | |
final_clip.write_videofile(output_path, fps=24, audio=False, codec="libx264") | |
if verbose: | |
print(f"β Created fallback color video at {output_path}") | |
return output_path | |
except Exception as e: | |
if verbose: | |
print(f"β Error creating fallback video: {e}") | |
# Create an empty file as a last resort | |
with open(output_path, 'w') as f: | |
pass | |
return output_path | |
def generate_voiceover(self, text, tone="neutral", voice_id=None, verbose=True): | |
""" | |
Generate AI voiceover using Kokoro TTS with dynamic tone adjustment. | |
Args: | |
text (str): Text to convert to speech | |
tone (str): Emotional tone for the narration | |
voice_id (str): Optional specific voice ID to use | |
verbose (bool): Whether to print progress | |
Returns: | |
str: Path to the generated audio file | |
""" | |
if verbose: | |
print(f"π Generating voiceover for: {text[:50]}...") | |
# Create a unique filename based on content hash | |
text_hash = str(hash(text))[:10] | |
output_path = os.path.join(self.temp_dir, "audio", f"voiceover_{text_hash}.mp3") | |
# Check if we already have this audio | |
if os.path.exists(output_path): | |
if verbose: | |
print("β Using cached audio") | |
return output_path | |
# Map tone to voice parameters | |
tone_params = { | |
"cheerful": {"pitch": 1.1, "rate": 1.1, "voice_id": "cynthia"}, | |
"serious": {"pitch": 0.9, "rate": 0.95, "voice_id": "adam"}, | |
"excited": {"pitch": 1.2, "rate": 1.15, "voice_id": "bella"}, | |
"calm": {"pitch": 0.95, "rate": 0.9, "voice_id": "daniel"}, | |
"sad": {"pitch": 0.85, "rate": 0.9, "voice_id": "emily"}, | |
"neutral": {"pitch": 1.0, "rate": 1.0, "voice_id": "michael"}, | |
"professional": {"pitch": 1.0, "rate": 1.05, "voice_id": "olivia"} | |
} | |
# Get tone parameters or use neutral as default | |
params = tone_params.get(tone.lower(), tone_params["neutral"]) | |
# Override voice_id if provided | |
if voice_id: | |
params["voice_id"] = voice_id | |
try: | |
# Using Hugging Face TTS API with Kokoro TTS | |
response = self.hf_client.text_to_speech( | |
text=text, | |
model="Kokoro/Kokoro-TTS-v2" | |
) | |
# Save the audio | |
with open(output_path, "wb") as f: | |
f.write(response) | |
# Adjust audio parameters using pydub | |
try: | |
audio = AudioSegment.from_file(output_path) | |
# Apply pitch and rate adjustment | |
# Note: Basic modification using pydub (more advanced would require librosa) | |
pitch_adjusted = audio._spawn(audio.raw_data, overrides={ | |
"frame_rate": int(audio.frame_rate * params["pitch"]) | |
}) | |
rate_adjusted = pitch_adjusted.set_frame_rate(audio.frame_rate) | |
# Export the adjusted audio | |
rate_adjusted.export(output_path, format="mp3") | |
except Exception as e: | |
if verbose: | |
print(f"β οΈ Error adjusting audio parameters: {e}") | |
# Keep the original audio if adjustment fails | |
if verbose: | |
print(f"β Generated voiceover at {output_path}") | |
return output_path | |
except Exception as e: | |
if verbose: | |
print(f"β Error generating voiceover with Kokoro TTS: {e}") | |
# Fallback to gTTS if available | |
try: | |
from gtts import gTTS | |
tts = gTTS(text=text, lang='en', slow=False) | |
tts.save(output_path) | |
if verbose: | |
print(f"β οΈ Used fallback gTTS for voiceover at {output_path}") | |
return output_path | |
except Exception as e2: | |
if verbose: | |
print(f"β Error with fallback TTS: {e2}") | |
# Create an empty audio file | |
empty_audio = AudioSegment.silent(duration=len(text.split()) * 500) # Rough estimation of duration | |
empty_audio.export(output_path, format="mp3") | |
if verbose: | |
print(f"β οΈ Created silent audio at {output_path}") | |
return output_path | |
def process_scene(self, scene, verbose=True): | |
""" | |
Process a single scene from the script to create audio and video. | |
Args: | |
scene (dict): Scene dictionary from the script | |
verbose (bool): Whether to print progress | |
Returns: | |
tuple: (video_path, audio_path, scene_data) | |
""" | |
if verbose: | |
print(f"\n㪠Processing Scene {scene['scene_id']}: {scene['title']}") | |
# Extract scene information | |
narration = scene['narration'] | |
keywords = scene['keywords'] | |
duration = float(scene['duration']) | |
tone = scene.get('tone', 'neutral') | |
visual_description = scene.get('visuals', '') | |
# Add the visual description to the keywords for more targeted video retrieval | |
enhanced_keywords = keywords.copy() | |
if visual_description: | |
# Extract key phrases from visual description | |
visual_keywords = [w for w in visual_description.split() if len(w) > 3] | |
enhanced_keywords.extend(visual_keywords[:3]) # Add up to 3 keywords from visual description | |
# Generate voiceover with synchronization markers for later alignment | |
audio_path = self.generate_voiceover(narration, tone=tone, verbose=verbose) | |
# Get audio duration first to ensure visual content matches | |
audio_clip = AudioFileClip(audio_path) | |
audio_duration = audio_clip.duration | |
audio_clip.close() | |
# Fetch video based on enhanced keywords and precise duration | |
video_path = self.fetch_videos(enhanced_keywords, duration=max(audio_duration, duration), verbose=verbose) | |
# Load the video to analyze and improve it | |
video_clip = VideoFileClip(video_path) | |
# Analyze video brightness and contrast - adjust if needed | |
try: | |
frame = video_clip.get_frame(0) # Get first frame | |
brightness = np.mean(frame) | |
# If video is too dark or too bright, apply correction | |
if brightness < 40: # Too dark | |
video_clip = video_clip.fx(vfx.colorx, 1.3) | |
elif brightness > 200: # Too bright | |
video_clip = video_clip.fx(vfx.colorx, 0.8) | |
except: | |
pass # Skip if frame analysis fails | |
# Ensure video is long enough for audio | |
target_duration = max(audio_duration, duration) | |
if video_clip.duration < target_duration: | |
# Instead of simple looping, use a more sophisticated approach | |
if verbose: | |
print(f"β οΈ Video too short, extending to {target_duration:.1f}s using advanced techniques") | |
# If video is very short, use ping-pong technique (forward then reverse) | |
if video_clip.duration < target_duration / 2: | |
clip1 = video_clip | |
clip2 = video_clip.fx(vfx.time_mirror) # Play in reverse | |
extended_clip = concatenate_videoclips([clip1, clip2]) | |
# Loop if still needed | |
n_loops = int(np.ceil(target_duration / extended_clip.duration)) | |
video_clip = extended_clip.fx(vfx.loop, n=n_loops) | |
else: | |
# Use loop with subtle zoom to make it less obvious | |
video_clip = video_clip.fx(vfx.loop, n=int(np.ceil(target_duration / video_clip.duration))) | |
video_clip = video_clip.fx(vfx.resize, lambda t: 1 + 0.05 * np.sin(t/2)) # Subtle zoom effect | |
# Add gentle motion to static images (if the video seems static) | |
if scene.get('detected_static_image', False) or np.random.random() < 0.3: # Random chance to add motion | |
try: | |
# Apply subtle Ken Burns effect (slow pan and zoom) | |
start_scale, end_scale = 1.0, 1.05 # Subtle zoom in | |
start_pos, end_pos = (0, 0), (10, 5) # Subtle pan | |
# Create transform function for zoom and pan | |
def transform(get_frame, t): | |
scale = start_scale + (end_scale - start_scale) * t / video_clip.duration | |
pos_x = start_pos[0] + (end_pos[0] - start_pos[0]) * t / video_clip.duration | |
pos_y = start_pos[1] + (end_pos[1] - start_pos[1]) * t / video_clip.duration | |
frame = get_frame(t) | |
h, w = frame.shape[:2] | |
# Apply zoom | |
zoomed = cv2.resize(frame, None, fx=scale, fy=scale) | |
# Calculate new dimensions | |
zh, zw = zoomed.shape[:2] | |
# Calculate crop area | |
x1 = int(pos_x + (zw - w) / 2) | |
y1 = int(pos_y + (zh - h) / 2) | |
x2 = int(x1 + w) | |
y2 = int(y1 + h) | |
# Ensure bounds | |
x1 = max(0, min(x1, zw - w)) | |
y1 = max(0, min(y1, zh - h)) | |
x2 = min(zw, x1 + w) | |
y2 = min(zh, y1 + h) | |
# Crop | |
return zoomed[y1:y2, x1:x2] | |
# Apply transform if it doesn't error | |
try: | |
# This is a simplified approximation - in reality, we would use moviepy's proper transform | |
# functions which would require more complex setup | |
video_clip = video_clip.resize(lambda t: 1 + 0.05 * t / video_clip.duration) | |
except: | |
pass | |
except: | |
pass # Skip if transform fails | |
# Trim video to match target duration | |
video_clip = video_clip.subclip(0, target_duration) | |
# Save the improved video | |
improved_video_path = os.path.join(self.temp_dir, "videos", f"improved_{os.path.basename(video_path)}") | |
video_clip.write_videofile(improved_video_path, codec="libx264", audio=False) | |
video_clip.close() | |
# Return scene data | |
scene_data = { | |
'scene_id': scene['scene_id'], | |
'title': scene['title'], | |
'video_path': improved_video_path, | |
'audio_path': audio_path, | |
'narration': narration, | |
'duration': target_duration, | |
'transition': scene.get('transition', 'cut'), | |
'keywords': keywords, | |
'visual_description': visual_description | |
} | |
if verbose: | |
print(f"β Scene processed: {target_duration:.1f}s with enhanced visuals") | |
return scene_data | |
def get_background_music(self, duration, mood="neutral", verbose=True): | |
""" | |
Get background music from open source repositories. | |
Args: | |
duration (float): Required duration in seconds | |
mood (str): The mood of the music | |
verbose (bool): Whether to print progress | |
Returns: | |
str: Path to the background music file | |
""" | |
if verbose: | |
print(f"π΅ Finding background music ({mood}, {duration:.1f}s)") | |
output_path = os.path.join(self.temp_dir, "audio", f"background_{mood}_{int(duration)}.mp3") | |
# Try to use a pre-defined set of free music URLs | |
free_music_urls = { | |
"neutral": "https://cdn.pixabay.com/download/audio/2022/01/18/audio_ba33122ff6.mp3?filename=ambient-piano-amp-strings-10711.mp3", | |
"cheerful": "https://cdn.pixabay.com/download/audio/2022/04/27/audio_8c0d4a1380.mp3?filename=upbeat-uplifting-corporate-12954.mp3", | |
"serious": "https://cdn.pixabay.com/download/audio/2022/01/25/audio_2b5eb3efde.mp3?filename=lifelike-126735.mp3", | |
"dramatic": "https://cdn.pixabay.com/download/audio/2022/05/27/audio_f8a876107c.mp3?filename=cinematic-documentary-piano-14007.mp3", | |
"inspirational": "https://cdn.pixabay.com/download/audio/2022/09/02/audio_13b3266382.mp3?filename=inspiring-cinematic-ambient-116199.mp3" | |
} | |
url = free_music_urls.get(mood.lower(), free_music_urls["neutral"]) | |
try: | |
response = requests.get(url, timeout=15) | |
if response.status_code == 200: | |
with open(output_path, 'wb') as f: | |
f.write(response.content) | |
# Load and adjust the music to fit the required duration | |
music = AudioSegment.from_file(output_path) | |
# If music is too short, loop it | |
if len(music) < duration * 1000: # Convert to milliseconds | |
num_loops = int(np.ceil(duration * 1000 / len(music))) | |
music = music * num_loops | |
# Trim to required duration | |
music = music[:int(duration * 1000)] | |
# Fade in and out | |
fade_duration = min(3000, int(duration * 1000 / 4)) # 3 seconds or 1/4 of total, whichever is smaller | |
music = music.fade_in(fade_duration).fade_out(fade_duration) | |
# Lower volume for background | |
music = music - 12 # Reduce by 12 dB | |
# Export the adjusted music | |
music.export(output_path, format="mp3") | |
if verbose: | |
print(f"β Downloaded and prepared background music") | |
return output_path | |
except Exception as e: | |
if verbose: | |
print(f"β οΈ Error getting background music: {e}") | |
# If we failed to get music, create silent audio | |
silent_audio = AudioSegment.silent(duration=int(duration * 1000)) | |
silent_audio.export(output_path, format="mp3") | |
if verbose: | |
print("β οΈ Created silent background track") | |
return output_path | |
def create_text_overlay(self, text, duration, position="bottom", title=False): | |
""" | |
Create a text overlay clip for the video. | |
Args: | |
text (str): Text to display | |
duration (float): Duration in seconds | |
position (str): Position on screen ('top', 'bottom', 'center') | |
title (bool): Whether this is a title (larger font) | |
Returns: | |
TextClip: The text overlay clip | |
""" | |
# Set text properties based on type | |
if title: | |
fontsize = 60 | |
color = 'white' | |
bg_color = 'rgba(0, 0, 0, 0.7)' | |
stroke_color = 'black' | |
stroke_width = 2 | |
else: | |
fontsize = 36 | |
color = 'white' | |
bg_color = 'rgba(0, 0, 0, 0.5)' | |
stroke_color = 'black' | |
stroke_width = 1 | |
# Create text clip | |
txt_clip = TextClip( | |
txt=text, | |
fontsize=fontsize, | |
color=color, | |
stroke_color=stroke_color, | |
stroke_width=stroke_width, | |
bg_color=bg_color, | |
method='caption', | |
align='center', | |
size=(720, None) # Width constrained, height auto | |
) | |
# Set position | |
if position == "top": | |
pos = ('center', 50) | |
elif position == "bottom": | |
pos = ('center', 'bottom') | |
else: # center | |
pos = 'center' | |
# Set duration and position | |
txt_clip = txt_clip.set_position(pos).set_duration(duration) | |
# Add fade in/out | |
fade_duration = min(1.0, duration / 4) | |
txt_clip = txt_clip.fadeout(fade_duration).fadein(fade_duration) | |
return txt_clip | |
def assemble_video(self, verbose=True): | |
""" | |
Assemble the final video from processed scenes. | |
Args: | |
verbose (bool): Whether to print progress | |
Returns: | |
str: Path to the final rendered video | |
""" | |
if not self.script: | |
raise ValueError("No script generated. Please run generate_script() first.") | |
if verbose: | |
print("\nποΈ Assembling final video...") | |
# Process each scene | |
processed_scenes = [] | |
total_duration = 0 | |
for scene in tqdm(self.script['scenes'], desc="Processing scenes"): | |
scene_data = self.process_scene(scene, verbose=(verbose > 1)) | |
processed_scenes.append(scene_data) | |
total_duration += scene_data['duration'] | |
if verbose: | |
print(f"β Processed {len(processed_scenes)} scenes, total duration: {total_duration:.1f}s") | |
# Get background music for the entire video | |
bg_music_path = self.get_background_music(total_duration, verbose=verbose) | |
# Assemble video clips | |
final_clips = [] | |
for i, scene in enumerate(processed_scenes): | |
# Load video and audio for this scene | |
video_clip = VideoFileClip(scene['video_path']) | |
audio_clip = AudioFileClip(scene['audio_path']) | |
# Trim video to match intended duration | |
video_clip = video_clip.subclip(0, scene['duration']) | |
# Create text overlays | |
if i == 0: | |
# Title overlay for first scene | |
title_overlay = self.create_text_overlay( | |
self.script['title'], | |
min(5, scene['duration']), | |
position="top", | |
title=True | |
) | |
# Scene title for first scene | |
scene_overlay = self.create_text_overlay( | |
scene['title'], | |
min(4, scene['duration']), | |
position="bottom", | |
title=False | |
) | |
# Combine video with overlays | |
video_clip = CompositeVideoClip([ | |
video_clip, | |
title_overlay, | |
scene_overlay | |
]) | |
else: | |
# Scene title overlay | |
scene_overlay = self.create_text_overlay( | |
scene['title'], | |
min(4, scene['duration']), | |
position="bottom", | |
title=False | |
) | |
# Combine video with overlay | |
video_clip = CompositeVideoClip([video_clip, scene_overlay]) | |
# Set audio | |
video_clip = video_clip.set_audio(audio_clip) | |
# Add transition effect based on scene specification | |
transition = scene.get('transition', 'cut').lower() | |
if i > 0: # Only apply transitions after the first clip | |
if transition == 'fade': | |
video_clip = video_clip.fadein(1) | |
elif transition == 'dissolve': | |
# We'll handle dissolve in the concatenation step | |
pass | |
elif transition == 'zoom': | |
video_clip = video_clip.resize(lambda t: 1 + 0.05 * (1 - min(t, 1))) | |
# 'cut' is default and needs no special handling | |
final_clips.append(video_clip) | |
# Concatenate clips with appropriate transitions | |
if verbose: | |
print("π Applying transitions and concatenating clips...") | |
# Handle different transitions for the concatenation | |
transition_durations = [] | |
for i, scene in enumerate(processed_scenes): | |
if i == 0: | |
transition_durations.append(0) # No transition for first clip | |
else: | |
transition = scene.get('transition', 'cut').lower() | |
if transition == 'dissolve': | |
transition_durations.append(1) # 1 second dissolve | |
else: | |
transition_durations.append(0) # No crossfade for other transitions | |
final_video = concatenate_videoclips( | |
final_clips, | |
method="crossfadein", | |
crossfadein=transition_durations | |
) | |
# Add background music | |
if verbose: | |
print("π Adding background music...") | |
bg_music = AudioFileClip(bg_music_path) | |
bg_music = bg_music.subclip(0, final_video.duration) | |
bg_music = bg_music.volumex(0.2) # Lower volume for background | |
# Mix background music with existing audio | |
final_audio = CompositeAudioClip([final_video.audio, bg_music]) | |
final_video = final_video.set_audio(final_audio) | |
# Add ending fade out | |
final_video = final_video.fadeout(2) | |
# Render the final video | |
output_path = os.path.join(self.temp_dir, "output", f"{self.script['title'].replace(' ', '_')}.mp4") | |
if verbose: | |
print(f"πΎ Rendering final video to {output_path}...") | |
# Use high quality rendering settings | |
final_video.write_videofile( | |
output_path, | |
fps=24, | |
codec="libx264", | |
audio_codec="aac", | |
preset="medium", | |
audio_bitrate="192k", | |
bitrate="5000k" | |
) | |
if verbose: | |
print("β Video rendering complete!") | |
self.final_video = output_path | |
return output_path | |
def generate_video(self, user_prompt, verbose=True): | |
""" | |
End-to-end function to generate a video from a user prompt. | |
Args: | |
user_prompt (str): The user's input describing the video they want to create | |
verbose (bool): Whether to print progress (Gradio doesn't use this directly) | |
Returns: | |
str: Path to the final rendered video, or None if generation failed. | |
""" | |
try: | |
# Step 1: Generate script | |
self.generate_script(user_prompt, verbose=verbose) | |
# Step 2: Assemble and render video | |
output_path = self.assemble_video(verbose=verbose) | |
return output_path | |
except Exception as e: | |
print(f"β Error generating video: {e}") | |
import traceback | |
traceback.print_exc() | |
return None | |
# --- Gradio Interface --- | |
def run_video_generation(user_prompt, progress=gr.Progress()): | |
""" | |
Wrapper function for Gradio that integrates with the AIVideoGenerator. | |
Args: | |
user_prompt (str): The user's input describing the desired video. | |
progress (gr.Progress): Gradio progress object for tracking. | |
Returns: | |
str: Path to the generated video file, or a message if an error occurred. | |
""" | |
progress(0, desc="Starting video generation...") | |
generator = AIVideoGenerator() | |
video_path = generator.generate_video(user_prompt, verbose=False) #Verbose False | |
if video_path: | |
progress(1, desc="Video generation complete!") | |
return video_path | |
else: | |
return "Video generation failed. Please check the logs for details." | |
# Gradio Interface Setup | |
if __name__ == '__main__': #add for the gradio | |
with gr.Blocks(title="AI Video Generator") as demo: | |
gr.Markdown( | |
""" | |
# AI Video Generator π¬ | |
Enter a topic, and let the AI create a short video for you! | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
input_prompt = gr.Textbox( | |
label="What video would you like to create?", | |
placeholder="Enter a topic, e.g., 'The history of coffee', 'How to make pizza', 'The life cycle of a butterfly'", | |
lines=3 | |
) | |
generate_button = gr.Button("Generate Video", variant="primary") | |
with gr.Column(): | |
output_video = gr.Video(label="Generated Video", interactive=False) | |
# Event handling | |
generate_button.click( | |
fn=run_video_generation, | |
inputs=input_prompt, | |
outputs=output_video | |
) | |
# Examples | |
gr.Examples( | |
examples=[ | |
["The history of the internet"], | |
["How to train a dog"], | |
["A travel guide to Paris"], | |
["The benefits of meditation"], | |
["The future of artificial intelligence"] | |
], | |
inputs=input_prompt | |
) | |
demo.launch() |