import gradio as gr import json from difflib import Differ import ffmpeg import os from pathlib import Path import time import aiohttp import asyncio # Set true if you're using huggingface inference API API_BACKEND = True # 변경된 모델 식별자 MODEL = "openai/whisper-small" if API_BACKEND: from dotenv import load_dotenv import base64 load_dotenv(Path(".env")) HF_TOKEN = os.environ["HF_TOKEN"] headers = {"Authorization": f"Bearer {HF_TOKEN}"} API_URL = f'https://api-inference.huggingface.co/models/{MODEL}' videos_out_path = Path("./videos_out") videos_out_path.mkdir(parents=True, exist_ok=True) samples_data = sorted(Path('examples').glob('*.json')) SAMPLES = [] for file in samples_data: with open(file) as f: sample = json.load(f) SAMPLES.append(sample) VIDEOS = list(map(lambda x: [x['video']], SAMPLES)) total_inferences_since_reboot = 415 total_cuts_since_reboot = 1539 async def speech_to_text(video_file_path): global total_inferences_since_reboot if video_file_path is None: raise ValueError("Error no video input") video_path = Path(video_file_path) try: audio_memory, _ = ffmpeg.input(video_path).output('-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True) except Exception as e: raise RuntimeError("Error converting video to audio") ping("speech_to_text") if API_BACKEND: for i in range(10): for tries in range(4): print(f'Transcribing from API attempt {tries}') try: inference_response = await query_api(audio_memory) print(inference_response) transcription = inference_response["text"].lower() timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]] for chunk in inference_response['chunks']] total_inferences_since_reboot += 1 print("\n\ntotal_inferences_since_reboot: ", total_inferences_since_reboot, "\n\n") return (transcription, transcription, timestamps) except Exception as e: print(e) if 'error' in inference_response and 'estimated_time' in inference_response: wait_time = inference_response['estimated_time'] print("Waiting for model to load....", wait_time) await asyncio.sleep(wait_time + 5.0) elif 'error' in inference_response: raise RuntimeError("Error Fetching API", inference_response['error']) else: break else: raise RuntimeError("Error Fetching API") else: # Local model handling would go here, but is not applicable for Whisper model without Hugging Face pipeline support pass async def cut_timestamps_to_video(video_in, transcription, text_in, timestamps): global total_cuts_since_reboot video_path = Path(video_in) video_file_name = video_path.stem if video_in is None or text_in is None or transcription is None: raise ValueError("Inputs undefined") d = Differ() diff_chars = d.compare(transcription, text_in) filtered = list(filter(lambda x: x[0] != '+', diff_chars)) idx = 0 grouped = {} for (a, b) in zip(filtered, timestamps): if a[0] != '-': if idx in grouped: grouped[idx].append(b) else: grouped[idx] = [] grouped[idx].append(b) else: idx += 1 timestamps_to_cut = [[v[0][1], v[-1][2]] for v in grouped.values()] between_str = '+'.join(map(lambda t: f'between(t,{t[0]},{t[1]})', timestamps_to_cut)) if timestamps_to_cut: video_file = ffmpeg.input(video_in) video = video_file.video.filter("select", f'({between_str})').filter("setpts", "N/FRAME_RATE/TB") audio = video_file.audio.filter("aselect", f'({between_str})').filter("asetpts", "N/SR/TB") output_video = f'./videos_out/{video_file_name}.mp4' ffmpeg.concat(video, audio, v=1, a=1).output(output_video).overwrite_output().global_args('-loglevel', 'quiet').run() else: output_video = video_in tokens = [(token[2:], token[0] if token[0] != " " else None) for token in filtered] total_cuts_since_reboot += 1 ping("video_cuts") print("\n\ntotal_cuts_since_reboot: ", total_cuts_since_reboot, "\n\n") return (tokens, output_video) async def query_api(audio_bytes: bytes): payload = json.dumps({ "inputs": base64.b64encode(audio_bytes).decode("utf-8"), "parameters": { "return_timestamps": "char", "chunk_length_s": 10, "stride_length_s": [4, 2] }, "options": {"use_gpu": False} }).encode("utf-8") async with aiohttp.ClientSession() as session: async with session.post(API_URL, headers=headers, data=payload) as response: print("API Response: ", response.status) if response.headers['Content-Type'] == 'application/json': return await response.json() elif response.headers['Content-Type'] == 'application/octet-stream': return await response.read() elif response.headers['Content-Type'] == 'text/plain': return await response.text() else: raise RuntimeError("Error Fetching API") def ping(name): url = f'https://huggingface.co/api/telemetry/spaces/radames/edit-video-by-editing-text/{name}' print("ping: ", url) async def req(): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print("pong: ", response.status) asyncio.create_task(req()) # Gradio Layout and rest of the code remains the same as before. # ---- Gradio Layout ----- video_in = gr.Video(label="Video file", elem_id="video-container") text_in = gr.Textbox(label="Transcription", lines=10, interactive=True) video_out = gr.Video(label="Video Out") diff_out = gr.HighlightedText(label="Cuts Diffs", combine_adjacent=True) examples = gr.Dataset(components=[video_in], samples=VIDEOS, type="index") css = """ #cut_btn, #reset_btn { align-self:stretch; } #\\31 3 { max-width: 540px; } .output-markdown {max-width: 65ch !important;} #video-container{ max-width: 40rem; } """ with gr.Blocks(css=css) as demo: transcription_var = gr.State() timestamps_var = gr.State() with gr.Row(): with gr.Column(): gr.Markdown(""" # Edit Video By Editing Text This project is a quick proof of concept of a simple video editor where the edits are made by editing the audio transcription. Using the [Huggingface Automatic Speech Recognition Pipeline](https://huggingface.co/tasks/automatic-speech-recognition) with a fine tuned [Wav2Vec2 model using Connectionist Temporal Classification (CTC)](https://huggingface.co/facebook/wav2vec2-large-960h-lv60-self) you can predict not only the text transcription but also the [character or word base timestamps](https://huggingface.co/docs/transformers/v4.19.2/en/main_classes/pipelines#transformers.AutomaticSpeechRecognitionPipeline.__call__.return_timestamps) """) with gr.Row(): examples.render() def load_example(id): video = SAMPLES[id]['video'] transcription = SAMPLES[id]['transcription'].lower() timestamps = SAMPLES[id]['timestamps'] return (video, transcription, transcription, timestamps) examples.click( load_example, inputs=[examples], outputs=[video_in, text_in, transcription_var, timestamps_var], queue=False) with gr.Row(): with gr.Column(): video_in.render() transcribe_btn = gr.Button("Transcribe Audio") transcribe_btn.click(speech_to_text, [video_in], [ text_in, transcription_var, timestamps_var]) with gr.Row(): gr.Markdown(""" ### Now edit as text After running the video transcription, you can make cuts to the text below (only cuts, not additions!)""") with gr.Row(): with gr.Column(): text_in.render() with gr.Row(): cut_btn = gr.Button("Cut to video", elem_id="cut_btn") # send audio path and hidden variables cut_btn.click(cut_timestamps_to_video, [ video_in, transcription_var, text_in, timestamps_var], [diff_out, video_out]) reset_transcription = gr.Button( "Reset to last trascription", elem_id="reset_btn") reset_transcription.click( lambda x: x, transcription_var, text_in) with gr.Column(): video_out.render() diff_out.render() with gr.Row(): gr.Markdown(""" #### Video Credits 1. [Cooking](https://vimeo.com/573792389) 1. [Shia LaBeouf "Just Do It"](https://www.youtube.com/watch?v=n2lTxIk_Dr0) 1. [Mark Zuckerberg & Yuval Noah Harari in Conversation](https://www.youtube.com/watch?v=Boj9eD0Wug8) """) demo.queue() if __name__ == "__main__": demo.launch(debug=True)