Audio_Splitter / app.py
maliahson's picture
Create app.py
c2dbc7b verified
raw
history blame
3.38 kB
import gradio as gr
import math
import time
import numpy as np
from pydub import AudioSegment
import io
import zipfile
import os
def numpy_to_mp3(audio_array, sampling_rate):
# Normalize audio_array if it's floating-point
if np.issubdtype(audio_array.dtype, np.floating):
max_val = np.max(np.abs(audio_array))
audio_array = (audio_array / max_val) * 32767 # Normalize to 16-bit range
audio_array = audio_array.astype(np.int16)
# Create an audio segment from the numpy array
audio_segment = AudioSegment(
audio_array.tobytes(),
frame_rate=sampling_rate,
sample_width=audio_array.dtype.itemsize,
channels=1
)
# Export the audio segment to MP3 bytes - use a high bitrate to maximize quality
mp3_io = io.BytesIO()
audio_segment.export(mp3_io, format="mp3", bitrate="320k")
# Get the MP3 bytes
mp3_bytes = mp3_io.getvalue()
mp3_io.close()
return mp3_bytes
def stream(audio, chunk_length_s):
start_time = time.time()
sampling_rate, array = audio
# Ensure the chunk length does not exceed 30 seconds
chunk_length_s = min(chunk_length_s, 30) # Limit chunk length to 30 seconds
chunk_length = int(chunk_length_s * sampling_rate)
time_length = chunk_length_s / 2 # always stream outputs faster than it takes to process
audio_length = len(array)
num_batches = math.ceil(audio_length / chunk_length)
# Create a temporary directory to save the MP3 chunks
temp_dir = "temp_chunks"
os.makedirs(temp_dir, exist_ok=True)
# List to keep track of all MP3 filenames
mp3_files = []
for idx in range(num_batches):
time.sleep(time_length)
start_pos = idx * chunk_length
end_pos = min((idx + 1) * chunk_length, audio_length)
chunk = array[start_pos : end_pos]
chunk_mp3 = numpy_to_mp3(chunk, sampling_rate=sampling_rate)
# Save the MP3 file to the temp directory
mp3_filename = f"{temp_dir}/chunk_{idx + 1}.mp3"
with open(mp3_filename, "wb") as f:
f.write(chunk_mp3)
mp3_files.append(mp3_filename)
if idx == 0:
first_time = round(time.time() - start_time, 2)
run_time = round(time.time() - start_time, 2)
# Create a zip file containing all the MP3 chunks
zip_filename = "audio_chunks.zip"
with zipfile.ZipFile(zip_filename, "w") as zipf:
for mp3_file in mp3_files:
zipf.write(mp3_file, os.path.basename(mp3_file))
# Clean up the temporary directory
for mp3_file in mp3_files:
os.remove(mp3_file)
os.rmdir(temp_dir)
return zip_filename, first_time, run_time
with gr.Blocks() as demo:
with gr.Row():
with gr.Column():
audio_in = gr.Audio(value="librispeech.wav", sources=["upload"], type="numpy")
chunk_length = gr.Slider(minimum=2, maximum=30, value=2, step=2, label="Chunk length (s)")
run_button = gr.Button("Stream audio")
with gr.Column():
zip_out = gr.File(label="Download Zip of Chunks")
first_time = gr.Textbox(label="Time to first chunk (s)")
run_time = gr.Textbox(label="Time to current chunk (s)")
run_button.click(fn=stream, inputs=[audio_in, chunk_length], outputs=[zip_out, first_time, run_time])
demo.launch()