Spaces:
Running
Running
| """ | |
| Utility functions for the TTSFM package. | |
| This module provides common utility functions used throughout the package, | |
| including HTTP helpers, validation utilities, and configuration management. | |
| """ | |
| import os | |
| import re | |
| import time | |
| import random | |
| import logging | |
| from typing import Dict, Any, Optional, Union, List | |
| from urllib.parse import urljoin, urlparse | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| def get_user_agent() -> str: | |
| """ | |
| Generate a realistic User-Agent string. | |
| Returns: | |
| str: User-Agent string for HTTP requests | |
| """ | |
| try: | |
| from fake_useragent import UserAgent | |
| ua = UserAgent() | |
| return ua.random | |
| except ImportError: | |
| # Fallback if fake_useragent is not available | |
| return "TTSFM-Client/3.0.0 (Python)" | |
| def get_realistic_headers() -> Dict[str, str]: | |
| """ | |
| Generate realistic HTTP headers for requests. | |
| Returns: | |
| Dict[str, str]: HTTP headers dictionary | |
| """ | |
| user_agent = get_user_agent() | |
| headers = { | |
| "Accept": "application/json, audio/*", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "Accept-Language": random.choice(["en-US,en;q=0.9", "en-GB,en;q=0.8", "en-CA,en;q=0.7"]), | |
| "Cache-Control": "no-cache", | |
| "DNT": "1", | |
| "Pragma": "no-cache", | |
| "User-Agent": user_agent, | |
| "X-Requested-With": "XMLHttpRequest", | |
| } | |
| # Add browser-specific headers for Chromium-based browsers | |
| if any(browser in user_agent.lower() for browser in ['chrome', 'edge', 'chromium']): | |
| version_match = re.search(r'(?:Chrome|Edge|Chromium)/(\d+)', user_agent) | |
| major_version = version_match.group(1) if version_match else "121" | |
| brands = [] | |
| if 'google chrome' in user_agent.lower(): | |
| brands.extend([ | |
| f'"Google Chrome";v="{major_version}"', | |
| f'"Chromium";v="{major_version}"', | |
| '"Not A(Brand";v="99"' | |
| ]) | |
| elif 'microsoft edge' in user_agent.lower(): | |
| brands.extend([ | |
| f'"Microsoft Edge";v="{major_version}"', | |
| f'"Chromium";v="{major_version}"', | |
| '"Not A(Brand";v="99"' | |
| ]) | |
| else: | |
| brands.extend([ | |
| f'"Chromium";v="{major_version}"', | |
| '"Not A(Brand";v="8"' | |
| ]) | |
| headers.update({ | |
| "Sec-Ch-Ua": ", ".join(brands), | |
| "Sec-Ch-Ua-Mobile": "?0", | |
| "Sec-Ch-Ua-Platform": random.choice(['"Windows"', '"macOS"', '"Linux"']), | |
| "Sec-Fetch-Dest": "empty", | |
| "Sec-Fetch-Mode": "cors", | |
| "Sec-Fetch-Site": "same-origin" | |
| }) | |
| # Randomly add some optional headers | |
| if random.random() < 0.5: | |
| headers["Upgrade-Insecure-Requests"] = "1" | |
| return headers | |
| def validate_text_length(text: str, max_length: int = 4096, raise_error: bool = True) -> bool: | |
| """ | |
| Validate text length against maximum allowed characters. | |
| Args: | |
| text: Text to validate | |
| max_length: Maximum allowed length in characters | |
| raise_error: Whether to raise an exception if validation fails | |
| Returns: | |
| bool: True if text is within limits, False otherwise | |
| Raises: | |
| ValueError: If text exceeds max_length and raise_error is True | |
| """ | |
| if not text: | |
| return True | |
| text_length = len(text) | |
| if text_length > max_length: | |
| if raise_error: | |
| raise ValueError( | |
| f"Text is too long ({text_length} characters). " | |
| f"Maximum allowed length is {max_length} characters. " | |
| f"TTS models typically support up to 4096 characters per request." | |
| ) | |
| return False | |
| return True | |
| def split_text_by_length(text: str, max_length: int = 4096, preserve_words: bool = True) -> List[str]: | |
| """ | |
| Split text into chunks that don't exceed the maximum length. | |
| Args: | |
| text: Text to split | |
| max_length: Maximum length per chunk | |
| preserve_words: Whether to avoid splitting words | |
| Returns: | |
| List[str]: List of text chunks | |
| """ | |
| if not text: | |
| return [] | |
| if len(text) <= max_length: | |
| return [text] | |
| chunks = [] | |
| if preserve_words: | |
| # Split by sentences first, then by words if needed | |
| sentences = re.split(r'[.!?]+', text) | |
| current_chunk = "" | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| # Add sentence ending punctuation back | |
| if not sentence.endswith(('.', '!', '?')): | |
| sentence += '.' | |
| # Check if adding this sentence would exceed the limit | |
| test_chunk = current_chunk + (" " if current_chunk else "") + sentence | |
| if len(test_chunk) <= max_length: | |
| current_chunk = test_chunk | |
| else: | |
| # Save current chunk if it has content | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| # If single sentence is too long, split by words | |
| if len(sentence) > max_length: | |
| word_chunks = _split_by_words(sentence, max_length) | |
| chunks.extend(word_chunks) | |
| current_chunk = "" | |
| else: | |
| current_chunk = sentence | |
| # Add remaining chunk | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| else: | |
| # Simple character-based splitting | |
| for i in range(0, len(text), max_length): | |
| chunks.append(text[i:i + max_length]) | |
| return [chunk for chunk in chunks if chunk.strip()] | |
| def _split_by_words(text: str, max_length: int) -> List[str]: | |
| """ | |
| Split text by words when sentences are too long. | |
| Args: | |
| text: Text to split | |
| max_length: Maximum length per chunk | |
| Returns: | |
| List[str]: List of word-based chunks | |
| """ | |
| words = text.split() | |
| chunks = [] | |
| current_chunk = "" | |
| for word in words: | |
| test_chunk = current_chunk + (" " if current_chunk else "") + word | |
| if len(test_chunk) <= max_length: | |
| current_chunk = test_chunk | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| # If single word is too long, split it | |
| if len(word) > max_length: | |
| for i in range(0, len(word), max_length): | |
| chunks.append(word[i:i + max_length]) | |
| current_chunk = "" | |
| else: | |
| current_chunk = word | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| return chunks | |
| def sanitize_text(text: str) -> str: | |
| """ | |
| Sanitize input text for TTS processing. | |
| Args: | |
| text: Input text to sanitize | |
| Returns: | |
| str: Sanitized text | |
| """ | |
| if not text: | |
| return "" | |
| # Remove HTML tags | |
| text = re.sub(r'<[^>]+>', '', text) | |
| # Remove script tags and content | |
| text = re.sub(r'<script.*?</script>', '', text, flags=re.DOTALL | re.IGNORECASE) | |
| # Remove potentially dangerous characters | |
| text = re.sub(r'[<>"\']', '', text) | |
| # Normalize whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def validate_url(url: str) -> bool: | |
| """ | |
| Validate if a URL is properly formatted. | |
| Args: | |
| url: URL to validate | |
| Returns: | |
| bool: True if URL is valid, False otherwise | |
| """ | |
| try: | |
| result = urlparse(url) | |
| return all([result.scheme, result.netloc]) | |
| except Exception: | |
| return False | |
| def build_url(base_url: str, path: str) -> str: | |
| """ | |
| Build a complete URL from base URL and path. | |
| Args: | |
| base_url: Base URL | |
| path: Path to append | |
| Returns: | |
| str: Complete URL | |
| """ | |
| # Ensure base_url ends with / | |
| if not base_url.endswith('/'): | |
| base_url += '/' | |
| # Ensure path doesn't start with / | |
| if path.startswith('/'): | |
| path = path[1:] | |
| return urljoin(base_url, path) | |
| def get_random_delay(min_delay: float = 1.0, max_delay: float = 5.0) -> float: | |
| """ | |
| Get a random delay with jitter for rate limiting. | |
| Args: | |
| min_delay: Minimum delay in seconds | |
| max_delay: Maximum delay in seconds | |
| Returns: | |
| float: Random delay in seconds | |
| """ | |
| base_delay = random.uniform(min_delay, max_delay) | |
| jitter = random.uniform(0.1, 0.5) | |
| return base_delay + jitter | |
| def exponential_backoff(attempt: int, base_delay: float = 1.0, max_delay: float = 60.0) -> float: | |
| """ | |
| Calculate exponential backoff delay. | |
| Args: | |
| attempt: Attempt number (0-based) | |
| base_delay: Base delay in seconds | |
| max_delay: Maximum delay in seconds | |
| Returns: | |
| float: Delay in seconds | |
| """ | |
| delay = base_delay * (2 ** attempt) | |
| jitter = random.uniform(0.1, 0.3) * delay | |
| return min(delay + jitter, max_delay) | |
| def load_config_from_env(prefix: str = "TTSFM_") -> Dict[str, Any]: | |
| """ | |
| Load configuration from environment variables. | |
| Args: | |
| prefix: Prefix for environment variables | |
| Returns: | |
| Dict[str, Any]: Configuration dictionary | |
| """ | |
| config = {} | |
| for key, value in os.environ.items(): | |
| if key.startswith(prefix): | |
| config_key = key[len(prefix):].lower() | |
| # Try to convert to appropriate type | |
| if value.lower() in ('true', 'false'): | |
| config[config_key] = value.lower() == 'true' | |
| elif value.isdigit(): | |
| config[config_key] = int(value) | |
| elif '.' in value and value.replace('.', '').isdigit(): | |
| config[config_key] = float(value) | |
| else: | |
| config[config_key] = value | |
| return config | |
| def setup_logging(level: Union[str, int] = logging.INFO, format_string: Optional[str] = None) -> None: | |
| """ | |
| Setup logging configuration for the package. | |
| Args: | |
| level: Logging level | |
| format_string: Custom format string | |
| """ | |
| if format_string is None: | |
| format_string = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| logging.basicConfig( | |
| level=level, | |
| format=format_string, | |
| handlers=[logging.StreamHandler()] | |
| ) | |
| def estimate_audio_duration(text: str, words_per_minute: float = 150.0) -> float: | |
| """ | |
| Estimate audio duration based on text length. | |
| Args: | |
| text: Input text | |
| words_per_minute: Average speaking rate | |
| Returns: | |
| float: Estimated duration in seconds | |
| """ | |
| if not text: | |
| return 0.0 | |
| # Count words (simple whitespace split) | |
| word_count = len(text.split()) | |
| # Calculate duration in seconds | |
| duration = (word_count / words_per_minute) * 60.0 | |
| # Add some buffer for pauses and processing | |
| return duration * 1.1 | |
| def format_file_size(size_bytes: int) -> str: | |
| """ | |
| Format file size in human-readable format. | |
| Args: | |
| size_bytes: Size in bytes | |
| Returns: | |
| str: Formatted size string | |
| """ | |
| if size_bytes == 0: | |
| return "0 B" | |
| size_names = ["B", "KB", "MB", "GB"] | |
| i = 0 | |
| while size_bytes >= 1024 and i < len(size_names) - 1: | |
| size_bytes /= 1024.0 | |
| i += 1 | |
| return f"{size_bytes:.1f} {size_names[i]}" | |