cnph001 commited on
Commit
0a995d3
·
verified ·
1 Parent(s): c9c3247

Update app.py

Browse files

Add error handling - skip if text is not valid

Files changed (1) hide show
  1. app.py +42 -48
app.py CHANGED
@@ -17,11 +17,9 @@ def get_silence(duration_ms=1000):
17
  duration=duration_ms,
18
  frame_rate=24000 # 24kHz sampling rate
19
  )
20
-
21
  # Set audio parameters
22
  silent_audio = silent_audio.set_channels(1) # Mono
23
  silent_audio = silent_audio.set_sample_width(4) # 32-bit (4 bytes per sample)
24
-
25
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
26
  # Export with specific bitrate and codec parameters
27
  silent_audio.export(
@@ -39,8 +37,12 @@ def get_silence(duration_ms=1000):
39
 
40
  # Get all available voices
41
  async def get_voices():
42
- voices = await edge_tts.list_voices()
43
- return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices}
 
 
 
 
44
 
45
  async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch, target_duration_ms=None, speed_adjustment_factor=1.0):
46
  """Generates audio for a text segment, handling voice prefixes and adjusting rate for duration."""
@@ -78,7 +80,6 @@ async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pi
78
  detect = 1
79
  processed_text = processed_text[len(prefix):].strip()
80
  break
81
-
82
  match = re.search(r'([A-Za-z]+)-?(\d+)', processed_text)
83
  if match:
84
  prefix_pitch = match.group(1)
@@ -88,36 +89,35 @@ async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pi
88
  processed_text = re.sub(r'[A-Za-z]+-?\d+', '', processed_text, count=1).strip()
89
  elif detect:
90
  processed_text = processed_text.lstrip('-0123456789').strip() # Remove potential leftover numbers
91
-
92
  elif detect:
93
  processed_text = processed_text[2:].strip()
94
-
95
  if processed_text:
96
  rate_str = f"{current_rate:+d}%"
97
  pitch_str = f"{current_pitch:+d}Hz"
98
- communicate = edge_tts.Communicate(processed_text, current_voice_short, rate=rate_str, pitch=pitch_str)
99
- with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
100
- audio_path = tmp_file.name
101
- await communicate.save(audio_path)
102
-
103
- if target_duration_ms is not None and os.path.exists(audio_path):
104
- audio = AudioSegment.from_mp3(audio_path)
105
- audio_duration_ms = len(audio)
106
- #print(f"Generated audio duration: {audio_duration_ms}ms, Target duration: {target_duration_ms}ms") # Debug
107
-
108
- if audio_duration_ms > target_duration_ms and target_duration_ms > 0:
109
- speed_factor = (audio_duration_ms / target_duration_ms) * speed_adjustment_factor
110
- #print(f"Speed factor (after user adjustment): {speed_factor}") # Debug
111
- if speed_factor > 0:
112
- if speed_factor <1.0:
113
- speed_factor = 1.0
114
- y, sr = librosa.load(audio_path, sr=None)
115
- y_stretched = librosa.effects.time_stretch(y, rate=speed_factor)
116
- sf.write(audio_path, y_stretched, sr)
117
- else:
118
- print("Generated audio is not longer than target duration, no speed adjustment.") # Debug
119
-
120
- return audio_path
 
121
  return None
122
 
123
  async def process_transcript_line(line, default_voice, rate, pitch, speed_adjustment_factor):
@@ -153,7 +153,6 @@ async def process_transcript_line(line, default_voice, rate, pitch, speed_adjust
153
  audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, duration_ms, speed_adjustment_factor)
154
  if audio_path:
155
  audio_segments.append(audio_path)
156
-
157
  return start_time_ms, audio_segments, duration_ms
158
  return None, None, None
159
 
@@ -162,43 +161,38 @@ async def transcript_to_speech(transcript_text, voice, rate, pitch, speed_adjust
162
  return None, gr.Warning("Please enter transcript text.")
163
  if not voice:
164
  return None, gr.Warning("Please select a voice.")
165
-
166
  lines = transcript_text.strip().split('\n')
167
  timed_audio_segments = []
168
  max_end_time_ms = 0
169
-
170
  for line in lines:
171
  start_time, audio_paths, duration = await process_transcript_line(line, voice, rate, pitch, speed_adjustment_factor)
172
  if start_time is not None and audio_paths:
173
  combined_line_audio = AudioSegment.empty()
174
  current_time_ms = start_time
175
  segment_duration = duration / len(audio_paths) if audio_paths else 0
176
-
177
  for path in audio_paths:
178
- try:
179
- audio = AudioSegment.from_mp3(path)
180
- combined_line_audio += audio
181
- os.remove(path)
182
- except FileNotFoundError:
183
- print(f"Warning: Audio file not found: {path}")
184
-
185
  if combined_line_audio:
186
  timed_audio_segments.append({'start': start_time, 'audio': combined_line_audio})
187
  max_end_time_ms = max(max_end_time_ms, start_time + len(combined_line_audio))
188
  elif audio_paths:
189
  for path in audio_paths:
190
- try:
191
- os.remove(path)
192
- except FileNotFoundError:
193
- pass # Clean up even if no timestamp
194
-
195
  if not timed_audio_segments:
196
  return None, "No processable audio segments found."
197
-
198
  final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000)
199
  for segment in timed_audio_segments:
200
  final_audio = final_audio.overlay(segment['audio'], position=segment['start'])
