# app.py — Shot Grammar Adapter — Proof (clean + auth) import os, json, tempfile, zipfile, random from typing import List, Tuple os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1") import numpy as np import gradio as gr import imageio import torch torch.set_num_threads(2) # ----------------------------- # Config # ----------------------------- MODEL_ID = os.getenv("VIDEO_MODEL_ID", "damo-vilab/text-to-video-ms-1.7b") DATA_PATH = os.getenv("SHOTS_JSONL", "shots_public_subset.jsonl") assert os.path.exists(DATA_PATH), f"Missing data file: {DATA_PATH}" DEFAULT_NUM_FRAMES = int(os.getenv("DEF_FRAMES", 12)) DEFAULT_GUIDANCE = float(os.getenv("DEF_GUIDANCE", 6.0)) DEFAULT_STEPS = int(os.getenv("DEF_STEPS", 10)) DEFAULT_SIZE = int(os.getenv("DEF_SIZE", 256)) DEFAULT_FPS = int(os.getenv("DEF_FPS", 8)) MAX_BATCH = int(os.getenv("MAX_BATCH", 12)) # ----------------------------- # Data load # ----------------------------- ROWS = [] with open(DATA_PATH, "r", encoding="utf-8") as f: for line in f: s = line.strip() if not s: continue ROWS.append(json.loads(s)) INDEX = {r["shot_id"]: r for r in ROWS} SHOT_IDS = list(INDEX.keys()) assert SHOT_IDS, "No shots found in JSONL." def pretty_features(row: dict) -> dict: feat = row.get("features", {}) return { "ep_id": row.get("ep_id"), "shot_id": row.get("shot_id"), "size": feat.get("size"), "angle": feat.get("angle"), "motion": feat.get("motion"), "relation": feat.get("relation"), "duration": feat.get("duration"), "prompt": row.get("prompt"), } # ----------------------------- # Pipeline (CPU-optimized) # ----------------------------- from diffusers import DiffusionPipeline device = "cpu" pipe = DiffusionPipeline.from_pretrained(MODEL_ID, torch_dtype=torch.float32) pipe.to(device) if hasattr(pipe, "enable_attention_slicing"): pipe.enable_attention_slicing() if hasattr(pipe, "set_progress_bar_config"): pipe.set_progress_bar_config(disable=True) # ----------------------------- # Utils # ----------------------------- def _result_to_frames(result): """ Normalize pipeline output to List[PIL.Image.Image]. Handles dict/attr/list/np.ndarray/torch.Tensor variants. """ import numpy as np from PIL import Image def _to_uint8(arr): if arr.dtype != np.uint8: if arr.max() <= 1.0 and arr.min() >= 0.0: arr = (arr * 255.0).clip(0, 255).astype(np.uint8) else: arr = arr.clip(0, 255).astype(np.uint8) return arr def _as_list_of_arrays(x): if isinstance(x, (list, tuple)): out = [] for f in x: if isinstance(f, Image.Image): out.append(np.array(f)) elif isinstance(f, np.ndarray): out.append(f) else: raise TypeError(f"Unsupported frame element type: {type(f)}") return out if isinstance(x, np.ndarray): if x.ndim == 3: x = x[None, ...] elif x.ndim != 4: raise ValueError(f"Unexpected ndarray shape: {x.shape}") return [f for f in x] try: import torch if isinstance(x, torch.Tensor): t = x.detach().cpu() if t.ndim == 4: if t.shape[1] in (1, 3, 4): t = t.permute(0, 2, 3, 1) elif t.ndim == 3: if t.shape[0] in (1, 3, 4): t = t.permute(1, 2, 0).unsqueeze(0) else: t = t.unsqueeze(0) else: raise ValueError(f"Unexpected tensor shape: {tuple(t.shape)}") arr = t.numpy() return [f for f in arr] except Exception: pass if isinstance(x, Image.Image): return [np.array(x)] candidates = [] if isinstance(result, dict): for key in ("frames", "images", "frames_list", "videos"): if key in result and result[key] is not None: candidates = result[key] break else: for attr in ("frames", "images", "videos"): if hasattr(result, attr): candidates = getattr(result, attr) if candidates is not None: break if candidates is not None and candidates != []: return _as_list_of_arrays(candidates) raise TypeError(f"Unsupported result type: {type(x)}") # 1) dict/attr에서 1차 후보 candidate = None if isinstance(result, dict): for k in ("frames", "images", "frames_list", "videos"): if k in result and result[k] is not None: candidate = result[k]; break else: for attr in ("frames", "images", "videos"): if hasattr(result, attr): v = getattr(result, attr) if v is not None: candidate = v; break if candidate is None: candidate = result arr_list = _as_list_of_arrays(candidate) out = [] for arr in arr_list: arr = _to_uint8(arr) if arr.ndim == 2: arr = np.stack([arr] * 3, axis=-1) out.append(Image.fromarray(arr)) return out def save_gif(frames, fps: int = DEFAULT_FPS, shot_id: str = "clip") -> str: frames_np = [np.array(f) for f in frames] tmpdir = tempfile.mkdtemp() out_path = os.path.join(tmpdir, f"{shot_id}.gif") imageio.mimsave(out_path, frames_np, duration=1.0 / fps) return out_path def save_mp4(frames, fps: int = DEFAULT_FPS, shot_id: str = "clip") -> str: tmpdir = tempfile.mkdtemp() out_path = os.path.join(tmpdir, f"{shot_id}.mp4") writer = imageio.get_writer(out_path, format="FFMPEG", fps=fps, codec="mpeg4", quality=6) for f in frames: writer.append_data(imageio.core.util.Array(f)) writer.close() return out_path # ----------------------------- # Inference # ----------------------------- def infer_one(shot_id: str, num_frames: int, guidance: float, steps: int, size: int) -> Tuple[str, str]: row = INDEX[shot_id] result = pipe( row["prompt"], num_frames=int(num_frames), num_inference_steps=int(steps), guidance_scale=float(guidance), height=int(size), width=int(size), ) frames = _result_to_frames(result) gif_path = save_gif(frames, fps=DEFAULT_FPS, shot_id=shot_id) meta_json = json.dumps(pretty_features(row), ensure_ascii=False, indent=2) return gif_path, meta_json def infer_batch(shot_ids: List[str], num_frames: int, guidance: float, steps: int, size: int) -> str: if not shot_ids: raise gr.Error("Pick at least one shot for batch.") outputs, metas = [], [] for sid in shot_ids: row = INDEX[sid] result = pipe( row["prompt"], num_frames=int(num_frames), num_inference_steps=int(steps), guidance_scale=float(guidance), height=int(size), width=int(size), ) frames = _result_to_frames(result) mp4_path = save_mp4(frames, fps=DEFAULT_FPS, shot_id=sid) outputs.append(mp4_path) metas.append(pretty_features(row)) zip_path = tempfile.mktemp(suffix=".zip") with zipfile.ZipFile(zip_path, "w") as z: for p in outputs: z.write(p, arcname=os.path.basename(p)) meta_path = tempfile.mktemp(suffix=".json") with open(meta_path, "w", encoding="utf-8") as f: json.dump(metas, f, ensure_ascii=False, indent=2) z.write(meta_path, arcname="metadata.json") return zip_path # ----------------------------- # UI # ----------------------------- PRESET_MAP = { "Low • fastest (CPU)": dict(frames=8, steps=10, guidance=6.0, size=256), "Med • balanced": dict(frames=12, steps=12, guidance=6.5, size=256), "High • slower": dict(frames=16, steps=14, guidance=7.0, size=320), } with gr.Blocks(title="Shot Grammar Adapter — Proof") as demo: gr.Markdown( "🔒 **Shot Grammar Adapter — Proof** \n" "Team-only proof page. If you need access, contact the owner.\n\n" "# Shot Grammar Adapter — Proof\n" "Turn shot grammar JSON into controllable video generations.\n" "**Preview = GIF (fast) · Batch = MP4 (ZIP)**" ) with gr.Row(): with gr.Column(scale=1): preset = gr.Dropdown(choices=list(PRESET_MAP.keys()), value="Low • fastest (CPU)", label="Preset") shot_dropdown = gr.Dropdown(choices=SHOT_IDS, value=SHOT_IDS[0], label="Select Shot ID") with gr.Row(): num_frames = gr.Slider(8, 32, value=DEFAULT_NUM_FRAMES, step=1, label="Frames") steps = gr.Slider(8, 24, value=DEFAULT_STEPS, step=1, label="Steps") with gr.Row(): guidance = gr.Slider(1.0, 12.0, value=DEFAULT_GUIDANCE, step=0.5, label="Guidance") size = gr.Slider(256, 384, value=DEFAULT_SIZE, step=64, label="Resolution (square)") run_btn = gr.Button("Generate (GIF Preview)") info_json = gr.Code(label="Selected Shot JSON (features + prompt)", interactive=False, language="json") with gr.Column(scale=1): gif_out = gr.Image(label="Generated GIF Preview", type="filepath", interactive=False) file_out = gr.File(label="Download GIF", interactive=False) gr.Markdown("### Batch: Build a Proof Reel (MP4 + metadata.json → ZIP)") with gr.Row(): batch_select = gr.CheckboxGroup( choices=SHOT_IDS[:MAX_BATCH], value=SHOT_IDS[: min(6, MAX_BATCH)], label=f"Pick up to {MAX_BATCH} shots" ) build_btn = gr.Button("Build ZIP") zip_out = gr.File(label="Download Proof Reel (ZIP)", interactive=False) def apply_preset(name): p = PRESET_MAP[name] return p["frames"], p["steps"], p["guidance"], p["size"] preset.change(apply_preset, inputs=[preset], outputs=[num_frames, steps, guidance, size]) def _run_one(sid, nf, gs, st, sz): gif_path, meta_json = infer_one(sid, nf, gs, st, sz) return meta_json, gif_path, gif_path run_btn.click(_run_one, inputs=[shot_dropdown, num_frames, guidance, steps, size], outputs=[info_json, gif_out, file_out]) def _run_batch(sids, nf, gs, st, sz): return infer_batch(sids, nf, gs, st, sz) build_btn.click(_run_batch, inputs=[batch_select, num_frames, guidance, steps, size], outputs=[zip_out]) # ----------------------------- # Launch (맨 아래, 딱 여기서만) # ----------------------------- demo.queue(max_size=8) demo.launch(show_api=False)