cnph001 commited on
Commit
298c01a
·
verified ·
1 Parent(s): 0a995d3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +108 -127
app.py CHANGED
@@ -11,32 +11,35 @@ import librosa
11
  import soundfile as sf
12
  import numpy as np
13
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  def get_silence(duration_ms=1000):
15
- # Create silent audio segment with specified parameters
16
- silent_audio = AudioSegment.silent(
17
  duration=duration_ms,
18
- frame_rate=24000 # 24kHz sampling rate
 
 
19
  )
20
- # Set audio parameters
21
- silent_audio = silent_audio.set_channels(1) # Mono
22
- silent_audio = silent_audio.set_sample_width(4) # 32-bit (4 bytes per sample)
23
- with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
24
- # Export with specific bitrate and codec parameters
25
- silent_audio.export(
26
- tmp_file.name,
27
- format="mp3",
28
- bitrate="48k",
29
- parameters=[
30
- "-ac", "1", # Mono
31
- "-ar", "24000", # Sample rate
32
- "-sample_fmt", "s32", # 32-bit samples
33
- "-codec:a", "libmp3lame" # MP3 codec
34
- ]
35
- )
36
- return tmp_file.name
37
 
38
- # Get all available voices
39
  async def get_voices():
 
40
  try:
41
  voices = await edge_tts.list_voices()
42
  return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices}
@@ -46,30 +49,12 @@ async def get_voices():
46
 
47
  async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch, target_duration_ms=None, speed_adjustment_factor=1.0):
48
  """Generates audio for a text segment, handling voice prefixes and adjusting rate for duration."""
49
- current_voice_full = default_voice
50
- current_voice_short = current_voice_full.split(" - ")[0] if current_voice_full else ""
51
  current_rate = rate
52
  current_pitch = pitch
53
- processed_text = text_segment.strip()
54
- print(f"Processing this text segment: {processed_text}") # Debug
55
- voice_map = {
56
- "1F": "en-GB-SoniaNeural",
57
- "2M": "en-GB-RyanNeural",
58
- "3M": "en-US-BrianMultilingualNeural",
59
- "2F": "en-US-JennyNeural",
60
- "1M": "en-AU-WilliamNeural",
61
- "3F": "en-HK-YanNeural",
62
- "4M": "en-GB-ThomasNeural",
63
- "4F": "en-US-EmmaNeural",
64
- "1O": "en-GB-RyanNeural", # Old Man
65
- "1C": "en-GB-MaisieNeural", # Child
66
- "1V": "vi-VN-HoaiMyNeural", # Vietnamese (Female)
67
- "2V": "vi-VN-NamMinhNeural", # Vietnamese (Male)
68
- "3V": "vi-VN-HoaiMyNeural", # Vietnamese (Female)
69
- "4V": "vi-VN-NamMinhNeural", # Vietnamese (Male)
70
- }
71
- detect = 0
72
- for prefix, voice_short in voice_map.items():
73
  if processed_text.startswith(prefix):
74
  current_voice_short = voice_short
75
  if prefix in ["1F", "3F", "1V", "3V"]:
@@ -77,20 +62,17 @@ async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pi
77
  elif prefix in ["1O", "4V"]:
78
  current_pitch = -20
79
  current_rate = -10
80
- detect = 1
81
  processed_text = processed_text[len(prefix):].strip()
82
  break
 
83
  match = re.search(r'([A-Za-z]+)-?(\d+)', processed_text)
84
- if match:
85
- prefix_pitch = match.group(1)
86
- number = int(match.group(2))
87
- if prefix_pitch in voice_map:
88
- current_pitch += number
89
- processed_text = re.sub(r'[A-Za-z]+-?\d+', '', processed_text, count=1).strip()
90
- elif detect:
91
- processed_text = processed_text.lstrip('-0123456789').strip() # Remove potential leftover numbers
92
- elif detect:
93
- processed_text = processed_text[2:].strip()
94
  if processed_text:
95
  rate_str = f"{current_rate:+d}%"
96
  pitch_str = f"{current_pitch:+d}Hz"
@@ -99,110 +81,109 @@ async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pi
99
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
100
  audio_path = tmp_file.name
101
  await communicate.save(audio_path)
