import os import shutil import tempfile from datetime import datetime from typing import List, Tuple import gradio as gr import cv2 from PIL import Image import numpy as np SUPPORTED_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".mpeg", ".mpg", ".wmv", ".flv"} MAX_FILE_SIZE_MB = 500 # 最大ファイルサイズ(MB) def _safe_filename(path: str) -> str: base = os.path.basename(path) stem, _ = os.path.splitext(base) stem = "".join(c for c in stem if c.isalnum() or c in ("-", "_"))[:60] return stem or "video" def check_file_size(file_path: str) -> bool: """ファイルサイズをチェックする""" if not os.path.exists(file_path): return False file_size_mb = os.path.getsize(file_path) / (1024 * 1024) return file_size_mb <= MAX_FILE_SIZE_MB def extract_frames(files: List[str]) -> Tuple[List[str], List[str], List[Tuple[str, Image.Image]], str]: if not files: return [], [], [], "⚠️ ファイルがアップロードされていません。" workdir = tempfile.mkdtemp(prefix="frames_") out_images, out_zips, gallery_items = [], [], [] status_messages = [] for path in files: if not path or not os.path.exists(path): status_messages.append(f"⚠️ ファイルが見つかりません: {os.path.basename(path)}") continue # ファイルサイズチェック if not check_file_size(path): file_size_mb = os.path.getsize(path) / (1024 * 1024) status_messages.append(f"❌ ファイルサイズが大きすぎます ({file_size_mb:.1f}MB > {MAX_FILE_SIZE_MB}MB): {os.path.basename(path)}") continue _, ext = os.path.splitext(path) if ext.lower() not in SUPPORTED_EXTS: status_messages.append(f"⚠️ サポートされていない形式: {os.path.basename(path)}") continue safe = _safe_filename(path) vid_dir = os.path.join(workdir, safe) os.makedirs(vid_dir, exist_ok=True) try: # OpenCVでビデオを開く cap = cv2.VideoCapture(path) if not cap.isOpened(): status_messages.append(f"❌ ビデオファイルを開けませんでした: {safe}") continue # ビデオ情報を取得 fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"Processing {safe}: {total_frames} frames at {fps:.2f} fps") # 最初のフレームを取得 ret, first_frame = cap.read() if not ret: status_messages.append(f"❌ 最初のフレームを読み込めませんでした: {safe}") cap.release() continue # BGRからRGBに変換 first_frame_rgb = cv2.cvtColor(first_frame, cv2.COLOR_BGR2RGB) img_first = Image.fromarray(first_frame_rgb) # 最後のフレームを取得 img_last = img_first # デフォルト値 if total_frames > 1: # 複数の試行で最後のフレームを取得 for offset in range(min(10, total_frames)): frame_pos = max(0, total_frames - 1 - offset) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_pos) ret, last_frame = cap.read() if ret: last_frame_rgb = cv2.cvtColor(last_frame, cv2.COLOR_BGR2RGB) img_last = Image.fromarray(last_frame_rgb) print(f"Got last frame at position {frame_pos}") break else: print(f"Warning: Could not read last frame, using first frame instead") cap.release() # 画像を保存 ts = datetime.now().strftime("%Y%m%d_%H%M%S") first_path = os.path.join(vid_dir, f"{safe}_first_{ts}.png") last_path = os.path.join(vid_dir, f"{safe}_last_{ts}.png") # PNG品質を調整してファイルサイズを抑制 img_first.save(first_path, "PNG", optimize=True) img_last.save(last_path, "PNG", optimize=True) out_images.extend([first_path, last_path]) gallery_items.extend([ (first_path, f"{safe} — FIRST FRAME"), (last_path, f"{safe} — LAST FRAME") ]) # ZIPファイルを作成 zip_path = shutil.make_archive( os.path.join(workdir, f"{safe}_frames_{ts}"), 'zip', vid_dir ) out_zips.append(zip_path) status_messages.append(f"✅ 処理成功: {safe}") except Exception as e: status_messages.append(f"❌ エラーが発生しました ({safe}): {str(e)}") print(f"Error processing {path}: {str(e)}") import traceback traceback.print_exc() continue # ステータスメッセージをまとめる if gallery_items: final_status = f"✅ {len(gallery_items)//2}個のビデオから正常にフレームを抽出しました。\n" else: final_status = "⚠️ フレームを抽出できませんでした。\n" if status_messages: final_status += "\n".join(status_messages) return out_images, out_zips, gallery_items, final_status # Gradioインターフェース with gr.Blocks( title="First & Last Frame Extractor", theme=gr.themes.Soft(), analytics_enabled=False, # アナリティクスを無効化 css=""" .gradio-container { max-width: 1200px !important; margin: auto !important; } """ ) as demo: gr.Markdown(f""" # 🎞️ First & Last Frame Extractor Upload one or more videos to extract the **first** and **last** frame. **Supported formats:** MP4, MOV, AVI, MKV, WebM, M4V, MPEG, MPG, WMV, FLV **Maximum file size:** {MAX_FILE_SIZE_MB}MB per file """) with gr.Row(): with gr.Column(): video_files = gr.Files( label="Upload videos", file_count="multiple", file_types=["video"], type="filepath" ) run_btn = gr.Button("🎬 Extract Frames", variant="primary", size="lg") with gr.Row(): with gr.Tab("📸 Preview"): gallery = gr.Gallery( label="Extracted Frames", columns=2, rows=2, height=500, object_fit="contain", show_label=True ) with gr.Tab("💾 Downloads"): with gr.Column(): gr.Markdown("### Individual Images") images_out = gr.Files(label="PNG files", file_count="multiple") gr.Markdown("### ZIP Archives") zips_out = gr.Files(label="ZIP files (contains both frames per video)", file_count="multiple") # ステータスメッセージ status_msg = gr.Markdown("") def process_videos(files): if not files: return [], [], [], "⚠️ 少なくとも1つのビデオファイルをアップロードしてください。" try: imgs, zips, gallery_items, status = extract_frames(files) return gallery_items, imgs, zips, status except Exception as e: return [], [], [], f"❌ エラー: {str(e)}" run_btn.click( fn=process_videos, inputs=[video_files], outputs=[gallery, images_out, zips_out, status_msg] ) # サンプル説明 gr.Markdown(f""" --- ### 📝 使用方法: 1. ファイルアップローダーを使用して1つ以上のビデオファイルをアップロード 2. "Extract Frames"ボタンをクリック 3. Previewタブで抽出されたフレームを確認 4. Downloadsタブから個別画像またはZIPアーカイブをダウンロード ### ⚡ 機能: - 各ビデオの最初と最後のフレームを抽出 - 複数のビデオ形式をサポート - 複数ビデオの一括処理 - 個別PNGまたはZIPアーカイブとしてダウンロード - ファイルサイズ制限: {MAX_FILE_SIZE_MB}MB ### ⚠️ 注意事項: - 大きなファイルはアップロード前にサイズを確認してください - ネットワーク環境によってはアップロードに時間がかかる場合があります """) if __name__ == "__main__": # Pydanticバージョン問題の回避策 try: import pydantic print(f"Pydantic version: {pydantic.__version__}") # Pydantic 2.11以降の場合の警告と対処 from packaging import version if version.parse(pydantic.__version__) >= version.parse("2.11.0"): print("⚠️ Warning: Pydantic 2.11+ detected. Downgrading to compatible version...") print("Please run: pip install 'pydantic>=2.5.0,<2.11.0'") print(f"Gradio version: {gr.__version__}") except ImportError as e: print(f"Import error: {e}") # より安定した設定でlaunch demo.queue( max_size=20, # キューサイズを制限 default_concurrency_limit=3 # 並行処理数をより保守的に設定 ).launch( server_name="0.0.0.0", server_port=7860, share=False, max_file_size="500mb", # Gradioレベルでのファイルサイズ制限 max_threads=8, # スレッド数を調整 show_api=False, # API情報の表示を無効化(schema問題回避) debug=False # デバッグモードを無効化 )