import os os.system('pip install --upgrade --pre --extra-index-url https://download.pytorch.org/whl/nightly/cu126 "torch<2.9" spaces peft') # --- 1. Model Download and Setup (Diffusers Backend) --- import spaces import torch from diffusers import FlowMatchEulerDiscreteScheduler from diffusers.image_processor import VaeImageProcessor # <-- CORREÇÃO APLICADA AQUI from diffusers.pipelines.wan.pipeline_wan_i2v import WanImageToVideoPipeline from diffusers.models.transformers.transformer_wan import WanTransformer3DModel from diffusers.utils.export_utils import export_to_video import gradio as gr import tempfile import numpy as np from PIL import Image import random import gc from typing import Dict, Any, Optional from gradio_client import Client, handle_file # ... (o resto do arquivo, incluindo o monkey patch, a inicialização da pipeline, # a inicialização manual do image_processor, e toda a lógica do Gradio, # permanece exatamente o mesmo da nossa última versão funcional.) def patched_wan_transformer_forward( self, hidden_states: torch.FloatTensor, timestep: Optional[torch.LongTensor] = None, encoder_hidden_states: Optional[torch.FloatTensor] = None, added_cond_kwargs: Dict[str, torch.Tensor] = None, eco_latent_conditioning: Optional[Dict[str, Any]] = None, **kwargs, ): start_latent = added_cond_kwargs['start_latent'] last_latent = added_cond_kwargs['last_latent'] device, dtype = hidden_states.device, hidden_states.dtype start_latent = start_latent.to(device, dtype) last_latent = last_latent.to(device, dtype) combined_input = torch.cat([hidden_states, start_latent, last_latent], dim=1) if eco_latent_conditioning is not None and 'latent' in eco_latent_conditioning: eco_latent = eco_latent_conditioning['latent'].to(device, dtype) eco_strength = eco_latent_conditioning['strength'] combined_input = torch.cat([combined_input, eco_latent * eco_strength], dim=1) kwargs.pop('eco_latent_conditioning', None) return self.original_forward( hidden_states=combined_input, timestep=timestep, encoder_hidden_states=encoder_hidden_states, added_cond_kwargs=added_cond_kwargs, **kwargs ) def apply_monkey_patch(): print("Applying Causal Physics Monkey Patch to WanTransformer3DModel...") if not hasattr(WanTransformer3DModel, 'original_forward'): WanTransformer3DModel.original_forward = WanTransformer3DModel.forward WanTransformer3DModel.forward = patched_wan_transformer_forward print("Monkey Patch applied successfully.") else: print("Monkey Patch already applied.") MODEL_ID = "Wan-AI/Wan2.2-I2V-A14B-Diffusers" MAX_DIMENSION = 832 MIN_DIMENSION = 480 DIMENSION_MULTIPLE = 16 SQUARE_SIZE = 480 MAX_SEED = np.iinfo(np.int32).max FIXED_FPS = 16 MIN_FRAMES_MODEL = 8 MAX_FRAMES_MODEL = 81 MIN_DURATION = round(MIN_FRAMES_MODEL/FIXED_FPS, 1) MAX_DURATION = round(MAX_FRAMES_MODEL/FIXED_FPS, 1) default_negative_prompt = "色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走,过曝," print("Loading models into memory. This may take a few minutes...") pipe = WanImageToVideoPipeline.from_pretrained( MODEL_ID, torch_dtype=torch.bfloat16, load_in_8bit=True, device_map="balanced", ) pipe.scheduler = FlowMatchEulerDiscreteScheduler.from_config(pipe.scheduler.config, shift=8.0) pipe.image_processor = VaeImageProcessor(vae_scale_factor=pipe.vae.config.scaling_factor) apply_monkey_patch() print("Applying 8-step Lightning LoRA...") try: pipe.load_lora_weights("Kijai/WanVideo_comfy", weight_name="Lightx2v/lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors", adapter_name="lightx2v") pipe.load_lora_weights("Kijai/WanVideo_comfy", weight_name="Lightx2v/lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors", adapter_name="lightx2v_2", load_into_transformer_2=True) pipe.set_adapters(["lightx2v", "lightx2v_2"], adapter_weights=[1.0, 1.0]) print("Fusing LoRA weights into the main model...") pipe.fuse_lora(adapter_names=["lightx2v"], lora_scale=3.0, components=["transformer"]) pipe.fuse_lora(adapter_names=["lightx2v_2"], lora_scale=1.0, components=["transformer_2"]) pipe.unload_lora_weights() print("Lightning LoRA successfully fused.") except Exception as e: print(f"AVISO: Falha ao carregar ou fundir o LoRA. A geração pode ser lenta ou de baixa qualidade. Erro: {e}") print("All models loaded, patched, and ready. Gradio app is ready.") def process_image_for_video(image: Image.Image) -> Image.Image: width, height = image.size if width == height: return image.resize((SQUARE_SIZE, SQUARE_SIZE), Image.Resampling.LANCZOS) aspect_ratio = width / height new_width, new_height = width, height if new_width > MAX_DIMENSION or new_height > MAX_DIMENSION: scale = MAX_DIMENSION / new_width if aspect_ratio > 1 else MAX_DIMENSION / new_height new_width *= scale; new_height *= scale if new_width < MIN_DIMENSION or new_height < MIN_DIMENSION: scale = MIN_DIMENSION / new_height if aspect_ratio > 1 else MIN_DIMENSION / new_width new_width *= scale; new_height *= scale final_width = int(round(new_width / DIMENSION_MULTIPLE) * DIMENSION_MULTIPLE) final_height = int(round(new_height / DIMENSION_MULTIPLE) * DIMENSION_MULTIPLE) final_width = max(final_width, MIN_DIMENSION if aspect_ratio < 1 else SQUARE_SIZE) final_height = max(final_height, MIN_DIMENSION if aspect_ratio > 1 else SQUARE_SIZE) return image.resize((final_width, final_height), Image.Resampling.LANCZOS) def resize_and_crop_to_match(target_image, reference_image): ref_width, ref_height = reference_image.size target_width, target_height = target_image.size scale = max(ref_width / target_width, ref_height / target_height) new_width, new_height = int(target_width * scale), int(target_height * scale) resized = target_image.resize((new_width, new_height), Image.Resampling.LANCZOS) left, top = (new_width - ref_width) // 2, (new_height - ref_height) // 2 return resized.crop((left, top, left + ref_width, top + ref_height)) def _preprocess_image_to_latent(image: Image.Image) -> torch.FloatTensor: image_tensor = pipe.image_processor.preprocess(image).to(pipe.device, torch.bfloat16) latent = pipe.vae.encode(image_tensor).latent_dist.sample() return latent * pipe.vae.config.scaling_factor def generate_video( eco_image_pil, start_image_pil, end_image_pil, prompt, eco_strength, negative_prompt=default_negative_prompt, duration_seconds=2.1, steps=8, guidance_scale=1, guidance_scale_2=1, seed=42, randomize_seed=False, progress=gr.Progress(track_tqdm=True) ): if not all([eco_image_pil, start_image_pil, end_image_pil]): raise gr.Error("Por favor, forneça as três imagens: Eco, Início e Fim.") progress(0.1, desc="Pré-processando imagens...") processed_start_image = process_image_for_video(start_image_pil) processed_end_image = resize_and_crop_to_match(end_image_pil, processed_start_image) processed_eco_image = resize_and_crop_to_match(eco_image_pil, processed_start_image) target_height, target_width = processed_start_image.height, processed_start_image.width progress(0.2, desc="Convertendo imagens para o espaço latente...") start_latent = _preprocess_image_to_latent(processed_start_image) end_latent = _preprocess_image_to_latent(processed_end_image) eco_latent = _preprocess_image_to_latent(processed_eco_image) eco_conditioning = {'latent': eco_latent, 'strength': eco_strength} current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) num_frames = np.clip(int(round(duration_seconds * FIXED_FPS)), 8, 81) progress(0.4, desc=f"Gerando {num_frames} quadros (Seed: {current_seed})...") generator = torch.Generator().manual_seed(current_seed) output_frames_list = pipe( image=start_latent, last_image=end_latent, eco_latent_conditioning=eco_conditioning, prompt=prompt, negative_prompt=negative_prompt, height=target_height, width=target_width, num_frames=num_frames, guidance_scale=float(guidance_scale), guidance_scale_2=float(guidance_scale_2), num_inference_steps=int(steps), generator=generator, ).frames[0] progress(0.9, desc="Codificando o vídeo...") with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpfile: video_path = tmpfile.name export_to_video(output_frames_list, video_path, fps=FIXED_FPS) progress(1.0, desc="Concluído!") return video_path, current_seed css = ''' h1 { text-align: center; } .gradio-container { max-width: 1200px !important; margin: auto !important; } .gr-button-primary { background: linear-gradient(90deg, #4F46E5, #8B5CF6) !important; } .image-column { min-width: 300px !important; } ''' with gr.Blocks(theme=gr.themes.Soft(), css=css) as demo: gr.Markdown("# 🎬 ADUC-SDR Director's Interface for Wan2.2") gr.Markdown("Controle a geração com 3 âncoras temporais: Passado (Eco), Presente (Início) e Futuro (Destino).") with gr.Row(): with gr.Column(elem_classes="image-column"): gr.Markdown("### Âncora do Passado") eco_image_input = gr.Image(type="pil", label="Eco Causal (Quadro Anterior)", sources=["upload", "clipboard"]) eco_strength_slider = gr.Slider(minimum=0.0, maximum=2.0, value=0.5, step=0.05, label="Força do Eco (Inércia)") with gr.Column(elem_classes="image-column"): gr.Markdown("### Âncora do Presente") start_image_input = gr.Image(type="pil", label="Quadro Inicial", sources=["upload", "clipboard"]) with gr.Column(elem_classes="image-column"): gr.Markdown("### Âncora do Futuro") end_image_input = gr.Image(type="pil", label="Quadro Final (Destino)", sources=["upload", "clipboard"]) with gr.Row(): with gr.Column(scale=3): prompt_input = gr.Textbox(label="Prompt da Transição", lines=3, placeholder="Descreva a ação que conecta o quadro inicial ao final...") with gr.Column(scale=1): generate_button = gr.Button("🎬 Gerar Vídeo", variant="primary", scale=2) with gr.Row(): with gr.Column(): output_video = gr.Video(label="Vídeo Gerado") with gr.Column(): with gr.Accordion("Opções Avançadas", open=False): duration_seconds_input = gr.Slider(minimum=MIN_DURATION, maximum=MAX_DURATION, step=0.1, value=4.0, label="Duração do Vídeo (s)") negative_prompt_input = gr.Textbox(label="Prompt Negativo", value=default_negative_prompt, lines=2) steps_slider = gr.Slider(minimum=1, maximum=30, step=1, value=8, label="Passos de Inferência") guidance_scale_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1.0, label="CFG Scale - High Noise") guidance_scale_2_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1.0, label="CFG Scale - Low Noise") with gr.Row(): seed_input = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42) randomize_seed_checkbox = gr.Checkbox(label="Randomize", value=True) ui_inputs = [ eco_image_input, start_image_input, end_image_input, prompt_input, eco_strength_slider, negative_prompt_input, duration_seconds_input, steps_slider, guidance_scale_input, guidance_scale_2_input, seed_input, randomize_seed_checkbox ] ui_outputs = [output_video, seed_input] generate_button.click(fn=generate_video, inputs=ui_inputs, outputs=ui_outputs) if __name__ == "__main__": demo.queue().launch()