102
- if target_duration_ms is not None and os.path.exists(audio_path):
103
- audio = AudioSegment.from_mp3(audio_path)
104
- audio_duration_ms = len(audio)
105
- #print(f"Generated audio duration: {audio_duration_ms}ms, Target duration: {target_duration_ms}ms") # Debug
106
- if audio_duration_ms > target_duration_ms and target_duration_ms > 0:
107
- speed_factor = (audio_duration_ms / target_duration_ms) * speed_adjustment_factor
108
- #print(f"Speed factor (after user adjustment): {speed_factor}") # Debug
109
- if speed_factor > 0:
110
- if speed_factor < 1.0:
111
- speed_factor = 1.0
112
- y, sr = librosa.load(audio_path, sr=None)
113
- y_stretched = librosa.effects.time_stretch(y, rate=speed_factor)
114
- sf.write(audio_path, y_stretched, sr)
115
- else:
116
- print("Generated audio is not longer than target duration, no speed adjustment.") # Debug
117
- return audio_path
118
  except Exception as e:
119
  print(f"Edge TTS error processing '{processed_text}': {e}")
120
  return None
121
  return None
122
 
123
  async def process_transcript_line(line, default_voice, rate, pitch, speed_adjustment_factor):
124
- """Processes a single transcript line with HH:MM:SS,milliseconds - HH:MM:SS,milliseconds timestamp."""
125
- match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+-\s+(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+(.*)', line)
126
  if match:
127
- start_h, start_m, start_s, start_ms, end_h, end_m, end_s, end_ms, text_parts = match.groups()
128
- start_time_ms = (
129
- int(start_h) * 3600000 +
130
- int(start_m) * 60000 +
131
- int(start_s) * 1000 +
132
- int(start_ms)
133
- )
134
- end_time_ms = (
135
- int(end_h) * 3600000 +
136
- int(end_m) * 60000 +
137
- int(end_s) * 1000 +
138
- int(end_ms)
139
- )
140
  duration_ms = end_time_ms - start_time_ms
 
141
  audio_segments = []
142
- split_parts = re.split(r'[“”"]', text_parts)
143
- process_next = False
144
- for part in split_parts:
145
  if part == '"':
146
- process_next = not process_next
147
  continue
148
- if process_next and part.strip():
149
- audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, duration_ms, speed_adjustment_factor)
150
- if audio_path:
151
- audio_segments.append(audio_path)
152
- elif not process_next and part.strip():
153
- audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, duration_ms, speed_adjustment_factor)
154
  if audio_path:
155
  audio_segments.append(audio_path)
156
  return start_time_ms, audio_segments, duration_ms
157
  return None, None, None
158
 
159
  async def transcript_to_speech(transcript_text, voice, rate, pitch, speed_adjustment_factor):
 
160
  if not transcript_text.strip():
161
  return None, gr.Warning("Please enter transcript text.")
162
  if not voice:
163
  return None, gr.Warning("Please select a voice.")
 
164
  lines = transcript_text.strip().split('\n')
165
  timed_audio_segments = []
166
  max_end_time_ms = 0
167
- for line in lines:
168
- start_time, audio_paths, duration = await process_transcript_line(line, voice, rate, pitch, speed_adjustment_factor)
169
- if start_time is not None and audio_paths:
170
- combined_line_audio = AudioSegment.empty()
171
- current_time_ms = start_time
172
- segment_duration = duration / len(audio_paths) if audio_paths else 0
173
- for path in audio_paths:
174
- if path: # Only process if audio_path is not None (meaning TTS was successful)
175
- try:
176
- audio = AudioSegment.from_mp3(path)
177
- combined_line_audio += audio
178
- os.remove(path)
179
- except FileNotFoundError:
180
- print(f"Warning: Audio file not found: {path}")
181
- if combined_line_audio:
182
- timed_audio_segments.append({'start': start_time, 'audio': combined_line_audio})
183
- max_end_time_ms = max(max_end_time_ms, start_time + len(combined_line_audio))
184
- elif audio_paths:
185
- for path in audio_paths:
186
- if path:
187
- try:
188
- os.remove(path)
189
- except FileNotFoundError:
190
- pass # Clean up even if no timestamp
191
- if not timed_audio_segments:
192
- return None, "No processable audio segments found."
193
- final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000)
194
- for segment in timed_audio_segments:
195
- final_audio = final_audio.overlay(segment['audio'], position=segment['start'])
196
- combined_audio_path = tempfile.mktemp(suffix=".mp3")
197
- final_audio.export(combined_audio_path, format="mp3")
198
- return combined_audio_path, None
 
 
 
 
 
 
 
