Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| # Copyright 2024 Google LLC | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # | |
| """Google Cloud Text-To-Speech API streaming sample with input/output streams.""" | |
| from google.cloud import texttospeech | |
| import itertools | |
| import queue | |
| import threading | |
| class TTSStreamer: | |
| def __init__(self): | |
| self.client = texttospeech.TextToSpeechClient() | |
| self.text_queue = queue.Queue() | |
| self.audio_queue = queue.Queue() | |
| def start_stream(self): | |
| streaming_config = texttospeech.StreamingSynthesizeConfig( | |
| voice=texttospeech.VoiceSelectionParams( | |
| name="en-US-Journey-D", | |
| language_code="en-US" | |
| ) | |
| ) | |
| config_request = texttospeech.StreamingSynthesizeRequest( | |
| streaming_config=streaming_config | |
| ) | |
| def request_generator(): | |
| while True: | |
| try: | |
| text = self.text_queue.get() | |
| if text is None: # Poison pill to stop | |
| break | |
| yield texttospeech.StreamingSynthesizeRequest( | |
| input=texttospeech.StreamingSynthesisInput(text=text) | |
| ) | |
| except queue.Empty: | |
| continue | |
| def audio_processor(): | |
| responses = self.client.streaming_synthesize( | |
| itertools.chain([config_request], request_generator()) | |
| ) | |
| for response in responses: | |
| self.audio_queue.put(response.audio_content) | |
| self.processor_thread = threading.Thread(target=audio_processor) | |
| self.processor_thread.start() | |
| def send_text(self, text: str): | |
| """Send text to be synthesized.""" | |
| self.text_queue.put(text) | |
| def get_audio(self): | |
| """Get the next chunk of audio bytes.""" | |
| try: | |
| return self.audio_queue.get_nowait() | |
| except queue.Empty: | |
| return None | |
| def stop(self): | |
| """Stop the streaming synthesis.""" | |
| self.text_queue.put(None) # Send poison pill | |
| if self.processor_thread: | |
| self.processor_thread.join() | |
| def main(): | |
| tts = TTSStreamer() | |
| tts.start_stream() | |
| # Example usage | |
| try: | |
| while True: | |
| text = input("Enter text (or 'q' to quit): ") | |
| if text.lower() == 'q': | |
| break | |
| tts.send_text(text) | |
| # Get and print audio bytes | |
| while True: | |
| audio_chunk = tts.get_audio() | |
| if audio_chunk is None: | |
| break | |
| print(f"Received audio chunk of {len(audio_chunk)} bytes") | |
| finally: | |
| tts.stop() | |
| if __name__ == "__main__": | |
| main() |