|
import os |
|
import chardet |
|
import gradio as gr |
|
import fitz |
|
import faiss |
|
import numpy as np |
|
from sentence_transformers import SentenceTransformer |
|
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM |
|
|
|
|
|
INDEX_PATH = "faiss_index.idx" |
|
CHUNKS_PATH = "chunks.txt" |
|
|
|
|
|
def extract_text_from_pdf(pdf_path): |
|
doc = fitz.open(pdf_path) |
|
return "\n".join(page.get_text() for page in doc) |
|
|
|
def load_documents(folder_path): |
|
texts, filenames = [], [] |
|
for filename in os.listdir(folder_path): |
|
path = os.path.join(folder_path, filename) |
|
if filename.endswith(".pdf"): |
|
texts.append(extract_text_from_pdf(path)) |
|
filenames.append(filename) |
|
elif filename.endswith(".txt"): |
|
with open(path, "rb") as f: |
|
raw_data = f.read() |
|
result = chardet.detect(raw_data) |
|
encoding = result["encoding"] or "utf-8" |
|
try: |
|
text = raw_data.decode(encoding) |
|
texts.append(text) |
|
filenames.append(filename) |
|
except Exception as e: |
|
print(f"Erreur lors du décodage de {filename} : {e}") |
|
return texts, filenames |
|
|
|
|
|
def chunk_text(text, max_len=500): |
|
paragraphs = text.split('\n\n') |
|
chunks, current_chunk = [], "" |
|
for p in paragraphs: |
|
if len(current_chunk) + len(p) < max_len: |
|
current_chunk += p + "\n\n" |
|
else: |
|
if current_chunk.strip(): |
|
chunks.append(current_chunk.strip()) |
|
current_chunk = p + "\n\n" |
|
if current_chunk.strip(): |
|
chunks.append(current_chunk.strip()) |
|
return chunks |
|
|
|
|
|
def build_and_save_faiss_index(chunks, model): |
|
if not chunks: |
|
raise ValueError("Aucun chunk fourni pour l'indexation.") |
|
embeddings = model.encode(chunks, convert_to_numpy=True) |
|
if len(embeddings.shape) == 1: |
|
embeddings = embeddings.reshape(1, -1) |
|
embeddings = embeddings.astype("float32") |
|
dimension = embeddings.shape[1] |
|
index = faiss.IndexFlatL2(dimension) |
|
index.add(embeddings) |
|
faiss.write_index(index, INDEX_PATH) |
|
with open(CHUNKS_PATH, "w", encoding="utf-8") as f: |
|
f.write("\n<>\n".join(chunks)) |
|
return index, chunks |
|
|
|
|
|
def load_faiss_index(): |
|
if not os.path.exists(INDEX_PATH) or not os.path.exists(CHUNKS_PATH): |
|
return None, None |
|
index = faiss.read_index(INDEX_PATH) |
|
with open(CHUNKS_PATH, "r", encoding="utf-8") as f: |
|
chunks = f.read().split("\n<>\n") |
|
return index, chunks |
|
|
|
|
|
emotion_model_name = "astrosbd/french_emotion_camembert" |
|
emotion_tokenizer = AutoTokenizer.from_pretrained(emotion_model_name) |
|
emotion_model = AutoModelForSequenceClassification.from_pretrained(emotion_model_name) |
|
emotion_pipe = pipeline("text-classification", model=emotion_model, tokenizer=emotion_tokenizer) |
|
|
|
gen_model_name = "plguillou/t5-base-fr-sum-cnndm" |
|
gen_tokenizer = AutoTokenizer.from_pretrained(gen_model_name) |
|
gen_model = AutoModelForSeq2SeqLM.from_pretrained(gen_model_name) |
|
|
|
emb_model = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2') |
|
index, chunks = load_faiss_index() |
|
if index is None or chunks is None: |
|
print("Reconstruction de l'index FAISS...") |
|
folder_path = "./happython" |
|
raw_texts, filenames = load_documents(folder_path) |
|
all_chunks = [] |
|
for text in raw_texts: |
|
all_chunks.extend(chunk_text(text)) |
|
if not all_chunks: |
|
raise ValueError("Aucun texte trouvé dans les documents.") |
|
index, chunks = build_and_save_faiss_index(all_chunks, emb_model) |
|
|
|
|
|
|
|
def accueil(): |
|
return ( |
|
"Je suis NEOLA, le Médiateur virtuel. Que souhaitez-vous faire aujourd'hui ?", |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
) |
|
|
|
def choix_temoignage(): |
|
return ( |
|
"Merci ! Écris ici ton témoignage heureux puis valide.", |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
) |
|
|
|
def choix_sentiment(): |
|
return ( |
|
"Décris-moi en quelques mots comment tu te sens aujourd'hui.", |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
) |
|
|
|
def choix_question(): |
|
return ( |
|
"Quelle est ta question ?", |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
) |
|
|
|
def handle_temoignage(temoignage): |
|
return f"Merci pour ton témoignage ! Il a bien été reçu. 😊", None, *accueil()[1:] |
|
|
|
def handle_sentiment(sentiment): |
|
emotion_result = emotion_pipe(sentiment) |
|
emotion = emotion_result[0]['label'] |
|
suggestion = { |
|
"joy": "Tu sembles joyeux·se ! Veux-tu partager cette joie dans un témoignage ou essayer un atelier créatif ?", |
|
"sad": "Je ressens de la tristesse. Tu peux écrire ce que tu ressens ou consulter la boîte à outils 'réconfort'.", |
|
"anger": "Exprimer sa colère est important. Souhaites-tu en parler ou découvrir des exercices de gestion ?" |
|
}.get(emotion, "Merci pour ce partage. Veux-tu explorer un atelier ou écrire un témoignage ?") |
|
return f"Émotion détectée : {emotion}. {suggestion}", emotion, *accueil()[1:] |
|
|
|
def handle_question(question): |
|
|
|
query_vec = emb_model.encode([question], convert_to_numpy=True).astype("float32") |
|
if query_vec.shape[1] != index.d: |
|
return "Désolé, problème technique avec la recherche.", None, *accueil()[1:] |
|
D, I = index.search(query_vec, k=3) |
|
relevant_chunks = [chunks[i] for i in I[0] if chunks[i].strip()] |
|
context = "\n".join(relevant_chunks) |
|
prompt = ( |
|
"Tu es NEOLA, le médiateur bienveillant du Happython Village.\n" |
|
"Réponds en français de façon claire, concise et bienveillante à la question suivante, en t'appuyant sur les souvenirs du village si utile.\n\n" |
|
f"Question du villageois : \"{question}\"\n" |
|
f"Contexte :\n{context}\n" |
|
"Réponse :" |
|
) |
|
input_ids = gen_tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True).input_ids |
|
output_ids = gen_model.generate(input_ids, max_new_tokens=120) |
|
response = gen_tokenizer.decode(output_ids[0], skip_special_tokens=True) |
|
|
|
emotion_result = emotion_pipe(question) |
|
emotion = emotion_result[0]['label'] |
|
return response, emotion, *accueil()[1:] |
|
|
|
|
|
def neola_api(message, user_info=None): |
|
if message.lower().startswith("témoignage:"): |
|
rep, emotion, *_ = handle_temoignage(message[11:].strip()) |
|
return rep, emotion |
|
elif message.lower().startswith("sentiment:"): |
|
rep, emotion, *_ = handle_sentiment(message[10:].strip()) |
|
return rep, emotion |
|
elif message.lower().startswith("question:"): |
|
rep, emotion, *_ = handle_question(message[9:].strip()) |
|
return rep, emotion |
|
else: |
|
emotion_result = emotion_pipe(message) |
|
emotion = emotion_result[0]['label'] |
|
response = f"Bienvenue ! (Émotion détectée : {emotion}) Que veux-tu faire aujourd'hui ? (Déposer un témoignage, exprimer un sentiment, poser une question...)" |
|
return response, emotion |
|
|
|
|
|
|
|
with gr.Blocks(theme="soft") as demo: |
|
gr.Markdown("## 👋 Bienvenue au Happython Village !") |
|
chatbot = gr.Textbox( |
|
label="NEOLA", |
|
value="Je suis NEOLA, le Médiateur virtuel. Que souhaitez-vous faire aujourd'hui ?", |
|
interactive=False |
|
) |
|
with gr.Row(): |
|
btn_temoignage = gr.Button("Déposer un témoignage heureux") |
|
btn_sentiment = gr.Button("Je ne sais pas quoi faire") |
|
btn_question = gr.Button("J'ai des questions") |
|
zone_temoignage = gr.Textbox(label="Écris ton témoignage ici", visible=False) |
|
zone_sentiment = gr.Textbox(label="Décris tes sentiments", visible=False) |
|
zone_question = gr.Textbox(label="Pose ta question", visible=False) |
|
|
|
btn_temoignage.click(choix_temoignage, None, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
|
btn_sentiment.click(choix_sentiment, None, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
|
btn_question.click(choix_question, None, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
|
|
|
zone_temoignage.submit(handle_temoignage, zone_temoignage, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
|
zone_sentiment.submit(handle_sentiment, zone_sentiment, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
|
zone_question.submit(handle_question, zone_question, [chatbot, btn_temoignage, zone_temoignage, zone_sentiment, zone_question]) |
|
|
|
|
|
gr.Interface( |
|
fn=neola_api, |
|
inputs=[gr.Textbox(label="Message"), gr.JSON(label="Infos utilisateur")], |
|
outputs=[ |
|
gr.Textbox(label="Réponse NEOLA"), |
|
gr.Textbox(label="Émotion détectée") |
|
], |
|
title="API NEOLA (pour widget JS)", |
|
description="Appelle ce Space en POST sur /predict/ avec {'data': [message, user_info]}. La réponse contient [réponse, émotion]." |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |