|
print("NLTK")
|
|
import nltk
|
|
nltk.download('punkt')
|
|
print("SCIPY")
|
|
from scipy.io.wavfile import write
|
|
print("TORCH STUFF")
|
|
import torch
|
|
print("START")
|
|
torch.manual_seed(0)
|
|
torch.backends.cudnn.benchmark = False
|
|
torch.backends.cudnn.deterministic = True
|
|
|
|
|
|
|
|
|
|
|
|
import IPython.display as ipd
|
|
import os
|
|
os.environ['CUDA_HOME'] = '/home/ubuntu/miniconda3/envs/respair/lib/python3.11/site-packages/torch/lib/include/cuda'
|
|
import torch
|
|
torch.manual_seed(0)
|
|
torch.backends.cudnn.benchmark = False
|
|
torch.backends.cudnn.deterministic = True
|
|
|
|
import random
|
|
random.seed(0)
|
|
|
|
import numpy as np
|
|
np.random.seed(0)
|
|
|
|
|
|
from text_utils import TextCleaner
|
|
textclenaer = TextCleaner()
|
|
|
|
|
|
def length_to_mask(lengths):
|
|
mask = torch.arange(lengths.max()).unsqueeze(0).expand(lengths.shape[0], -1).type_as(lengths)
|
|
mask = torch.gt(mask+1, lengths.unsqueeze(1))
|
|
return mask
|
|
|
|
|
|
|
|
|
|
import time
|
|
import random
|
|
import yaml
|
|
from munch import Munch
|
|
import numpy as np
|
|
import torch
|
|
from torch import nn
|
|
import torch.nn.functional as F
|
|
import torchaudio
|
|
import librosa
|
|
from nltk.tokenize import word_tokenize
|
|
|
|
from models import *
|
|
from Modules.KotoDama_sampler import tokenizer_koto_prompt, tokenizer_koto_text
|
|
from utils import *
|
|
|
|
import nltk
|
|
nltk.download('punkt_tab')
|
|
|
|
from nltk.tokenize import sent_tokenize
|
|
|
|
from konoha import SentenceTokenizer
|
|
|
|
|
|
sent_tokenizer = SentenceTokenizer()
|
|
|
|
|
|
to_mel = torchaudio.transforms.MelSpectrogram(
|
|
n_mels=80, n_fft=2048, win_length=1200, hop_length=300)
|
|
mean, std = -4, 4
|
|
|
|
|
|
def preprocess(wave):
|
|
wave_tensor = torch.from_numpy(wave).float()
|
|
mel_tensor = to_mel(wave_tensor)
|
|
mel_tensor = (torch.log(1e-5 + mel_tensor.unsqueeze(0)) - mean) / std
|
|
return mel_tensor
|
|
|
|
def compute_style_through_clip(path):
|
|
wave, sr = librosa.load(path, sr=24000)
|
|
audio, index = librosa.effects.trim(wave, top_db=30)
|
|
if sr != 24000:
|
|
audio = librosa.resample(audio, sr, 24000)
|
|
mel_tensor = preprocess(audio).to(device)
|
|
|
|
with torch.no_grad():
|
|
ref_s = model.style_encoder(mel_tensor.unsqueeze(1))
|
|
ref_p = model.predictor_encoder(mel_tensor.unsqueeze(1))
|
|
|
|
return torch.cat([ref_s, ref_p], dim=1)
|
|
|
|
|
|
def Kotodama_Prompter(model, text, device):
|
|
|
|
with torch.no_grad():
|
|
style = model.KotoDama_Prompt(**tokenizer_koto_prompt(text, return_tensors="pt").to(device))['logits']
|
|
return style
|
|
|
|
def Kotodama_Sampler(model, text, device):
|
|
|
|
with torch.no_grad():
|
|
style = model.KotoDama_Text(**tokenizer_koto_text(text, return_tensors="pt").to(device))['logits']
|
|
return style
|
|
|
|
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
|
|
|
config = yaml.safe_load(open("Configs/config_kanade.yml"))
|
|
|
|
|
|
ASR_config = config.get('ASR_config', False)
|
|
ASR_path = config.get('ASR_path', False)
|
|
text_aligner = load_ASR_models(ASR_path, ASR_config)
|
|
|
|
|
|
KotoDama_Prompter = load_KotoDama_Prompter(path="Utils/KTD/prompt_enc/checkpoint-73285")
|
|
KotoDama_TextSampler = load_KotoDama_TextSampler(path="Utils/KTD/text_enc/checkpoint-22680")
|
|
|
|
|
|
F0_path = config.get('F0_path', False)
|
|
pitch_extractor = load_F0_models(F0_path)
|
|
|
|
|
|
from Utils.PLBERT.util import load_plbert
|
|
BERT_path = config.get('PLBERT_dir', False)
|
|
plbert = load_plbert(BERT_path)
|
|
|
|
model_params = recursive_munch(config['model_params'])
|
|
model = build_model(model_params, text_aligner, pitch_extractor, plbert, KotoDama_Prompter, KotoDama_TextSampler)
|
|
_ = [model[key].eval() for key in model]
|
|
_ = [model[key].to(device) for key in model]
|
|
|
|
params_whole = torch.load("Models/Style_Tsukasa_v02/Top_ckpt_24khz.pth", map_location='cpu')
|
|
params = params_whole['net']
|
|
|
|
|
|
for key in model:
|
|
if key in params:
|
|
print('%s loaded' % key)
|
|
try:
|
|
model[key].load_state_dict(params[key])
|
|
except:
|
|
from collections import OrderedDict
|
|
state_dict = params[key]
|
|
new_state_dict = OrderedDict()
|
|
for k, v in state_dict.items():
|
|
name = k[7:]
|
|
new_state_dict[name] = v
|
|
|
|
model[key].load_state_dict(new_state_dict, strict=False)
|
|
|
|
|
|
|
|
|
|
_ = [model[key].eval() for key in model]
|
|
|
|
|
|
from Modules.diffusion.sampler import DiffusionSampler, ADPM2Sampler, KarrasSchedule
|
|
diffusion_sampler = DiffusionSampler(
|
|
model.diffusion.diffusion,
|
|
sampler=ADPM2Sampler(),
|
|
sigma_schedule=KarrasSchedule(sigma_min=0.0001, sigma_max=3.0, rho=9.0),
|
|
clamp=False
|
|
)
|
|
|
|
def inference(text=None, ref_s=None, alpha = 0.3, beta = 0.7, diffusion_steps=5, embedding_scale=1, rate_of_speech=1.):
|
|
|
|
tokens = textclenaer(text)
|
|
tokens.insert(0, 0)
|
|
tokens = torch.LongTensor(tokens).to(device).unsqueeze(0)
|
|
|
|
with torch.no_grad():
|
|
input_lengths = torch.LongTensor([tokens.shape[-1]]).to(device)
|
|
|
|
text_mask = length_to_mask(input_lengths).to(device)
|
|
|
|
t_en = model.text_encoder(tokens, input_lengths, text_mask)
|
|
bert_dur = model.bert(tokens, attention_mask=(~text_mask).int())
|
|
d_en = model.bert_encoder(bert_dur).transpose(-1, -2)
|
|
|
|
|
|
|
|
s_pred = diffusion_sampler(noise = torch.randn((1, 256)).unsqueeze(1).to(device),
|
|
embedding=bert_dur,
|
|
embedding_scale=embedding_scale,
|
|
features=ref_s,
|
|
num_steps=diffusion_steps).squeeze(1)
|
|
|
|
|
|
s = s_pred[:, 128:]
|
|
ref = s_pred[:, :128]
|
|
|
|
ref = alpha * ref + (1 - alpha) * ref_s[:, :128]
|
|
s = beta * s + (1 - beta) * ref_s[:, 128:]
|
|
|
|
d = model.predictor.text_encoder(d_en,
|
|
s, input_lengths, text_mask)
|
|
|
|
|
|
|
|
x = model.predictor.lstm(d)
|
|
x_mod = model.predictor.prepare_projection(x)
|
|
duration = model.predictor.duration_proj(x_mod)
|
|
|
|
|
|
duration = torch.sigmoid(duration).sum(axis=-1) / rate_of_speech
|
|
|
|
pred_dur = torch.round(duration.squeeze()).clamp(min=1)
|
|
|
|
|
|
|
|
pred_aln_trg = torch.zeros(input_lengths, int(pred_dur.sum().data))
|
|
|
|
c_frame = 0
|
|
for i in range(pred_aln_trg.size(0)):
|
|
pred_aln_trg[i, c_frame:c_frame + int(pred_dur[i].data)] = 1
|
|
c_frame += int(pred_dur[i].data)
|
|
|
|
|
|
en = (d.transpose(-1, -2) @ pred_aln_trg.unsqueeze(0).to(device))
|
|
|
|
|
|
|
|
F0_pred, N_pred = model.predictor.F0Ntrain(en, s)
|
|
|
|
asr = (t_en @ pred_aln_trg.unsqueeze(0).to(device))
|
|
|
|
|
|
out = model.decoder(asr,
|
|
F0_pred, N_pred, ref.squeeze().unsqueeze(0))
|
|
|
|
|
|
return out.squeeze().cpu().numpy()[..., :-50]
|
|
|
|
|
|
def Longform(text, s_prev, ref_s, alpha = 0.3, beta = 0.7, t = 0.7, diffusion_steps=5, embedding_scale=1, rate_of_speech=1.0):
|
|
|
|
|
|
tokens = textclenaer(text)
|
|
tokens.insert(0, 0)
|
|
tokens = torch.LongTensor(tokens).to(device).unsqueeze(0)
|
|
|
|
with torch.no_grad():
|
|
input_lengths = torch.LongTensor([tokens.shape[-1]]).to(device)
|
|
text_mask = length_to_mask(input_lengths).to(device)
|
|
|
|
t_en = model.text_encoder(tokens, input_lengths, text_mask)
|
|
bert_dur = model.bert(tokens, attention_mask=(~text_mask).int())
|
|
d_en = model.bert_encoder(bert_dur).transpose(-1, -2)
|
|
|
|
s_pred = diffusion_sampler(noise = torch.randn((1, 256)).unsqueeze(1).to(device),
|
|
embedding=bert_dur,
|
|
embedding_scale=embedding_scale,
|
|
features=ref_s,
|
|
num_steps=diffusion_steps).squeeze(1)
|
|
|
|
if s_prev is not None:
|
|
|
|
s_pred = t * s_prev + (1 - t) * s_pred
|
|
|
|
s = s_pred[:, 128:]
|
|
ref = s_pred[:, :128]
|
|
|
|
ref = alpha * ref + (1 - alpha) * ref_s[:, :128]
|
|
s = beta * s + (1 - beta) * ref_s[:, 128:]
|
|
|
|
s_pred = torch.cat([ref, s], dim=-1)
|
|
|
|
d = model.predictor.text_encoder(d_en,
|
|
s, input_lengths, text_mask)
|
|
|
|
x = model.predictor.lstm(d)
|
|
x_mod = model.predictor.prepare_projection(x)
|
|
duration = model.predictor.duration_proj(x_mod)
|
|
|
|
duration = torch.sigmoid(duration).sum(axis=-1) / rate_of_speech
|
|
pred_dur = torch.round(duration.squeeze()).clamp(min=1)
|
|
|
|
|
|
pred_aln_trg = torch.zeros(input_lengths, int(pred_dur.sum().data))
|
|
c_frame = 0
|
|
for i in range(pred_aln_trg.size(0)):
|
|
pred_aln_trg[i, c_frame:c_frame + int(pred_dur[i].data)] = 1
|
|
c_frame += int(pred_dur[i].data)
|
|
|
|
|
|
en = (d.transpose(-1, -2) @ pred_aln_trg.unsqueeze(0).to(device))
|
|
|
|
F0_pred, N_pred = model.predictor.F0Ntrain(en, s)
|
|
|
|
asr = (t_en @ pred_aln_trg.unsqueeze(0).to(device))
|
|
|
|
out = model.decoder(asr,
|
|
F0_pred, N_pred, ref.squeeze().unsqueeze(0))
|
|
|
|
|
|
return out.squeeze().cpu().numpy()[..., :-100], s_pred
|
|
|
|
|
|
|
|
def trim_long_silences(wav_data, sample_rate=24000, silence_threshold=0.01, min_silence_duration=0.8):
|
|
|
|
|
|
min_silence_samples = int(min_silence_duration * sample_rate)
|
|
|
|
|
|
envelope = np.abs(wav_data)
|
|
|
|
|
|
silence_mask = envelope < silence_threshold
|
|
|
|
|
|
silence_changes = np.diff(silence_mask.astype(int))
|
|
silence_starts = np.where(silence_changes == 1)[0] + 1
|
|
silence_ends = np.where(silence_changes == -1)[0] + 1
|
|
|
|
|
|
if silence_mask[0]:
|
|
silence_starts = np.concatenate(([0], silence_starts))
|
|
if silence_mask[-1]:
|
|
silence_ends = np.concatenate((silence_ends, [len(wav_data)]))
|
|
|
|
|
|
if len(silence_starts) == 0 or len(silence_ends) == 0:
|
|
return wav_data
|
|
|
|
processed_segments = []
|
|
last_end = 0
|
|
|
|
for start, end in zip(silence_starts, silence_ends):
|
|
|
|
processed_segments.append(wav_data[last_end:start])
|
|
|
|
|
|
silence_duration = end - start
|
|
|
|
if silence_duration > min_silence_samples:
|
|
|
|
silence_segment = np.zeros(min_silence_samples)
|
|
|
|
fade_samples = min(1000, min_silence_samples // 4)
|
|
fade_in = np.linspace(0, 1, fade_samples)
|
|
fade_out = np.linspace(1, 0, fade_samples)
|
|
silence_segment[:fade_samples] *= fade_in
|
|
silence_segment[-fade_samples:] *= fade_out
|
|
processed_segments.append(silence_segment)
|
|
else:
|
|
|
|
processed_segments.append(wav_data[start:end])
|
|
|
|
last_end = end
|
|
|
|
|
|
if last_end < len(wav_data):
|
|
processed_segments.append(wav_data[last_end:])
|
|
|
|
|
|
return np.concatenate(processed_segments)
|
|
|
|
|
|
def merge_short_elements(lst):
|
|
i = 0
|
|
while i < len(lst):
|
|
if i > 0 and len(lst[i]) < 10:
|
|
lst[i-1] += ' ' + lst[i]
|
|
lst.pop(i)
|
|
else:
|
|
i += 1
|
|
return lst
|
|
|
|
|
|
def merge_three(text_list, maxim=2):
|
|
|
|
merged_list = []
|
|
for i in range(0, len(text_list), maxim):
|
|
merged_text = ' '.join(text_list[i:i+maxim])
|
|
merged_list.append(merged_text)
|
|
return merged_list
|
|
|
|
|
|
def merging_sentences(lst):
|
|
return merge_three(merge_short_elements(lst))
|
|
|
|
|
|
import os
|
|
|
|
|
|
from openai import OpenAI
|
|
|
|
|
|
openai_api_key = "EMPTY"
|
|
openai_api_base = "http://localhost:8000/v1"
|
|
|
|
client = OpenAI(
|
|
api_key=openai_api_key,
|
|
base_url=openai_api_base,
|
|
)
|
|
|
|
model_name = "Respair/Japanese_Phoneme_to_Grapheme_LLM"
|
|
|
|
|
|
def p2g(param):
|
|
|
|
chat_response = client.chat.completions.create(
|
|
|
|
model=model_name,
|
|
max_tokens=512,
|
|
temperature=0.1,
|
|
|
|
|
|
messages=[
|
|
|
|
{"role": "user", "content": f"convert this pronunciation back to normal japanese if you see one, otherwise copy the same thing: {param}"}]
|
|
)
|
|
|
|
result = chat_response.choices[0].message.content
|
|
|
|
|
|
|
|
return result.lstrip() |