import numpy as np import matplotlib.pyplot as plt import matplotlib import librosa from scipy.io.wavfile import write import torch k = 1e-16 def np_log10(x): """Safe log function with base 10.""" numerator = np.log(x + 1e-16) denominator = np.log(10) return numerator / denominator def sigmoid(x): """Safe log function with base 10.""" s = 1 / (1 + np.exp(-x)) return s def inv_sigmoid(s): """Safe inverse sigmoid function.""" x = np.log((s / (1 - s)) + 1e-16) return x def spc_to_VAE_input(spc): """Restrict value range from [0, infinite] to [0, 1]. (deprecated )""" return spc / (1 + spc) def VAE_out_put_to_spc(o): """Inverse transform of function 'spc_to_VAE_input'. (deprecated )""" return o / (1 - o + k) def np_power_to_db(S, amin=1e-16, top_db=80.0): """Helper method for numpy data scaling. (deprecated )""" ref = S.max() log_spec = 10.0 * np_log10(np.maximum(amin, S)) log_spec -= 10.0 * np_log10(np.maximum(amin, ref)) log_spec = np.maximum(log_spec, log_spec.max() - top_db) return log_spec def show_spc(spc): """Show a spectrogram. (deprecated )""" s = np.shape(spc) spc = np.reshape(spc, (s[0], s[1])) magnitude_spectrum = np.abs(spc) log_spectrum = np_power_to_db(magnitude_spectrum) plt.imshow(np.flipud(log_spectrum)) plt.show() def save_results(spectrogram, spectrogram_image_path, waveform_path): """Save the input 'spectrogram' and its waveform (reconstructed by Griffin Lim) to path provided by 'spectrogram_image_path' and 'waveform_path'.""" magnitude_spectrum = np.abs(spectrogram) log_spc = np_power_to_db(magnitude_spectrum) log_spc = np.reshape(log_spc, (512, 256)) matplotlib.pyplot.imsave(spectrogram_image_path, log_spc, vmin=-100, vmax=0, origin='lower') # save waveform abs_spec = np.zeros((513, 256)) abs_spec[:512, :] = abs_spec[:512, :] + np.sqrt(np.reshape(spectrogram, (512, 256))) rec_signal = librosa.griffinlim(abs_spec, n_iter=32, hop_length=256, win_length=1024) write(waveform_path, 16000, rec_signal) def plot_log_spectrogram(signal: np.ndarray, path: str, n_fft=2048, frame_length=1024, frame_step=256): """Save spectrogram.""" stft = librosa.stft(signal, n_fft=n_fft, hop_length=frame_step, win_length=frame_length) amp = np.square(np.real(stft)) + np.square(np.imag(stft)) magnitude_spectrum = np.abs(amp) log_mel = np_power_to_db(magnitude_spectrum) matplotlib.pyplot.imsave(path, log_mel, vmin=-100, vmax=0, origin='lower') def visualize_feature_maps(device, model, inputs, channel_indices=[0, 3,]): """ Visualize feature maps before and after quantization for given input. Parameters: - model: Your VQ-VAE model. - inputs: A batch of input data. - channel_indices: Indices of feature map channels to visualize. """ model.eval() inputs = inputs.to(device) with torch.no_grad(): z_e = model._encoder(inputs) z_q, loss, (perplexity, min_encodings, min_encoding_indices) = model._vq_vae(z_e) # Assuming inputs have shape [batch_size, channels, height, width] batch_size = z_e.size(0) for idx in range(batch_size): fig, axs = plt.subplots(1, len(channel_indices)*2, figsize=(15, 5)) for i, channel_idx in enumerate(channel_indices): # Plot encoder output axs[2*i].imshow(z_e[idx][channel_idx].cpu().numpy(), cmap='viridis') axs[2*i].set_title(f"Encoder Output - Channel {channel_idx}") # Plot quantized output axs[2*i+1].imshow(z_q[idx][channel_idx].cpu().numpy(), cmap='viridis') axs[2*i+1].set_title(f"Quantized Output - Channel {channel_idx}") plt.show() def adjust_audio_length(audio, desired_length, original_sample_rate, target_sample_rate): """ Adjust the audio length to the desired length and resample to target sample rate. Parameters: - audio (np.array): The input audio signal - desired_length (int): The desired length of the output audio - original_sample_rate (int): The original sample rate of the audio - target_sample_rate (int): The target sample rate for the output audio Returns: - np.array: The adjusted and resampled audio """ if not (original_sample_rate == target_sample_rate): audio = librosa.core.resample(audio, orig_sr=original_sample_rate, target_sr=target_sample_rate) if len(audio) > desired_length: return audio[:desired_length] elif len(audio) < desired_length: padded_audio = np.zeros(desired_length) padded_audio[:len(audio)] = audio return padded_audio else: return audio def safe_int(s, default=0): try: return int(s) except ValueError: return default def pad_spectrogram(D): """Resize spectrogram to (512, 256). (deprecated )""" D = D[1:, :] padding_length = 256 - D.shape[1] D_padded = np.pad(D, ((0, 0), (0, padding_length)), 'constant') return D_padded def pad_STFT(D, time_resolution=256): """Resize spectral matrix by padding and cropping""" D = D[1:, :] if time_resolution is None: return D padding_length = time_resolution - D.shape[1] if padding_length > 0: D_padded = np.pad(D, ((0, 0), (0, padding_length)), 'constant') return D_padded else: return D def depad_STFT(D_padded): """Inverse function of 'pad_STFT'""" zero_row = np.zeros((1, D_padded.shape[1])) D_restored = np.concatenate([zero_row, D_padded], axis=0) return D_restored def nnData2Audio(spectrogram_batch, resolution=(512, 256), squared=False): """Transform batch of numpy spectrogram into signals and encodings.""" # Todo: remove resolution hard-coding frequency_resolution, time_resolution = resolution if isinstance(spectrogram_batch, torch.Tensor): spectrogram_batch = spectrogram_batch.to("cpu").detach().numpy() origin_signals = [] for spectrogram in spectrogram_batch: spc = VAE_out_put_to_spc(spectrogram) # get_audio abs_spec = np.zeros((frequency_resolution+1, time_resolution)) if squared: abs_spec[1:, :] = abs_spec[1:, :] + np.sqrt(np.reshape(spc, (frequency_resolution, time_resolution))) else: abs_spec[1:, :] = abs_spec[1:, :] + np.reshape(spc, (frequency_resolution, time_resolution)) origin_signal = librosa.griffinlim(abs_spec, n_iter=32, hop_length=256, win_length=1024) origin_signals.append(origin_signal) return origin_signals def amp_to_audio(amp, n_iter=50): """The Griffin-Lim algorithm.""" y_reconstructed = librosa.griffinlim(amp, n_iter=n_iter, hop_length=256, win_length=1024) return y_reconstructed def rescale(amp, method="log1p"): """Rescale function.""" if method == "log1p": return np.log1p(amp) elif method == "NormalizedLogisticCompression": return amp / (1.0 + amp) else: raise NotImplementedError() def unrescale(scaled_amp, method="NormalizedLogisticCompression"): """Inverse function of 'rescale'""" if method == "log1p": return np.expm1(scaled_amp) elif method == "NormalizedLogisticCompression": return scaled_amp / (1.0 - scaled_amp + 1e-10) else: raise NotImplementedError() def create_key(attributes): """Create unique key for each multi-label.""" qualities_str = ''.join(map(str, attributes["qualities"])) instrument_source_str = attributes["instrument_source_str"] instrument_family = attributes["instrument_family_str"] key = f"{instrument_source_str}_{instrument_family}_{qualities_str}" return key def merge_dictionaries(dicts): """Merge dictionaries.""" merged_dict = {} for dictionary in dicts: for key, value in dictionary.items(): if key in merged_dict: merged_dict[key] += value else: merged_dict[key] = value return merged_dict def adsr_envelope(signal, sample_rate, duration, attack_time, decay_time, sustain_level, release_time): """ Apply an ADSR envelope to an audio signal. :param signal: The original audio signal (numpy array). :param sample_rate: The sample rate of the audio signal. :param attack_time: Attack time in seconds. :param decay_time: Decay time in seconds. :param sustain_level: Sustain level as a fraction of the peak (0 to 1). :param release_time: Release time in seconds. :return: The audio signal with the ADSR envelope applied. """ # Calculate the number of samples for each ADSR phase duration_samples = int(duration * sample_rate) # assert (duration_samples + int(1.0 * sample_rate)) <= len(signal), "(duration_samples + sample_rate) > len(signal)" assert release_time <= 1.0, "release_time > 1.0" attack_samples = int(attack_time * sample_rate) decay_samples = int(decay_time * sample_rate) release_samples = int(release_time * sample_rate) sustain_samples = max(0, duration_samples - attack_samples - decay_samples) # Create ADSR envelope attack_env = np.linspace(0, 1, attack_samples) decay_env = np.linspace(1, sustain_level, decay_samples) sustain_env = np.full(sustain_samples, sustain_level) release_env = np.linspace(sustain_level, 0, release_samples) release_env_expand = np.zeros(int(1.0 * sample_rate)) release_env_expand[:len(release_env)] = release_env # Concatenate all phases to create the complete envelope envelope = np.concatenate([attack_env, decay_env, sustain_env, release_env_expand]) # Apply the envelope to the signal if len(envelope) <= len(signal): applied_signal = signal[:len(envelope)] * envelope else: signal_expanded = np.zeros(len(envelope)) signal_expanded[:len(signal)] = signal applied_signal = signal_expanded * envelope return applied_signal def rms_normalize(audio, target_rms=0.1): """Normalize the RMS value.""" current_rms = np.sqrt(np.mean(audio**2)) scaling_factor = target_rms / current_rms normalized_audio = audio * scaling_factor return normalized_audio def encode_stft(D): """'STFT+' function that transform spectral matrix into spectral representation.""" magnitude = np.abs(D) phase = np.angle(D) log_magnitude = np.log1p(magnitude) cos_phase = np.cos(phase) sin_phase = np.sin(phase) encoded_D = np.stack([log_magnitude, cos_phase, sin_phase], axis=0) return encoded_D def decode_stft(encoded_D): """'ISTFT+' function that reconstructs spectral matrix from spectral representation.""" log_magnitude = encoded_D[0, ...] cos_phase = encoded_D[1, ...] sin_phase = encoded_D[2, ...] magnitude = np.expm1(log_magnitude) phase = np.arctan2(sin_phase, cos_phase) D = magnitude * (np.cos(phase) + 1j * np.sin(phase)) return D