Spaces:
Runtime error
Runtime error
| import os | |
| import numpy as np | |
| import pandas as pd | |
| import tensorflow as tf | |
| import tensorflow_io as tfio | |
| import csv | |
| from scipy.io import wavfile | |
| import scipy | |
| import librosa | |
| import soundfile as sf | |
| import time | |
| import soundfile as sf | |
| import gradio as gr | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
| from transformers import AutoProcessor | |
| from transformers import BarkModel | |
| from optimum.bettertransformer import BetterTransformer | |
| import torch | |
| from nemo.collections.tts.models import FastPitchModel | |
| from nemo.collections.tts.models import HifiGanModel | |
| from deep_translator import GoogleTranslator | |
| from haystack.document_stores import InMemoryDocumentStore | |
| from haystack.nodes import EmbeddingRetriever | |
| # --- Load models --- | |
| #Load a model from tensorflow hub | |
| def load_model_hub(model_url): | |
| model = hub.load(model_url) | |
| return model | |
| # Load a model from the project folder | |
| def load_model_file(model_path): | |
| interpreter = tf.lite.Interpreter(model_path) | |
| interpreter.allocate_tensors() | |
| return interpreter | |
| # --- Initialize models --- | |
| def initialize_text_to_speech_model(): | |
| spec_generator = FastPitchModel.from_pretrained("nvidia/tts_en_fastpitch") | |
| # Load vocoder | |
| model = HifiGanModel.from_pretrained(model_name="nvidia/tts_hifigan") | |
| return spec_generator, model | |
| def initialize_tt5_model(): | |
| from transformers import SpeechT5ForTextToSpeech, SpeechT5Processor, SpeechT5HifiGan | |
| from datasets import load_dataset | |
| dataset = load_dataset("pedropauletti/librispeech-portuguese") | |
| model = SpeechT5ForTextToSpeech.from_pretrained("pedropauletti/speecht5_finetuned_librispeech_pt") | |
| processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts") | |
| vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan") | |
| example = dataset["test"][100] | |
| speaker_embeddings = torch.tensor(example["speaker_embeddings"]).unsqueeze(0) | |
| return model, processor, vocoder, speaker_embeddings | |
| def load_qa_model(): | |
| document_store = InMemoryDocumentStore() | |
| retriever = EmbeddingRetriever( | |
| document_store=document_store, | |
| embedding_model="sentence-transformers/all-MiniLM-L6-v2", | |
| use_gpu=False, | |
| scale_score=False, | |
| ) | |
| # Get dataframe with columns "question", "answer" and some custom metadata | |
| df = pd.read_csv('content/social-faq.csv', on_bad_lines='skip', delimiter=';') | |
| # Minimal cleaning | |
| df.fillna(value="", inplace=True) | |
| df["question"] = df["question"].apply(lambda x: x.strip()) | |
| questions = list(df["question"].values) | |
| df["embedding"] = retriever.embed_queries(queries=questions).tolist() | |
| df = df.rename(columns={"question": "content"}) | |
| # Convert Dataframe to list of dicts and index them in our DocumentStore | |
| docs_to_index = df.to_dict(orient="records") | |
| document_store.write_documents(docs_to_index) | |
| return retriever | |
| # --- Audio pre-processing --- | |
| # Utility functions for loading audio files and making sure the sample rate is correct. | |
| def load_wav_16k_mono(filename): | |
| """ Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio. """ | |
| file_contents = tf.io.read_file(filename) | |
| wav, sample_rate = tf.audio.decode_wav( | |
| file_contents, | |
| desired_channels=1) | |
| wav = tf.squeeze(wav, axis=-1) | |
| sample_rate = tf.cast(sample_rate, dtype=tf.int64) | |
| wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000) | |
| return wav | |
| def load_wav_16k_mono_librosa(filename): | |
| """ Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio using librosa. """ | |
| wav, sample_rate = librosa.load(filename, sr=16000, mono=True) | |
| return wav | |
| def load_wav_16k_mono_soundfile(filename): | |
| """ Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio using soundfile. """ | |
| wav, sample_rate = sf.read(filename, dtype='float32') | |
| # Resample to 16 kHz if necessary | |
| if sample_rate != 16000: | |
| wav = librosa.resample(wav, orig_sr=sample_rate, target_sr=16000) | |
| return wav | |
| # --- History --- | |
| def updateHistory(): | |
| global history | |
| return history | |
| def clearHistory(): | |
| global history | |
| history = "" | |
| return history | |
| def clear(): | |
| return None | |
| # --- Output Format --- | |
| def format_dictionary(dictionary): | |
| result = [] | |
| for key, value in dictionary.items(): | |
| percentage = int(value * 100) | |
| result.append(f"{key}: {percentage}%") | |
| return ', '.join(result) | |
| def format_json(json_data): | |
| confidence_strings = [f"{item['label']}: {round(item['confidence']*100)}%" for item in json_data['confidences']] | |
| result_string = f"{', '.join(confidence_strings)}" | |
| return result_string | |
| def format_json_pt(json_data): | |
| from unidecode import unidecode | |
| confidence_strings = [f"{item['label']}... " for item in json_data['confidences']] | |
| result_string = f"{', '.join(confidence_strings)}" | |
| return unidecode(result_string) | |
| # --- Classification --- | |
| def load_label_mapping(csv_path): | |
| label_mapping = {} | |
| with open(csv_path, newline='', encoding='utf-8') as csvfile: | |
| reader = csv.DictReader(csvfile) | |
| for row in reader: | |
| label_mapping[int(row['index'])] = row['display_name'] | |
| return label_mapping | |
| def predict_yamnet(interpreter, waveform, input_details, output_details, label_mapping): | |
| # Pré-processamento da waveform para corresponder aos requisitos do modelo | |
| input_shape = input_details[0]['shape'] | |
| input_data = np.array(waveform, dtype=np.float32) | |
| if input_data.shape != input_shape: | |
| # Redimensionar ou preencher a waveform para corresponder ao tamanho esperado | |
| if input_data.shape[0] < input_shape[0]: | |
| # Preencher a waveform com zeros | |
| padding = np.zeros((input_shape[0] - input_data.shape[0],)) | |
| input_data = np.concatenate((input_data, padding)) | |
| elif input_data.shape[0] > input_shape[0]: | |
| # Redimensionar a waveform | |
| input_data = input_data[:input_shape[0]] | |
| input_data = np.reshape(input_data, input_shape) | |
| # Executar a inferência | |
| interpreter.set_tensor(input_details[0]['index'], input_data) | |
| interpreter.invoke() | |
| # Obter os resultados da inferência | |
| output_data = interpreter.get_tensor(output_details[0]['index']) | |
| # Processar os resultados e imprimir nome da etiqueta | |
| top_labels_indices = np.argsort(output_data[0])[::-1][:3] | |
| results = [] | |
| for i in top_labels_indices: | |
| label_name = label_mapping.get(i, "Unknown Label") | |
| probability = float(output_data[0][i]) # Converter para float | |
| results.append({'label': label_name, 'probability': str(probability)}) | |
| return results # Retornar um dicionário contendo a lista de resultados | |
| def classify(audio, language="en-us"): | |
| #Preprocessing audio | |
| wav_data = load_wav_16k_mono_librosa(audio) | |
| if(language == "pt-br"): | |
| #Label Mapping | |
| label_mapping = load_label_mapping('content/yamnet_class_map_ptbr.csv') | |
| else: | |
| label_mapping = load_label_mapping('content/yamnet_class_map.csv') | |
| #Load Model by File | |
| model = load_model_file('content/yamnet_classification.tflite') | |
| input_details = model.get_input_details() | |
| output_details = model.get_output_details() | |
| #Classification | |
| result = predict_yamnet(model, wav_data, input_details, output_details, label_mapping) | |
| return result | |
| def classify_realtime(language, audio, state): | |
| #Preprocessing audio | |
| wav_data = load_wav_16k_mono_librosa(audio) | |
| if(language == "pt-br"): | |
| #Label Mapping | |
| label_mapping = load_label_mapping('content/yamnet_class_map_ptbr.csv') | |
| else: | |
| label_mapping = load_label_mapping('content/yamnet_class_map.csv') | |
| #Load Model by File | |
| model = load_model_file('content/yamnet_classification.tflite') | |
| input_details = model.get_input_details() | |
| output_details = model.get_output_details() | |
| #Classification | |
| result = predict_yamnet(model, wav_data, input_details, output_details, label_mapping) | |
| state += result + " " | |
| return result, state | |
| # --- TTS --- | |
| def generate_audio(spec_generator, model, input_text): | |
| parsed = spec_generator.parse(input_text) | |
| spectrogram = spec_generator.generate_spectrogram(tokens=parsed) | |
| audio = model.convert_spectrogram_to_audio(spec=spectrogram) | |
| return 22050, audio.cpu().detach().numpy().squeeze() | |
| def generate_audio_tt5(model, processor, vocoder, speaker_embeddings, text): | |
| inputs = processor(text=text, return_tensors="pt") | |
| audio = model.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder) | |
| return 16000, audio.cpu().detach().numpy().squeeze() | |
| def TTS(json_input, language): | |
| global spec_generator, model_nvidia, history | |
| global model_tt5, processor, vocoder, speaker_embeddings | |
| if language == 'en-us': | |
| sr, generatedAudio = generate_audio(spec_generator, model_nvidia, format_json(json_input)) | |
| else: | |
| sr, generatedAudio = generate_audio_tt5(model_tt5, processor, vocoder, speaker_embeddings, format_json_pt(json_input)) | |
| return (sr, generatedAudio) | |
| def TTS_ASR(json_input, language): | |
| global spec_generator, model_nvidia, history | |
| global model_tt5, processor, vocoder, speaker_embeddings | |
| if language == 'en-us': | |
| sr, generatedAudio = generate_audio(spec_generator, model_nvidia, json_input['label']) | |
| else: | |
| sr, generatedAudio = generate_audio_tt5(model_tt5, processor, vocoder, speaker_embeddings, json_input['label']) | |
| return (sr, generatedAudio) | |
| def TTS_chatbot(language): | |
| global spec_generator, model_nvidia, history | |
| global model_tt5, processor, vocoder, speaker_embeddings | |
| global last_answer | |
| if language == 'en-us': | |
| sr, generatedAudio = generate_audio(spec_generator, model_nvidia, last_answer) | |
| else: | |
| sr, generatedAudio = generate_audio_tt5(model_tt5, processor, vocoder, speaker_embeddings, last_answer) | |
| return (sr, generatedAudio) | |
| # --- ASR --- | |
| def transcribe_speech(filepath, language): | |
| print(filepath) | |
| if(language == "pt-br"): | |
| output = pipe( | |
| filepath, | |
| max_new_tokens=256, | |
| generate_kwargs={ | |
| "task": "transcribe", | |
| "language": "portuguese", | |
| }, | |
| chunk_length_s=30, | |
| batch_size=8, | |
| ) | |
| else: | |
| output = pipe_en( | |
| filepath, | |
| max_new_tokens=256, | |
| generate_kwargs={ | |
| "task": "transcribe", | |
| "language": "english", | |
| }, | |
| chunk_length_s=30, | |
| batch_size=8, | |
| ) | |
| return output["text"] | |
| def transcribe_speech_realtime(filepath, state): | |
| output = pipe( | |
| filepath, | |
| max_new_tokens=256, | |
| generate_kwargs={ | |
| "task": "transcribe", | |
| "language": "english", | |
| }, | |
| chunk_length_s=30, | |
| batch_size=8, | |
| ) | |
| state += output["text"] + " " | |
| return output["text"], state | |
| def transcribe_realtime(new_chunk, stream): | |
| sr, y = new_chunk | |
| y = y.astype(np.float32) | |
| y /= np.max(np.abs(y)) | |
| if stream is not None: | |
| stream = np.concatenate([stream, y]) | |
| else: | |
| stream = y | |
| return stream, pipe_en({"sampling_rate": sr, "raw": stream})["text"] | |
| # --- Translation --- | |
| def translate_enpt(text): | |
| global enpt_pipeline | |
| translation = enpt_pipeline(f"translate English to Portuguese: {text}") | |
| return translation[0]['generated_text'] | |
| # --- Gradio Interface --- | |
| def interface(language, audio): | |
| global classificationResult | |
| result = classify(language, audio) | |
| dic = {result[0]['label']: float(result[0]['probability']), | |
| result[1]['label']: float(result[1]['probability']), | |
| result[2]['label']: float(result[2]['probability']) | |
| } | |
| # history += result[0]['label'] + '\n' | |
| classificationResult = dic | |
| return dic | |
| def interface_realtime(language, audio): | |
| global history | |
| result = classify(language, audio) | |
| dic = {result[0]['label']: float(result[0]['probability']), | |
| result[1]['label']: float(result[1]['probability']), | |
| result[2]['label']: float(result[2]['probability']) | |
| } | |
| history = result[0]['label'] + '\n' + history | |
| return dic | |
| # --- QA Model --- | |
| def get_answers(retriever, query): | |
| from haystack.pipelines import FAQPipeline | |
| pipe = FAQPipeline(retriever=retriever) | |
| from haystack.utils import print_answers | |
| # Run any question and change top_k to see more or less answers | |
| prediction = pipe.run(query=query, params={"Retriever": {"top_k": 1}}) | |
| answers = prediction['answers'] | |
| if answers: | |
| return answers[0].answer | |
| else: | |
| return "I don't have an answer to that question" | |
| def add_text(chat_history, text): | |
| chat_history = chat_history + [(text, None)] | |
| return chat_history, gr.Textbox(value="", interactive=False) | |
| def chatbot_response(chat_history, language): | |
| chat_history[-1][1] = "" | |
| global retriever | |
| global last_answer | |
| if language == 'pt-br': | |
| response = get_answers(retriever, GoogleTranslator(source='pt', target='en').translate(chat_history[-1][0])) | |
| response = GoogleTranslator(source='en', target='pt').translate(response) | |
| else: | |
| response = get_answers(retriever, chat_history[-1][0]) | |
| last_answer = response | |
| for character in response: | |
| chat_history[-1][1] += character | |
| time.sleep(0.01) | |
| yield chat_history | |
| retriever = load_qa_model() | |
| spec_generator, model_nvidia = initialize_text_to_speech_model() | |
| model_tt5, processor, vocoder, speaker_embeddings = initialize_tt5_model() | |
| pipe = pipeline("automatic-speech-recognition", model="pedropauletti/whisper-small-pt") | |
| pipe_en = pipeline("automatic-speech-recognition", model="openai/whisper-small") |