199
 
200
  @spaces.GPU
201
  def tts_interface(transcript, voice, rate, pitch, speed_adjustment_factor):
 
202
  audio, warning = asyncio.run(transcript_to_speech(transcript, voice, rate, pitch, speed_adjustment_factor))
203
  return audio, warning
204
 
205
  async def create_demo():
 
206
  voices = await get_voices()
207
  default_voice = "en-US-AndrewMultilingualNeural - en-US (Male)"
208
  description = """
 
11
  import soundfile as sf
12
  import numpy as np
13
 
14
+ # Global constant for voice mapping
15
+ VOICE_MAP = {
16
+ "1F": "en-GB-SoniaNeural",
17
+ "2M": "en-GB-RyanNeural",
18
+ "3M": "en-US-BrianMultilingualNeural",
19
+ "2F": "en-US-JennyNeural",
20
+ "1M": "en-AU-WilliamNeural",
21
+ "3F": "en-HK-YanNeural",
22
+ "4M": "en-GB-ThomasNeural",
23
+ "4F": "en-US-EmmaNeural",
24
+ "1O": "en-GB-RyanNeural", # Old Man
25
+ "1C": "en-GB-MaisieNeural", # Child
26
+ "1V": "vi-VN-HoaiMyNeural", # Vietnamese (Female)
27
+ "2V": "vi-VN-NamMinhNeural", # Vietnamese (Male)
28
+ "3V": "vi-VN-HoaiMyNeural", # Vietnamese (Female)
29
+ "4V": "vi-VN-NamMinhNeural", # Vietnamese (Male)
30
+ }
31
+
32
  def get_silence(duration_ms=1000):
33
+ """Creates a silent AudioSegment."""
34
+ return AudioSegment.silent(
35
  duration=duration_ms,
36
+ frame_rate=24000,
37
+ sample_width=4,
38
+ channels=1
39
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
 
41
  async def get_voices():
42
+ """Lists available Edge TTS voices."""
43
  try:
44
  voices = await edge_tts.list_voices()
45
  return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices}
 
49
 
50
  async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch, target_duration_ms=None, speed_adjustment_factor=1.0):
51
  """Generates audio for a text segment, handling voice prefixes and adjusting rate for duration."""
52
+ processed_text = text_segment.strip()
53
+ current_voice_short = default_voice.split(" - ")[0] if default_voice else ""
54
  current_rate = rate
55
  current_pitch = pitch
56
+
57
+ for prefix, voice_short in VOICE_MAP.items():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  if processed_text.startswith(prefix):
59
  current_voice_short = voice_short
60
  if prefix in ["1F", "3F", "1V", "3V"]:
 
62
  elif prefix in ["1O", "4V"]:
63
  current_pitch = -20
64
  current_rate = -10
 
65
  processed_text = processed_text[len(prefix):].strip()
66
  break
67
+
68
  match = re.search(r'([A-Za-z]+)-?(\d+)', processed_text)
69
+ if match and match.group(1) in VOICE_MAP:
70
+ pitch_adjustment = int(match.group(2))
71
+ current_pitch += pitch_adjustment
72
+ processed_text = re.sub(r'[A-Za-z]+-?\d+', '', processed_text, count=1).strip()
73
+ elif any(processed_text.startswith(prefix) for prefix in VOICE_MAP): # Handle leftover prefixes
74
+ processed_text = re.sub(r'^[A-Za-z]{1,2}', '', processed_text).lstrip('-').strip()
75
+
 
 
 
76
  if processed_text:
77
  rate_str = f"{current_rate:+d}%"
78
  pitch_str = f"{current_pitch:+d}Hz"
 
81
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
82
  audio_path = tmp_file.name
83
  await communicate.save(audio_path)
84
+
85
+ if target_duration_ms is not None and os.path.exists(audio_path) and target_duration_ms > 0:
86
+ audio = AudioSegment.from_mp3(audio_path)
87
+ audio_duration_ms = len(audio)
88
+ if audio_duration_ms > target_duration_ms:
89
+ speed_factor = (audio_duration_ms / target_duration_ms) * speed_adjustment_factor
90
+ if speed_factor > 0 and speed_factor >= 1.0:
91
+ y, sr = librosa.load(audio_path, sr=None)
92
+ y_stretched = librosa.effects.time_stretch(y, rate=speed_factor)
93
+ sf.write(audio_path, y_stretched, sr)
94
+ return audio_path
 
 
 
 
 
95
  except Exception as e:
96
  print(f"Edge TTS error processing '{processed_text}': {e}")
97
  return None
98
  return None
99
 
100
  async def process_transcript_line(line, default_voice, rate, pitch, speed_adjustment_factor):
101
+ """Processes a single transcript line with timestamp and potential voice changes."""
102
+ match = re.match(r'(\d{2}:\d{2}:\d{2},\d{3})\s+-\s+(\d{2}:\d{2}:\d{2},\d{3})\s+(.*)', line)
103
  if match:
104
+ start_time_str, end_time_str, text_parts = match.groups()
105
+
106
+ def time_str_to_ms(time_str):
107
+ h, m, s_ms = time_str.split(':')
108
+ s, ms = s_ms.split(',')
109
+ return int(h) * 3600000 + int(m) * 60000 + int(s) * 1000 + int(ms)
110
+
111
+ start_time_ms = time_str_to_ms(start_time_str)
112
+ end_time_ms = time_str_to_ms(end_time_str)
 
 
 
 
113
  duration_ms = end_time_ms - start_time_ms
114
+
115
  audio_segments = []
116
+ parts = re.split(r'([“”"])', text_parts)
117
+ in_quote = False
118
+ for part in parts:
119
  if part == '"':
120
+ in_quote = not in_quote
121
  continue
122
+ if part.strip():
123
+ audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, duration_ms, speed_adjustment_factor if in_quote else 1.0)
 
 
 
 
124
  if audio_path:
125
  audio_segments.append(audio_path)
126
  return start_time_ms, audio_segments, duration_ms
127
  return None, None, None
128
 
129
  async def transcript_to_speech(transcript_text, voice, rate, pitch, speed_adjustment_factor):
130
+ """Converts a timestamped transcript with voice changes to a single audio file."""
131
  if not transcript_text.strip():
132
  return None, gr.Warning("Please enter transcript text.")
133
  if not voice:
134
  return None, gr.Warning("Please select a voice.")
135
+
136
  lines = transcript_text.strip().split('\n')
137
  timed_audio_segments = []
138
  max_end_time_ms = 0
139
+
140
+ with tempfile.TemporaryDirectory() as tmpdir:
141
+ for line in lines:
142
+ start_time, audio_paths, duration = await process_transcript_line(line, voice, rate, pitch, speed_adjustment_factor)
143
+ if start_time is not None and audio_paths:
144
+ combined_line_audio = AudioSegment.empty()
145
+ for path in audio_paths:
146
+ if path and os.path.exists(path):
147
+ try:
148
+ audio = AudioSegment.from_mp3(path)
149
+ combined_line_audio += audio
150
+ except FileNotFoundError:
151
+ print(f"Warning: Audio file not found: {path}")
152
+ finally:
153
+ try:
154
+ os.remove(path)
155
+ except OSError:
156
+ print(f"Warning: Could not remove temporary file: {path}")
157
+ if combined_line_audio:
158
+ timed_audio_segments.append({'start': start_time, 'audio': combined_line_audio})
159
+ max_end_time_ms = max(max_end_time_ms, start_time + len(combined_line_audio))
160
+ elif audio_paths:
161
+ for path in audio_paths:
162
+ if path:
163
+ try:
164
+ os.remove(path)
165
+ except FileNotFoundError:
166
+ pass # Clean up even if no timestamp
167
+
168
+ if not timed_audio_segments:
169
+ return None, "No processable audio segments found."
170
+
171
+ final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000)
172
+ for segment in timed_audio_segments:
173
+ final_audio = final_audio.overlay(segment['audio'], position=segment['start'])
174
+
175
+ combined_audio_path = Path(tmpdir) / "combined_audio.mp3"
176
+ final_audio.export(str(combined_audio_path), format="mp3")
177
+ return str(combined_audio_path), None
178
 
179
  @spaces.GPU
180
  def tts_interface(transcript, voice, rate, pitch, speed_adjustment_factor):
181
+ """Gradio interface function for TTS."""
182
  audio, warning = asyncio.run(transcript_to_speech(transcript, voice, rate, pitch, speed_adjustment_factor))
183
  return audio, warning
184
 
185
  async def create_demo():
186
+ """Creates the Gradio demo interface."""
187
  voices = await get_voices()
188
  default_voice = "en-US-AndrewMultilingualNeural - en-US (Male)"
189
  description = """