Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| from sentence_transformers import SentenceTransformer, util | |
| from transformers import T5ForConditionalGeneration, T5Tokenizer | |
| from pypdf import PdfReader | |
| import os | |
| # --- 1. Carregamento dos Modelos (faça isso apenas uma vez) --- | |
| # Esta parte não muda. Usaremos os mesmos modelos eficientes. | |
| print("Carregando o modelo de recuperação (Sentence Transformer)...") | |
| retriever_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| print("Carregando o modelo de geração (Flan-T5)...") | |
| generator_tokenizer = T5Tokenizer.from_pretrained('google/flan-t5-base') | |
| generator_model = T5ForConditionalGeneration.from_pretrained('google/flan-t5-base') | |
| print("Modelos carregados com sucesso!") | |
| # --- 2. Função para Processar Arquivos Enviados --- | |
| def process_files(files): | |
| """ | |
| Lê arquivos .pdf e .txt, extrai o texto e o divide em blocos. | |
| Retorna os blocos de texto e seus embeddings correspondentes. | |
| """ | |
| if not files: | |
| return None, "Por favor, envie um ou mais arquivos." | |
| knowledge_text = "" | |
| for file in files: | |
| file_path = file.name | |
| # Extrai texto de arquivos PDF | |
| if file_path.endswith(".pdf"): | |
| try: | |
| reader = PdfReader(file_path) | |
| for page in reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| knowledge_text += page_text + "\n" | |
| except Exception as e: | |
| return None, f"Erro ao ler o arquivo PDF {os.path.basename(file_path)}: {e}" | |
| # Extrai texto de arquivos TXT | |
| elif file_path.endswith(".txt"): | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| knowledge_text += f.read() + "\n" | |
| except Exception as e: | |
| return None, f"Erro ao ler o arquivo TXT {os.path.basename(file_path)}: {e}" | |
| if not knowledge_text.strip(): | |
| return None, "Não foi possível extrair texto dos arquivos fornecidos." | |
| # Divide o texto em blocos menores (parágrafos) para uma melhor recuperação | |
| text_chunks = [chunk.strip() for chunk in knowledge_text.split('\n\n') if chunk.strip()] | |
| if not text_chunks: | |
| return None, "O texto extraído não continha blocos de texto válidos para processamento." | |
| # Cria os embeddings para a base de conhecimento extraída | |
| print(f"Processando {len(text_chunks)} blocos de texto dos arquivos...") | |
| knowledge_base_embeddings = retriever_model.encode(text_chunks, convert_to_tensor=True, show_progress_bar=True) | |
| print("Base de conhecimento criada a partir dos arquivos.") | |
| # Retorna a base de conhecimento (blocos e embeddings) e uma mensagem de sucesso | |
| return (text_chunks, knowledge_base_embeddings), f"✅ Sucesso! {len(files)} arquivo(s) processado(s), gerando {len(text_chunks)} blocos de texto." | |
| # --- 3. A Função Principal do RAG --- | |
| def answer_question(question, knowledge_state): | |
| """ | |
| Recebe uma pergunta e o estado da base de conhecimento (texto e embeddings) | |
| e gera uma resposta. | |
| """ | |
| if not question: | |
| return "Por favor, insira uma pergunta." | |
| if not knowledge_state or not knowledge_state[0] or knowledge_state[1] is None: | |
| return "⚠️ A base de conhecimento está vazia. Por favor, processe alguns arquivos primeiro." | |
| knowledge_base, knowledge_base_embeddings = knowledge_state | |
| # Etapa de Recuperação (Retrieval) | |
| question_embedding = retriever_model.encode(question, convert_to_tensor=True) | |
| cosine_scores = util.cos_sim(question_embedding, knowledge_base_embeddings) | |
| # Encontra o bloco de texto mais relevante | |
| best_doc_index = torch.argmax(cosine_scores) | |
| retrieved_context = knowledge_base[best_doc_index] | |
| print(f"\n--- Nova Pergunta de Auditoria ---") | |
| print(f"Pergunta: {question}") | |
| print(f"Contexto Recuperado: {retrieved_context}") | |
| # Etapa de Geração (Generation) | |
| prompt = f""" | |
| Contexto: {retrieved_context} | |
| Pergunta: {question} | |
| Com base estritamente no contexto do documento fornecido, responda à pergunta do auditor. | |
| Resposta: | |
| """ | |
| input_ids = generator_tokenizer(prompt, return_tensors="pt").input_ids | |
| outputs = generator_model.generate( | |
| input_ids, | |
| max_length=256, # Aumentado para respostas potencialmente mais longas | |
| num_beams=5, | |
| early_stopping=True | |
| ) | |
| answer = generator_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| return answer | |
| # --- 4. Criação da Interface com Gradio Blocks --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as interface: | |
| # Estado para armazenar a base de conhecimento processada | |
| knowledge_state = gr.State() | |
| gr.Markdown( | |
| """ | |
| # 🤖 RAG - Auditor de Documentos | |
| **1. Carregue seus arquivos**: Envie um ou mais certificados ou documentos nos formatos `.pdf` ou `.txt`. | |
| **2. Processe os arquivos**: Clique no botão para criar a base de conhecimento. | |
| **3. Faça perguntas**: Após o processamento, faça perguntas sobre o conteúdo dos documentos. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_uploader = gr.File( | |
| label="Carregar Certificados (.pdf, .txt)", | |
| file_count="multiple", | |
| file_types=[".pdf", ".txt"] | |
| ) | |
| process_button = gr.Button("Processar Arquivos", variant="primary") | |
| status_box = gr.Textbox(label="Status do Processamento", interactive=False) | |
| with gr.Column(scale=2): | |
| question_box = gr.Textbox(label="Faça sua pergunta aqui", placeholder="Ex: Qual o resultado da calibração do instrumento PI-101?") | |
| submit_button = gr.Button("Obter Resposta", variant="primary") | |
| answer_box = gr.Textbox(label="Resposta Baseada nos Documentos", interactive=False, lines=5) | |
| # Conecta os componentes às funções | |
| process_button.click( | |
| fn=process_files, | |
| inputs=[file_uploader], | |
| outputs=[knowledge_state, status_box] | |
| ) | |
| submit_button.click( | |
| fn=answer_question, | |
| inputs=[question_box, knowledge_state], | |
| outputs=[answer_box] | |
| ) | |
| # --- 5. Lançamento do App --- | |
| if __name__ == "__main__": | |
| interface.launch() | |