audio-classifier / modules /feature_extraction.py
ahmedtarekabd's picture
Add Models & files.
4c8f740
# import torch
import librosa
import numpy as np
import parselmouth
from transformers import Wav2Vec2Model, Wav2Vec2Processor
from config import FEATURES_CACHE
from pathlib import Path
from typing import Tuple, Optional
# === Feature Extraction ===
class FeatureExtractor:
def __init__(self) -> None:
# self.wav2vec_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base-960h")
# self.wav2vec_proc = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
pass
def traditional(self, y: np.ndarray, sr: int = 16000, n_mfcc: int = 13) -> np.ndarray:
# MFCCs (13 is standard for voice tasks)
mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
# delta = librosa.feature.delta(mfcc)
# delta2 = librosa.feature.delta(mfcc, order=2)
# Chroma
chroma = librosa.feature.chroma_stft(y=y, sr=sr)
# Spectral Contrast
contrast = librosa.feature.spectral_contrast(y=y, sr=sr)
# # Tonnetz
# tonnetz = librosa.feature.tonnetz(y=librosa.effects.harmonic(y), sr=sr)
# RMS Energy & ZCR
rmse = librosa.feature.rms(y=y)
zcr = librosa.feature.zero_crossing_rate(y)
# Spectral Centroid
centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
#* PROSODIC FEATURES
# Fundamental frequency (F0) using YIN
try:
f0 = librosa.yin(y, fmin=50, fmax=500, sr=sr)
f0_mean = np.nanmean(f0)
f0_std = np.nanstd(f0)
f0_max = np.nanmax(f0)
except:
f0_mean = f0_std = f0_max = 0
# Loudness (Log energy)
loudness = librosa.amplitude_to_db(np.abs(librosa.stft(y)), ref=np.max)
loudness_mean = np.mean(loudness)
loudness_std = np.std(loudness)
# Rhythm / Duration
intervals = librosa.effects.split(y, top_db=30)
durations = [(e - s) / sr for s, e in intervals]
if durations:
dur_mean = np.mean(durations)
dur_std = np.std(durations)
dur_count = len(durations)
else:
dur_mean = dur_std = dur_count = 0
# Formant Features
formants = self.extract_formants(y, sr)
f1_mean = formants["f1_mean"]
f1_std = formants["f1_std"]
f2_mean = formants["f2_mean"]
f2_std = formants["f2_std"]
return np.concatenate([
mfcc.mean(axis=1),
# delta.mean(axis=1),
# delta2.mean(axis=1),
chroma.mean(axis=1),
contrast.mean(axis=1),
# tonnetz.mean(axis=1),
[rmse.mean()],
[zcr.mean()],
[centroid.mean()],
[f0_mean, f0_std, f0_max],
[loudness_mean, loudness_std],
[dur_mean, dur_std, dur_count],
[f1_mean, f1_std, f2_mean, f2_std],
])
def extract_formants(self, audio: np.ndarray, sr: int = 16000) -> dict:
try:
sound = parselmouth.Sound(audio, sampling_frequency=sr)
formant = sound.to_formant_burg()
duration = sound.duration
times = np.linspace(0.01, duration - 0.01, 100)
f1_list, f2_list = [], []
for t in times:
f1 = formant.get_value_at_time(1, t)
f2 = formant.get_value_at_time(2, t)
if f1: f1_list.append(f1)
if f2: f2_list.append(f2)
return {
"f1_mean": np.nanmean(f1_list) if f1_list else 0,
"f1_std": np.nanstd(f1_list) if f1_list else 0,
"f2_mean": np.nanmean(f2_list) if f2_list else 0,
"f2_std": np.nanstd(f2_list) if f2_list else 0,
}
except Exception as e:
print(f"[Formant Error] {e}")
return {
"f1_mean": 0, "f1_std": 0,
"f2_mean": 0, "f2_std": 0,
}
# def wav2vec(self, y: np.ndarray, sr: int = 16000) -> np.ndarray:
# if sr != 16000:
# y = librosa.resample(y, orig_sr=sr, target_sr=16000)
# input_values: torch.Tensor = self.wav2vec_proc(y, return_tensors="pt", sampling_rate=16000).input_values
# with torch.no_grad():
# embeddings: torch.Tensor = self.wav2vec_model(input_values).last_hidden_state
# return embeddings.mean(dim=1).squeeze().numpy()
def extract(self, y: np.ndarray, sr: int = 16000, mode: str = "traditional", n_mfcc: int = 40) -> np.ndarray:
return self.traditional(y, sr, n_mfcc=n_mfcc) if mode == "traditional" else self.wav2vec(y, sr)
def cache_features(self, X: np.ndarray, y: np.ndarray, mode: str, version: Optional[int] = None, force_update: bool = False) -> None:
X_path = FEATURES_CACHE / f"X_{mode}.npy" if version is None else FEATURES_CACHE / f"X_{mode}_v{version}.npy"
y_path = FEATURES_CACHE / f"y_{mode}.npy" if version is None else FEATURES_CACHE / f"y_{mode}_v{version}.npy"
if force_update or not X_path.exists() or not y_path.exists():
np.save(X_path, X)
np.save(y_path, y)
def load_cached_features(self, mode: str, version: Optional[int] = None) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
X_path = FEATURES_CACHE / f"X_{mode}.npy" if version is None else FEATURES_CACHE / f"X_{mode}_v{version}.npy"
y_path = FEATURES_CACHE / f"y_{mode}.npy" if version is None else FEATURES_CACHE / f"y_{mode}_v{version}.npy"
if X_path.exists() and y_path.exists():
return np.load(X_path), np.load(y_path)
return None, None
def remove_cached_features(self, mode: str, version: Optional[int] = None) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
X_path = FEATURES_CACHE / f"X_{mode}.npy" if version is None else FEATURES_CACHE / f"X_{mode}_v{version}.npy"
y_path = FEATURES_CACHE / f"y_{mode}.npy" if version is None else FEATURES_CACHE / f"y_{mode}_v{version}.npy"
if X_path.exists(): X_path.unlink()
if y_path.exists(): y_path.unlink()
return None, None
def merge_features(self, mode: str) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
X = []
y = []
for file in FEATURES_CACHE.glob(f"X_{mode}_*.npy"):
X.append(np.load(file))
y.append(np.load(file.with_name(file.name.replace("X_", "y_"))))
return np.concatenate(X), np.concatenate(y) if y else None
def get_latest_version(self, mode: str) -> int:
versions = [
int(file.stem.split("_v")[-1])
for file in FEATURES_CACHE.glob(f"X_{mode}_*.npy")
if "_v" in file.stem and file.stem.split("_v")[-1].isdigit()
]
return max(versions) if versions else 0