import gradio as gr import torch import spaces import numpy as np import random import os import yaml from pathlib import Path import imageio import tempfile from PIL import Image from huggingface_hub import hf_hub_download import shutil from inference import ( create_ltx_video_pipeline, create_latent_upsampler, load_image_to_tensor_with_resize_and_crop, seed_everething, get_device, calculate_padding, load_media_file ) from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXMultiScalePipeline, LTXVideoPipeline from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy # Configuración del modelo gratuito optimizada config_file_path = "configs/ltxv-13b-0.9.7-distilled.yaml" # Alternativas de modelos gratuitos que puedes usar: AVAILABLE_FREE_MODELS = { "ltx-video": { "repo": "Lightricks/LTX-Video", "config": "configs/ltxv-13b-0.9.7-distilled.yaml" }, "zeroscope": { "repo": "cerspense/zeroscope_v2_576w", "config": None # Usar configuración por defecto }, "animatediff": { "repo": "guoyww/animatediff-motion-adapter-v1-5-2", "config": None } } # Configuración del modelo seleccionado SELECTED_MODEL = "ltx-video" # Cambia esto por el modelo que prefieras MODEL_CONFIG = AVAILABLE_FREE_MODELS[SELECTED_MODEL] # Cargar configuración if MODEL_CONFIG["config"]: with open(MODEL_CONFIG["config"], "r") as file: PIPELINE_CONFIG_YAML = yaml.safe_load(file) else: # Configuración por defecto para modelos sin config específico PIPELINE_CONFIG_YAML = { "max_resolution": 1280, "checkpoint_path": "model.safetensors", "precision": "bfloat16", "text_encoder_model_name_or_path": "google/flan-t5-xl", "sampler": "from_checkpoint", "spatial_upscaler_model_path": None, "decode_timestep": 0.0, "decode_noise_scale": 0.0, "stochastic_sampling": False, "first_pass": { "guidance_scale": 3.0, "timesteps": None, "stg_scale": 0.0, "rescaling_scale": 1.0, "skip_block_list": None } } LTX_REPO = MODEL_CONFIG["repo"] MAX_IMAGE_SIZE = PIPELINE_CONFIG_YAML.get("max_resolution", 1280) MAX_NUM_FRAMES = 257 FPS = 30.0 # Variables globales para modelos cargados pipeline_instance = None latent_upsampler_instance = None models_dir = "downloaded_models_gradio_cpu_init" Path(models_dir).mkdir(parents=True, exist_ok=True) def setup_free_model(): """Configura el modelo gratuito seleccionado""" global pipeline_instance, latent_upsampler_instance print(f"Configurando modelo gratuito: {SELECTED_MODEL}") print(f"Repositorio: {LTX_REPO}") try: # Descargar modelo principal print("Descargando modelo principal (si no está presente)...") if SELECTED_MODEL == "ltx-video": distilled_model_actual_path = hf_hub_download( repo_id=LTX_REPO, filename=PIPELINE_CONFIG_YAML["checkpoint_path"], local_dir=models_dir, local_dir_use_symlinks=False ) PIPELINE_CONFIG_YAML["checkpoint_path"] = distilled_model_actual_path print(f"Ruta del modelo: {distilled_model_actual_path}") # Descargar upscaler espacial si está disponible if PIPELINE_CONFIG_YAML.get("spatial_upscaler_model_path"): SPATIAL_UPSCALER_FILENAME = PIPELINE_CONFIG_YAML["spatial_upscaler_model_path"] spatial_upscaler_actual_path = hf_hub_download( repo_id=LTX_REPO, filename=SPATIAL_UPSCALER_FILENAME, local_dir=models_dir, local_dir_use_symlinks=False ) PIPELINE_CONFIG_YAML["spatial_upscaler_model_path"] = spatial_upscaler_actual_path print(f"Ruta del upscaler espacial: {spatial_upscaler_actual_path}") elif SELECTED_MODEL == "zeroscope": # Configuración específica para Zeroscope print("Configurando Zeroscope...") # Zeroscope usa una configuración diferente from diffusers import DiffusionPipeline pipeline_instance = DiffusionPipeline.from_pretrained( LTX_REPO, torch_dtype=torch.float16 ) return elif SELECTED_MODEL == "animatediff": # Configuración específica para AnimateDiff print("Configurando AnimateDiff...") from diffusers import AnimateDiffPipeline, MotionAdapter adapter = MotionAdapter.from_pretrained(LTX_REPO) pipeline_instance = AnimateDiffPipeline.from_pretrained( "runwayml/stable-diffusion-v1-5", motion_adapter=adapter, torch_dtype=torch.float16 ) return # Crear pipeline LTX Video en CPU print("Creando pipeline LTX Video en CPU...") pipeline_instance = create_ltx_video_pipeline( ckpt_path=PIPELINE_CONFIG_YAML["checkpoint_path"], precision=PIPELINE_CONFIG_YAML["precision"], text_encoder_model_name_or_path=PIPELINE_CONFIG_YAML["text_encoder_model_name_or_path"], sampler=PIPELINE_CONFIG_YAML["sampler"], device="cpu", enhance_prompt=False, prompt_enhancer_image_caption_model_name_or_path=PIPELINE_CONFIG_YAML.get("prompt_enhancer_image_caption_model_name_or_path"), prompt_enhancer_llm_model_name_or_path=PIPELINE_CONFIG_YAML.get("prompt_enhancer_llm_model_name_or_path"), ) print("Pipeline LTX Video creado en CPU.") # Crear upsampler latente si está disponible if PIPELINE_CONFIG_YAML.get("spatial_upscaler_model_path"): print("Creando upsampler latente en CPU...") latent_upsampler_instance = create_latent_upsampler( PIPELINE_CONFIG_YAML["spatial_upscaler_model_path"], device="cpu" ) print("Upsampler latente creado en CPU.") # Mover a dispositivo de inferencia target_inference_device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Dispositivo de inferencia objetivo: {target_inference_device}") pipeline_instance.to(target_inference_device) if latent_upsampler_instance: latent_upsampler_instance.to(target_inference_device) except Exception as e: print(f"Error configurando el modelo: {e}") print("Intentando configuración alternativa...") # Configuración de respaldo setup_fallback_model() def setup_fallback_model(): """Configuración de respaldo usando un modelo más simple""" global pipeline_instance print("Configurando modelo de respaldo...") try: from diffusers import DiffusionPipeline # Usar un modelo más ligero como respaldo pipeline_instance = DiffusionPipeline.from_pretrained( "cerspense/zeroscope_v2_576w", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 ) print("Modelo de respaldo configurado exitosamente.") except Exception as e: print(f"Error configurando modelo de respaldo: {e}") raise # Configurar el modelo setup_free_model() # Función para cambiar de modelo dinámicamente def switch_model(model_name): """Cambia dinámicamente entre modelos disponibles""" global SELECTED_MODEL, pipeline_instance, latent_upsampler_instance if model_name not in AVAILABLE_FREE_MODELS: raise ValueError(f"Modelo {model_name} no está disponible") print(f"Cambiando a modelo: {model_name}") SELECTED_MODEL = model_name # Limpiar memoria if pipeline_instance: del pipeline_instance if latent_upsampler_instance: del latent_upsampler_instance torch.cuda.empty_cache() if torch.cuda.is_available() else None # Reconfigurar con el nuevo modelo setup_free_model() return f"Modelo cambiado a: {model_name}" # Resto del código permanece igual... MIN_DIM_SLIDER = 256 TARGET_FIXED_SIDE = 768 def calculate_new_dimensions(orig_w, orig_h): """ Calcula nuevas dimensiones para los sliders de altura y anchura basándose en las dimensiones originales del medio. """ if orig_w == 0 or orig_h == 0: return int(TARGET_FIXED_SIDE), int(TARGET_FIXED_SIDE) if orig_w >= orig_h: # Paisaje o cuadrado new_h = TARGET_FIXED_SIDE aspect_ratio = orig_w / orig_h new_w_ideal = new_h * aspect_ratio new_w = round(new_w_ideal / 32) * 32 new_w = max(MIN_DIM_SLIDER, min(new_w, MAX_IMAGE_SIZE)) new_h = max(MIN_DIM_SLIDER, min(new_h, MAX_IMAGE_SIZE)) else: # Retrato new_w = TARGET_FIXED_SIDE aspect_ratio = orig_h / orig_w new_h_ideal = new_w * aspect_ratio new_h = round(new_h_ideal / 32) * 32 new_h = max(MIN_DIM_SLIDER, min(new_h, MAX_IMAGE_SIZE)) new_w = max(MIN_DIM_SLIDER, min(new_w, MAX_IMAGE_SIZE)) return int(new_h), int(new_w) def get_duration(prompt, negative_prompt, input_image_filepath, input_video_filepath, height_ui, width_ui, mode, duration_ui, ui_frames_to_use, seed_ui, randomize_seed, ui_guidance_scale, improve_texture_flag, progress): # Optimización para recursos limitados if duration_ui > 5: # Reducido de 7 a 5 para modelos gratuitos return 60 # Reducido de 75 a 60 else: return 45 # Reducido de 60 a 45 @spaces.GPU(duration=get_duration) def generate(prompt, negative_prompt, input_image_filepath, input_video_filepath, height_ui, width_ui, mode, duration_ui, ui_frames_to_use, seed_ui, randomize_seed, ui_guidance_scale, improve_texture_flag, progress=gr.Progress(track_tqdm=True)): if randomize_seed: seed_ui = random.randint(0, 2**32 - 1) seed_everething(int(seed_ui)) # Optimizar para modelos gratuitos target_frames_ideal = min(duration_ui * FPS, 120) # Limitar frames para recursos target_frames_rounded = round(target_frames_ideal) if target_frames_rounded < 1: target_frames_rounded = 1 n_val = round((float(target_frames_rounded) - 1.0) / 8.0) actual_num_frames = int(n_val * 8 + 1) actual_num_frames = max(9, actual_num_frames) actual_num_frames = min(MAX_NUM_FRAMES, actual_num_frames) # Optimizar resolución para modelos gratuitos actual_height = min(int(height_ui), 512) # Limitar altura actual_width = min(int(width_ui), 768) # Limitar anchura height_padded = ((actual_height - 1) // 32 + 1) * 32 width_padded = ((actual_width - 1) // 32 + 1) * 32 num_frames_padded = ((actual_num_frames - 2) // 8 + 1) * 8 + 1 padding_values = calculate_padding(actual_height, actual_width, height_padded, width_padded) # Configuración optimizada para modelos gratuitos call_kwargs = { "prompt": prompt, "negative_prompt": negative_prompt, "height": height_padded, "width": width_padded, "num_frames": num_frames_padded, "frame_rate": int(FPS), "generator": torch.Generator(device=get_device()).manual_seed(int(seed_ui)), "output_type": "pt", "conditioning_items": None, "media_items": None, "decode_timestep": PIPELINE_CONFIG_YAML.get("decode_timestep", 0.0), "decode_noise_scale": PIPELINE_CONFIG_YAML.get("decode_noise_scale", 0.0), "stochastic_sampling": PIPELINE_CONFIG_YAML.get("stochastic_sampling", False), "image_cond_noise_scale": 0.15, "is_video": True, "vae_per_channel_normalize": True, "mixed_precision": (PIPELINE_CONFIG_YAML.get("precision") == "mixed_precision"), "offload_to_cpu": True, # Activar para ahorrar memoria "enhance_prompt": False, } # Configurar estrategia de capa de salto stg_mode_str = PIPELINE_CONFIG_YAML.get("stg_mode", "attention_values") if stg_mode_str.lower() in ["stg_av", "attention_values"]: call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.AttentionValues elif stg_mode_str.lower() in ["stg_as", "attention_skip"]: call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.AttentionSkip elif stg_mode_str.lower() in ["stg_r", "residual"]: call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.Residual elif stg_mode_str.lower() in ["stg_t", "transformer_block"]: call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.TransformerBlock # Procesar entrada de imagen o video target_inference_device = get_device() if mode == "image-to-video" and input_image_filepath: try: media_tensor = load_image_to_tensor_with_resize_and_crop( input_image_filepath, actual_height, actual_width ) media_tensor = torch.nn.functional.pad(media_tensor, padding_values) call_kwargs["conditioning_items"] = [ConditioningItem(media_tensor.to(target_inference_device), 0, 1.0)] except Exception as e: print(f"Error cargando imagen {input_image_filepath}: {e}") raise gr.Error(f"No se pudo cargar la imagen: {e}") elif mode == "video-to-video" and input_video_filepath: try: call_kwargs["media_items"] = load_media_file( media_path=input_video_filepath, height=actual_height, width=actual_width, max_frames=int(ui_frames_to_use), padding=padding_values ).to(target_inference_device) except Exception as e: print(f"Error cargando video {input_video_filepath}: {e}") raise gr.Error(f"No se pudo cargar el video: {e}") print(f"Moviendo modelos a {target_inference_device} para inferencia...") # Generar video result_images_tensor = None try: if improve_texture_flag and latent_upsampler_instance: # Usar pipeline multi-escala multi_scale_pipeline_obj = LTXMultiScalePipeline(pipeline_instance, latent_upsampler_instance) first_pass_args = PIPELINE_CONFIG_YAML.get("first_pass", {}).copy() first_pass_args["guidance_scale"] = float(ui_guidance_scale) first_pass_args.pop("num_inference_steps", None) second_pass_args = PIPELINE_CONFIG_YAML.get("second_pass", {}).copy() second_pass_args["guidance_scale"] = float(ui_guidance_scale) second_pass_args.pop("num_inference_steps", None) multi_scale_call_kwargs = call_kwargs.copy() multi_scale_call_kwargs.update({ "downscale_factor": PIPELINE_CONFIG_YAML.get("downscale_factor", 2), "first_pass": first_pass_args, "second_pass": second_pass_args, }) print(f"Llamando pipeline multi-escala...") result_images_tensor = multi_scale_pipeline_obj(**multi_scale_call_kwargs).images else: # Usar pipeline simple single_pass_call_kwargs = call_kwargs.copy() first_pass_config = PIPELINE_CONFIG_YAML.get("first_pass", {}) single_pass_call_kwargs["timesteps"] = first_pass_config.get("timesteps") single_pass_call_kwargs["guidance_scale"] = float(ui_guidance_scale) single_pass_call_kwargs["stg_scale"] = first_pass_config.get("stg_scale", 0.0) single_pass_call_kwargs["rescaling_scale"] = first_pass_config.get("rescaling_scale", 1.0) single_pass_call_kwargs["skip_block_list"] = first_pass_config.get("skip_block_list") print(f"Llamando pipeline base...") result_images_tensor = pipeline_instance(**single_pass_call_kwargs).images except Exception as e: print(f"Error en la generación: {e}") raise gr.Error(f"Error en la generación: {e}") if result_images_tensor is None: raise gr.Error("La generación falló.") # Procesar resultado pad_left, pad_right, pad_top, pad_bottom = padding_values slice_h_end = -pad_bottom if pad_bottom > 0 else None slice_w_end = -pad_right if pad_right > 0 else None result_images_tensor = result_images_tensor[ :, :, :actual_num_frames, pad_top:slice_h_end, pad_left:slice_w_end ] video_np = result_images_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy() video_np = np.clip(video_np, 0, 1) video_np = (video_np * 255).astype(np.uint8) # Guardar video temp_dir = tempfile.mkdtemp() timestamp = random.randint(10000,99999) output_video_path = os.path.join(temp_dir, f"output_{timestamp}.mp4") try: with imageio.get_writer(output_video_path, fps=call_kwargs["frame_rate"], macro_block_size=1) as video_writer: for frame_idx in range(video_np.shape[0]): progress(frame_idx / video_np.shape[0], desc="Guardando video") video_writer.append_data(video_np[frame_idx]) except Exception as e: print(f"Error guardando video: {e}") try: with imageio.get_writer(output_video_path, fps=call_kwargs["frame_rate"], format='FFMPEG', codec='libx264', quality=8) as video_writer: for frame_idx in range(video_np.shape[0]): progress(frame_idx / video_np.shape[0], desc="Guardando video (respaldo)") video_writer.append_data(video_np[frame_idx]) except Exception as e2: print(f"Error en respaldo de guardado: {e2}") raise gr.Error(f"Error guardando video: {e2}") return output_video_path, seed_ui # Funciones de actualización de tarea def update_task_image(): return "image-to-video" def update_task_text(): return "text-to-video" def update_task_video(): return "video-to-video" # CSS para la interfaz css=""" #col-container { margin: 0 auto; max-width: 900px; } .model-info { background: #f0f0f0; padding: 10px; border-radius: 5px; margin-bottom: 10px; } """ # Interfaz Gradio with gr.Blocks(css=css) as demo: gr.Markdown("# Generador de Video LTX - Modelos Gratuitos") gr.Markdown("Generación de video de alta calidad usando modelos completamente gratuitos.") with gr.Row(): with gr.Column(): # Selector de modelo with gr.Accordion("Configuración de Modelo", open=False): model_selector = gr.Dropdown( choices=list(AVAILABLE_FREE_MODELS.keys()), value=SELECTED_MODEL, label="Modelo a usar", info="Todos los modelos son completamente gratuitos" ) model_info = gr.Markdown(f"**Modelo actual:** {SELECTED_MODEL}\n**Repositorio:** {LTX_REPO}", elem_classes="model-info") switch_btn = gr.Button("Cambiar Modelo", variant="secondary") with gr.Tab("imagen-a-video") as image_tab: video_i_hidden = gr.Textbox(label="video_i", visible=False, value=None) image_i2v = gr.Image(label="Imagen de Entrada", type="filepath", sources=["upload", "webcam", "clipboard"]) i2v_prompt = gr.Textbox(label="Prompt", value="La criatura de la imagen comienza a moverse", lines=3) i2v_button = gr.Button("Generar Imagen-a-Video", variant="primary") with gr.Tab("texto-a-video") as text_tab: image_n_hidden = gr.Textbox(label="image_n", visible=False, value=None) video_n_hidden = gr.Textbox(label="video_n", visible=False, value=None) t2v_prompt = gr.Textbox(label="Prompt", value="Un majestuoso dragón volando sobre un castillo medieval", lines=3) t2v_button = gr.Button("Generar Texto-a-Video", variant="primary") with gr.Tab("video-a-video", visible=False) as video_tab: image_v_hidden = gr.Textbox(label="image_v", visible=False, value=None) video_v2v = gr.Video(label="Video de Entrada", sources=["upload", "webcam"]) frames_to_use = gr.Slider(label="Frames a usar del video de entrada", minimum=9, maximum=MAX_NUM_FRAMES, value=9, step=8) v2v_prompt = gr.Textbox(label="Prompt", value="Cambiar el estilo a anime cinematográfico", lines=3) v2v_button = gr.Button("Generar Video-a-Video", variant="primary") duration_input = gr.Slider( label="Duración del Video (segundos)", minimum=0.3, maximum=5.0, # Reducido para modelos gratuitos value=2, step=0.1, info="Duración objetivo del video (0.3s a 5.0s)" ) improve_texture = gr.Checkbox( label="Mejorar Textura (multi-escala)", value=False, # Desactivado por defecto para ahorrar recursos info="Usa generación de dos pasadas para mejor calidad, pero es más lento." ) with gr.Column(): output_video = gr.Video(label="Video Generado", interactive=False) with gr.Accordion("Configuración Avanzada", open=False): mode = gr.Dropdown(["texto-a-video", "imagen-a-video", "video-a-video"], label="tarea", value="imagen-a-video", visible=False) negative_prompt_input = gr.Textbox( label="Prompt Negativo", value="peor calidad, movimiento inconsistente, borroso, tembloroso, distorsionado", lines=2 ) with gr.Row(): seed_input = gr.Number(label="Semilla", value=42, precision=0, minimum=0, maximum=2**32-1) randomize_seed_input = gr.Checkbox(label="Semilla Aleatoria", value=True) with gr.Row(): guidance_scale_input = gr.Slider( label="Escala de Guía (CFG)", minimum=1.0, maximum=7.0, # Reducido para modelos gratuitos value=3.0, step=0.1 ) with gr.Row(): height_input = gr.Slider( label="Altura", value=512, step=32, minimum=MIN_DIM_SLIDER, maximum=512, # Limitado para modelos gratuitos info="Debe ser divisible por 32." ) width_input = gr.Slider( label="Anchura", value=704, step=32, minimum=MIN_DIM_SLIDER, maximum=768, # Limitado para modelos gratuitos info="Debe ser divisible por 32." ) # Manejadores de eventos def handle_image_upload_for_dims(image_filepath, current_h, current_w): if not image_filepath: return gr.update(value=current_h), gr.update(value=current_w) try: img = Image.open(image_filepath) orig_w, orig_h = img.size new_h, new_w = calculate_new_dimensions(orig_w, orig_h) # Limitar para modelos gratuitos new_h = min(new_h, 512) new_w = min(new_w, 768) return gr.update(value=new_h), gr.update(value=new_w) except Exception as e: print(f"Error procesando imagen: {e}") return gr.update(value=current_h), gr.update(value=current_w) def handle_video_upload_for_dims(video_filepath, current_h, current_w): if not video_filepath: return gr.update(value=current_h), gr.update(value=current_w) try: video_filepath_str = str(video_filepath) if not os.path.exists(video_filepath_str): return gr.update(value=current_h), gr.update(value=current_w) with imageio.get_reader(video_filepath_str) as reader: meta = reader.get_meta_data() if 'size' in meta: orig_w, orig_h = meta['size'] else: first_frame = reader.get_data(0) orig_h, orig_w = first_frame.shape[0], first_frame.shape[1] new_h, new_w = calculate_new_dimensions(orig_w, orig_h) # Limitar para modelos gratuitos new_h = min(new_h, 512) new_w = min(new_w, 768) return gr.update(value=new_h), gr.update(value=new_w) except Exception as e: print(f"Error procesando video: {e}") return gr.update(value=current_h), gr.update(value=current_w) # Configurar eventos image_i2v.upload( fn=handle_image_upload_for_dims, inputs=[image_i2v, height_input, width_input], outputs=[height_input, width_input] ) video_v2v.upload( fn=handle_video_upload_for_dims, inputs=[video_v2v, height_input, width_input], outputs=[height_input, width_input] ) # Cambio de modelo y actualización de información def handle_model_switch(model_name): try: result = switch_model(model_name) new_repo = AVAILABLE_FREE_MODELS[model_name]["repo"] return gr.update(value=f"**Modelo actual:** {model_name}\n**Repositorio:** {new_repo}") except Exception as e: return gr.update(value=f"Error cambiando modelo: {e}") switch_btn.click( fn=handle_model_switch, inputs=[model_selector], outputs=[model_info] ) # Botón imagen a video i2v_button.click( fn=generate, inputs=[ i2v_prompt, negative_prompt_input, image_i2v, video_i_hidden, height_input, width_input, mode, duration_input, frames_to_use, seed_input, randomize_seed_input, guidance_scale_input, improve_texture ], outputs=[output_video, seed_input] ) # Botón texto a video t2v_button.click( fn=generate, inputs=[ t2v_prompt, negative_prompt_input, image_n_hidden, video_n_hidden, height_input, width_input, mode, duration_input, frames_to_use, seed_input, randomize_seed_input, guidance_scale_input, improve_texture ], outputs=[output_video, seed_input] ) # Botón video a video v2v_button.click( fn=generate, inputs=[ v2v_prompt, negative_prompt_input, image_v_hidden, video_v2v, height_input, width_input, mode, duration_input, frames_to_use, seed_input, randomize_seed_input, guidance_scale_input, improve_texture ], outputs=[output_video, seed_input] ) # Lanzar interfaz if __name__ == "__main__": demo.queue().launch()