201
-
202
  combined_audio_path = tempfile.mktemp(suffix=".mp3")
203
  final_audio.export(combined_audio_path, format="mp3")
204
  return combined_audio_path, None
 
17
  duration=duration_ms,
18
  frame_rate=24000 # 24kHz sampling rate
19
  )
 
20
  # Set audio parameters
21
  silent_audio = silent_audio.set_channels(1) # Mono
22
  silent_audio = silent_audio.set_sample_width(4) # 32-bit (4 bytes per sample)
 
23
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
24
  # Export with specific bitrate and codec parameters
25
  silent_audio.export(
 
37
 
38
  # Get all available voices
39
  async def get_voices():
40
+ try:
41
+ voices = await edge_tts.list_voices()
42
+ return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices}
43
+ except Exception as e:
44
+ print(f"Error listing voices: {e}")
45
+ return {}
46
 
47
  async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch, target_duration_ms=None, speed_adjustment_factor=1.0):
48
  """Generates audio for a text segment, handling voice prefixes and adjusting rate for duration."""
 
80
  detect = 1
81
  processed_text = processed_text[len(prefix):].strip()
82
  break
 
83
  match = re.search(r'([A-Za-z]+)-?(\d+)', processed_text)
84
  if match:
85
  prefix_pitch = match.group(1)
 
89
  processed_text = re.sub(r'[A-Za-z]+-?\d+', '', processed_text, count=1).strip()
90
  elif detect:
91
  processed_text = processed_text.lstrip('-0123456789').strip() # Remove potential leftover numbers
 
92
  elif detect:
93
  processed_text = processed_text[2:].strip()
 
94
  if processed_text:
95
  rate_str = f"{current_rate:+d}%"
96
  pitch_str = f"{current_pitch:+d}Hz"
97
+ try:
98
+ communicate = edge_tts.Communicate(processed_text, current_voice_short, rate=rate_str, pitch=pitch_str)
99
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
100
+ audio_path = tmp_file.name
101
+ await communicate.save(audio_path)
102
+ if target_duration_ms is not None and os.path.exists(audio_path):
103
+ audio = AudioSegment.from_mp3(audio_path)
104
+ audio_duration_ms = len(audio)
105
+ #print(f"Generated audio duration: {audio_duration_ms}ms, Target duration: {target_duration_ms}ms") # Debug
106
+ if audio_duration_ms > target_duration_ms and target_duration_ms > 0:
107
+ speed_factor = (audio_duration_ms / target_duration_ms) * speed_adjustment_factor
108
+ #print(f"Speed factor (after user adjustment): {speed_factor}") # Debug
109
+ if speed_factor > 0:
110
+ if speed_factor < 1.0:
111
+ speed_factor = 1.0
112
+ y, sr = librosa.load(audio_path, sr=None)
113
+ y_stretched = librosa.effects.time_stretch(y, rate=speed_factor)
114
+ sf.write(audio_path, y_stretched, sr)
115
+ else:
116
+ print("Generated audio is not longer than target duration, no speed adjustment.") # Debug
117
+ return audio_path
118
+ except Exception as e:
119
+ print(f"Edge TTS error processing '{processed_text}': {e}")
120
+ return None
121
  return None
122
 
123
  async def process_transcript_line(line, default_voice, rate, pitch, speed_adjustment_factor):
 
153
  audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, duration_ms, speed_adjustment_factor)
154
  if audio_path:
155
  audio_segments.append(audio_path)
 
156
  return start_time_ms, audio_segments, duration_ms
157
  return None, None, None
158
 
 
161
  return None, gr.Warning("Please enter transcript text.")
162
  if not voice:
163
  return None, gr.Warning("Please select a voice.")
 
164
  lines = transcript_text.strip().split('\n')
165
  timed_audio_segments = []
166
  max_end_time_ms = 0
 
167
  for line in lines:
168
  start_time, audio_paths, duration = await process_transcript_line(line, voice, rate, pitch, speed_adjustment_factor)
169
  if start_time is not None and audio_paths:
170
  combined_line_audio = AudioSegment.empty()
171
  current_time_ms = start_time
172
  segment_duration = duration / len(audio_paths) if audio_paths else 0
 
173
  for path in audio_paths:
174
+ if path: # Only process if audio_path is not None (meaning TTS was successful)
175
+ try:
176
+ audio = AudioSegment.from_mp3(path)
177
+ combined_line_audio += audio
178
+ os.remove(path)
179
+ except FileNotFoundError:
180
+ print(f"Warning: Audio file not found: {path}")
181
  if combined_line_audio:
182
  timed_audio_segments.append({'start': start_time, 'audio': combined_line_audio})
183
  max_end_time_ms = max(max_end_time_ms, start_time + len(combined_line_audio))
184
  elif audio_paths:
185
  for path in audio_paths:
186
+ if path:
187
+ try:
188
+ os.remove(path)
189
+ except FileNotFoundError:
190
+ pass # Clean up even if no timestamp
191
  if not timed_audio_segments:
192
  return None, "No processable audio segments found."
 
193
  final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000)
194
  for segment in timed_audio_segments:
195
  final_audio = final_audio.overlay(segment['audio'], position=segment['start'])
 
196
  combined_audio_path = tempfile.mktemp(suffix=".mp3")
197
  final_audio.export(combined_audio_path, format="mp3")
198
  return combined_audio_path, None