Spaces:
Paused
Paused
import os | |
import torch | |
import cv2 | |
import subprocess | |
from datetime import datetime | |
from pathlib import Path | |
import gradio as gr | |
import numpy as np | |
from huggingface_hub import hf_hub_download, snapshot_download | |
# ----------------------------- | |
# Setup paths and env | |
# ----------------------------- | |
HF_HOME = "/app/hf_cache" | |
os.environ["HF_HOME"] = HF_HOME | |
os.environ["TRANSFORMERS_CACHE"] = HF_HOME | |
os.makedirs(HF_HOME, exist_ok=True) | |
PRETRAINED_DIR = "/app/pretrained" | |
os.makedirs(PRETRAINED_DIR, exist_ok=True) | |
# ----------------------------- | |
# Step 1: Optional Model Download | |
# ----------------------------- | |
def download_models(): | |
expected_model = os.path.join(PRETRAINED_DIR, "RAFT/raft-things.pth") | |
if not Path(expected_model).exists(): | |
print("⚙️ Downloading pretrained models...") | |
try: | |
subprocess.check_call(["bash", "download/download_models.sh"]) | |
print("✅ Models downloaded.") | |
except subprocess.CalledProcessError as e: | |
print(f"Model download failed: {e}") | |
else: | |
print("✅ Pretrained models already exist.") | |
def visualize_depth_npy_as_video(npy_file, fps): | |
# Load .npy file | |
depth_np = np.load(npy_file) # Shape: [T, 1, H, W] | |
tensor = torch.from_numpy(depth_np) | |
T, _, H, W = tensor.shape | |
# Prepare video writer | |
video_path = "depth_video_preview.mp4" | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(video_path, fourcc, fps, (W, H)) # 10 FPS | |
for i in range(T): | |
frame = tensor[i, 0].numpy() | |
norm = (frame - frame.min()) / (frame.max() - frame.min() + 1e-8) | |
frame_uint8 = (norm * 255).astype(np.uint8) | |
colored = cv2.applyColorMap(frame_uint8, cv2.COLORMAP_INFERNO) | |
out.write(colored) | |
out.release() | |
return video_path | |
# ----------------------------- | |
# Step 1: Get Anchor Video | |
# ----------------------------- | |
def get_anchor_video(video_path, fps, num_frames, target_pose, mode, | |
radius_scale, near_far_estimated, | |
sampler_name, diffusion_guidance_scale, diffusion_inference_steps, | |
prompt, negative_prompt, refine_prompt, | |
depth_inference_steps, depth_guidance_scale, | |
window_size, overlap, max_res, sample_size, | |
seed_input, height, width, aspect_ratio_inputs, | |
init_dx, init_dy, init_dz): | |
temp_input_path = "/app/temp_input.mp4" | |
output_dir = "/app/output_anchor" | |
video_output_path = f"{output_dir}/masked_videos/output.mp4" | |
captions_text_file = f"{output_dir}/captions/output.txt" | |
depth_file = f"{output_dir}/depth/output.npy" | |
if video_path: | |
os.system(f"cp '{video_path}' {temp_input_path}") | |
try: | |
theta, phi, r, x, y = target_pose.strip().split() | |
except ValueError: | |
return f"Invalid target pose format. Use: θ φ r x y", None, None | |
logs = f"Running inference with target pose: θ={theta}, φ={phi}, r={r}, x={x}, y={y}\n" | |
w, h = aspect_ratio_inputs.strip().split(",") | |
h_s, w_s = sample_size.strip().split(",") | |
command = [ | |
"python", "/app/inference/v2v_data/inference.py", | |
"--video_path", temp_input_path, | |
"--stride", "1", | |
"--out_dir", output_dir, | |
"--radius_scale", str(radius_scale), | |
"--camera", "target", | |
"--mask", | |
"--target_pose", theta, phi, r, x, y, | |
"--video_length", str(num_frames), | |
"--save_name", "output", | |
"--mode", mode, | |
"--fps", str(fps), | |
"--depth_inference_steps", str(depth_inference_steps), | |
"--depth_guidance_scale", str(depth_guidance_scale), | |
"--near_far_estimated", str(near_far_estimated), | |
"--sampler_name", sampler_name, | |
"--diffusion_guidance_scale", str(diffusion_guidance_scale), | |
"--diffusion_inference_steps", str(diffusion_inference_steps), | |
"--prompt", prompt if prompt else "", | |
"--negative_prompt", negative_prompt, | |
"--refine_prompt", refine_prompt, | |
"--window_size", str(window_size), | |
"--overlap", str(overlap), | |
"--max_res", str(max_res), | |
"--sample_size", h_s.strip(), w_s.strip(), | |
"--seed", str(seed_input), | |
"--height", str(height), | |
"--width", str(width), | |
"--target_aspect_ratio", w.strip(), h.strip(), | |
"--init_dx", str(init_dx), | |
"--init_dy", str(init_dy), | |
"--init_dz", str(init_dz), | |
] | |
try: | |
result = subprocess.run(command, capture_output=True, text=True, check=True) | |
logs += result.stdout | |
except subprocess.CalledProcessError as e: | |
logs += f"Inference failed:\n{e.stderr}{e.stdout}" | |
return None, logs | |
caption_text = "" | |
if os.path.exists(captions_text_file): | |
with open(captions_text_file, "r") as f: | |
caption_text = f.read() | |
depth_video_path = visualize_depth_npy_as_video(depth_file, fps) | |
return str(video_output_path), logs, caption_text, depth_video_path | |
# ----------------------------- | |
# Step 2: Run Inference | |
# ----------------------------- | |
def inference( | |
fps, num_frames, controlnet_weights, controlnet_guidance_start, | |
controlnet_guidance_end, guidance_scale, num_inference_steps, dtype, | |
seed, height, width, downscale_coef, vae_channels, | |
controlnet_input_channels, controlnet_transformer_num_layers | |
): | |
model_path = "/app/pretrained/CogVideoX-5b-I2V" | |
ckpt_path = "/app/out/EPiC_pretrained/checkpoint-500.pt" | |
video_root_dir = "/app/output_anchor" | |
out_dir = "/app/output" | |
command = [ | |
"python", "/app/inference/cli_demo_camera_i2v_pcd.py", | |
"--video_root_dir", video_root_dir, | |
"--base_model_path", model_path, | |
"--controlnet_model_path", ckpt_path, | |
"--output_path", out_dir, | |
"--controlnet_weights", str(controlnet_weights), | |
"--controlnet_guidance_start", str(controlnet_guidance_start), | |
"--controlnet_guidance_end", str(controlnet_guidance_end), | |
"--guidance_scale", str(guidance_scale), | |
"--num_inference_steps", str(num_inference_steps), | |
"--dtype", dtype, | |
"--seed", str(seed), | |
"--height", str(height), | |
"--width", str(width), | |
"--num_frames", str(num_frames), | |
"--fps", str(fps), | |
"--downscale_coef", str(downscale_coef), | |
"--vae_channels", str(vae_channels), | |
"--controlnet_input_channels", str(controlnet_input_channels), | |
"--controlnet_transformer_num_layers", str(controlnet_transformer_num_layers), | |
] | |
try: | |
result = subprocess.run(command, capture_output=True, text=True, check=True) | |
logs = result.stdout | |
except subprocess.CalledProcessError as e: | |
logs = f"❌ Step 2 Inference Failed:\nSTDERR:\n{e.stderr}\nSTDOUT:\n{e.stdout}" | |
return None, logs | |
video_output = f"{out_dir}/00000_{seed}_out.mp4" | |
return video_output if os.path.exists(video_output) else None, logs | |
# ----------------------------- | |
# UI | |
# ----------------------------- | |
demo = gr.Blocks() | |
with demo: | |
gr.Markdown("## 🎬 EPiC: Cinematic Camera Control") | |
with gr.Tabs(): | |
with gr.TabItem("Step 1: Camera Anchor"): | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Row(): | |
near_far_estimated = gr.Checkbox(label="Near Far Estimation", value=True) | |
pose_input = gr.Textbox(label="Target Pose (θ φ r x y)", placeholder="e.g., 0 30 -0.6 0 0") | |
fps_input = gr.Number(value=24, label="FPS") | |
aspect_ratio_inputs=gr.Textbox(value= "3,4",label="Target Aspect Ratio (e.g., 2,3)") | |
init_dx = gr.Number(value=0.0, label="Start Camera Offset X") | |
init_dy = gr.Number(value=0.0, label="Start Camera Offset Y") | |
init_dz = gr.Number(value=0.0, label="Start Camera Offset Z") | |
num_frames_input = gr.Number(value=49, label="Number of Frames") | |
radius_input = gr.Number(value = 1.0, label="Radius Scale") | |
mode_input = gr.Dropdown(choices=["gradual"], value="gradual", label="Camera Mode") | |
sampler_input = gr.Dropdown(choices=["Euler", "Euler A", "DPM++", "PNDM", "DDIM_Cog", "DDIM_Origin"], value="DDIM_Origin", label="Sampler") | |
diff_guidance_input = gr.Number(value=6.0, label="Diffusion Guidance") | |
diff_steps_input = gr.Number(value=50, label="Diffusion Steps") | |
depth_steps_input = gr.Number(value=5, label="Depth Steps") | |
depth_guidance_input = gr.Number(value=1.0, label="Depth Guidance") | |
window_input = gr.Number(value=64, label="Window Size") | |
overlap_input = gr.Number(value=25, label="Overlap") | |
maxres_input = gr.Number(value=720, label="Max Resolution") | |
sample_size = gr.Textbox(label="Sample Size (height, width)", placeholder="e.g., 384, 672", value="384, 672") | |
seed_input = gr.Number(value=43, label="Seed") | |
height = gr.Number(value=480, label="Height") | |
width = gr.Number(value=720, label="Width") | |
prompt_input = gr.Textbox(label="Prompt") | |
neg_prompt_input = gr.Textbox(label="Negative Prompt", value="The video is not of a high quality, it has a low resolution. Watermark present in each frame. The background is solid. Strange body and strange trajectory.") | |
refine_prompt_input = gr.Textbox(label="Refine Prompt", value=" The video is of high quality, and the view is very clear. ") | |
with gr.Column(): | |
video_input = gr.Video(label="Upload Video (MP4)") | |
step1_button = gr.Button("▶️ Run Step 1") | |
step1_video = gr.Video(label="[Step 1] Masked Video") | |
step1_captions = gr.Textbox(label="[Step 1] Captions", lines=4) | |
step1_logs = gr.Textbox(label="[Step 1] Logs") | |
step1_depth = gr.Video(label="[Step 1] Depth Video", visible=False) # Hidden by default | |
with gr.TabItem("Step 2: CogVideoX Refinement"): | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Row(): | |
controlnet_weights_input = gr.Number(value=0.5, label="ControlNet Weights") | |
controlnet_guidance_start_input = gr.Number(value=0.0, label="Guidance Start") | |
controlnet_guidance_end_input = gr.Number(value=0.5, label="Guidance End") | |
guidance_scale_input = gr.Number(value=6.0, label="Guidance Scale") | |
inference_steps_input = gr.Number(value=50, label="Num Inference Steps") | |
dtype_input = gr.Dropdown(choices=["float16", "bfloat16"], value="bfloat16", label="Compute Dtype") | |
seed_input2 = gr.Number(value=42, label="Seed") | |
height_input = gr.Number(value=480, label="Height") | |
width_input = gr.Number(value=720, label="Width") | |
num_frames_input2 = gr.Number(value=49, label="Num Frames") | |
fps_input2 = gr.Number(value=24, label="FPS") | |
downscale_coef_input = gr.Number(value=8, label="Downscale Coef") | |
vae_channels_input = gr.Number(value=16, label="VAE Channels") | |
controlnet_input_channels_input = gr.Number(value=6, label="ControlNet Input Channels") | |
controlnet_layers_input = gr.Number(value=8, label="ControlNet Transformer Layers") | |
with gr.Column(): | |
step2_video = gr.Video(label="[Step 2] Final Refined Video") | |
step2_button = gr.Button("▶️ Run Step 2") | |
step2_logs = gr.Textbox(label="[Step 2] Logs") | |
step1_button.click( | |
get_anchor_video, | |
inputs=[ | |
video_input, fps_input, num_frames_input, pose_input, mode_input, | |
radius_input, near_far_estimated, | |
sampler_input, diff_guidance_input, diff_steps_input, | |
prompt_input, neg_prompt_input, refine_prompt_input, | |
depth_steps_input, depth_guidance_input, | |
window_input, overlap_input, maxres_input, sample_size, | |
seed_input, height, width, aspect_ratio_inputs, | |
init_dx, init_dy, init_dz | |
], | |
outputs=[step1_video, step1_logs, step1_captions, step1_depth] # ← updated here | |
) | |
step2_button.click( | |
inference, | |
inputs=[ | |
fps_input2, num_frames_input2, | |
controlnet_weights_input, controlnet_guidance_start_input, | |
controlnet_guidance_end_input, guidance_scale_input, | |
inference_steps_input, dtype_input, seed_input2, | |
height_input, width_input, downscale_coef_input, | |
vae_channels_input, controlnet_input_channels_input, | |
controlnet_layers_input | |
], | |
outputs=[step2_video, step2_logs] | |
) | |
if __name__ == "__main__": | |
download_models() | |
demo.launch(server_name="0.0.0.0", server_port=7860) |