|
|
|
|
|
|
|
|
|
import spaces |
|
import gradio as gr |
|
import edge_tts |
|
import asyncio |
|
import tempfile |
|
import os |
|
import re |
|
from pathlib import Path |
|
from pydub.silence import detect_nonsilent |
|
from pydub import AudioSegment |
|
|
|
default_voice_short= "" |
|
check1 = False |
|
|
|
def strip_silence(audio: AudioSegment, silence_thresh=-40, min_silence_len=100, silence_padding_ms=100): |
|
from pydub.silence import detect_nonsilent |
|
|
|
nonsilent = detect_nonsilent(audio, min_silence_len=min_silence_len, silence_thresh=silence_thresh) |
|
|
|
if not nonsilent: |
|
return AudioSegment.silent(duration=silence_padding_ms) |
|
|
|
start_trim = nonsilent[0][0] |
|
end_trim = nonsilent[-1][1] |
|
|
|
|
|
|
|
|
|
start_trim = max(0, start_trim - silence_padding_ms) |
|
end_trim = min(len(audio), end_trim + silence_padding_ms) |
|
|
|
|
|
print(f"Check1: {check1}**") |
|
return audio[start_trim:end_trim] |
|
|
|
|
|
def get_silence(duration_ms=1000): |
|
|
|
silent_audio = AudioSegment.silent( |
|
duration=duration_ms, |
|
frame_rate=24000 |
|
) |
|
|
|
|
|
silent_audio = silent_audio.set_channels(1) |
|
silent_audio = silent_audio.set_sample_width(4) |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: |
|
|
|
silent_audio.export( |
|
tmp_file.name, |
|
format="mp3", |
|
bitrate="48k", |
|
parameters=[ |
|
"-ac", "1", |
|
"-ar", "24000", |
|
"-sample_fmt", "s32", |
|
"-codec:a", "libmp3lame" |
|
] |
|
) |
|
return tmp_file.name |
|
|
|
|
|
async def get_voices(): |
|
voices = await edge_tts.list_voices() |
|
return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices} |
|
|
|
async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch): |
|
global default_voice_short |
|
global check1 |
|
"""Generates audio for a text segment, handling voice prefixes, retries, and fallback.""" |
|
print(f"Text: {text_segment}") |
|
voice_map = { |
|
"1F": ("en-GB-SoniaNeural", 25, 0), |
|
"2F": ("en-US-JennyNeural", 0, 0), |
|
"3F": ("en-HK-YanNeural", 0, 0), |
|
"4F": ("en-US-EmmaNeural", 0, 0), |
|
"1M": ("en-AU-WilliamNeural", 0, 0), |
|
"2M": ("en-GB-RyanNeural", 0, 0), |
|
"3M": ("en-US-BrianMultilingualNeural", 0, 0), |
|
"4M": ("en-GB-ThomasNeural", 0, 0), |
|
"1O": ("en-GB-RyanNeural", -20, -10), |
|
"1C": ("en-GB-MaisieNeural", 0, 0), |
|
"1V": ("vi-VN-HoaiMyNeural", 0, 0), |
|
"2V": ("vi-VN-NamMinhNeural", 0, 0), |
|
"3V": ("en-US-EmmaMultilingualNeural", 0, 0), |
|
"4V": ("en-US-BrianMultilingualNeural", 0, 0), |
|
"5V": ("en-US-AvaMultilingualNeural", 0, 0), |
|
"6V": ("en-US-AndrewMultilingualNeural", 0, 0), |
|
"7V": ("de-DE-SeraphinaMultilingualNeural", 0, 0), |
|
"8V": ("ko-KR-HyunsuMultilingualNeural", 0, 0), |
|
} |
|
if default_voice_short == "": |
|
current_voice_full = default_voice |
|
current_voice_short = current_voice_full.split(" - ")[0] if current_voice_full else "" |
|
else: |
|
current_voice_short = default_voice_short |
|
current_rate = rate |
|
current_pitch = pitch |
|
processed_text = text_segment.strip() |
|
|
|
detect = False |
|
|
|
prefix = processed_text[:2] |
|
if prefix in voice_map: |
|
current_voice_short, pitch_adj, rate_adj = voice_map[prefix] |
|
current_pitch += pitch_adj |
|
current_rate += rate_adj |
|
detect = True |
|
|
|
match = re.search(r'[A-Za-z]+\-?\d+', processed_text) |
|
if match: |
|
group = match.group() |
|
prefix_only = ''.join(filter(str.isalpha, group)) |
|
number = int(''.join(ch for ch in group if ch.isdigit() or ch == '-')) |
|
if number == 0: |
|
default_voice_short= current_voice_short |
|
current_pitch += number |
|
processed_text = re.sub(r'[A-Za-z]+\-?\d+', '', processed_text, count=1).strip() |
|
processed_text = processed_text[len(prefix_only):].strip() |
|
elif detect: |
|
processed_text = processed_text[2:].strip() |
|
|
|
if processed_text: |
|
rate_str = f"{current_rate:+d}%" |
|
pitch_str = f"{current_pitch:+d}Hz" |
|
|
|
|
|
for attempt in range(3): |
|
try: |
|
communicate = edge_tts.Communicate(processed_text, current_voice_short, rate=rate_str, pitch=pitch_str) |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: |
|
audio_path = tmp_file.name |
|
await communicate.save(audio_path) |
|
|
|
audio = AudioSegment.from_mp3(audio_path) |
|
if not check1: |
|
print(f"not last part of sentence - SHORT silence") |
|
audio = strip_silence(audio, silence_thresh=-40, min_silence_len=50, silence_padding_ms=10) |
|
else: |
|
audio = strip_silence(audio, silence_thresh=-40, min_silence_len=100, silence_padding_ms=200) |
|
print(f"Last part of sentence - long silence") |
|
stripped_path = tempfile.mktemp(suffix=".mp3") |
|
audio.export(stripped_path, format="mp3") |
|
return stripped_path |
|
except Exception as e: |
|
print(f"Edge TTS Failed# {attempt}:: {e}") |
|
if attempt == 2: |
|
|
|
silent_audio = AudioSegment.silent(duration=500) |
|
fallback_path = tempfile.mktemp(suffix=".mp3") |
|
silent_audio.export(fallback_path, format="mp3") |
|
return fallback_path |
|
await asyncio.sleep(0.5) |
|
|
|
return None |
|
|
|
async def process_transcript_line(line, default_voice, rate, pitch): |
|
"""Processes a single transcript line with HH:MM:SS.milliseconds timestamp and quoted text segments.""" |
|
match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+(.*)', line) |
|
if match: |
|
count = 0 |
|
hours, minutes, seconds, milliseconds, text_parts = match.groups() |
|
start_time_ms = ( |
|
int(hours) * 3600000 + |
|
int(minutes) * 60000 + |
|
int(seconds) * 1000 + |
|
int(milliseconds) |
|
) |
|
audio_segments = [] |
|
split_parts = re.split(r'(")', text_parts) |
|
|
|
global check1 |
|
check1 = False |
|
process_next = False |
|
for part in split_parts: |
|
if part == '"': |
|
process_next = not process_next |
|
check1 = False |
|
continue |
|
if process_next and part.strip(): |
|
audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch) |
|
if audio_path: |
|
audio_segments.append(audio_path) |
|
elif not process_next and part.strip(): |
|
if part == split_parts[-1]: |
|
check1 = True |
|
audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch) |
|
if audio_path: |
|
audio_segments.append(audio_path) |
|
|
|
return start_time_ms, audio_segments |
|
return None, None |
|
|
|
async def transcript_to_speech(transcript_text, voice, rate, pitch): |
|
if not transcript_text.strip(): |
|
return None, gr.Warning("Please enter transcript text.") |
|
if not voice: |
|
return None, gr.Warning("Please select a voice.") |
|
|
|
lines = transcript_text.strip().split('\n') |
|
timed_audio_segments = [] |
|
max_end_time_ms = 0 |
|
previous_end_time_ms = 0 |
|
i = 0 |
|
|
|
while i < len(lines): |
|
start_time, audio_paths = await process_transcript_line(lines[i], voice, rate, pitch) |
|
if start_time is not None and audio_paths: |
|
combined_line_audio = AudioSegment.empty() |
|
for path in audio_paths: |
|
try: |
|
audio = AudioSegment.from_mp3(path) |
|
|
|
combined_line_audio += audio |
|
|
|
os.remove(path) |
|
except FileNotFoundError: |
|
print(f"Warning: Audio file not found: {path}") |
|
|
|
current_audio_duration = len(combined_line_audio) |
|
intended_start_time = start_time |
|
|
|
|
|
if i + 1 < len(lines): |
|
next_start_time_line = lines[i+1] |
|
next_start_time_match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+.*', next_start_time_line) |
|
if next_start_time_match: |
|
next_h, next_m, next_s, next_ms = next_start_time_match.groups() |
|
next_start_time_ms = (int(next_h) * 3600000 + int(next_m) * 60000 + int(next_s) * 1000 + int(next_ms)) |
|
duration_to_next = next_start_time_ms - start_time |
|
else: |
|
duration_to_next = float('inf') |
|
|
|
if current_audio_duration > duration_to_next: |
|
|
|
j = i + 1 |
|
while j < len(lines): |
|
next_start_time, next_audio_paths = await process_transcript_line(lines[j], voice, rate, pitch) |
|
if next_start_time is not None and next_audio_paths: |
|
for next_path in next_audio_paths: |
|
try: |
|
next_audio = AudioSegment.from_mp3(next_path) |
|
combined_line_audio += next_audio |
|
os.remove(next_path) |
|
except FileNotFoundError: |
|
print(f"Warning: Audio file not found: {next_path}") |
|
current_audio_duration = len(combined_line_audio) |
|
|
|
|
|
if j + 1 < len(lines): |
|
next_start_time_line_2 = lines[j+1] |
|
next_start_time_match_2 = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+.*', next_start_time_line_2) |
|
if next_start_time_match_2: |
|
next_h_2, next_m_2, next_s_2, next_ms_2 = next_start_time_match_2.groups() |
|
next_start_time_ms_2 = (int(next_h_2) * 3600000 + int(next_m_2) * 60000 + int(next_s_2) * 1000 + int(next_ms_2)) |
|
duration_to_next_2 = next_start_time_ms_2 - start_time |
|
if current_audio_duration <= duration_to_next_2: |
|
break |
|
else: |
|
break |
|
j += 1 |
|
else: |
|
break |
|
i = j |
|
|
|
timed_audio_segments.append({'start': intended_start_time, 'audio': combined_line_audio}) |
|
previous_end_time_ms = max(previous_end_time_ms, intended_start_time + current_audio_duration) |
|
max_end_time_ms = max(max_end_time_ms, previous_end_time_ms) |
|
elif audio_paths: |
|
for path in audio_paths: |
|
try: |
|
os.remove(path) |
|
except FileNotFoundError: |
|
pass |
|
i += 1 |
|
|
|
if not timed_audio_segments: |
|
return None, "No processable audio segments found." |
|
|
|
print(f"Combining Audio - final stage.") |
|
final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000) |
|
for segment in timed_audio_segments: |
|
final_audio = final_audio.overlay(segment['audio'], position=segment['start']) |
|
|
|
combined_audio_path = tempfile.mktemp(suffix=".mp3") |
|
final_audio.export(combined_audio_path, format="mp3") |
|
global default_voice_short |
|
default_voice_short="" |
|
print(f"Job done! reset voice back to default.") |
|
return combined_audio_path, None |
|
|
|
@spaces.GPU |
|
def tts_interface(transcript, voice, rate, pitch): |
|
audio, warning = asyncio.run(transcript_to_speech(transcript, voice, rate, pitch)) |
|
return audio, warning |
|
|
|
async def create_demo(): |
|
voices = await get_voices() |
|
default_voice = "en-US-AndrewMultilingualNeural - en-US (Male)" |
|
description = """ |
|
Process timestamped text (HH:MM:SS,milliseconds) with voice changes within quotes. |
|
Format: `HH:MM:SS,milliseconds "VoicePrefix Text" more text "1F Different Voice" |
|
Example: |
|
``` |
|
00:00:00,000 "This is the default voice." more default. "1F Now a female voice." and back to default. |
|
00:00:05,000 "1C Yes," said the child, "it is fun!" |
|
``` |
|
*************************************************************************************************** |
|
<b> 1F : en-GB-SoniaNeural |
|
2F : en-US-JennyNeural |
|
3F : en-HK-YanNeural |
|
4F : en-US-EmmaNeural |
|
1M : en-AU-WilliamNeural |
|
2M : en-GB-RyanNeural |
|
3M : en-US-BrianMultilingualNeural |
|
4M : en-GB-ThomasNeural |
|
1O : en-GB-RyanNeural" |
|
1C : en-GB-MaisieNeural |
|
1V : vi-VN-HoaiMyNeural |
|
2V : vi-VN-NamMinhNeural |
|
3V : en-US-EmmaMultilingualNeural |
|
4V : en-US-BrianMultilingualNeural |
|
5V : en-US-AvaMultilingualNeural |
|
6V : en-US-AndrewMultilingualNeural |
|
7V : de-DE-SeraphinaMultilingualNeural |
|
8V : ko-KR-HyunsuMultilingualNeural </b> |
|
**************************************************************************************************** |
|
""" |
|
demo = gr.Interface( |
|
fn=tts_interface, |
|
inputs=[ |
|
gr.Textbox(label="Timestamped Text with Voice Changes", lines=10, placeholder='00:00:00,000 "Text" more text "1F Different Voice"'), |
|
gr.Dropdown(choices=[""] + list(voices.keys()), label="Select Default Voice", value=default_voice), |
|
gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Rate Adjustment (%)", step=1), |
|
gr.Slider(minimum=-50, maximum=50, value=0, label="Pitch Adjustment (Hz)", step=1) |
|
], |
|
outputs=[ |
|
gr.Audio(label="Generated Audio", type="filepath"), |
|
gr.Markdown(label="Warning", visible=False) |
|
], |
|
title="TTS with HH:MM:SS,milliseconds and In-Quote Voice Switching", |
|
description=description, |
|
analytics_enabled=False, |
|
allow_flagging=False |
|
) |
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = asyncio.run(create_demo()) |
|
demo.launch() |
|
|