import torch import whisperx import json import sys import os from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline import warnings import concurrent.futures from functools import lru_cache import numpy as np warnings.filterwarnings("ignore") # Configurações otimizadas BATCH_SIZE = 8 COMPUTE_TYPE = "int8" # Mudado para int8 para maior compatibilidade DEVICE = "cuda" if torch.cuda.is_available() else "cpu" MAX_WORKERS = 2 CHUNK_SIZE = 30 MIN_WORD_DURATION = 0.02 OVERLAP = 2 SCORE_MINIMO = 0.5 TERMO_FIXO = ["CETOX", "CETOX31", "WhisperX", "VSL"] MODEL_NAME = "unicamp-dl/ptt5-base-portuguese-vocab" # Configurar ambiente os.environ["HF_HOME"] = "/app/.cache/huggingface" # Usar HF_HOME em vez de TRANSFORMERS_CACHE if DEVICE == "cuda": torch.backends.cudnn.benchmark = True torch.set_float32_matmul_precision('high') else: torch.set_num_threads(2) def corrigir_palavra(corretor, palavra, contexto=""): """Corrige uma única palavra mantendo maiúsculas/minúsculas""" if not palavra.strip() or palavra.upper() in [t.upper() for t in TERMO_FIXO] or palavra.isnumeric(): return palavra try: # Preservar maiúsculas/minúsculas originais is_upper = palavra.isupper() is_title = palavra.istitle() # Usar contexto para melhor correção entrada = f"corrigir gramática no contexto '{contexto}': {palavra}" corrigida = corretor(entrada, max_length=40, do_sample=False, num_beams=1)[0]["generated_text"] # Limpar resultado corrigida = corrigida.replace("corrigir gramática no contexto", "").replace("'", "").strip(": .") # Restaurar maiúsculas/minúsculas if is_upper: return corrigida.upper() elif is_title: return corrigida.title() return corrigida.capitalize() except Exception as e: print(f"[AVISO] Erro ao corrigir palavra '{palavra}': {e}") return palavra.capitalize() def process_chunk(audio_chunk, model, align_model, metadata): """Processa um chunk de áudio""" try: # Transcrever chunk result = model.transcribe( audio_chunk, batch_size=BATCH_SIZE, language="pt", beam_size=5 ) # Alinhar palavras aligned = whisperx.align( result["segments"], align_model, metadata, audio_chunk, DEVICE, return_char_alignments=False ) return aligned.get("word_segments", []) except Exception as e: print(f"[ERRO] Falha ao processar chunk: {e}") return [] def merge_words(words): """Mescla palavras próximas""" if not words: return words merged = [] current_word = words[0] for next_word in words[1:]: # Se intervalo muito pequeno, mesclar if next_word["start"] - current_word["end"] < MIN_WORD_DURATION: current_word = { "word": f"{current_word['word']} {next_word['word']}", "start": current_word["start"], "end": next_word["end"], "score": (current_word["score"] + next_word["score"]) / 2 } else: if current_word["score"] >= SCORE_MINIMO: merged.append(current_word) current_word = next_word if current_word["score"] >= SCORE_MINIMO: merged.append(current_word) return merged def main(): # === VERIFICAR ARGUMENTOS === if len(sys.argv) != 2: print("Uso: python transcriptor.py ") sys.exit(1) AUDIO_PATH = sys.argv[1] # Verificar se arquivo existe if not os.path.exists(AUDIO_PATH): print(f"[ERRO] Arquivo não encontrado: {AUDIO_PATH}") sys.exit(1) # Gerar nome do JSON baseado no áudio base_name = os.path.splitext(os.path.basename(AUDIO_PATH))[0] OUTPUT_JSON = f"{base_name}_transcricao.json" # === CONFIGURAÇÕES === LANGUAGE = "pt" print(f"[INFO] Usando dispositivo: {DEVICE.upper()}") # === CARREGAR MODELOS === print("[INFO] Carregando modelo WhisperX...") try: model = whisperx.load_model( "medium", # Usar modelo medium para melhor equilíbrio DEVICE, compute_type=COMPUTE_TYPE, language=LANGUAGE, asr_options={"beam_size": 5} ) except Exception as e: print(f"[ERRO] Falha ao carregar modelo: {e}") sys.exit(1) print("[INFO] Carregando modelo de alinhamento...") align_model, metadata = whisperx.load_align_model( language_code=LANGUAGE, device=DEVICE ) # === CARREGAR ÁUDIO === print("[INFO] Carregando áudio...") audio = whisperx.load_audio(AUDIO_PATH) # === PROCESSAR EM CHUNKS === print("[INFO] Processando áudio em chunks...") audio_duration = len(audio) / model.feature_extractor.sampling_rate chunk_samples = int(CHUNK_SIZE * model.feature_extractor.sampling_rate) overlap_samples = int(OVERLAP * model.feature_extractor.sampling_rate) chunks = [] for start in range(0, len(audio), chunk_samples - overlap_samples): end = min(start + chunk_samples, len(audio)) chunks.append(audio[start:end]) # Processar chunks em paralelo all_words = [] with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [ executor.submit(process_chunk, chunk, model, align_model, metadata) for chunk in chunks ] for future in concurrent.futures.as_completed(futures): try: words = future.result() all_words.extend(words) except Exception as e: print(f"[ERRO] Falha ao processar chunk: {e}") # Ordenar palavras por tempo all_words.sort(key=lambda x: x["start"]) # Mesclar palavras próximas all_words = merge_words(all_words) # === CORREÇÃO GRAMATICAL === print("[INFO] Aplicando correção gramatical...") try: tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) model_corr = AutoModelForSeq2SeqLM.from_pretrained(MODEL_NAME) corretor = pipeline( "text2text-generation", model=model_corr, tokenizer=tokenizer, device=0 if DEVICE == "cuda" else -1, batch_size=BATCH_SIZE ) corretor_disponivel = True # Corrigir palavras em lotes for i in range(0, len(all_words), BATCH_SIZE): batch = all_words[i:i + BATCH_SIZE] for word in batch: # Criar contexto com palavras vizinhas start_idx = max(0, i - 2) end_idx = min(len(all_words), i + 3) contexto = " ".join(w["word"] for w in all_words[start_idx:end_idx]) word["word"] = corrigir_palavra(corretor, word["word"].strip(), contexto) except Exception as e: print(f"[AVISO] Correção desativada: {e}") corretor_disponivel = False # Capitalizar palavras mesmo sem correção for word in all_words: word["word"] = word["word"].capitalize() # === SALVAR RESULTADO === output = { "metadata": { "total_words": len(all_words), "arquivo": AUDIO_PATH, "modelo": "WhisperX medium", "correcao_gramatical": corretor_disponivel, "duracao": audio_duration, "chunks": len(chunks) }, "words": [{ "word": w["word"], "start": round(w["start"], 3), "end": round(w["end"], 3), "score": round(w["score"], 3) } for w in all_words] } with open(OUTPUT_JSON, "w", encoding="utf-8") as f: json.dump(output, f, ensure_ascii=False, indent=2) print(f"[SUCESSO] Transcrição salva em {OUTPUT_JSON} com {len(all_words)} palavras!") return OUTPUT_JSON if __name__ == "__main__": main()