Spaces:
Runtime error
Runtime error
# ----- Imports ----- | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import numpy as np | |
import pandas as pd | |
import pyarrow as pa | |
import librosa | |
import subprocess | |
import torchaudio | |
import pykakasi | |
from datasets import Dataset | |
from transformers import HubertForCTC, Wav2Vec2Processor, AutoTokenizer | |
from reazonspeech.nemo.asr import transcribe, audio_from_path, load_model | |
from modelscope.pipelines import pipeline | |
from modelscope.utils.constant import Tasks | |
import difflib | |
# ----- Model Loading ----- | |
kks = pykakasi.kakasi() | |
model = load_model() | |
processor = Wav2Vec2Processor.from_pretrained('TKU410410103/uniTKU-hubert-japanese-asr') | |
hubert = HubertForCTC.from_pretrained('TKU410410103/uniTKU-hubert-japanese-asr') | |
hubert.config.output_hidden_states=True | |
tokenizer = AutoTokenizer.from_pretrained("cl-tohoku/bert-base-japanese-char") | |
# ----- Function Definitions ----- | |
def convert_to_wav(input_path, output_path): | |
command = [ | |
'ffmpeg', | |
'-i', input_path, | |
'-ar', '16000', | |
'-ac', '1', | |
'-f', 'wav', | |
output_path | |
] | |
try: | |
subprocess.run(command, check=True, capture_output=True) | |
except subprocess.CalledProcessError as e: | |
print("FFmpeg failed:") | |
print(e.stderr.decode('utf-8')) | |
raise e | |
def modified_filename(file_path): | |
base_name = file_path.rsplit('.', 1)[0] | |
extension = file_path.rsplit('.', 1)[1] | |
output_file = f"{base_name}1.{extension}" | |
return output_file | |
def acoustic_noise_suppression(input_wav_path, output_wav_path): | |
ans = pipeline( | |
Tasks.acoustic_noise_suppression, | |
model='damo/speech_frcrn_ans_cirm_16k') | |
result = ans( | |
input_wav_path, | |
output_path=output_wav_path) | |
return result | |
def detect_audio_features(audio_file, energy_threshold=0.1, amplitude_threshold=0.1): | |
# Load the audio file once | |
y, sr = librosa.load(audio_file, sr=None) | |
# Calculate frame energy | |
frame_length = int(0.025 * sr) # 25ms frame length | |
hop_length = int(0.010 * sr) # 10ms hop length | |
frames = librosa.util.frame(y, frame_length=frame_length, hop_length=hop_length) | |
energy = np.sum(np.square(frames), axis=0) | |
avg_energy = np.mean(energy) | |
# Calculate maximum amplitude | |
max_amplitude = np.max(np.abs(y)) | |
# Print the results | |
print("Average Energy:", avg_energy) | |
print("Maximum Amplitude:", max_amplitude) | |
# Return the checks as a tuple | |
return (avg_energy > energy_threshold) and (max_amplitude > amplitude_threshold) | |
def process_waveforms(batch): | |
waveform, sample_rate = torchaudio.load(batch['audio_path']) | |
if sample_rate != 16000: | |
resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000) | |
waveform = resampler(waveform) | |
# 如果 waveform 是雙聲道,需要轉單聲道。 | |
if waveform.size(0) > 1: | |
waveform = waveform.mean(dim=0) | |
# 讓 waveform的維度正確 | |
if waveform.ndim > 1: | |
waveform = waveform.squeeze() | |
batch["speech_array"] = waveform | |
return batch | |
def asr(path): | |
audio = audio_from_path(path) | |
ret = transcribe(model, audio) | |
result = kks.convert(ret.text) | |
return result[0]['hira'] | |
def get_most_similar(predicted_word): | |
correct_words = ['わたし', 'わたしたち', 'あなた', 'あのひと', 'あのかた', 'みなさん', | |
'せんせい', 'きょうし', 'がくせい', 'かいしゃいん','しゃいん', | |
'ぎんこういん', 'いしゃ', 'けんきゅうしゃ', 'エンジニア', 'だいがく', | |
'びょういん', 'でんき', 'だれ', 'どなた', '~さい', 'なんさい', 'おいくつ'] | |
# 初始化最高相似度和對應的單字 | |
highest_similarity = 0.0 | |
most_similar_word = predicted_word | |
# 遍歷正確的單字列表,比較相似度 | |
for word in correct_words: | |
# 使用SequenceMatcher計算相似度 | |
similarity = difflib.SequenceMatcher(None, predicted_word, word).ratio() | |
# 更新最高相似度和最相似的單字 | |
if similarity > highest_similarity: | |
highest_similarity = similarity | |
most_similar_word = word | |
print(f'most similar word: ', most_similar_word, 'highest_similarity: ', highest_similarity) | |
if highest_similarity < 0.2: | |
return '單字未被收納' | |
return most_similar_word | |
class BLSTMSpeechScoring(nn.Module): | |
def __init__(self, input_size=768, hidden_size=128, num_layers=1, output_size=1, embedding_dim=64, vocab_size=4000): | |
super(BLSTMSpeechScoring, self).__init__() | |
# 聲學特徵的 BLSTM | |
self.acoustic_blstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, | |
num_layers=num_layers, batch_first=True, bidirectional=True) | |
# 語言特徵(字符)的 BLSTM | |
self.linguistic_blstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_size, | |
num_layers=num_layers, batch_first=True, bidirectional=True) | |
# 字符的嵌入層 | |
self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim) | |
# 處理 BLSTM 輸出的線性層,以匹配維度 | |
self.acoustic_linear = nn.Linear(hidden_size * 2, hidden_size) | |
self.linguistic_linear = nn.Linear(hidden_size * 2, hidden_size) | |
# 串接後的最終線性層 | |
self.final_linear = nn.Linear(hidden_size * 2, output_size) | |
def forward(self, acoustic_input, linguistic_input): | |
# 聲學輸入通過 BLSTM | |
acoustic_output, _ = self.acoustic_blstm(acoustic_input) | |
# 將語言輸入嵌入並通過 BLSTM | |
embedded_chars = self.embedding(linguistic_input) | |
linguistic_output, _ = self.linguistic_blstm(embedded_chars) | |
# 線性層確保維度匹配 | |
acoustic_features = self.acoustic_linear(acoustic_output) | |
linguistic_features = self.linguistic_linear(linguistic_output) | |
# 對兩輸出進行全局平均池化(GAP) | |
gap_acoustic = torch.mean(acoustic_features, dim=1) | |
gap_linguistic = torch.mean(linguistic_features, dim=1) | |
# 串接特徵並最終評分 | |
concatenated_features = torch.cat((gap_acoustic, gap_linguistic), dim=1) | |
score = self.final_linear(concatenated_features) | |
return score | |
judge = 0.65 | |
class Trainer: | |
def __init__(self, model, tokenizer): | |
self.model = model | |
self.tokenizer = tokenizer | |
def pred(self, acoustic_input, text): | |
self.model.eval() | |
with torch.no_grad(): | |
encoded_input = self.tokenizer(text, padding=True, truncation=True, return_tensors="pt", max_length=100) | |
linguistic_input = encoded_input['input_ids'] | |
outputs = self.model(acoustic_input, linguistic_input) | |
return outputs | |
def make_dataframe(audio_path): | |
row = [] | |
text = asr(audio_path) | |
print(text) | |
row.append({'audio_path': audio_path, 'text': text}) | |
df = pd.DataFrame(row) | |
return df | |
def get_acoustic_feature(batch): | |
with torch.no_grad(): | |
processed_audios = processor(batch['speech_array'], | |
sampling_rate=16000, | |
return_tensors="pt", | |
padding=True, | |
truncation=True, | |
max_length=160000) | |
outputs = hubert(**processed_audios) | |
transformer_hidden_states = outputs.hidden_states[:] | |
# Stack transformer hidden states to have a new dimension for layers | |
stacked_hidden_states = torch.stack(transformer_hidden_states) | |
# Average across layers dimension (0) while keeping sequence_length | |
overall_avg_hidden_state = torch.mean(stacked_hidden_states, dim=0) | |
return overall_avg_hidden_state | |
# Model Loading and Initialization | |
blstm = BLSTMSpeechScoring() | |
model_save_path = "./BLSTMSpeechScoring_TKU.pth" | |
blstm.load_state_dict(torch.load(model_save_path)) | |
blstm.eval() | |
trainer = Trainer(blstm, tokenizer) | |
def scoring(file_path): | |
converted_file_path = modified_filename(file_path) | |
convert_to_wav(input_path=file_path, output_path=converted_file_path) | |
output_file = modified_filename(converted_file_path) | |
acoustic_noise_suppression(input_wav_path=converted_file_path, output_wav_path=output_file) # output_file 降噪音檔 | |
if(detect_audio_features(output_file)): | |
df = make_dataframe(output_file) | |
dataset = Dataset.from_pandas(df) | |
dataset_array = dataset.map(process_waveforms, remove_columns=['audio_path']) | |
acoustic_input = get_acoustic_feature(dataset_array) | |
text = list(df['text']) | |
similar_text = get_most_similar(text[0][:-1]) | |
if similar_text == '單字未被收納': | |
return 0, similar_text | |
score = trainer.pred(acoustic_input, text[0]) | |
score = 1 if score > judge else float(score) | |
print('score: ', score*100) | |
return score*100, similar_text | |
return -1, None |