File size: 6,436 Bytes
78cb487
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from queue import Queue
import threading
import time
from typing import Optional, Tuple, List
import numpy as np
from pathlib import Path
import logging
from datetime import datetime
from .audio_io import save_audio_file

logging.getLogger("phonemizer").setLevel(logging.ERROR)
logging.getLogger("speechbrain.utils.quirks").setLevel(logging.ERROR)
logging.basicConfig(format="%(message)s", level=logging.INFO)


class AudioGenerationQueue:
    """
    A queue system for managing asynchronous audio generation from text input.

    This class implements a threaded queue system that handles text-to-audio generation
    in a background thread. It provides functionality for adding sentences to be processed,
    retrieving generated audio, and monitoring the generation process.

    Attributes:
        generator: Audio generator instance used for text-to-speech conversion
        speed (float): Speed multiplier for audio generation
        output_dir (Path): Directory where generated audio files are saved
        sentences_processed (int): Count of processed sentences
        audio_generated (int): Count of successfully generated audio files
        failed_sentences (list): List of tuples containing failed sentences and error messages
    """

    def __init__(
        self, generator, speed: float = 1.0, output_dir: Optional[Path] = None
    ):
        """
        Initialize the audio generation queue system.

        Args:
            generator: Audio generator instance for text-to-speech conversion
            speed: Speed multiplier for audio generation (default: 1.0)
            output_dir: Directory path for saving generated audio files (default: "generated_audio")
        """
        self.generator = generator
        self.speed = speed
        self.lock = threading.Lock()
        self.output_dir = output_dir or Path("generated_audio")
        self.output_dir.mkdir(exist_ok=True)
        self.sentence_queue = Queue()
        self.audio_queue = Queue()
        self.is_running = False
        self.generation_thread = None
        self.sentences_processed = 0
        self.audio_generated = 0
        self.failed_sentences = []

    def start(self):
        """
        Start the audio generation thread if not already running.
        The thread will process sentences from the queue until stopped.
        """
        if not self.is_running:
            self.is_running = True
            self.generation_thread = threading.Thread(target=self._generation_worker)
            self.generation_thread.daemon = True
            self.generation_thread.start()

    def stop(self):
        """
        Stop the audio generation thread gracefully.
        Waits for the current queue to be processed before stopping.
        Outputs final processing statistics.
        """
        if self.generation_thread:
            while not self.sentence_queue.empty():
                time.sleep(0.1)

            time.sleep(0.5)

            self.is_running = False
            self.generation_thread.join()
            self.generation_thread = None

            logging.info(
                f"\nAudio Generation Complete - Processed: {self.sentences_processed}, Generated: {self.audio_generated}, Failed: {len(self.failed_sentences)}"
            )

    def add_sentences(self, sentences: List[str]):
        """
        Add a list of sentences to the generation queue.

        Args:
            sentences: List of text strings to be converted to audio
        """
        added_count = 0
        for sentence in sentences:
            sentence = sentence.strip()
            if sentence:
                self.sentence_queue.put(sentence)
                added_count += 1

        if not self.is_running:
            self.start()

    def get_next_audio(self) -> Tuple[Optional[np.ndarray], Optional[Path]]:
        """
        Retrieve the next generated audio segment from the queue.

        Returns:
            Tuple containing:
                - numpy array of audio data (or None if queue is empty)
                - Path object for the saved audio file (or None if queue is empty)
        """
        try:
            audio_data, output_path = self.audio_queue.get_nowait()
            return audio_data, output_path
        except:
            return None, None

    def clear_queues(self):
        """
        Clear both sentence and audio queues, removing all pending items.
        Returns immediately without waiting for queue processing.
        """
        sentences_cleared = 0
        audio_cleared = 0

        while not self.sentence_queue.empty():
            try:
                self.sentence_queue.get_nowait()
                sentences_cleared += 1
            except:
                pass

        while not self.audio_queue.empty():
            try:
                self.audio_queue.get_nowait()
                audio_cleared += 1
            except:
                pass

    def _generation_worker(self):
        """
        Internal worker method that runs in a separate thread.
        Continuously processes sentences from the queue, generating audio
        and handling any errors that occur during generation.
        """
        while self.is_running or not self.sentence_queue.empty():
            try:
                try:
                    sentence = self.sentence_queue.get_nowait()
                    self.sentences_processed += 1
                except:
                    if not self.is_running and self.sentence_queue.empty():
                        break
                    time.sleep(0.01)
                    continue

                try:
                    audio_data, phonemes = self.generator.generate(
                        sentence, speed=self.speed
                    )

                    if audio_data is None or len(audio_data) == 0:
                        raise ValueError("Generated audio data is empty")

                    output_path = save_audio_file(audio_data, self.output_dir)
                    self.audio_generated += 1

                    self.audio_queue.put((audio_data, output_path))

                except Exception as e:
                    error_msg = str(e)
                    self.failed_sentences.append((sentence, error_msg))
                    continue

            except Exception as e:
                if not self.is_running and self.sentence_queue.empty():
                    break
                time.sleep(0.1)