jbilcke-hf's picture
jbilcke-hf HF Staff
Upload handler.py
5e47316 verified
from dataclasses import dataclass
from pathlib import Path
import logging
import base64
import random
import gc
import os
import numpy as np
import torch
from typing import Dict, Any, Optional, List, Union, Tuple
import json
from safetensors import safe_open
from ltx_video.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder
from ltx_video.models.transformers.transformer3d import Transformer3DModel
from ltx_video.models.transformers.symmetric_patchifier import SymmetricPatchifier
from ltx_video.schedulers.rf import RectifiedFlowScheduler, TimestepShifter
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXVideoPipeline, LTXMultiScalePipeline
from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy
from ltx_video.models.autoencoders.latent_upsampler import LatentUpsampler
from transformers import T5EncoderModel, T5Tokenizer, AutoModelForCausalLM, AutoProcessor, AutoTokenizer
from varnish import Varnish
from varnish.utils import is_truthy, process_input_image
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Get token from environment
hf_token = os.getenv("HF_API_TOKEN")
# Constraints
MAX_LARGE_SIDE = 1280
MAX_SMALL_SIDE = 768 # should be 720 but it must be divisible by 32
MAX_FRAMES = (8 * 21) + 1 # visual glitches appear after about 169 frames, so we cap it
# Default timesteps for multi-scale pipeline (from ltxv-2b-0.9.8-distilled config)
DEFAULT_FIRST_PASS_TIMESTEPS = [1.0000, 0.9937, 0.9875, 0.9812, 0.9750, 0.9094, 0.7250]
DEFAULT_SECOND_PASS_TIMESTEPS = [0.9094, 0.7250, 0.4219]
# Allowed timesteps from the 0.9.8 model (these are the only valid timesteps)
ALLOWED_TIMESTEPS = [1.0, 0.9937, 0.9875, 0.9812, 0.975, 0.9094, 0.725, 0.4219]
# Check environment variable for pipeline support
support_image_prompt = is_truthy(os.getenv("SUPPORT_INPUT_IMAGE_PROMPT"))
def generate_valid_timesteps(num_steps: int, allowed_timesteps: List[float], start_high: bool = True) -> List[float]:
"""Generate valid timesteps by selecting from the allowed timesteps list"""
if num_steps >= len(allowed_timesteps):
return allowed_timesteps
if num_steps == 1:
# For single step, use the highest timestep (most noisy) if start_high is True
return [allowed_timesteps[0] if start_high else allowed_timesteps[-1]]
if start_high:
# Select evenly spaced timesteps from the allowed list, starting from highest
indices = []
for i in range(num_steps):
idx = int(i * (len(allowed_timesteps) - 1) / (num_steps - 1))
indices.append(idx)
return [allowed_timesteps[i] for i in indices]
else:
# For cases where we don't need to start high (like second pass continuation)
# Take the last num_steps timesteps
return allowed_timesteps[-num_steps:]
@dataclass
class GenerationConfig:
"""Configuration for video generation"""
# general content settings
prompt: str = ""
negative_prompt: str = "saturated, highlight, overexposed, highlighted, overlit, shaking, too bright, worst quality, inconsistent motion, blurry, jittery, distorted, cropped, watermarked, watermark, logo, subtitle, subtitles, lowres"
# video model settings (will be used during generation of the initial raw video clip)
width: int = 1216 # 768
height: int = 704 # 416
# this is a hack to fool LTX-Video into believing our input image is an actual video frame with poor encoding quality
# after a quick benchmark using the value 70 seems like a sweet spot
input_image_quality: int = 70
# users may tend to always set this to the max, to get as much useable content as possible (which is MAX_FRAMES ie. 257).
# The value must be a multiple of 8, plus 1 frame.
# visual glitches appear after about 169 frames, so we don't need more actually
num_frames: int = (8 * 14) + 1
# values between 3.0 and 4.0 are nice
guidance_scale: float = 3.0
num_inference_steps: int = 8
# Multi-scale pipeline settings
pipeline_type: str = "multi-scale" # "base" or "multi-scale"
downscale_factor: float = 0.6666666
first_pass_timesteps: Optional[List[float]] = None # Custom timesteps for first pass
second_pass_timesteps: Optional[List[float]] = None # Custom timesteps for second pass
# reproducible generation settings
seed: int = -1 # -1 means random seed
# varnish settings (will be used for post-processing after the raw video clip has been generated
fps: int = 30 # FPS of the final video (only applied at the very end, when converting to mp4)
double_num_frames: bool = False # if True, the number of frames will be multiplied by 2 using RIFE
super_resolution: bool = False # if True, the resolution will be multiplied by 2 using Real_ESRGAN
grain_amount: float = 0.0 # be careful, adding film grain can negatively impact video compression
# audio settings
enable_audio: bool = False # Whether to generate audio
audio_prompt: str = "" # Text prompt for audio generation
audio_negative_prompt: str = "voices, voice, talking, speaking, speech" # Negative prompt for audio generation
# The range of the CRF scale is 0–51, where:
# 0 is lossless (for 8 bit only, for 10 bit use -qp 0)
# 23 is the default
# 51 is worst quality possible
# A lower value generally leads to higher quality, and a subjectively sane range is 17–28.
# Consider 17 or 18 to be visually lossless or nearly so;
# it should look the same or nearly the same as the input but it isn't technically lossless.
# The range is exponential, so increasing the CRF value +6 results in roughly half the bitrate / file size, while -6 leads to roughly twice the bitrate.
quality: int = 18
# STG (Spatiotemporal Guidance) settings
stg_scale: float = 0.0
stg_rescale: float = 1.0
stg_mode: str = "attention_values" # Can be "attention_values", "attention_skip", "residual", or "transformer_block"
# VAE noise augmentation
decode_timestep: float = 0.05
decode_noise_scale: float = 0.025
# Other advanced settings
image_cond_noise_scale: float = 0.15
mixed_precision: bool = True # Use mixed precision for inference
stochastic_sampling: bool = False # Use stochastic sampling
# Sampling settings
sampler: Optional[str] = "from_checkpoint" # "uniform" or "linear-quadratic" or None (use default from checkpoint)
# Prompt enhancement
enhance_prompt: bool = False # Whether to enhance the prompt using an LLM
prompt_enhancement_words_threshold: int = 50 # Enhance prompt only if it has fewer words than this
def validate_and_adjust(self) -> 'GenerationConfig':
"""Validate and adjust parameters to meet constraints"""
# First check if it's one of our explicitly allowed resolutions
if not ((self.width == MAX_LARGE_SIDE and self.height == MAX_SMALL_SIDE) or
(self.width == MAX_SMALL_SIDE and self.height == MAX_LARGE_SIDE)):
# For other resolutions, ensure total pixels don't exceed max
MAX_TOTAL_PIXELS = MAX_SMALL_SIDE * MAX_LARGE_SIDE # or 921600 = 1280 * 720
# If total pixels exceed maximum, scale down proportionally
total_pixels = self.width * self.height
if total_pixels > MAX_TOTAL_PIXELS:
scale = (MAX_TOTAL_PIXELS / total_pixels) ** 0.5
self.width = max(128, min(MAX_LARGE_SIDE, round(self.width * scale / 32) * 32))
self.height = max(128, min(MAX_LARGE_SIDE, round(self.height * scale / 32) * 32))
else:
# Round dimensions to nearest multiple of 32
self.width = max(128, min(MAX_LARGE_SIDE, round(self.width / 32) * 32))
self.height = max(128, min(MAX_LARGE_SIDE, round(self.height / 32) * 32))
# Adjust number of frames to be in format 8k + 1
k = (self.num_frames - 1) // 8
self.num_frames = min((k * 8) + 1, MAX_FRAMES)
# Set random seed if not specified
if self.seed == -1:
self.seed = random.randint(0, 2**32 - 1)
# Set up STG parameters
if self.stg_mode.lower() == "stg_av" or self.stg_mode.lower() == "attention_values":
self.stg_mode = "attention_values"
elif self.stg_mode.lower() == "stg_as" or self.stg_mode.lower() == "attention_skip":
self.stg_mode = "attention_skip"
elif self.stg_mode.lower() == "stg_r" or self.stg_mode.lower() == "residual":
self.stg_mode = "residual"
elif self.stg_mode.lower() == "stg_t" or self.stg_mode.lower() == "transformer_block":
self.stg_mode = "transformer_block"
# Check if we should enhance the prompt
if self.enhance_prompt and self.prompt:
prompt_word_count = len(self.prompt.split())
if prompt_word_count >= self.prompt_enhancement_words_threshold:
logger.info(f"Prompt has {prompt_word_count} words, which exceeds the threshold of {self.prompt_enhancement_words_threshold}. Prompt enhancement disabled.")
self.enhance_prompt = False
return self
def load_image_to_tensor_with_resize_and_crop(
image_input: Union[str, bytes],
target_height: int = 704,
target_width: int = 1216,
quality: int = 100
) -> torch.Tensor:
"""Load and process an image into a tensor.
Args:
image_input: Either a file path (str) or image data (bytes)
target_height: Desired height of output tensor
target_width: Desired width of output tensor
quality: JPEG quality to use when re-encoding (to simulate lower quality images)
"""
from PIL import Image
import io
import numpy as np
# Handle base64 data URI
if isinstance(image_input, str) and image_input.startswith('data:'):
header, encoded = image_input.split(",", 1)
image_data = base64.b64decode(encoded)
image = Image.open(io.BytesIO(image_data)).convert("RGB")
# Handle raw bytes
elif isinstance(image_input, bytes):
image = Image.open(io.BytesIO(image_input)).convert("RGB")
# Handle file path
elif isinstance(image_input, str):
image = Image.open(image_input).convert("RGB")
else:
raise ValueError("image_input must be either a file path, bytes, or base64 data URI")
# Apply JPEG compression if quality < 100 (to simulate a video frame)
if quality < 100:
buffer = io.BytesIO()
image.save(buffer, format="JPEG", quality=quality)
buffer.seek(0)
image = Image.open(buffer).convert("RGB")
input_width, input_height = image.size
aspect_ratio_target = target_width / target_height
aspect_ratio_frame = input_width / input_height
if aspect_ratio_frame > aspect_ratio_target:
new_width = int(input_height * aspect_ratio_target)
new_height = input_height
x_start = (input_width - new_width) // 2
y_start = 0
else:
new_width = input_width
new_height = int(input_width / aspect_ratio_target)
x_start = 0
y_start = (input_height - new_height) // 2
image = image.crop((x_start, y_start, x_start + new_width, y_start + new_height))
image = image.resize((target_width, target_height))
frame_tensor = torch.tensor(np.array(image)).permute(2, 0, 1).float()
frame_tensor = (frame_tensor / 127.5) - 1.0
# Create 5D tensor: (batch_size=1, channels=3, num_frames=1, height, width)
return frame_tensor.unsqueeze(0).unsqueeze(2)
def calculate_padding(
source_height: int, source_width: int, target_height: int, target_width: int
) -> tuple[int, int, int, int]:
"""Calculate padding to reach target dimensions"""
# Calculate total padding needed
pad_height = target_height - source_height
pad_width = target_width - source_width
# Calculate padding for each side
pad_top = pad_height // 2
pad_bottom = pad_height - pad_top # Handles odd padding
pad_left = pad_width // 2
pad_right = pad_width - pad_left # Handles odd padding
# Return padded tensor
# Padding format is (left, right, top, bottom)
padding = (pad_left, pad_right, pad_top, pad_bottom)
return padding
def prepare_conditioning(
conditioning_media_paths: List[str],
conditioning_strengths: List[float],
conditioning_start_frames: List[int],
height: int,
width: int,
num_frames: int,
input_image_quality: int = 100,
pipeline: Optional[LTXVideoPipeline] = None,
) -> Optional[List[ConditioningItem]]:
"""Prepare conditioning items based on input media paths and their parameters"""
conditioning_items = []
for path, strength, start_frame in zip(
conditioning_media_paths, conditioning_strengths, conditioning_start_frames
):
# Load and process the conditioning image
frame_tensor = load_image_to_tensor_with_resize_and_crop(
path, height, width, quality=input_image_quality
)
# Trim frame count if needed
if pipeline:
frame_count = 1 # For image inputs, it's always 1
frame_count = pipeline.trim_conditioning_sequence(
start_frame, frame_count, num_frames
)
conditioning_items.append(
ConditioningItem(frame_tensor, start_frame, strength)
)
return conditioning_items
def create_ltx_video_pipeline(
config: GenerationConfig,
device: str = "cuda"
) -> Union[LTXVideoPipeline, LTXMultiScalePipeline]:
"""Create and configure the LTX video pipeline"""
ckpt_path = "/repository/ltxv-2b-0.9.8-distilled.safetensors"
spatial_upscaler_path = "/repository/ltxv-spatial-upscaler-0.9.8.safetensors"
# Get allowed inference steps from config if available
allowed_inference_steps = None
assert os.path.exists(
ckpt_path
), f"Ckpt path provided (--ckpt_path) {ckpt_path} does not exist"
with safe_open(ckpt_path, framework="pt") as f:
metadata = f.metadata()
config_str = metadata.get("config")
configs = json.loads(config_str)
allowed_inference_steps = configs.get("allowed_inference_steps", None)
# Initialize model components
vae = CausalVideoAutoencoder.from_pretrained(ckpt_path)
transformer = Transformer3DModel.from_pretrained(ckpt_path)
# Use constructor if sampler is specified, otherwise use from_pretrained
if config.sampler:
scheduler = RectifiedFlowScheduler(
sampler=("Uniform" if config.sampler.lower() == "uniform" else "LinearQuadratic")
)
else:
scheduler = RectifiedFlowScheduler.from_pretrained(ckpt_path)
text_encoder = T5EncoderModel.from_pretrained("/repository/text_encoder")
patchifier = SymmetricPatchifier(patch_size=1)
tokenizer = T5Tokenizer.from_pretrained("/repository/tokenizer")
# Move models to the correct device
vae = vae.to(device)
transformer = transformer.to(device)
text_encoder = text_encoder.to(device)
# Set up precision
vae = vae.to(torch.bfloat16)
transformer = transformer.to(torch.bfloat16)
text_encoder = text_encoder.to(torch.bfloat16)
# Initialize prompt enhancer components if needed
prompt_enhancer_components = {
"prompt_enhancer_image_caption_model": None,
"prompt_enhancer_image_caption_processor": None,
"prompt_enhancer_llm_model": None,
"prompt_enhancer_llm_tokenizer": None
}
if config.enhance_prompt:
try:
# Use default models or ones specified by config
prompt_enhancer_image_caption_model = AutoModelForCausalLM.from_pretrained(
"MiaoshouAI/Florence-2-large-PromptGen-v2.0",
trust_remote_code=True
)
prompt_enhancer_image_caption_processor = AutoProcessor.from_pretrained(
"MiaoshouAI/Florence-2-large-PromptGen-v2.0",
trust_remote_code=True
)
prompt_enhancer_llm_model = AutoModelForCausalLM.from_pretrained(
"unsloth/Llama-3.2-3B-Instruct",
torch_dtype="bfloat16",
)
prompt_enhancer_llm_tokenizer = AutoTokenizer.from_pretrained(
"unsloth/Llama-3.2-3B-Instruct",
)
prompt_enhancer_components = {
"prompt_enhancer_image_caption_model": prompt_enhancer_image_caption_model,
"prompt_enhancer_image_caption_processor": prompt_enhancer_image_caption_processor,
"prompt_enhancer_llm_model": prompt_enhancer_llm_model,
"prompt_enhancer_llm_tokenizer": prompt_enhancer_llm_tokenizer
}
except Exception as e:
logger.warning(f"Failed to load prompt enhancer models: {e}")
config.enhance_prompt = False
# Construct the pipeline
pipeline = LTXVideoPipeline(
transformer=transformer,
patchifier=patchifier,
text_encoder=text_encoder,
tokenizer=tokenizer,
scheduler=scheduler,
vae=vae,
allowed_inference_steps=allowed_inference_steps,
**prompt_enhancer_components
)
# If multi-scale pipeline, wrap with spatial upscaler
if config.pipeline_type == "multi-scale":
if os.path.exists(spatial_upscaler_path):
latent_upsampler = LatentUpsampler.from_pretrained(spatial_upscaler_path)
latent_upsampler = latent_upsampler.to(device)
latent_upsampler = latent_upsampler.to(torch.bfloat16)
pipeline = LTXMultiScalePipeline(pipeline, latent_upsampler=latent_upsampler)
else:
logger.warning(f"Spatial upscaler not found at {spatial_upscaler_path}, falling back to base pipeline")
return pipeline
class EndpointHandler:
"""Handler for the LTX Video endpoint"""
def __init__(self, model_path: str = "/repository/"):
"""Initialize the endpoint handler
Args:
model_path: Path to model weights (not used, as weights are in current directory)
"""
# Enable TF32 for potential speedup on Ampere GPUs
torch.backends.cuda.matmul.allow_tf32 = True
# Initialize Varnish for post-processing
self.varnish = Varnish(
device="cuda",
model_base_dir="/repository/varnish",
enable_mmaudio=False, # Disable audio generation for now, since it is broken
)
# The actual LTX pipeline will be loaded during inference to save memory
self.pipeline = None
# Perform warm-up inference
logger.info("Performing warm-up inference...")
self._warmup()
logger.info("Warm-up completed!")
def _warmup(self):
"""Perform a warm-up inference to prepare the model for future requests"""
try:
# Create a simple test configuration
test_config = GenerationConfig(
prompt="an astronaut is riding a cow in the desert, during golden hour",
negative_prompt="worst quality, lowres",
width=768, # Using smaller resolution for faster warm-up
height=416,
num_frames=33, # Just enough frames for a valid video
guidance_scale=1.0,
num_inference_steps=4, # Fewer steps for faster warm-up
seed=42, # Fixed seed for consistent warm-up
fps=16, # Lower FPS for faster processing
enable_audio=False, # No audio for warm-up
mixed_precision=True,
).validate_and_adjust()
# Create the pipeline if it doesn't exist
if self.pipeline is None:
self.pipeline = create_ltx_video_pipeline(test_config)
# Run a quick inference
with torch.amp.autocast(device_type='cuda', dtype=torch.bfloat16), torch.no_grad():
# Set seeds for reproducibility
random.seed(test_config.seed)
np.random.seed(test_config.seed)
torch.manual_seed(test_config.seed)
generator = torch.Generator(device='cuda').manual_seed(test_config.seed)
# Generate video
if test_config.pipeline_type == "multi-scale" and isinstance(self.pipeline, LTXMultiScalePipeline):
# Multi-scale pipeline warm-up
first_pass = {
"timesteps": DEFAULT_FIRST_PASS_TIMESTEPS[:4], # Use fewer timesteps for faster warm-up
"guidance_scale": 1,
"stg_scale": 0,
"rescaling_scale": 1,
"skip_block_list": [42]
}
second_pass = {
"timesteps": DEFAULT_SECOND_PASS_TIMESTEPS[:2], # Use fewer timesteps for faster warm-up
"guidance_scale": 1,
"stg_scale": 0,
"rescaling_scale": 1,
"skip_block_list": [42]
}
result = self.pipeline(
downscale_factor=test_config.downscale_factor,
first_pass=first_pass,
second_pass=second_pass,
height=test_config.height,
width=test_config.width,
num_frames=test_config.num_frames,
frame_rate=test_config.fps,
prompt=test_config.prompt,
negative_prompt=test_config.negative_prompt,
generator=generator,
output_type="pt",
mixed_precision=test_config.mixed_precision,
is_video=True,
vae_per_channel_normalize=True,
)
else:
# Base pipeline warm-up
result = self.pipeline(
height=test_config.height,
width=test_config.width,
num_frames=test_config.num_frames,
frame_rate=test_config.fps,
prompt=test_config.prompt,
negative_prompt=test_config.negative_prompt,
guidance_scale=test_config.guidance_scale,
num_inference_steps=test_config.num_inference_steps,
generator=generator,
output_type="pt",
mixed_precision=test_config.mixed_precision,
is_video=True,
vae_per_channel_normalize=True,
)
# Just get the frames without full processing (faster warm-up)
frames = result.images
# Clean up
del result
torch.cuda.empty_cache()
gc.collect()
logger.info(f"Warm-up successful! Generated {frames.shape[2]} frames at {frames.shape[3]}x{frames.shape[4]}")
except Exception as e:
# Log the error but don't fail initialization
import traceback
error_message = f"Warm-up failed (but this is non-critical): {str(e)}\n{traceback.format_exc()}"
logger.warning(error_message)
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process inference requests
Args:
data: Request data containing inputs and parameters
Returns:
Dictionary with generated video and metadata
"""
# Extract inputs and parameters
inputs = data.get("inputs", {})
# Support both formats:
# 1. {"inputs": {"prompt": "...", "image": "..."}}
# 2. {"inputs": "..."} (prompt only)
if isinstance(inputs, str):
input_prompt = inputs
input_image = None
else:
input_prompt = inputs.get("prompt", "")
input_image = inputs.get("image")
params = data.get("parameters", {})
if not input_prompt and not input_image:
raise ValueError("Either prompt or image must be provided")
# Create and validate configuration
config = GenerationConfig(
# general content settings
prompt=input_prompt,
negative_prompt=params.get("negative_prompt", GenerationConfig.negative_prompt),
# video model settings
width=params.get("width", GenerationConfig.width),
height=params.get("height", GenerationConfig.height),
input_image_quality=params.get("input_image_quality", GenerationConfig.input_image_quality),
num_frames=params.get("num_frames", GenerationConfig.num_frames),
guidance_scale=params.get("guidance_scale", GenerationConfig.guidance_scale),
num_inference_steps=params.get("num_inference_steps", GenerationConfig.num_inference_steps),
# STG settings
stg_scale=params.get("stg_scale", GenerationConfig.stg_scale),
stg_rescale=params.get("stg_rescale", GenerationConfig.stg_rescale),
stg_mode=params.get("stg_mode", GenerationConfig.stg_mode),
# VAE noise settings
decode_timestep=params.get("decode_timestep", GenerationConfig.decode_timestep),
decode_noise_scale=params.get("decode_noise_scale", GenerationConfig.decode_noise_scale),
image_cond_noise_scale=params.get("image_cond_noise_scale", GenerationConfig.image_cond_noise_scale),
# reproducible generation settings
seed=params.get("seed", GenerationConfig.seed),
# varnish settings
fps=params.get("fps", GenerationConfig.fps),
double_num_frames=params.get("double_num_frames", GenerationConfig.double_num_frames),
super_resolution=params.get("super_resolution", GenerationConfig.super_resolution),
grain_amount=params.get("grain_amount", GenerationConfig.grain_amount),
enable_audio=params.get("enable_audio", GenerationConfig.enable_audio),
audio_prompt=params.get("audio_prompt", GenerationConfig.audio_prompt),
audio_negative_prompt=params.get("audio_negative_prompt", GenerationConfig.audio_negative_prompt),
quality=params.get("quality", GenerationConfig.quality),
# advanced settings
mixed_precision=params.get("mixed_precision", GenerationConfig.mixed_precision),
stochastic_sampling=params.get("stochastic_sampling", GenerationConfig.stochastic_sampling),
sampler=params.get("sampler", GenerationConfig.sampler),
# multi-scale pipeline settings
pipeline_type=params.get("pipeline_type", GenerationConfig.pipeline_type),
downscale_factor=params.get("downscale_factor", GenerationConfig.downscale_factor),
first_pass_timesteps=params.get("first_pass_timesteps", GenerationConfig.first_pass_timesteps),
second_pass_timesteps=params.get("second_pass_timesteps", GenerationConfig.second_pass_timesteps),
# prompt enhancement
enhance_prompt=params.get("enhance_prompt", GenerationConfig.enhance_prompt),
prompt_enhancement_words_threshold=params.get(
"prompt_enhancement_words_threshold",
GenerationConfig.prompt_enhancement_words_threshold
),
).validate_and_adjust()
try:
with torch.amp.autocast(device_type='cuda', dtype=torch.bfloat16), torch.no_grad():
# Set random seeds for reproducibility
random.seed(config.seed)
np.random.seed(config.seed)
torch.manual_seed(config.seed)
generator = torch.Generator(device='cuda').manual_seed(config.seed)
# Create pipeline if not already created
if self.pipeline is None:
self.pipeline = create_ltx_video_pipeline(config)
# Prepare conditioning items if an image is provided
conditioning_items = None
if input_image:
conditioning_items = [
ConditioningItem(
load_image_to_tensor_with_resize_and_crop(
input_image,
config.height,
config.width,
quality=config.input_image_quality
),
0, # Start frame
1.0 # Conditioning strength
)
]
# Set up spatiotemporal guidance strategy
if config.stg_mode == "attention_values":
skip_layer_strategy = SkipLayerStrategy.AttentionValues
elif config.stg_mode == "attention_skip":
skip_layer_strategy = SkipLayerStrategy.AttentionSkip
elif config.stg_mode == "residual":
skip_layer_strategy = SkipLayerStrategy.Residual
elif config.stg_mode == "transformer_block":
skip_layer_strategy = SkipLayerStrategy.TransformerBlock
# Generate video with LTX pipeline
if config.pipeline_type == "multi-scale" and isinstance(self.pipeline, LTXMultiScalePipeline):
# Use the multi-scale pipeline with two-pass generation
# Build first pass configuration
first_pass = {
"guidance_scale": config.guidance_scale if config.guidance_scale != 3.0 else 1, # Use user value if not default
"stg_scale": config.stg_scale,
"rescaling_scale": config.stg_rescale,
"skip_block_list": [42]
}
# Build second pass configuration
second_pass = {
"guidance_scale": config.guidance_scale if config.guidance_scale != 3.0 else 1, # Use user value if not default
"stg_scale": config.stg_scale,
"rescaling_scale": config.stg_rescale,
"skip_block_list": [42]
}
# Determine timesteps strategy
if config.first_pass_timesteps is not None:
# Case 1: Use custom timesteps provided by user
first_pass["timesteps"] = config.first_pass_timesteps
second_pass["timesteps"] = config.second_pass_timesteps or DEFAULT_SECOND_PASS_TIMESTEPS
elif config.num_inference_steps != 8:
# Case 2: Use num_inference_steps with valid timesteps only
if config.num_inference_steps <= 4:
# For very few steps, use a simple split with key timesteps
if config.num_inference_steps == 1:
first_pass["timesteps"] = [1.0]
second_pass["timesteps"] = [0.4219]
elif config.num_inference_steps == 2:
first_pass["timesteps"] = [1.0]
second_pass["timesteps"] = [0.9094, 0.4219]
elif config.num_inference_steps == 3:
first_pass["timesteps"] = [1.0, 0.9094]
second_pass["timesteps"] = [0.9094, 0.4219]
else: # 4 steps
first_pass["timesteps"] = [1.0, 0.975, 0.9094]
second_pass["timesteps"] = [0.9094, 0.725, 0.4219]
else:
# For more steps, split them properly
first_pass_steps = max(1, int(config.num_inference_steps * 0.7))
second_pass_steps = max(1, config.num_inference_steps - first_pass_steps)
# Generate valid timesteps for first pass (starts at 1.0)
first_pass["timesteps"] = generate_valid_timesteps(first_pass_steps, ALLOWED_TIMESTEPS)
# Second pass should start at high noise level (like 0.9094) but NOT 1.0
# Find a good starting point in the allowed timesteps (around 0.9094)
start_idx = 5 # This is 0.9094 in ALLOWED_TIMESTEPS
if second_pass_steps == 1:
second_pass["timesteps"] = [ALLOWED_TIMESTEPS[start_idx]] # Just 0.9094
else:
# Start from 0.9094 and go down, taking second_pass_steps timesteps
end_idx = min(len(ALLOWED_TIMESTEPS), start_idx + second_pass_steps)
second_pass["timesteps"] = ALLOWED_TIMESTEPS[start_idx:end_idx]
else:
# Case 3: Use default optimized timesteps
first_pass["timesteps"] = DEFAULT_FIRST_PASS_TIMESTEPS
second_pass["timesteps"] = DEFAULT_SECOND_PASS_TIMESTEPS
result = self.pipeline(
downscale_factor=config.downscale_factor,
first_pass=first_pass,
second_pass=second_pass,
height=config.height,
width=config.width,
num_frames=config.num_frames,
frame_rate=config.fps,
prompt=config.prompt,
negative_prompt=config.negative_prompt,
generator=generator,
output_type="pt", # Return as PyTorch tensor
skip_layer_strategy=skip_layer_strategy,
conditioning_items=conditioning_items,
decode_timestep=config.decode_timestep,
decode_noise_scale=config.decode_noise_scale,
image_cond_noise_scale=config.image_cond_noise_scale,
mixed_precision=config.mixed_precision,
is_video=True,
vae_per_channel_normalize=True,
stochastic_sampling=config.stochastic_sampling,
enhance_prompt=config.enhance_prompt,
)
else:
# Use the base pipeline (single pass)
result = self.pipeline(
height=config.height,
width=config.width,
num_frames=config.num_frames,
frame_rate=config.fps,
prompt=config.prompt,
negative_prompt=config.negative_prompt,
guidance_scale=config.guidance_scale,
num_inference_steps=config.num_inference_steps,
generator=generator,
output_type="pt", # Return as PyTorch tensor
skip_layer_strategy=skip_layer_strategy,
stg_scale=config.stg_scale,
do_rescaling=config.stg_rescale != 1.0,
rescaling_scale=config.stg_rescale,
conditioning_items=conditioning_items,
decode_timestep=config.decode_timestep,
decode_noise_scale=config.decode_noise_scale,
image_cond_noise_scale=config.image_cond_noise_scale,
mixed_precision=config.mixed_precision,
is_video=True,
vae_per_channel_normalize=True,
stochastic_sampling=config.stochastic_sampling,
enhance_prompt=config.enhance_prompt,
)
# Get the generated frames
frames = result.images
# FIX: Convert LTX output format to varnish-compatible format
# LTX outputs: [batch, channels, frames, height, width]
# We need: [frames, channels, height, width] for varnish
frames = frames.squeeze(0) # Remove batch: [channels, frames, height, width]
frames = frames.permute(1, 0, 2, 3) # Reorder to: [frames, channels, height, width]
# Convert from [0, 1] to [0, 255] range
frames = frames * 255.0
# Convert to uint8
frames = frames.to(torch.uint8)
# Process the generated frames with Varnish
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Process with Varnish for post-processing
varnish_result = loop.run_until_complete(
self.varnish(
frames,
fps=config.fps,
double_num_frames=config.double_num_frames,
super_resolution=config.super_resolution,
grain_amount=config.grain_amount,
enable_audio=config.enable_audio,
audio_prompt=config.audio_prompt or config.prompt,
audio_negative_prompt=config.audio_negative_prompt,
)
)
# Get the final video as a data URI
video_uri = loop.run_until_complete(
varnish_result.write(
type="data-uri",
quality=config.quality
)
)
# Prepare metadata about the generated video
metadata = {
"width": varnish_result.metadata.width,
"height": varnish_result.metadata.height,
"num_frames": varnish_result.metadata.frame_count,
"fps": varnish_result.metadata.fps,
"duration": varnish_result.metadata.duration,
"seed": config.seed,
"prompt": config.prompt,
}
# Clean up to prevent CUDA OOM errors
del result
torch.cuda.empty_cache()
gc.collect()
return {
"video": video_uri,
"content-type": "video/mp4",
"metadata": metadata
}
except Exception as e:
# Log the error and reraise
import traceback
error_message = f"Error generating video: {str(e)}\n{traceback.format_exc()}"
logger.error(error_message)
raise RuntimeError(error_message)