| from .config import settings | |
| class TextChunker: | |
| """A class to handle intelligent text chunking for voice generation.""" | |
| def __init__(self): | |
| """Initialize the TextChunker with break points and priorities.""" | |
| self.current_text = [] | |
| self.found_first_sentence = False | |
| self.semantic_breaks = { | |
| "however": 4, | |
| "therefore": 4, | |
| "furthermore": 4, | |
| "moreover": 4, | |
| "nevertheless": 4, | |
| "while": 3, | |
| "although": 3, | |
| "unless": 3, | |
| "since": 3, | |
| "and": 2, | |
| "but": 2, | |
| "because": 2, | |
| "then": 2, | |
| } | |
| self.punctuation_priorities = { | |
| ".": 5, | |
| "!": 5, | |
| "?": 5, | |
| ";": 4, | |
| ":": 4, | |
| ",": 3, | |
| "-": 2, | |
| } | |
| def should_process(self, text: str) -> bool: | |
| """Determines if text should be processed based on length or punctuation. | |
| Args: | |
| text (str): The text to check. | |
| Returns: | |
| bool: True if the text should be processed, False otherwise. | |
| """ | |
| if any(text.endswith(p) for p in self.punctuation_priorities): | |
| return True | |
| words = text.split() | |
| target = ( | |
| settings.FIRST_SENTENCE_SIZE | |
| if not self.found_first_sentence | |
| else settings.TARGET_SIZE | |
| ) | |
| return len(words) >= target | |
| def find_break_point(self, words: list, target_size: int) -> int: | |
| """Finds optimal break point in text. | |
| Args: | |
| words (list): The list of words to find a break point in. | |
| target_size (int): The target size of the chunk. | |
| Returns: | |
| int: The index of the break point. | |
| """ | |
| if len(words) <= target_size: | |
| return len(words) | |
| break_points = [] | |
| for i, word in enumerate(words[: target_size + 3]): | |
| word_lower = word.lower() | |
| priority = self.semantic_breaks.get(word_lower, 0) | |
| for punct, punct_priority in self.punctuation_priorities.items(): | |
| if word.endswith(punct): | |
| priority = max(priority, punct_priority) | |
| if priority > 0: | |
| break_points.append((i, priority, -abs(i - target_size))) | |
| if not break_points: | |
| return target_size | |
| break_points.sort(key=lambda x: (x[1], x[2]), reverse=True) | |
| return break_points[0][0] + 1 | |
| def process(self, text: str, audio_queue) -> str: | |
| """Process text chunk and return remaining text. | |
| Args: | |
| text (str): The text to process. | |
| audio_queue: The audio queue to add sentences to. | |
| Returns: | |
| str: The remaining text after processing. | |
| """ | |
| if not text: | |
| return "" | |
| words = text.split() | |
| if not words: | |
| return "" | |
| target_size = ( | |
| settings.FIRST_SENTENCE_SIZE | |
| if not self.found_first_sentence | |
| else settings.TARGET_SIZE | |
| ) | |
| split_point = self.find_break_point(words, target_size) | |
| if split_point: | |
| chunk = " ".join(words[:split_point]).strip() | |
| if chunk and any(c.isalnum() for c in chunk): | |
| chunk = chunk.rstrip(",") | |
| audio_queue.add_sentences([chunk]) | |
| self.found_first_sentence = True | |
| return " ".join(words[split_point:]) if split_point < len(words) else "" | |
| return "" | |