import os, uuid, time, shutil, mimetypes, subprocess, requests, gradio as gr from datetime import datetime from typing import Optional, List # --- config --- ENDPOINT = "https://moonmath-ai-dev--moonmath-i2v-backend-moonmathinference-run.modal.run" FFMPEG = "ffmpeg" OUT, TMP = "outputs", "tmp" os.makedirs(OUT, exist_ok=True); os.makedirs(TMP, exist_ok=True) ts = lambda: datetime.utcnow().strftime("%Y%m%d_%H%M%S") fname = lambda p,e: f"{p}_{ts()}_{uuid.uuid4().hex[:6]}.{e}" abspath= lambda p: os.path.abspath(p) def run_ffmpeg(args: List[str]): p = subprocess.run([FFMPEG]+args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if p.returncode: raise RuntimeError(p.stderr.decode(errors="ignore")) def extract_last_frame(video_path: str) -> str: out = os.path.join(TMP, fname("last", "png")) run_ffmpeg(["-sseof","-1","-i",video_path,"-frames:v","1","-q:v","2",out]) return out def concat_videos(paths: List[str]) -> str: if not paths: raise ValueError("No videos selected.") if len(paths)==1: dst = os.path.join(OUT, fname("continuous","mp4")); shutil.copy(paths[0], dst); return dst listfile = os.path.join(TMP, f"concat_{uuid.uuid4().hex}.txt") with open(listfile,"w") as f: for p in paths: f.write(f"file '{abspath(p)}'\n") out = os.path.join(OUT, fname("continuous","mp4")) run_ffmpeg(["-f","concat","-safe","0","-i",listfile,"-c","copy",out]) os.remove(listfile) return out def zip_used(paths: List[str]) -> str: pack = os.path.join(TMP, f"pack_{uuid.uuid4().hex[:6]}") os.makedirs(pack, exist_ok=True) for p in paths: shutil.copy(p, pack) base = os.path.join(OUT, f"used_{ts()}") shutil.make_archive(base, "zip", pack) shutil.rmtree(pack, ignore_errors=True) return base + ".zip" def save_video_bytes(content: bytes, content_type: str) -> str: ext = (mimetypes.guess_extension(content_type) or ".mp4").lstrip(".") path = os.path.join(OUT, fname("gen", ext)) with open(path,"wb") as f: f.write(content) return path def call_backend( prompt: str, image_bytes_path: str, negative_prompt: Optional[str], fps: int, vlen: int, steps: Optional[int], seed: Optional[int] ) -> str: params = { "prompt": prompt, "frames_per_second": str(fps), "video_length": str(vlen), } if negative_prompt: params["negative_prompt"] = negative_prompt if steps is not None: params["num_inference_steps"] = str(steps) if seed is None: seed = int(time.time()) params["seed"] = str(seed) files = {"image_bytes": (os.path.basename(image_bytes_path), open(image_bytes_path,"rb"), "application/octet-stream")} try: r = requests.post(ENDPOINT, params=params, files=files, headers={"accept":"application/json"}, timeout=600) finally: try: files["image_bytes"][1].close() except: pass if r.status_code != 200: raise RuntimeError(f"Backend {r.status_code}: {r.text[:500]}") ctype = r.headers.get("Content-Type","") if ctype.startswith("video/"): # raw video bytes return save_video_bytes(r.content, ctype) # expect JSON with { "video_url": ... } try: payload = r.json() url = payload.get("video_url") if not url: raise ValueError("no video_url in response") r2 = requests.get(url, stream=True, timeout=600) if r2.status_code != 200: raise RuntimeError(f"fetch video {r2.status_code}") path = os.path.join(OUT, fname("gen","mp4")) with open(path,"wb") as f: for chunk in r2.iter_content(1<<20): if chunk: f.write(chunk) return path except Exception: # fallback if backend mislabels content return save_video_bytes(r.content, ctype or "video/mp4") # ================= UI ================= with gr.Blocks() as demo: gr.Markdown("## Continuous Video — chain last frame → next first frame") # state used_videos = gr.State([]) # list[str] last_seed_img = gr.State(None) # PNG path (becomes image_bytes) current_video = gr.State(None) # latest generated video with gr.Row(): prompt = gr.Textbox(label="Prompt", placeholder="Describe your shot…", lines=2) with gr.Row(): start_file = gr.File( label="Initial start image or video (only for the very first clip)", file_types=["image","video"] ) with gr.Row(): negative = gr.Textbox(label="Negative prompt (optional)", placeholder="What to avoid…") with gr.Row(): fps = gr.Slider(1,60, value=24, step=1, label="Frames per second") vlen = gr.Slider(1,12, value=4, step=1, label="Video length (seconds)") steps = gr.Slider(1,100, value=30, step=1, label="Num inference steps (optional)") seed = gr.Number(label="Seed (optional)", precision=0) video_out = gr.Video(label="Output") status_md = gr.Markdown("") # shows ✅ Saved… after Chain with gr.Row(): gen_btn = gr.Button("Generate", variant="primary") chain_btn= gr.Button("Chain (save & clear)") dl_btn = gr.Button("Download (concatenate & ZIP)") files_out = gr.Files(label="Downloads") # ---- handlers ---- def on_generate(prompt_txt, start_file_obj, neg, fps_val, vlen_val, steps_val, seed_val, seed_img): if not prompt_txt or not prompt_txt.strip(): raise gr.Error("Prompt is required.") # choose image_bytes source: last stolen frame > uploaded start file image_path = seed_img if not image_path: if not start_file_obj: raise gr.Error("First generation requires an initial image OR video.") image_path = start_file_obj.name try: out = call_backend( prompt=prompt_txt.strip(), image_bytes_path=image_path, negative_prompt=(neg.strip() if (neg and neg.strip()) else None), fps=int(fps_val), vlen=int(vlen_val), steps=int(steps_val) if steps_val else None, seed=int(seed_val) if (seed_val not in [None,""]) else None ) except Exception as e: raise gr.Error(str(e)) return out, out, gr.update(value="") # show video, set state, clear status gen_btn.click( on_generate, inputs=[prompt, start_file, negative, fps, vlen, steps, seed, last_seed_img], outputs=[video_out, current_video, status_md] ) def on_chain(curr, used): """ Save current video, steal last frame for next seed, and CLEAR: preview, prompt, negative, start_file. Also show a visible 'saved' banner. """ if not curr or not os.path.exists(curr): raise gr.Error("Generate a video first.") new_used = used[:] if used else [] if curr not in new_used: new_used.append(curr) # last frame for next seed seed_img = extract_last_frame(curr) saved_idx = len(new_used) saved_msg = f"✅ **Saved clip #{saved_idx}** — last frame captured. Ready for your next prompt." return ( new_used, # used_videos seed_img, # last_seed_img (image_bytes for next gen) gr.update(value=None), # CLEAR prompt gr.update(value=None), # CLEAR negative prompt gr.update(value=None), # CLEAR start_file gr.update(value=None), # CLEAR video preview gr.update(value=saved_msg) # status banner ) chain_btn.click( on_chain, inputs=[current_video, used_videos], outputs=[used_videos, last_seed_img, prompt, negative, start_file, video_out, status_md] ) def on_download(used): if not used: raise gr.Error("Nothing to download. Click Chain after generating clips.") files = [] try: merged = concat_videos(used); files.append(merged) except Exception as e: print("concat error:", e) try: zipped = zip_used(used); files.append(zipped) except Exception as e: print("zip error:", e) return files dl_btn.click(on_download, inputs=[used_videos], outputs=[files_out]) if __name__ == "__main__": demo.launch()