Spaces:
Sleeping
Sleeping
| import argparse | |
| import csv | |
| import docker | |
| import ebooklib | |
| import fnmatch | |
| import gradio as gr | |
| import hashlib | |
| import json | |
| import numpy as np | |
| import os | |
| import regex as re | |
| import requests | |
| import shutil | |
| import socket | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| import torch | |
| import torchaudio | |
| import urllib.request | |
| import uuid | |
| import zipfile | |
| import traceback | |
| from bs4 import BeautifulSoup | |
| from collections import Counter | |
| from collections.abc import MutableMapping | |
| from datetime import datetime | |
| from ebooklib import epub | |
| from glob import glob | |
| from huggingface_hub import hf_hub_download | |
| from iso639 import languages | |
| from multiprocessing import Manager, Event | |
| from pydub import AudioSegment | |
| from tqdm import tqdm | |
| from translate import Translator | |
| from TTS.api import TTS as XTTS | |
| from TTS.tts.configs.xtts_config import XttsConfig | |
| from TTS.tts.models.xtts import Xtts | |
| from urllib.parse import urlparse | |
| import lib.conf as conf | |
| import lib.lang as lang | |
| def inject_configs(target_namespace): | |
| # Extract variables from both modules and inject them into the target namespace | |
| for module in (conf, lang): | |
| target_namespace.update({k: v for k, v in vars(module).items() if not k.startswith('__')}) | |
| # Inject configurations into the global namespace of this module | |
| inject_configs(globals()) | |
| def recursive_proxy(data, manager=None): | |
| """Recursively convert a nested dictionary into Manager.dict proxies.""" | |
| if manager is None: | |
| manager = Manager() | |
| if isinstance(data, dict): | |
| proxy_dict = manager.dict() | |
| for key, value in data.items(): | |
| proxy_dict[key] = recursive_proxy(value, manager) | |
| return proxy_dict | |
| elif isinstance(data, list): | |
| proxy_list = manager.list() | |
| for item in data: | |
| proxy_list.append(recursive_proxy(item, manager)) | |
| return proxy_list | |
| elif isinstance(data, (str, int, float, bool, type(None))): # Scalars | |
| return data | |
| else: | |
| raise TypeError(f"Unsupported data type: {type(data)}") | |
| class ConversionContext: | |
| def __init__(self): | |
| self.manager = Manager() | |
| self.sessions = self.manager.dict() # Store all session-specific contexts | |
| self.cancellation_events = {} # Store multiprocessing.Event for each session | |
| def get_session(self, session_id): | |
| """Retrieve or initialize session-specific context""" | |
| if session_id not in self.sessions: | |
| self.sessions[session_id] = recursive_proxy({ | |
| "script_mode": NATIVE, | |
| "client": None, | |
| "language": default_language_code, | |
| "audiobooks_dir": None, | |
| "tmp_dir": None, | |
| "src": None, | |
| "id": session_id, | |
| "chapters_dir": None, | |
| "chapters_dir_sentences": None, | |
| "epub": None, | |
| "epub_path": None, | |
| "filename_noext": None, | |
| "fine_tuned": None, | |
| "voice_file": None, | |
| "custom_model": None, | |
| "custom_model_dir": None, | |
| "chapters": None, | |
| "cover": None, | |
| "metadata": { | |
| "title": None, | |
| "creator": None, | |
| "contributor": None, | |
| "language": None, | |
| "language_iso1": None, | |
| "identifier": None, | |
| "publisher": None, | |
| "date": None, | |
| "description": None, | |
| "subject": None, | |
| "rights": None, | |
| "format": None, | |
| "type": None, | |
| "coverage": None, | |
| "relation": None, | |
| "Source": None, | |
| "Modified": None, | |
| }, | |
| "status": "Idle", | |
| "progress": 0, | |
| "cancellation_requested": False | |
| }, manager=self.manager) | |
| return self.sessions[session_id] | |
| context = ConversionContext() | |
| is_gui_process = False | |
| class DependencyError(Exception): | |
| def __init__(self, message=None): | |
| super().__init__(message) | |
| # Automatically handle the exception when it's raised | |
| self.handle_exception() | |
| def handle_exception(self): | |
| # Print the full traceback of the exception | |
| traceback.print_exc() | |
| # Print the exception message | |
| print(f'Caught DependencyError: {self}') | |
| # Exit the script if it's not a web process | |
| if not is_gui_process: | |
| sys.exit(1) | |
| def prepare_dirs(src, session): | |
| try: | |
| resume = False | |
| os.makedirs(os.path.join(models_dir,'tts'), exist_ok=True) | |
| os.makedirs(session['tmp_dir'], exist_ok=True) | |
| os.makedirs(session['custom_model_dir'], exist_ok=True) | |
| os.makedirs(session['audiobooks_dir'], exist_ok=True) | |
| session['src'] = os.path.join(session['tmp_dir'], os.path.basename(src)) | |
| if os.path.exists(session['src']): | |
| if compare_files_by_hash(session['src'], src): | |
| resume = True | |
| if not resume: | |
| shutil.rmtree(session['chapters_dir'], ignore_errors=True) | |
| os.makedirs(session['chapters_dir'], exist_ok=True) | |
| os.makedirs(session['chapters_dir_sentences'], exist_ok=True) | |
| shutil.copy(src, session['src']) | |
| return True | |
| except Exception as e: | |
| raise DependencyError(e) | |
| def check_programs(prog_name, command, options): | |
| try: | |
| subprocess.run([command, options], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| return True, None | |
| except FileNotFoundError: | |
| e = f'''********** Error: {prog_name} is not installed! if your OS calibre package version | |
| is not compatible you still can run ebook2audiobook.sh (linux/mac) or ebook2audiobook.cmd (windows) **********''' | |
| raise DependencyError(e) | |
| except subprocess.CalledProcessError: | |
| e = f'Error: There was an issue running {prog_name}.' | |
| raise DependencyError(e) | |
| def check_fine_tuned(fine_tuned, language): | |
| try: | |
| for parent, children in models.items(): | |
| if fine_tuned in children: | |
| if language_xtts.get(language): | |
| tts = 'xtts' | |
| else: | |
| tts = 'fairseq' | |
| if parent == tts: | |
| return parent | |
| return False | |
| except Exception as e: | |
| raise RuntimeError(e) | |
| def analyze_uploaded_file(zip_path, required_files=None): | |
| if required_files is None: | |
| required_files = default_model_files | |
| executable_extensions = {'.exe', '.bat', '.cmd', '.bash', '.bin', '.sh', '.msi', '.dll', '.com'} | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zf: | |
| files_in_zip = set() | |
| executables_found = False | |
| for file_info in zf.infolist(): | |
| file_name = file_info.filename | |
| if file_info.is_dir(): | |
| continue # Skip directories | |
| base_name = os.path.basename(file_name) | |
| files_in_zip.add(base_name) | |
| _, ext = os.path.splitext(base_name.lower()) | |
| if ext in executable_extensions: | |
| executables_found = True | |
| break | |
| missing_files = [f for f in required_files if f not in files_in_zip] | |
| is_valid = not executables_found and not missing_files | |
| return is_valid, | |
| except zipfile.BadZipFile: | |
| raise ValueError("error: The file is not a valid ZIP archive.") | |
| except Exception as e: | |
| raise RuntimeError(f'analyze_uploaded_file(): {e}') | |
| async def extract_custom_model(file_src, dest=None, session=None, required_files=None): | |
| try: | |
| progress_bar = None | |
| if is_gui_process: | |
| progress_bar = gr.Progress(track_tqdm=True) | |
| if dest is None: | |
| dest = session['custom_model_dir'] = os.path.join(models_dir, '__sessions', f"model-{session['id']}") | |
| os.makedirs(dest, exist_ok=True) | |
| if required_files is None: | |
| required_files = default_model_files | |
| dir_src = os.path.dirname(file_src) | |
| dir_name = os.path.basename(file_src).replace('.zip', '') | |
| with zipfile.ZipFile(file_src, 'r') as zip_ref: | |
| files = zip_ref.namelist() | |
| files_length = len(files) | |
| dir_tts = 'fairseq' | |
| xtts_config = 'config.json' | |
| # Check the model type | |
| config_data = {} | |
| if xtts_config in zip_ref.namelist(): | |
| with zip_ref.open(xtts_config) as file: | |
| config_data = json.load(file) | |
| if config_data.get('model') == 'xtts': | |
| dir_tts = 'xtts' | |
| dir_dest = os.path.join(dest, dir_tts, dir_name) | |
| os.makedirs(dir_dest, exist_ok=True) | |
| # Initialize progress bar | |
| with tqdm(total=100, unit='%') as t: # Track progress as a percentage | |
| for i, file in enumerate(files): | |
| if file in required_files: | |
| zip_ref.extract(file, dir_dest) | |
| progress_percentage = ((i + 1) / files_length) * 100 | |
| t.n = int(progress_percentage) | |
| t.refresh() | |
| if progress_bar is not None: | |
| progress_bar(downloaded / total_size) | |
| yield dir_name, progress_bar | |
| os.remove(file_src) | |
| print(f'Extracted files to {dir_dest}') | |
| yield dir_name, progress_bar | |
| return | |
| except Exception as e: | |
| raise DependencyError(e) | |
| def calculate_hash(filepath, hash_algorithm='sha256'): | |
| hash_func = hashlib.new(hash_algorithm) | |
| with open(filepath, 'rb') as file: | |
| while chunk := file.read(8192): # Read in chunks to handle large files | |
| hash_func.update(chunk) | |
| return hash_func.hexdigest() | |
| def compare_files_by_hash(file1, file2, hash_algorithm='sha256'): | |
| return calculate_hash(file1, hash_algorithm) == calculate_hash(file2, hash_algorithm) | |
| def has_metadata(f): | |
| try: | |
| b = epub.read_epub(f) | |
| metadata = b.get_metadata('DC', '') | |
| if metadata: | |
| return True | |
| else: | |
| return False | |
| except Exception as e: | |
| return False | |
| def convert_to_epub(session): | |
| if session['cancellation_requested']: | |
| #stop_and_detach_tts() | |
| print('Cancel requested') | |
| return False | |
| if session['script_mode'] == DOCKER_UTILS: | |
| try: | |
| docker_dir = os.path.basename(session['tmp_dir']) | |
| docker_file_in = os.path.basename(session['src']) | |
| docker_file_out = os.path.basename(session['epub_path']) | |
| # Check if the input file is already an EPUB | |
| if docker_file_in.lower().endswith('.epub'): | |
| shutil.copy(session['src'], session['epub_path']) | |
| return True | |
| # Convert the ebook to EPUB format using utils Docker image | |
| container = session['client'].containers.run( | |
| docker_utils_image, | |
| command=f'ebook-convert /files/{docker_dir}/{docker_file_in} /files/{docker_dir}/{docker_file_out}', | |
| volumes={session['tmp_dir']: {'bind': f'/files/{docker_dir}', 'mode': 'rw'}}, | |
| remove=True, | |
| detach=False, | |
| stdout=True, | |
| stderr=True | |
| ) | |
| print(container.decode('utf-8')) | |
| return True | |
| except docker.errors.ContainerError as e: | |
| raise DependencyError(e) | |
| except docker.errors.ImageNotFound as e: | |
| raise DependencyError(e) | |
| except docker.errors.APIError as e: | |
| raise DependencyError(e) | |
| else: | |
| try: | |
| util_app = shutil.which('ebook-convert') | |
| subprocess.run([util_app, session['src'], session['epub_path']], check=True) | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| raise DependencyError(e) | |
| def get_cover(session): | |
| try: | |
| if session['cancellation_requested']: | |
| #stop_and_detach_tts() | |
| print('Cancel requested') | |
| return False | |
| cover_image = False | |
| cover_path = os.path.join(session['tmp_dir'], session['filename_noext'] + '.jpg') | |
| for item in session['epub'].get_items_of_type(ebooklib.ITEM_COVER): | |
| cover_image = item.get_content() | |
| break | |
| if not cover_image: | |
| for item in session['epub'].get_items_of_type(ebooklib.ITEM_IMAGE): | |
| if 'cover' in item.file_name.lower() or 'cover' in item.get_id().lower(): | |
| cover_image = item.get_content() | |
| break | |
| if cover_image: | |
| with open(cover_path, 'wb') as cover_file: | |
| cover_file.write(cover_image) | |
| return cover_path | |
| return True | |
| except Exception as e: | |
| raise DependencyError(e) | |
| def get_chapters(language, session): | |
| try: | |
| if session['cancellation_requested']: | |
| #stop_and_detach_tts() | |
| print('Cancel requested') | |
| return False | |
| all_docs = list(session['epub'].get_items_of_type(ebooklib.ITEM_DOCUMENT)) | |
| if all_docs: | |
| all_docs = all_docs[1:] | |
| doc_patterns = [filter_pattern(str(doc)) for doc in all_docs if filter_pattern(str(doc))] | |
| most_common_pattern = filter_doc(doc_patterns) | |
| selected_docs = [doc for doc in all_docs if filter_pattern(str(doc)) == most_common_pattern] | |
| chapters = [filter_chapter(doc, language) for doc in selected_docs] | |
| if session['metadata'].get('creator'): | |
| intro = f"{session['metadata']['creator']}, {session['metadata']['title']};\n " | |
| chapters[0].insert(0, intro) | |
| return chapters | |
| return False | |
| except Exception as e: | |
| raise DependencyError(f'Error extracting main content pages: {e}') | |
| def filter_doc(doc_patterns): | |
| pattern_counter = Counter(doc_patterns) | |
| # Returns a list with one tuple: [(pattern, count)] | |
| most_common = pattern_counter.most_common(1) | |
| return most_common[0][0] if most_common else None | |
| def filter_pattern(doc_identifier): | |
| parts = doc_identifier.split(':') | |
| if len(parts) > 2: | |
| segment = parts[1] | |
| if re.search(r'[a-zA-Z]', segment) and re.search(r'\d', segment): | |
| return ''.join([char for char in segment if char.isalpha()]) | |
| elif re.match(r'^[a-zA-Z]+$', segment): | |
| return segment | |
| elif re.match(r'^\d+$', segment): | |
| return 'numbers' | |
| return None | |
| def filter_chapter(doc, language): | |
| soup = BeautifulSoup(doc.get_body_content(), 'html.parser') | |
| # Remove scripts and styles | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| # Normalize lines and remove unnecessary spaces | |
| text = re.sub(r'(\r\n|\r|\n){3,}', '\r\n', soup.get_text().strip()) | |
| text = replace_roman_numbers(text) | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| text = text.replace('»', '"').replace('«', '"') | |
| # Pattern 1: Add a space between UTF-8 characters and numbers | |
| text = re.sub(r'(?<=[\p{L}])(?=\d)|(?<=\d)(?=[\p{L}])', ' ', text) | |
| # Pattern 2: Split numbers into groups of 4 | |
| text = re.sub(r'(\d{4})(?=\d)', r'\1 ', text) | |
| chapter_sentences = get_sentences(text, language) | |
| return chapter_sentences | |
| def get_sentences(sentence, language, max_pauses=9): | |
| max_length = language_mapping[language]['char_limit'] | |
| punctuation = language_mapping[language]['punctuation'] | |
| sentence = sentence.replace(".", ";\n") | |
| parts = [] | |
| while len(sentence) > max_length or sum(sentence.count(p) for p in punctuation) > max_pauses: | |
| # Step 1: Look for the last period (.) within max_length | |
| possible_splits = [i for i, char in enumerate(sentence[:max_length]) if char == '.'] | |
| # Step 2: If no periods, look for the last comma (,) | |
| if not possible_splits: | |
| possible_splits = [i for i, char in enumerate(sentence[:max_length]) if char == ','] | |
| # Step 3: If still no splits, look for any other punctuation | |
| if not possible_splits: | |
| possible_splits = [i for i, char in enumerate(sentence[:max_length]) if char in punctuation] | |
| # Step 4: Determine where to split the sentence | |
| if possible_splits: | |
| split_at = possible_splits[-1] + 1 # Split at the last occurrence of punctuation | |
| else: | |
| # If no punctuation is found, split at the last space | |
| last_space = sentence.rfind(' ', 0, max_length) | |
| if last_space != -1: | |
| split_at = last_space + 1 | |
| else: | |
| # If no space is found, force split at max_length | |
| split_at = max_length | |
| # Add the split sentence to parts | |
| parts.append(sentence[:split_at].strip() + ' ') | |
| sentence = sentence[split_at:].strip() | |
| # Add the remaining sentence if any | |
| if sentence: | |
| parts.append(sentence.strip() + ' ') | |
| return parts | |
| def convert_chapters_to_audio(session): | |
| try: | |
| if session['cancellation_requested']: | |
| #stop_and_detach_tts() | |
| print('Cancel requested') | |
| return False | |
| progress_bar = None | |
| params = {} | |
| if is_gui_process: | |
| progress_bar = gr.Progress(track_tqdm=True) | |
| params['tts_model'] = None | |
| ''' | |
| # List available TTS base models | |
| print("Available Models:") | |
| print("=================") | |
| for index, model in enumerate(XTTS().list_models(), 1): | |
| print(f"{index}. {model}") | |
| ''' | |
| if session['metadata']['language'] in language_xtts: | |
| params['tts_model'] = 'xtts' | |
| if session['custom_model'] is not None: | |
| print(f"Loading TTS {params['tts_model']} model from {session['custom_model']}...") | |
| model_path = os.path.join(session['custom_model'], 'model.pth') | |
| config_path = os.path.join(session['custom_model'],'config.json') | |
| vocab_path = os.path.join(session['custom_model'],'vocab.json') | |
| voice_path = os.path.join(session['custom_model'],'ref.wav') | |
| config = XttsConfig() | |
| config.models_dir = os.path.join(models_dir,'tts') | |
| config.load_json(config_path) | |
| params['tts'] = Xtts.init_from_config(config) | |
| params['tts'].load_checkpoint(config, checkpoint_path=model_path, vocab_path=vocab_path, eval=True) | |
| print('Computing speaker latents...') | |
| params['voice_file'] = session['voice_file'] if session['voice_file'] is not None else voice_path | |
| params['gpt_cond_latent'], params['speaker_embedding'] = params['tts'].get_conditioning_latents(audio_path=[params['voice_file']]) | |
| elif session['fine_tuned'] != 'std': | |
| print(f"Loading TTS {params['tts_model']} model from {session['fine_tuned']}...") | |
| hf_repo = models[params['tts_model']][session['fine_tuned']]['repo'] | |
| hf_sub = models[params['tts_model']][session['fine_tuned']]['sub'] | |
| cache_dir = os.path.join(models_dir,'tts') | |
| model_path = hf_hub_download(repo_id=hf_repo, filename=f"{hf_sub}/model.pth", cache_dir=cache_dir) | |
| config_path = hf_hub_download(repo_id=hf_repo, filename=f"{hf_sub}/config.json", cache_dir=cache_dir) | |
| vocab_path = hf_hub_download(repo_id=hf_repo, filename=f"{hf_sub}/vocab.json", cache_dir=cache_dir) | |
| config = XttsConfig() | |
| config.models_dir = cache_dir | |
| config.load_json(config_path) | |
| params['tts'] = Xtts.init_from_config(config) | |
| params['tts'].load_checkpoint(config, checkpoint_path=model_path, vocab_path=vocab_path, eval=True) | |
| print('Computing speaker latents...') | |
| params['voice_file'] = session['voice_file'] if session['voice_file'] is not None else models[params['tts_model']][session['fine_tuned']]['voice'] | |
| params['gpt_cond_latent'], params['speaker_embedding'] = params['tts'].get_conditioning_latents(audio_path=[params['voice_file']]) | |
| else: | |
| print(f"Loading TTS {params['tts_model']} model from {models[params['tts_model']][session['fine_tuned']]['repo']}...") | |
| params['tts'] = XTTS(model_name=models[params['tts_model']][session['fine_tuned']]['repo']) | |
| params['voice_file'] = session['voice_file'] if session['voice_file'] is not None else models[params['tts_model']][session['fine_tuned']]['voice'] | |
| params['tts'].to(session['device']) | |
| else: | |
| params['tts_model'] = 'fairseq' | |
| model_repo = models[params['tts_model']][session['fine_tuned']]['repo'].replace("[lang]", session['metadata']['language']) | |
| print(f"Loading TTS {model_repo} model from {model_repo}...") | |
| params['tts'] = XTTS(model_repo) | |
| params['voice_file'] = session['voice_file'] if session['voice_file'] is not None else models[params['tts_model']][session['fine_tuned']]['voice'] | |
| params['tts'].to(session['device']) | |
| resume_chapter = 0 | |
| resume_sentence = 0 | |
| # Check existing files to resume the process if it was interrupted | |
| existing_chapters = sorted([f for f in os.listdir(session['chapters_dir']) if f.endswith(f'.{audioproc_format}')]) | |
| existing_sentences = sorted([f for f in os.listdir(session['chapters_dir_sentences']) if f.endswith(f'.{audioproc_format}')]) | |
| if existing_chapters: | |
| count_chapter_files = len(existing_chapters) | |
| resume_chapter = count_chapter_files - 1 if count_chapter_files > 0 else 0 | |
| print(f'Resuming from chapter {count_chapter_files}') | |
| if existing_sentences: | |
| resume_sentence = len(existing_sentences) | |
| print(f'Resuming from sentence {resume_sentence}') | |
| total_chapters = len(session['chapters']) | |
| total_sentences = sum(len(array) for array in session['chapters']) | |
| current_sentence = 0 | |
| with tqdm(total=total_sentences, desc='convert_chapters_to_audio 0.00%', bar_format='{desc}: {n_fmt}/{total_fmt} ', unit='step', initial=resume_sentence) as t: | |
| t.n = resume_sentence | |
| t.refresh() | |
| for x in range(resume_chapter, total_chapters): | |
| chapter_num = x + 1 | |
| chapter_audio_file = f'chapter_{chapter_num}.{audioproc_format}' | |
| sentences = session['chapters'][x] | |
| sentences_count = len(sentences) | |
| start = current_sentence # Mark the starting sentence of the chapter | |
| print(f"\nChapter {chapter_num} containing {sentences_count} sentences...") | |
| for i, sentence in enumerate(sentences): | |
| if current_sentence >= resume_sentence: | |
| params['sentence_audio_file'] = os.path.join(session['chapters_dir_sentences'], f'{current_sentence}.{audioproc_format}') | |
| params['sentence'] = sentence | |
| if convert_sentence_to_audio(params, session): | |
| t.update(1) | |
| percentage = (current_sentence / total_sentences) * 100 | |
| t.set_description(f'Processing {percentage:.2f}%') | |
| print(f'Sentence: {sentence}') | |
| t.refresh() | |
| if progress_bar is not None: | |
| progress_bar(current_sentence / total_sentences) | |
| else: | |
| return False | |
| current_sentence += 1 | |
| end = current_sentence - 1 | |
| print(f"\nEnd of Chapter {chapter_num}") | |
| if start >= resume_sentence: | |
| if combine_audio_sentences(chapter_audio_file, start, end, session): | |
| print(f'Combining chapter {chapter_num} to audio, sentence {start} to {end}') | |
| else: | |
| print('combine_audio_sentences() failed!') | |
| return False | |
| return True | |
| except Exception as e: | |
| raise DependencyError(e) | |
| def convert_sentence_to_audio(params, session): | |
| try: | |
| if session['cancellation_requested']: | |
| #stop_and_detach_tts(params['tts']) | |
| print('Cancel requested') | |
| return False | |
| generation_params = { | |
| "temperature": session['temperature'], | |
| "length_penalty": session["length_penalty"], | |
| "repetition_penalty": session['repetition_penalty'], | |
| "num_beams": int(session['length_penalty']) + 1 if session["length_penalty"] > 1 else 1, | |
| "top_k": session['top_k'], | |
| "top_p": session['top_p'], | |
| "speed": session['speed'], | |
| "enable_text_splitting": session['enable_text_splitting'] | |
| } | |
| if params['tts_model'] == 'xtts': | |
| if session['custom_model'] is not None or session['fine_tuned'] != 'std': | |
| output = params['tts'].inference( | |
| text=params['sentence'], | |
| language=session['metadata']['language_iso1'], | |
| gpt_cond_latent=params['gpt_cond_latent'], | |
| speaker_embedding=params['speaker_embedding'], | |
| **generation_params | |
| ) | |
| torchaudio.save( | |
| params['sentence_audio_file'], | |
| torch.tensor(output[audioproc_format]).unsqueeze(0), | |
| sample_rate=24000 | |
| ) | |
| else: | |
| params['tts'].tts_to_file( | |
| text=params['sentence'], | |
| language=session['metadata']['language_iso1'], | |
| file_path=params['sentence_audio_file'], | |
| speaker_wav=params['voice_file'], | |
| **generation_params | |
| ) | |
| elif params['tts_model'] == 'fairseq': | |
| params['tts'].tts_with_vc_to_file( | |
| text=params['sentence'], | |
| file_path=params['sentence_audio_file'], | |
| speaker_wav=params['voice_file'].replace('_24khz','_16khz'), | |
| split_sentences=session['enable_text_splitting'] | |
| ) | |
| if os.path.exists(params['sentence_audio_file']): | |
| return True | |
| print(f"Cannot create {params['sentence_audio_file']}") | |
| return False | |
| except Exception as e: | |
| raise DependencyError(e) | |
| def combine_audio_sentences(chapter_audio_file, start, end, session): | |
| try: | |
| chapter_audio_file = os.path.join(session['chapters_dir'], chapter_audio_file) | |
| combined_audio = AudioSegment.empty() | |
| # Get all audio sentence files sorted by their numeric indices | |
| sentence_files = [f for f in os.listdir(session['chapters_dir_sentences']) if f.endswith(".wav")] | |
| sentences_dir_ordered = sorted(sentence_files, key=lambda x: int(re.search(r'\d+', x).group())) | |
| # Filter the files in the range [start, end] | |
| selected_files = [ | |
| file for file in sentences_dir_ordered | |
| if start <= int(''.join(filter(str.isdigit, os.path.basename(file)))) <= end | |
| ] | |
| for file in selected_files: | |
| if session['cancellation_requested']: | |
| #stop_and_detach_tts(params['tts']) | |
| print('Cancel requested') | |
| return False | |
| if session['cancellation_requested']: | |
| msg = 'Cancel requested' | |
| raise ValueError(msg) | |
| audio_segment = AudioSegment.from_file(os.path.join(session['chapters_dir_sentences'],file), format=audioproc_format) | |
| combined_audio += audio_segment | |
| combined_audio.export(chapter_audio_file, format=audioproc_format) | |
| print(f'Combined audio saved to {chapter_audio_file}') | |
| return True | |
| except Exception as e: | |
| raise DependencyError(e) | |
| def combine_audio_chapters(session): | |
| def sort_key(chapter_file): | |
| numbers = re.findall(r'\d+', chapter_file) | |
| return int(numbers[0]) if numbers else 0 | |
| def assemble_audio(): | |
| try: | |
| combined_audio = AudioSegment.empty() | |
| batch_size = 256 | |
| # Process the chapter files in batches | |
| for i in range(0, len(chapter_files), batch_size): | |
| batch_files = chapter_files[i:i + batch_size] | |
| batch_audio = AudioSegment.empty() # Initialize an empty AudioSegment for the batch | |
| # Sequentially append each file in the current batch to the batch_audio | |
| for chapter_file in batch_files: | |
| if session['cancellation_requested']: | |
| print('Cancel requested') | |
| return False | |
| audio_segment = AudioSegment.from_wav(os.path.join(session['chapters_dir'],chapter_file)) | |
| batch_audio += audio_segment | |
| combined_audio += batch_audio | |
| combined_audio.export(assembled_audio, format=audioproc_format) | |
| print(f'Combined audio saved to {assembled_audio}') | |
| return True | |
| except Exception as e: | |
| raise DependencyError(e) | |
| def generate_ffmpeg_metadata(): | |
| try: | |
| if session['cancellation_requested']: | |
| print('Cancel requested') | |
| return False | |
| ffmpeg_metadata = ';FFMETADATA1\n' | |
| if session['metadata'].get('title'): | |
| ffmpeg_metadata += f"title={session['metadata']['title']}\n" | |
| if session['metadata'].get('creator'): | |
| ffmpeg_metadata += f"artist={session['metadata']['creator']}\n" | |
| if session['metadata'].get('language'): | |
| ffmpeg_metadata += f"language={session['metadata']['language']}\n\n" | |
| if session['metadata'].get('publisher'): | |
| ffmpeg_metadata += f"publisher={session['metadata']['publisher']}\n" | |
| if session['metadata'].get('description'): | |
| ffmpeg_metadata += f"description={session['metadata']['description']}\n" | |
| if session['metadata'].get('published'): | |
| # Check if the timestamp contains fractional seconds | |
| if '.' in session['metadata']['published']: | |
| # Parse with fractional seconds | |
| year = datetime.strptime(session['metadata']['published'], '%Y-%m-%dT%H:%M:%S.%f%z').year | |
| else: | |
| # Parse without fractional seconds | |
| year = datetime.strptime(session['metadata']['published'], '%Y-%m-%dT%H:%M:%S%z').year | |
| else: | |
| # If published is not provided, use the current year | |
| year = datetime.now().year | |
| ffmpeg_metadata += f'year={year}\n' | |
| if session['metadata'].get('identifiers') and isinstance(session['metadata'].get('identifiers'), dict): | |
| isbn = session['metadata']['identifiers'].get('isbn', None) | |
| if isbn: | |
| ffmpeg_metadata += f'isbn={isbn}\n' # ISBN | |
| mobi_asin = session['metadata']['identifiers'].get('mobi-asin', None) | |
| if mobi_asin: | |
| ffmpeg_metadata += f'asin={mobi_asin}\n' # ASIN | |
| start_time = 0 | |
| for index, chapter_file in enumerate(chapter_files): | |
| if session['cancellation_requested']: | |
| msg = 'Cancel requested' | |
| raise ValueError(msg) | |
| duration_ms = len(AudioSegment.from_wav(os.path.join(session['chapters_dir'],chapter_file))) | |
| ffmpeg_metadata += f'[CHAPTER]\nTIMEBASE=1/1000\nSTART={start_time}\n' | |
| ffmpeg_metadata += f'END={start_time + duration_ms}\ntitle=Chapter {index + 1}\n' | |
| start_time += duration_ms | |
| # Write the metadata to the file | |
| with open(metadata_file, 'w', encoding='utf-8') as file: | |
| file.write(ffmpeg_metadata) | |
| return True | |
| except Exception as e: | |
| raise DependencyError(e) | |
| def export_audio(): | |
| try: | |
| if session['cancellation_requested']: | |
| print('Cancel requested') | |
| return False | |
| ffmpeg_cover = None | |
| if session['script_mode'] == DOCKER_UTILS: | |
| docker_dir = os.path.basename(session['tmp_dir']) | |
| ffmpeg_combined_audio = f'/files/{docker_dir}/' + os.path.basename(assembled_audio) | |
| ffmpeg_metadata_file = f'/files/{docker_dir}/' + os.path.basename(metadata_file) | |
| ffmpeg_final_file = f'/files/{docker_dir}/' + os.path.basename(docker_final_file) | |
| if session['cover'] is not None: | |
| ffmpeg_cover = f'/files/{docker_dir}/' + os.path.basename(session['cover']) | |
| ffmpeg_cmd = ['ffmpeg', '-i', ffmpeg_combined_audio, '-i', ffmpeg_metadata_file] | |
| else: | |
| ffmpeg_combined_audio = assembled_audio | |
| ffmpeg_metadata_file = metadata_file | |
| ffmpeg_final_file = final_file | |
| if session['cover'] is not None: | |
| ffmpeg_cover = session['cover'] | |
| ffmpeg_cmd = [shutil.which('ffmpeg'), '-i', ffmpeg_combined_audio, '-i', ffmpeg_metadata_file] | |
| if ffmpeg_cover is not None: | |
| ffmpeg_cmd += ['-i', ffmpeg_cover, '-map', '0:a', '-map', '2:v'] | |
| else: | |
| ffmpeg_cmd += ['-map', '0:a'] | |
| ffmpeg_cmd += ['-map_metadata', '1', '-c:a', 'aac', '-b:a', '128k', '-ar', '44100'] | |
| if ffmpeg_cover is not None: | |
| if ffmpeg_cover.endswith('.png'): | |
| ffmpeg_cmd += ['-c:v', 'png', '-disposition:v', 'attached_pic'] # PNG cover | |
| else: | |
| ffmpeg_cmd += ['-c:v', 'copy', '-disposition:v', 'attached_pic'] # JPEG cover (no re-encoding needed) | |
| if ffmpeg_cover is not None and ffmpeg_cover.endswith('.png'): | |
| ffmpeg_cmd += ['-pix_fmt', 'yuv420p'] | |
| ffmpeg_cmd += [ | |
| '-af', | |
| 'agate=threshold=-35dB:ratio=1.5:attack=10:release=200,acompressor=threshold=-20dB:ratio=2:attack=80:release=200:makeup=1dB,loudnorm=I=-19:TP=-3:LRA=7:linear=true,afftdn=nf=-50,equalizer=f=150:t=q:w=2:g=2,equalizer=f=250:t=q:w=2:g=-2,equalizer=f=12000:t=q:w=2:g=2', | |
| '-movflags', '+faststart', '-y', ffmpeg_final_file | |
| ] | |
| if session['script_mode'] == DOCKER_UTILS: | |
| try: | |
| container = session['client'].containers.run( | |
| docker_utils_image, | |
| command=ffmpeg_cmd, | |
| volumes={session['tmp_dir']: {'bind': f'/files/{docker_dir}', 'mode': 'rw'}}, | |
| remove=True, | |
| detach=False, | |
| stdout=True, | |
| stderr=True | |
| ) | |
| print(container.decode('utf-8')) | |
| if shutil.copy(docker_final_file, final_file): | |
| return True | |
| return False | |
| except docker.errors.ContainerError as e: | |
| raise DependencyError(e) | |
| except docker.errors.ImageNotFound as e: | |
| raise DependencyError(e) | |
| except docker.errors.APIError as e: | |
| raise DependencyError(e) | |
| else: | |
| try: | |
| subprocess.run(ffmpeg_cmd, env={}, check=True) | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| raise DependencyError(e) | |
| except Exception as e: | |
| raise DependencyError(e) | |
| try: | |
| chapter_files = [f for f in os.listdir(session['chapters_dir']) if f.endswith(".wav")] | |
| chapter_files = sorted(chapter_files, key=lambda x: int(re.search(r'\d+', x).group())) | |
| assembled_audio = os.path.join(session['tmp_dir'], session['metadata']['title'] + '.' + audioproc_format) | |
| metadata_file = os.path.join(session['tmp_dir'], 'metadata.txt') | |
| if assemble_audio(): | |
| if generate_ffmpeg_metadata(): | |
| final_name = session['metadata']['title'] + '.' + audiobook_format | |
| docker_final_file = os.path.join(session['tmp_dir'], final_name) | |
| final_file = os.path.join(session['audiobooks_dir'], final_name) | |
| if export_audio(): | |
| return final_file | |
| return None | |
| except Exception as e: | |
| raise DependencyError(e) | |
| def replace_roman_numbers(text): | |
| def roman_to_int(s): | |
| try: | |
| roman = {'I': 1, 'V': 5, 'X': 10, 'L': 50, 'C': 100, 'D': 500, 'M': 1000, | |
| 'IV': 4, 'IX': 9, 'XL': 40, 'XC': 90, 'CD': 400, 'CM': 900} | |
| i = 0 | |
| num = 0 | |
| # Iterate over the string to calculate the integer value | |
| while i < len(s): | |
| # Check for two-character numerals (subtractive combinations) | |
| if i + 1 < len(s) and s[i:i+2] in roman: | |
| num += roman[s[i:i+2]] | |
| i += 2 | |
| else: | |
| # Add the value of the single character | |
| num += roman[s[i]] | |
| i += 1 | |
| return num | |
| except Exception as e: | |
| return s | |
| roman_chapter_pattern = re.compile( | |
| r'\b(chapter|volume|chapitre|tome|capitolo|capítulo|volumen|Kapitel|глава|том|κεφάλαιο|τόμος|capitul|poglavlje)\s' | |
| r'(M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})|[IVXLCDM]+)\b', | |
| re.IGNORECASE | |
| ) | |
| roman_numerals_with_period = re.compile( | |
| r'^(M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})|[IVXLCDM])\.+' | |
| ) | |
| def replace_chapter_match(match): | |
| chapter_word = match.group(1) | |
| roman_numeral = match.group(2) | |
| integer_value = roman_to_int(roman_numeral.upper()) | |
| return f'{chapter_word.capitalize()} {integer_value}' | |
| def replace_numeral_with_period(match): | |
| roman_numeral = match.group(1) | |
| integer_value = roman_to_int(roman_numeral) | |
| return f'{integer_value}.' | |
| text = roman_chapter_pattern.sub(replace_chapter_match, text) | |
| text = roman_numerals_with_period.sub(replace_numeral_with_period, text) | |
| return text | |
| ''' | |
| def stop_and_detach_tts(tts=None): | |
| if tts is not None: | |
| if next(tts.parameters()).is_cuda: | |
| tts.to('cpu') | |
| del tts | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| ''' | |
| def delete_old_web_folders(root_dir): | |
| try: | |
| if not os.path.exists(root_dir): | |
| os.makedirs(root_dir) | |
| print(f'Created missing directory: {root_dir}') | |
| current_time = time.time() | |
| age_limit = current_time - interface_shared_expire * 60 * 60 # 24 hours in seconds | |
| for folder_name in os.listdir(root_dir): | |
| dir_path = os.path.join(root_dir, folder_name) | |
| if os.path.isdir(dir_path) and folder_name.startswith('web-'): | |
| folder_creation_time = os.path.getctime(dir_path) | |
| if folder_creation_time < age_limit: | |
| shutil.rmtree(dir_path) | |
| except Exception as e: | |
| raise DependencyError(e) | |
| def compare_file_metadata(f1, f2): | |
| if os.path.getsize(f1) != os.path.getsize(f2): | |
| return False | |
| if os.path.getmtime(f1) != os.path.getmtime(f2): | |
| return False | |
| return True | |
| def convert_ebook(args): | |
| try: | |
| global is_gui_process | |
| global context | |
| error = None | |
| try: | |
| if len(args['language']) == 2: | |
| lang_array = languages.get(alpha2=args['language']) | |
| if lang_array and lang_array.part3: | |
| args['language'] = lang_array.part3 | |
| else: | |
| args['language'] = None | |
| else: | |
| lang_array = languages.get(part3=args['language']) | |
| if not lang_array: | |
| args['language'] = None | |
| except Exception as e: | |
| args['language'] = None | |
| pass | |
| if args['language'] is not None and args['language'] in language_mapping.keys(): | |
| session_id = args['session'] if args['session'] is not None else str(uuid.uuid4()) | |
| session = context.get_session(session_id) | |
| session['id'] = session_id | |
| session['src'] = args['ebook'] | |
| session['script_mode'] = args['script_mode'] if args['script_mode'] is not None else NATIVE | |
| session['audiobooks_dir'] = args['audiobooks_dir'] | |
| is_gui_process = args['is_gui_process'] | |
| device = args['device'].lower() | |
| voice_file = args['voice'] | |
| language = args['language'] | |
| temperature = args['temperature'] | |
| length_penalty = args['length_penalty'] | |
| repetition_penalty = args['repetition_penalty'] | |
| top_k = args['top_k'] | |
| top_p = args['top_p'] | |
| speed = args['speed'] | |
| enable_text_splitting = args['enable_text_splitting'] if args['enable_text_splitting'] is not None else True | |
| custom_model_file = args['custom_model'] if args['custom_model'] != 'none' and args['custom_model'] is not None else None | |
| fine_tuned = args['fine_tuned'] if check_fine_tuned(args['fine_tuned'], args['language']) else None | |
| if not fine_tuned: | |
| raise ValueError('The fine tuned model does not exist.') | |
| if not os.path.splitext(args['ebook'])[1]: | |
| raise ValueError('The selected ebook file has no extension. Please select a valid file.') | |
| if session['script_mode'] == NATIVE: | |
| bool, e = check_programs('Calibre', 'calibre', '--version') | |
| if not bool: | |
| raise DependencyError(e) | |
| bool, e = check_programs('FFmpeg', 'ffmpeg', '-version') | |
| if not bool: | |
| raise DependencyError(e) | |
| elif session['script_mode'] == DOCKER_UTILS: | |
| session['client'] = docker.from_env() | |
| session['tmp_dir'] = os.path.join(processes_dir, f"ebook-{session['id']}") | |
| session['chapters_dir'] = os.path.join(session['tmp_dir'], f"chapters_{hashlib.md5(args['ebook'].encode()).hexdigest()}") | |
| session['chapters_dir_sentences'] = os.path.join(session['chapters_dir'], 'sentences') | |
| if not is_gui_process: | |
| print(f'*********** Session: {session_id}', '************* Store it in case of interruption or crash you can resume the conversion') | |
| session['custom_model_dir'] = os.path.join(models_dir,'__sessions',f"model-{session['id']}") | |
| if custom_model_file: | |
| session['custom_model'], progression_status = extract_custom_model(custom_model_file, session['custom_model_dir']) | |
| if not session['custom_model']: | |
| raise ValueError(f'{custom_model_file} could not be extracted or mandatory files are missing') | |
| if prepare_dirs(args['ebook'], session): | |
| session['filename_noext'] = os.path.splitext(os.path.basename(session['src']))[0] | |
| if not torch.cuda.is_available() or device == 'cpu': | |
| if device == 'gpu': | |
| print('GPU is not available on your device!') | |
| device = 'cpu' | |
| else: | |
| device = 'cuda' | |
| torch.device(device) | |
| print(f'Available Processor Unit: {device}') | |
| session['epub_path'] = os.path.join(session['tmp_dir'], '__' + session['filename_noext'] + '.epub') | |
| has_src_metadata = has_metadata(session['src']) | |
| if convert_to_epub(session): | |
| session['epub'] = epub.read_epub(session['epub_path'], {'ignore_ncx': True}) | |
| metadata = dict(session['metadata']) | |
| for key, value in metadata.items(): | |
| data = session['epub'].get_metadata('DC', key) | |
| if data: | |
| for value, attributes in data: | |
| if key == 'language' and not has_src_metadata: | |
| session['metadata'][key] = language | |
| else: | |
| session['metadata'][key] = value | |
| language_array = languages.get(part3=language) | |
| if language_array and language_array.part1: | |
| session['metadata']['language_iso1'] = language_array.part1 | |
| if session['metadata']['language'] == language or session['metadata']['language_iso1'] and session['metadata']['language'] == session['metadata']['language_iso1']: | |
| session['metadata']['title'] = os.path.splitext(os.path.basename(session['src']))[0] if not session['metadata']['title'] else session['metadata']['title'] | |
| session['metadata']['creator'] = False if not session['metadata']['creator'] else session['metadata']['creator'] | |
| session['cover'] = get_cover(session) | |
| if session['cover']: | |
| session['chapters'] = get_chapters(language, session) | |
| if session['chapters']: | |
| session['device'] = device | |
| session['temperature'] = temperature | |
| session['length_penalty'] = length_penalty | |
| session['repetition_penalty'] = repetition_penalty | |
| session['top_k'] = top_k | |
| session['top_p'] = top_p | |
| session['speed'] = speed | |
| session['enable_text_splitting'] = enable_text_splitting | |
| session['fine_tuned'] = fine_tuned | |
| session['voice_file'] = voice_file | |
| session['language'] = language | |
| if convert_chapters_to_audio(session): | |
| final_file = combine_audio_chapters(session) | |
| if final_file is not None: | |
| chapters_dirs = [ | |
| dir_name for dir_name in os.listdir(session['tmp_dir']) | |
| if fnmatch.fnmatch(dir_name, "chapters_*") and os.path.isdir(os.path.join(session['tmp_dir'], dir_name)) | |
| ] | |
| if len(chapters_dirs) > 1: | |
| if os.path.exists(session['chapters_dir']): | |
| shutil.rmtree(session['chapters_dir']) | |
| if os.path.exists(session['epub_path']): | |
| os.remove(session['epub_path']) | |
| if os.path.exists(session['cover']): | |
| os.remove(session['cover']) | |
| else: | |
| if os.path.exists(session['tmp_dir']): | |
| shutil.rmtree(session['tmp_dir']) | |
| progress_status = f'Audiobook {os.path.basename(final_file)} created!' | |
| return progress_status, final_file | |
| else: | |
| error = 'combine_audio_chapters() error: final_file not created!' | |
| else: | |
| error = 'convert_chapters_to_audio() failed!' | |
| else: | |
| error = 'get_chapters() failed!' | |
| else: | |
| error = 'get_cover() failed!' | |
| else: | |
| error = f"WARNING: Ebook language: {session['metadata']['language']}, language selected: {language}" | |
| else: | |
| error = 'convert_to_epub() failed!' | |
| else: | |
| error = f"Temporary directory {session['tmp_dir']} not removed due to failure." | |
| else: | |
| error = f"Language {args['language']} is not supported." | |
| if session['cancellation_requested']: | |
| error = 'Cancelled' | |
| print(error) | |
| return error, None | |
| except Exception as e: | |
| print(f'convert_ebook() Exception: {e}') | |
| return e, None | |
| def web_interface(args): | |
| script_mode = args['script_mode'] | |
| is_gui_process = args['is_gui_process'] | |
| is_gui_shared = args['share'] | |
| is_converting = False | |
| audiobooks_dir = None | |
| ebook_src = None | |
| audiobook_file = None | |
| language_options = [ | |
| ( | |
| f"{details['name']} - {details['native_name']}" if details['name'] != details['native_name'] else details['name'], | |
| lang | |
| ) | |
| for lang, details in language_mapping.items() | |
| ] | |
| custom_model_options = None | |
| fine_tuned_options = list(models['xtts'].keys()) | |
| default_language_name = next((name for name, key in language_options if key == default_language_code), None) | |
| theme = gr.themes.Origin( | |
| primary_hue='amber', | |
| secondary_hue='green', | |
| neutral_hue='gray', | |
| radius_size='lg', | |
| font_mono=['JetBrains Mono', 'monospace', 'Consolas', 'Menlo', 'Liberation Mono'] | |
| ) | |
| with gr.Blocks(theme=theme) as interface: | |
| gr.HTML( | |
| ''' | |
| <style> | |
| .svelte-1xyfx7i.center.boundedheight.flex{ | |
| height: 120px !important; | |
| } | |
| .block.svelte-5y6bt2 { | |
| padding: 10px !important; | |
| margin: 0 !important; | |
| height: auto !important; | |
| font-size: 16px !important; | |
| } | |
| .wrap.svelte-12ioyct { | |
| padding: 0 !important; | |
| margin: 0 !important; | |
| font-size: 12px !important; | |
| } | |
| .block.svelte-5y6bt2.padded { | |
| height: auto !important; | |
| padding: 10px !important; | |
| } | |
| .block.svelte-5y6bt2.padded.hide-container { | |
| height: auto !important; | |
| padding: 0 !important; | |
| } | |
| .waveform-container.svelte-19usgod { | |
| height: 58px !important; | |
| overflow: hidden !important; | |
| padding: 0 !important; | |
| margin: 0 !important; | |
| } | |
| .component-wrapper.svelte-19usgod { | |
| height: 110px !important; | |
| } | |
| .timestamps.svelte-19usgod { | |
| display: none !important; | |
| } | |
| .controls.svelte-ije4bl { | |
| padding: 0 !important; | |
| margin: 0 !important; | |
| } | |
| #component-7, #component-10, #component-20 { | |
| height: 140px !important; | |
| } | |
| #component-47, #component-51 { | |
| height: 100px !important; | |
| } | |
| </style> | |
| ''' | |
| ) | |
| gr.Markdown( | |
| f''' | |
| # Ebook2Audiobook v{version}<br/> | |
| https://github.com/DrewThomasson/ebook2audiobook<br/> | |
| Convert eBooks into immersive audiobooks with realistic voice TTS models.<br/> | |
| Multiuser, multiprocessing, multithread on a geo cluster to share the conversion to the Grid. | |
| This free space is very slow, you should just run it locally with docker, info in github | |
| [](https://github.com/DrewThomasson/ebook2audiobook) | |
| [](https://colab.research.google.com/github/DrewThomasson/ebook2audiobook/blob/main/Notebooks/colab_ebook2audiobook.ipynb) | |
| ''' | |
| ) | |
| with gr.Tabs(): | |
| gr_tab_main = gr.TabItem('Input Options') | |
| with gr_tab_main: | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| with gr.Group(): | |
| gr_ebook_file = gr.File(label='EBook File (.epub, .mobi, .azw3, fb2, lrf, rb, snb, tcr, .pdf, .txt, .rtf, doc, .docx, .html, .odt, .azw)', file_types=['.epub', '.mobi', '.azw3', 'fb2', 'lrf', 'rb', 'snb', 'tcr', '.pdf', '.txt', '.rtf', 'doc', '.docx', '.html', '.odt', '.azw']) | |
| with gr.Group(): | |
| gr_voice_file = gr.File(label='*Cloning Voice (a .wav 24khz for XTTS base model and 16khz for FAIRSEQ base model, no more than 6 sec)', file_types=['.wav'], visible=interface_component_options['gr_voice_file']) | |
| gr.Markdown('<p> * Optional</p>') | |
| with gr.Group(): | |
| gr_device = gr.Radio(label='Processor Unit', choices=['CPU', 'GPU'], value='CPU') | |
| with gr.Group(): | |
| gr_language = gr.Dropdown(label='Language', choices=[name for name, _ in language_options], value=default_language_name) | |
| with gr.Column(scale=3): | |
| gr_group_custom_model = gr.Group(visible=interface_component_options['gr_group_custom_model']) | |
| with gr_group_custom_model: | |
| gr_custom_model_file = gr.File(label='*Custom XTTS Model (a .zip containing config.json, vocab.json, model.pth, ref.wav)', file_types=['.zip']) | |
| gr_custom_model_list = gr.Dropdown(label='', choices=['none'], interactive=True) | |
| gr.Markdown('<p> * Optional</p>') | |
| with gr.Group(): | |
| gr_session_status = gr.Textbox(label='Session') | |
| with gr.Group(): | |
| gr_tts_engine = gr.Dropdown(label='TTS Base', choices=[default_tts_engine], value=default_tts_engine, interactive=True) | |
| gr_fine_tuned = gr.Dropdown(label='Fine Tuned Models', choices=fine_tuned_options, value=default_fine_tuned, interactive=True) | |
| gr_tab_preferences = gr.TabItem('Audio Generation Preferences', visible=interface_component_options['gr_tab_preferences']) | |
| with gr_tab_preferences: | |
| gr.Markdown( | |
| ''' | |
| ### Customize Audio Generation Parameters | |
| Adjust the settings below to influence how the audio is generated. You can control the creativity, speed, repetition, and more. | |
| ''' | |
| ) | |
| gr_temperature = gr.Slider( | |
| label='Temperature', | |
| minimum=0.1, | |
| maximum=10.0, | |
| step=0.1, | |
| value=0.65, | |
| info='Higher values lead to more creative, unpredictable outputs. Lower values make it more monotone.' | |
| ) | |
| gr_length_penalty = gr.Slider( | |
| label='Length Penalty', | |
| minimum=0.5, | |
| maximum=10.0, | |
| step=0.1, | |
| value=1.0, | |
| info='Penalize longer sequences. Higher values produce shorter outputs. Not applied to custom models.' | |
| ) | |
| gr_repetition_penalty = gr.Slider( | |
| label='Repetition Penalty', | |
| minimum=1.0, | |
| maximum=10.0, | |
| step=0.1, | |
| value=2.5, | |
| info='Penalizes repeated phrases. Higher values reduce repetition.' | |
| ) | |
| gr_top_k = gr.Slider( | |
| label='Top-k Sampling', | |
| minimum=10, | |
| maximum=100, | |
| step=1, | |
| value=50, | |
| info='Lower values restrict outputs to more likely words and increase speed at which audio generates.' | |
| ) | |
| gr_top_p = gr.Slider( | |
| label='Top-p Sampling', | |
| minimum=0.1, | |
| maximum=1.0, | |
| step=.01, | |
| value=0.8, | |
| info='Controls cumulative probability for word selection. Lower values make the output more predictable and increase speed at which audio generates.' | |
| ) | |
| gr_speed = gr.Slider( | |
| label='Speed', | |
| minimum=0.5, | |
| maximum=3.0, | |
| step=0.1, | |
| value=1.0, | |
| info='Adjusts how fast the narrator will speak.' | |
| ) | |
| gr_enable_text_splitting = gr.Checkbox( | |
| label='Enable Text Splitting', | |
| value=True, | |
| info='Splits long texts into sentences to generate audio in chunks. Useful for very long inputs.' | |
| ) | |
| gr_state = gr.State(value="") # Initialize state for each user session | |
| gr_session = gr.Textbox(label='Session', visible=False) | |
| gr_conversion_progress = gr.Textbox(label='Progress') | |
| gr_convert_btn = gr.Button('Convert', variant='primary', interactive=False) | |
| gr_audio_player = gr.Audio(label='Listen', type='filepath', show_download_button=False, container=True, visible=False) | |
| gr_audiobooks_ddn = gr.Dropdown(choices=[], label='Audiobooks') | |
| gr_audiobook_link = gr.File(label='Download') | |
| gr_write_data = gr.JSON(visible=False) | |
| gr_read_data = gr.JSON(visible=False) | |
| gr_data = gr.State({}) | |
| gr_modal_html = gr.HTML() | |
| def show_modal(message): | |
| return f''' | |
| <style> | |
| .modal {{ | |
| display: none; /* Hidden by default */ | |
| position: fixed; | |
| top: 0; | |
| left: 0; | |
| width: 100%; | |
| height: 100%; | |
| background-color: rgba(0, 0, 0, 0.5); | |
| z-index: 9999; | |
| display: flex; | |
| justify-content: center; | |
| align-items: center; | |
| }} | |
| .modal-content {{ | |
| background-color: #333; | |
| padding: 20px; | |
| border-radius: 8px; | |
| text-align: center; | |
| max-width: 300px; | |
| box-shadow: 0 4px 8px rgba(0, 0, 0, 0.5); | |
| border: 2px solid #FFA500; | |
| color: white; | |
| font-family: Arial, sans-serif; | |
| position: relative; | |
| }} | |
| .modal-content p {{ | |
| margin: 10px 0; | |
| }} | |
| /* Spinner */ | |
| .spinner {{ | |
| margin: 15px auto; | |
| border: 4px solid rgba(255, 255, 255, 0.2); | |
| border-top: 4px solid #FFA500; | |
| border-radius: 50%; | |
| width: 30px; | |
| height: 30px; | |
| animation: spin 1s linear infinite; | |
| }} | |
| @keyframes spin {{ | |
| 0% {{ transform: rotate(0deg); }} | |
| 100% {{ transform: rotate(360deg); }} | |
| }} | |
| </style> | |
| <div id="custom-modal" class="modal"> | |
| <div class="modal-content"> | |
| <p>{message}</p> | |
| <div class="spinner"></div> <!-- Spinner added here --> | |
| </div> | |
| </div> | |
| ''' | |
| def hide_modal(): | |
| return '' | |
| def update_interface(): | |
| nonlocal is_converting | |
| is_converting = False | |
| return gr.update('Convert', variant='primary', interactive=False), gr.update(value=None), gr.update(value=None), gr.update(value=audiobook_file), update_audiobooks_ddn(), hide_modal() | |
| def refresh_audiobook_list(): | |
| files = [] | |
| if audiobooks_dir is not None: | |
| if os.path.exists(audiobooks_dir): | |
| files = [f for f in os.listdir(audiobooks_dir)] | |
| files.sort(key=lambda x: os.path.getmtime(os.path.join(audiobooks_dir, x)), reverse=True) | |
| return files | |
| def change_gr_audiobooks_ddn(audiobook): | |
| if audiobooks_dir is not None: | |
| if audiobook: | |
| link = os.path.join(audiobooks_dir, audiobook) | |
| return link, link, gr.update(visible=True) | |
| return None, None, gr.update(visible=False) | |
| def update_convert_btn(upload_file=None, custom_model_file=None, session_id=None): | |
| if session_id is None: | |
| yield gr.update(variant='primary', interactive=False) | |
| return | |
| else: | |
| session = context.get_session(session_id) | |
| if hasattr(upload_file, 'name') and not hasattr(custom_model_file, 'name'): | |
| yield gr.update(variant='primary', interactive=True) | |
| else: | |
| yield gr.update(variant='primary', interactive=False) | |
| return | |
| def update_audiobooks_ddn(): | |
| files = refresh_audiobook_list() | |
| return gr.update(choices=files, label='Audiobooks', value=files[0] if files else None) | |
| async def change_gr_ebook_file(f, session_id): | |
| nonlocal is_converting | |
| if context and session_id: | |
| session = context.get_session(session_id) | |
| if f is None: | |
| if is_converting: | |
| session['cancellation_requested'] = True | |
| yield show_modal('Cancellation requested, please wait...') | |
| return | |
| session['cancellation_requested'] = False | |
| yield hide_modal() | |
| return | |
| def change_gr_language(selected: str, session_id: str): | |
| nonlocal custom_model_options | |
| if selected == 'zzzz': | |
| new_language_name = default_language_name | |
| new_language_key = default_language_code | |
| else: | |
| new_language_name, new_language_key = next(((name, key) for name, key in language_options if key == selected), (None, None)) | |
| tts_engine_options = ['xtts'] if language_xtts.get(new_language_key, False) else ['fairseq'] | |
| fine_tuned_options = [ | |
| model_name | |
| for model_name, model_details in models.get(tts_engine_options[0], {}).items() | |
| if model_details.get('lang') == 'multi' or model_details.get('lang') == new_language_key | |
| ] | |
| custom_model_options = ['none'] | |
| if context and session_id: | |
| session = context.get_session(session_id) | |
| session['language'] = new_language_key | |
| custom_model_tts = check_custom_model_tts(session) | |
| custom_model_tts_dir = os.path.join(session['custom_model_dir'], custom_model_tts) | |
| if os.path.exists(custom_model_tts_dir): | |
| custom_model_options += os.listdir(custom_model_tts_dir) | |
| return ( | |
| gr.update(value=new_language_name), | |
| gr.update(choices=tts_engine_options, value=tts_engine_options[0]), | |
| gr.update(choices=fine_tuned_options, value=fine_tuned_options[0] if fine_tuned_options else 'none'), | |
| gr.update(choices=custom_model_options, value=custom_model_options[0]) | |
| ) | |
| def check_custom_model_tts(session): | |
| custom_model_tts = 'xtts' | |
| if not language_xtts.get(session['language']): | |
| custom_model_tts = 'fairseq' | |
| custom_model_tts_dir = os.path.join(session['custom_model_dir'], custom_model_tts) | |
| if not os.path.isdir(custom_model_tts_dir): | |
| os.makedirs(custom_model_tts_dir, exist_ok=True) | |
| return custom_model_tts | |
| def change_gr_custom_model_list(custom_model_list): | |
| if custom_model_list == 'none': | |
| return gr.update(visible=True) | |
| return gr.update(visible=False) | |
| async def change_gr_custom_model_file(custom_model_file, session_id): | |
| try: | |
| nonlocal custom_model_options, gr_custom_model_file, gr_conversion_progress | |
| if context and session_id: | |
| session = context.get_session(session_id) | |
| if custom_model_file is not None: | |
| if analyze_uploaded_file(custom_model_file): | |
| session['custom_model'], progress_status = extract_custom_model(custom_model_file, None, session) | |
| if session['custom_model']: | |
| custom_model_tts_dir = check_custom_model_tts(session) | |
| custom_model_options = ['none'] + os.listdir(os.path.join(session['custom_model_dir'], custom_model_tts_dir)) | |
| yield ( | |
| gr.update(visible=False), | |
| gr.update(choices=custom_model_options, value=session['custom_model']), | |
| gr.update(value=f"{session['custom_model']} added to the custom list") | |
| ) | |
| gr_custom_model_file = gr.File(label='*XTTS Model (a .zip containing config.json, vocab.json, model.pth, ref.wav)', value=None, file_types=['.zip']) | |
| return | |
| yield gr.update(), gr.update(), gr.update(value='Invalid file! Please upload a valid ZIP.') | |
| return | |
| except Exception as e: | |
| yield gr.update(), gr.update(), gr.update(value=f'Error: {str(e)}') | |
| return | |
| def change_gr_tts_engine(engine): | |
| if engine == 'xtts': | |
| return gr.update(visible=True) | |
| else: | |
| return gr.update(visible=False) | |
| def change_gr_fine_tuned(fine_tuned): | |
| visible = False | |
| if fine_tuned == 'std': | |
| visible = True | |
| return gr.update(visible=visible) | |
| def change_gr_data(data): | |
| data['event'] = 'change_data' | |
| return data | |
| def change_gr_read_data(data): | |
| nonlocal audiobooks_dir | |
| nonlocal custom_model_options | |
| warning_text_extra = '' | |
| if not data: | |
| data = {'session_id': str(uuid.uuid4())} | |
| warning_text = f"Session: {data['session_id']}" | |
| else: | |
| if 'session_id' not in data: | |
| data['session_id'] = str(uuid.uuid4()) | |
| warning_text = data['session_id'] | |
| event = data.get('event', '') | |
| if event != 'load': | |
| return [gr.update(), gr.update(), gr.update(), gr.update(), gr.update()] | |
| session = context.get_session(data['session_id']) | |
| session['custom_model_dir'] = os.path.join(models_dir,'__sessions',f"model-{session['id']}") | |
| os.makedirs(session['custom_model_dir'], exist_ok=True) | |
| custom_model_tts_dir = check_custom_model_tts(session) | |
| custom_model_options = ['none'] + os.listdir(os.path.join(session['custom_model_dir'],custom_model_tts_dir)) | |
| if is_gui_shared: | |
| warning_text_extra = f' Note: access limit time: {interface_shared_expire} hours' | |
| audiobooks_dir = os.path.join(audiobooks_gradio_dir, f"web-{data['session_id']}") | |
| delete_old_web_folders(audiobooks_gradio_dir) | |
| else: | |
| audiobooks_dir = os.path.join(audiobooks_host_dir, f"web-{data['session_id']}") | |
| return [data, f'{warning_text}{warning_text_extra}', data['session_id'], update_audiobooks_ddn(), gr.update(choices=custom_model_options, value='none')] | |
| def submit_convert_btn( | |
| session, device, ebook_file, voice_file, language, | |
| custom_model_file, temperature, length_penalty, | |
| repetition_penalty, top_k, top_p, speed, enable_text_splitting, fine_tuned | |
| ): | |
| nonlocal is_converting | |
| args = { | |
| "is_gui_process": is_gui_process, | |
| "session": session, | |
| "script_mode": script_mode, | |
| "device": device.lower(), | |
| "ebook": ebook_file.name if ebook_file else None, | |
| "audiobooks_dir": audiobooks_dir, | |
| "voice": voice_file.name if voice_file else None, | |
| "language": next((key for name, key in language_options if name == language), None), | |
| "custom_model": next((key for name, key in language_options if name != 'none'), None), | |
| "temperature": float(temperature), | |
| "length_penalty": float(length_penalty), | |
| "repetition_penalty": float(repetition_penalty), | |
| "top_k": int(top_k), | |
| "top_p": float(top_p), | |
| "speed": float(speed), | |
| "enable_text_splitting": enable_text_splitting, | |
| "fine_tuned": fine_tuned | |
| } | |
| if args["ebook"] is None: | |
| yield gr.update(value='Error: a file is required.') | |
| return | |
| try: | |
| is_converting = True | |
| progress_status, audiobook_file = convert_ebook(args) | |
| if audiobook_file is None: | |
| if is_converting: | |
| yield gr.update(value='Conversion cancelled.') | |
| return | |
| else: | |
| yield gr.update(value='Conversion failed.') | |
| return | |
| else: | |
| yield progress_status | |
| return | |
| except Exception as e: | |
| yield DependencyError(e) | |
| return | |
| gr_ebook_file.change( | |
| fn=update_convert_btn, | |
| inputs=[gr_ebook_file, gr_custom_model_file, gr_session], | |
| outputs=gr_convert_btn | |
| ).then( | |
| fn=change_gr_ebook_file, | |
| inputs=[gr_ebook_file, gr_session], | |
| outputs=[gr_modal_html] | |
| ) | |
| gr_language.change( | |
| fn=lambda selected, session_id: change_gr_language(dict(language_options).get(selected, 'Unknown'), session_id), | |
| inputs=[gr_language, gr_session], | |
| outputs=[gr_language, gr_tts_engine, gr_fine_tuned, gr_custom_model_list] | |
| ) | |
| gr_audiobooks_ddn.change( | |
| fn=change_gr_audiobooks_ddn, | |
| inputs=gr_audiobooks_ddn, | |
| outputs=[gr_audiobook_link, gr_audio_player, gr_audio_player] | |
| ) | |
| gr_custom_model_file.change( | |
| fn=change_gr_custom_model_file, | |
| inputs=[gr_custom_model_file, gr_session], | |
| outputs=[gr_fine_tuned, gr_custom_model_list, gr_conversion_progress] | |
| ) | |
| gr_custom_model_list.change( | |
| fn=change_gr_custom_model_list, | |
| inputs=gr_custom_model_list, | |
| outputs=gr_fine_tuned | |
| ) | |
| gr_tts_engine.change( | |
| fn=change_gr_tts_engine, | |
| inputs=gr_tts_engine, | |
| outputs=gr_tab_preferences | |
| ) | |
| gr_fine_tuned.change( | |
| fn=change_gr_fine_tuned, | |
| inputs=gr_fine_tuned, | |
| outputs=gr_group_custom_model | |
| ) | |
| gr_session.change( | |
| fn=change_gr_data, | |
| inputs=gr_data, | |
| outputs=gr_write_data | |
| ) | |
| gr_write_data.change( | |
| fn=None, | |
| inputs=gr_write_data, | |
| js=''' | |
| (data) => { | |
| localStorage.clear(); | |
| console.log(data); | |
| window.localStorage.setItem('data', JSON.stringify(data)); | |
| } | |
| ''' | |
| ) | |
| gr_read_data.change( | |
| fn=change_gr_read_data, | |
| inputs=gr_read_data, | |
| outputs=[gr_data, gr_session_status, gr_session, gr_audiobooks_ddn, gr_custom_model_list] | |
| ) | |
| gr_convert_btn.click( | |
| fn=update_convert_btn, | |
| inputs=None, | |
| outputs=gr_convert_btn | |
| ).then( | |
| fn=submit_convert_btn, | |
| inputs=[ | |
| gr_session, gr_device, gr_ebook_file, gr_voice_file, gr_language, | |
| gr_custom_model_list, gr_temperature, gr_length_penalty, | |
| gr_repetition_penalty, gr_top_k, gr_top_p, gr_speed, gr_enable_text_splitting, gr_fine_tuned | |
| ], | |
| outputs=gr_conversion_progress | |
| ).then( | |
| fn=update_interface, | |
| inputs=None, | |
| outputs=[gr_convert_btn, gr_ebook_file, gr_voice_file, gr_audio_player, gr_audiobooks_ddn, gr_modal_html] | |
| ) | |
| interface.load( | |
| fn=None, | |
| js=''' | |
| () => { | |
| const dataStr = window.localStorage.getItem('data'); | |
| if (dataStr) { | |
| const obj = JSON.parse(dataStr); | |
| obj.event = 'load'; | |
| console.log(obj); | |
| return obj; | |
| } | |
| return null; | |
| } | |
| ''', | |
| outputs=gr_read_data | |
| ) | |
| try: | |
| interface.queue(default_concurrency_limit=interface_concurrency_limit).launch(server_name=interface_host, server_port=interface_port, share=is_gui_shared) | |
| except OSError as e: | |
| print(f'Connection error: {e}') | |
| except socket.error as e: | |
| print(f'Socket error: {e}') | |
| except KeyboardInterrupt: | |
| print('Server interrupted by user. Shutting down...') | |
| except Exception as e: | |
| print(f'An unexpected error occurred: {e}') | |