Spaces:
Running
Running
import os | |
import librosa | |
import numpy as np | |
import torch | |
from tqdm import tqdm | |
from metrics.pipelines import sample_pipeline, inpaint_pipeline, sample_pipeline_GAN | |
from metrics.pipelines_STFT import sample_pipeline_STFT, sample_pipeline_GAN_STFT | |
from tools import rms_normalize, pad_STFT, encode_stft | |
from webUI.natural_language_guided.utils import InputBatch2Encode_STFT | |
def get_inception_score_for_AudioLDM(device, timbre_encoder, VAE, AudioLDM_signals_directory_path): | |
VAE_encoder, VAE_quantizer, VAE_decoder = VAE._encoder, VAE._vq_vae, VAE._decoder | |
diffuSynth_probabilities = [] | |
# Step 1: Load all wav files in AudioLDM_signals_directory_path | |
AudioLDM_signals = [] | |
signal_lengths = set() | |
target_length = 4 * 16000 # 4 seconds * 16000 samples per second | |
for file_name in os.listdir(AudioLDM_signals_directory_path): | |
if file_name.endswith('.wav') and not file_name.startswith('._'): | |
file_path = os.path.join(AudioLDM_signals_directory_path, file_name) | |
signal, sr = librosa.load(file_path, sr=16000) # Load audio file with sampling rate 16000 | |
if len(signal) >= target_length: | |
signal = signal[:target_length] # Take only the first 4 seconds | |
else: | |
raise ValueError(f"The file {file_name} is shorter than 4 seconds.") | |
# Normalize | |
AudioLDM_signals.append(rms_normalize(signal)) | |
signal_lengths.add(len(signal)) | |
# Step 2: Check if all signals have the same length | |
if len(signal_lengths) != 1: | |
raise ValueError("Not all signals have the same length. Please ensure all audio files are of the same length.") | |
encoded_audios = [] | |
for origin_audio in AudioLDM_signals: | |
D = librosa.stft(origin_audio, n_fft=1024, hop_length=256, win_length=1024) | |
padded_D = pad_STFT(D) | |
encoded_D = encode_stft(padded_D) | |
encoded_audios.append(encoded_D) | |
encoded_audios_np = np.array(encoded_audios) | |
origin_spectrogram_batch_tensor = torch.from_numpy(encoded_audios_np).float().to(device) | |
# Step 3: Reshape to signal_batches [number_batches, batch_size=8, signal_length] | |
batch_size = 8 | |
num_batches = int(np.ceil(origin_spectrogram_batch_tensor.shape[0] / batch_size)) | |
spectrogram_batches = [] | |
for i in range(num_batches): | |
batch = origin_spectrogram_batch_tensor[i * batch_size:(i + 1) * batch_size] | |
spectrogram_batches.append(batch) | |
for spectrogram_batch in tqdm(spectrogram_batches): | |
spectrogram_batch = spectrogram_batch.to(device) | |
_, _, _, _, quantized_latent_representations = InputBatch2Encode_STFT(VAE_encoder, spectrogram_batch, quantizer=VAE_quantizer, squared=False) | |
quantized_latent_representations = quantized_latent_representations | |
feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) | |
probabilities = torch.nn.functional.softmax(instrument_logits, dim=1) | |
diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy()) | |
return inception_score(np.array(diffuSynth_probabilities)) | |
# def get_inception_score_for_AudioLDM(device, timbre_encoder, VAE, AudioLDM_signals_directory_path): | |
# VAE_encoder, VAE_quantizer, VAE_decoder = VAE._encoder, VAE._vq_vae, VAE._decoder | |
# | |
# diffuSynth_probabilities = [] | |
# | |
# # Step 1: Load all wav files in AudioLDM_signals_directory_path | |
# AudioLDM_signals = [] | |
# signal_lengths = set() | |
# | |
# for file_name in os.listdir(AudioLDM_signals_directory_path): | |
# if file_name.endswith('.wav'): | |
# file_path = os.path.join(AudioLDM_signals_directory_path, file_name) | |
# signal, sr = librosa.load(file_path, sr=16000) # Load audio file with sampling rate 16000 | |
# # Normalize | |
# AudioLDM_signals.append(rms_normalize(signal)) | |
# signal_lengths.add(len(signal)) | |
# | |
# # Step 2: Check if all signals have the same length | |
# if len(signal_lengths) != 1: | |
# raise ValueError("Not all signals have the same length. Please ensure all audio files are of the same length.") | |
# | |
# encoded_audios = [] | |
# for origin_audio in AudioLDM_signals: | |
# D = librosa.stft(origin_audio, n_fft=1024, hop_length=256, win_length=1024) | |
# padded_D = pad_STFT(D) | |
# encoded_D = encode_stft(padded_D) | |
# encoded_audios.append(encoded_D) | |
# encoded_audios_np = np.array(encoded_audios) | |
# origin_spectrogram_batch_tensor = torch.from_numpy(encoded_audios_np).float().to(device) | |
# | |
# | |
# # Step 3: Reshape to signal_batches [number_batches, batch_size=8, signal_length] | |
# batch_size = 8 | |
# num_batches = int(np.ceil(origin_spectrogram_batch_tensor.shape[0] / batch_size)) | |
# spectrogram_batches = [] | |
# for i in range(num_batches): | |
# batch = origin_spectrogram_batch_tensor[i * batch_size:(i + 1) * batch_size] | |
# spectrogram_batches.append(batch) | |
# | |
# | |
# for spectrogram_batch in tqdm(spectrogram_batches): | |
# spectrogram_batch = spectrogram_batch.to(device) | |
# _, _, _, _, quantized_latent_representations = InputBatch2Encode_STFT(VAE_encoder, spectrogram_batch, quantizer=VAE_quantizer,squared=False) | |
# quantized_latent_representations = quantized_latent_representations | |
# feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) | |
# probabilities = torch.nn.functional.softmax(instrument_logits, dim=1) | |
# | |
# diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy()) | |
# | |
# return inception_score(np.array(diffuSynth_probabilities)) | |
def get_inception_score(device, uNet, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, positive_prompts, negative_prompts="", CFG=1, sample_steps=10, task="spectrograms"): | |
diffuSynth_probabilities = [] | |
if task == "spectrograms": | |
pipe = sample_pipeline | |
elif task == "STFT": | |
pipe = sample_pipeline_STFT | |
else: | |
raise NotImplementedError | |
for _ in tqdm(range(num_batches)): | |
quantized_latent_representations = pipe(device, uNet, VAE, MMM, CLAP_tokenizer, | |
positive_prompts=positive_prompts, negative_prompts=negative_prompts, | |
batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None) | |
quantized_latent_representations = quantized_latent_representations.to(device) | |
feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) | |
probabilities = torch.nn.functional.softmax(instrument_logits, dim=1) | |
diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy()) | |
return inception_score(np.array(diffuSynth_probabilities)) | |
def get_inception_score_GAN(device, gan_generator, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, positive_prompts, negative_prompts="", CFG=1, sample_steps=10, task="spectrograms"): | |
diffuSynth_probabilities = [] | |
if task == "spectrograms": | |
pipe = sample_pipeline_GAN | |
elif task == "STFT": | |
pipe = sample_pipeline_GAN_STFT | |
else: | |
raise NotImplementedError | |
for _ in tqdm(range(num_batches)): | |
quantized_latent_representations = pipe(device, gan_generator, VAE, MMM, CLAP_tokenizer, | |
positive_prompts=positive_prompts, negative_prompts=negative_prompts, | |
batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None) | |
quantized_latent_representations = quantized_latent_representations.to(device) | |
feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) | |
probabilities = torch.nn.functional.softmax(instrument_logits, dim=1) | |
diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy()) | |
return inception_score(np.array(diffuSynth_probabilities)) | |
def predict_qualities_with_diffuSynth_sample(device, uNet, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, positive_prompts, negative_prompts="", CFG=6, sample_steps=10): | |
diffuSynth_qualities = [] | |
for _ in tqdm(range(num_batches)): | |
quantized_latent_representations = sample_pipeline(device, uNet, VAE, MMM, CLAP_tokenizer, | |
positive_prompts=positive_prompts, negative_prompts=negative_prompts, | |
batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None) | |
quantized_latent_representations = quantized_latent_representations.to(device) | |
feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) | |
qualities = qualities.to("cpu").detach().numpy() | |
# qualities = np.where(qualities > 0.5, 1, 0) | |
diffuSynth_qualities.extend(qualities) | |
return np.mean(diffuSynth_qualities, axis=0) | |
def generate_probabilities_with_diffuSynth_inpaint(device, uNet, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, guidance, duration, use_dynamic_mask, noising_strength, positive_prompts, negative_prompts="", CFG=6, sample_steps=10): | |
inpaint_probabilities, signals = [], [] | |
for _ in tqdm(range(num_batches)): | |
quantized_latent_representations, _, rec_signals = inpaint_pipeline(device, uNet, VAE, MMM, CLAP_tokenizer, | |
use_dynamic_mask=use_dynamic_mask, noising_strength=noising_strength, guidance=guidance, | |
positive_prompts=positive_prompts, negative_prompts=negative_prompts, batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None, duration=duration, mask_flexivity=0.999, | |
return_latent=False) | |
quantized_latent_representations = quantized_latent_representations.to(device) | |
feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) | |
probabilities = torch.nn.functional.softmax(instrument_logits, dim=1) | |
inpaint_probabilities.extend(probabilities.to("cpu").detach().numpy()) | |
signals.extend(rec_signals) | |
return np.array(inpaint_probabilities), signals | |
def inception_score(pred): | |
# 计算每个图像的条件概率分布 P(y|x) | |
pyx = pred / np.sum(pred, axis=1, keepdims=True) | |
# 计算整个数据集的边缘概率分布 P(y) | |
py = np.mean(pyx, axis=0, keepdims=True) | |
# 计算KL散度 | |
kl_div = pyx * (np.log(pyx + 1e-11) - np.log(py + 1e-11)) | |
# 对所有图像求和并平均 | |
kl_div_sum = np.sum(kl_div, axis=1) | |
score = np.exp(np.mean(kl_div_sum)) | |
return score |