Spaces:
Runtime error
Runtime error
import gradio as gr | |
import json | |
from difflib import Differ | |
import ffmpeg | |
import os | |
from pathlib import Path | |
import time | |
import aiohttp | |
import asyncio | |
# Set true if you're using huggingface inference API | |
API_BACKEND = True | |
# 변경된 모델 식별자 | |
MODEL = "openai/whisper-small" | |
if API_BACKEND: | |
from dotenv import load_dotenv | |
import base64 | |
load_dotenv(Path(".env")) | |
HF_TOKEN = os.environ["HF_TOKEN"] | |
headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
API_URL = f'https://api-inference.huggingface.co/models/{MODEL}' | |
videos_out_path = Path("./videos_out") | |
videos_out_path.mkdir(parents=True, exist_ok=True) | |
samples_data = sorted(Path('examples').glob('*.json')) | |
SAMPLES = [] | |
for file in samples_data: | |
with open(file) as f: | |
sample = json.load(f) | |
SAMPLES.append(sample) | |
VIDEOS = list(map(lambda x: [x['video']], SAMPLES)) | |
total_inferences_since_reboot = 415 | |
total_cuts_since_reboot = 1539 | |
async def speech_to_text(video_file_path): | |
global total_inferences_since_reboot | |
if video_file_path is None: | |
raise ValueError("Error no video input") | |
video_path = Path(video_file_path) | |
try: | |
audio_memory, _ = ffmpeg.input(video_path).output('-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True) | |
except Exception as e: | |
raise RuntimeError("Error converting video to audio") | |
ping("speech_to_text") | |
if API_BACKEND: | |
for i in range(10): | |
for tries in range(4): | |
print(f'Transcribing from API attempt {tries}') | |
try: | |
inference_response = await query_api(audio_memory) | |
print(inference_response) | |
transcription = inference_response["text"].lower() | |
timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]] for chunk in inference_response['chunks']] | |
total_inferences_since_reboot += 1 | |
print("\n\ntotal_inferences_since_reboot: ", total_inferences_since_reboot, "\n\n") | |
return (transcription, transcription, timestamps) | |
except Exception as e: | |
print(e) | |
if 'error' in inference_response and 'estimated_time' in inference_response: | |
wait_time = inference_response['estimated_time'] | |
print("Waiting for model to load....", wait_time) | |
await asyncio.sleep(wait_time + 5.0) | |
elif 'error' in inference_response: | |
raise RuntimeError("Error Fetching API", inference_response['error']) | |
else: | |
break | |
else: | |
raise RuntimeError("Error Fetching API") | |
else: | |
# Local model handling would go here, but is not applicable for Whisper model without Hugging Face pipeline support | |
pass | |
async def cut_timestamps_to_video(video_in, transcription, text_in, timestamps): | |
global total_cuts_since_reboot | |
video_path = Path(video_in) | |
video_file_name = video_path.stem | |
if video_in is None or text_in is None or transcription is None: | |
raise ValueError("Inputs undefined") | |
d = Differ() | |
diff_chars = d.compare(transcription, text_in) | |
filtered = list(filter(lambda x: x[0] != '+', diff_chars)) | |
idx = 0 | |
grouped = {} | |
for (a, b) in zip(filtered, timestamps): | |
if a[0] != '-': | |
if idx in grouped: | |
grouped[idx].append(b) | |
else: | |
grouped[idx] = [] | |
grouped[idx].append(b) | |
else: | |
idx += 1 | |
timestamps_to_cut = [[v[0][1], v[-1][2]] for v in grouped.values()] | |
between_str = '+'.join(map(lambda t: f'between(t,{t[0]},{t[1]})', timestamps_to_cut)) | |
if timestamps_to_cut: | |
video_file = ffmpeg.input(video_in) | |
video = video_file.video.filter("select", f'({between_str})').filter("setpts", "N/FRAME_RATE/TB") | |
audio = video_file.audio.filter("aselect", f'({between_str})').filter("asetpts", "N/SR/TB") | |
output_video = f'./videos_out/{video_file_name}.mp4' | |
ffmpeg.concat(video, audio, v=1, a=1).output(output_video).overwrite_output().global_args('-loglevel', 'quiet').run() | |
else: | |
output_video = video_in | |
tokens = [(token[2:], token[0] if token[0] != " " else None) for token in filtered] | |
total_cuts_since_reboot += 1 | |
ping("video_cuts") | |
print("\n\ntotal_cuts_since_reboot: ", total_cuts_since_reboot, "\n\n") | |
return (tokens, output_video) | |
async def query_api(audio_bytes: bytes): | |
payload = json.dumps({ | |
"inputs": base64.b64encode(audio_bytes).decode("utf-8"), | |
"parameters": { | |
"return_timestamps": "char", | |
"chunk_length_s": 10, | |
"stride_length_s": [4, 2] | |
}, | |
"options": {"use_gpu": False} | |
}).encode("utf-8") | |
async with aiohttp.ClientSession() as session: | |
async with session.post(API_URL, headers=headers, data=payload) as response: | |
print("API Response: ", response.status) | |
if response.headers['Content-Type'] == 'application/json': | |
return await response.json() | |
elif response.headers['Content-Type'] == 'application/octet-stream': | |
return await response.read() | |
elif response.headers['Content-Type'] == 'text/plain': | |
return await response.text() | |
else: | |
raise RuntimeError("Error Fetching API") | |
def ping(name): | |
url = f'https://huggingface.co/api/telemetry/spaces/radames/edit-video-by-editing-text/{name}' | |
print("ping: ", url) | |
async def req(): | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url) as response: | |
print("pong: ", response.status) | |
asyncio.create_task(req()) | |
# Gradio Layout and rest of the code remains the same as before. | |
# ---- Gradio Layout ----- | |
video_in = gr.Video(label="Video file", elem_id="video-container") | |
text_in = gr.Textbox(label="Transcription", lines=10, interactive=True) | |
video_out = gr.Video(label="Video Out") | |
diff_out = gr.HighlightedText(label="Cuts Diffs", combine_adjacent=True) | |
examples = gr.Dataset(components=[video_in], samples=VIDEOS, type="index") | |
css = """ | |
#cut_btn, #reset_btn { align-self:stretch; } | |
#\\31 3 { max-width: 540px; } | |
.output-markdown {max-width: 65ch !important;} | |
#video-container{ | |
max-width: 40rem; | |
} | |
""" | |
with gr.Blocks(css=css) as demo: | |
transcription_var = gr.State() | |
timestamps_var = gr.State() | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown(""" | |
# Edit Video By Editing Text | |
This project is a quick proof of concept of a simple video editor where the edits | |
are made by editing the audio transcription. | |
Using the [Huggingface Automatic Speech Recognition Pipeline](https://huggingface.co/tasks/automatic-speech-recognition) | |
with a fine tuned [Wav2Vec2 model using Connectionist Temporal Classification (CTC)](https://huggingface.co/facebook/wav2vec2-large-960h-lv60-self) | |
you can predict not only the text transcription but also the [character or word base timestamps](https://huggingface.co/docs/transformers/v4.19.2/en/main_classes/pipelines#transformers.AutomaticSpeechRecognitionPipeline.__call__.return_timestamps) | |
""") | |
with gr.Row(): | |
examples.render() | |
def load_example(id): | |
video = SAMPLES[id]['video'] | |
transcription = SAMPLES[id]['transcription'].lower() | |
timestamps = SAMPLES[id]['timestamps'] | |
return (video, transcription, transcription, timestamps) | |
examples.click( | |
load_example, | |
inputs=[examples], | |
outputs=[video_in, text_in, transcription_var, timestamps_var], | |
queue=False) | |
with gr.Row(): | |
with gr.Column(): | |
video_in.render() | |
transcribe_btn = gr.Button("Transcribe Audio") | |
transcribe_btn.click(speech_to_text, [video_in], [ | |
text_in, transcription_var, timestamps_var]) | |
with gr.Row(): | |
gr.Markdown(""" | |
### Now edit as text | |
After running the video transcription, you can make cuts to the text below (only cuts, not additions!)""") | |
with gr.Row(): | |
with gr.Column(): | |
text_in.render() | |
with gr.Row(): | |
cut_btn = gr.Button("Cut to video", elem_id="cut_btn") | |
# send audio path and hidden variables | |
cut_btn.click(cut_timestamps_to_video, [ | |
video_in, transcription_var, text_in, timestamps_var], [diff_out, video_out]) | |
reset_transcription = gr.Button( | |
"Reset to last trascription", elem_id="reset_btn") | |
reset_transcription.click( | |
lambda x: x, transcription_var, text_in) | |
with gr.Column(): | |
video_out.render() | |
diff_out.render() | |
with gr.Row(): | |
gr.Markdown(""" | |
#### Video Credits | |
1. [Cooking](https://vimeo.com/573792389) | |
1. [Shia LaBeouf "Just Do It"](https://www.youtube.com/watch?v=n2lTxIk_Dr0) | |
1. [Mark Zuckerberg & Yuval Noah Harari in Conversation](https://www.youtube.com/watch?v=Boj9eD0Wug8) | |
""") | |
demo.queue() | |
if __name__ == "__main__": | |
demo.launch(debug=True) | |