import streamlit as st import pandas as pd import numpy as np import re import random import time import streamlit as st from dotenv import load_dotenv from langchain_experimental.text_splitter import SemanticChunker from langchain_community.embeddings import OpenAIEmbeddings from langchain_community.vectorstores import FAISS from langchain_community.chat_models import ChatOpenAI from langchain import hub from langchain_core.runnables import RunnablePassthrough from langchain_community.document_loaders import WebBaseLoader,FireCrawlLoader,PyPDFLoader from langchain_core.prompts.prompt import PromptTemplate import os from high_chart import test_chart from chat_with_pps import get_response from ecologits.tracers.utils import compute_llm_impacts from codecarbon import EmissionsTracker load_dotenv() def get_docs_from_website(urls): loader = WebBaseLoader(urls, header_template={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36', }) try: docs = loader.load() return docs except Exception as e: return None def get_docs_from_website_fc(urls,firecrawl_api_key): docs = [] try: for url in urls: loader = FireCrawlLoader(api_key=firecrawl_api_key, url = url,mode="scrape") docs+=loader.load() return docs except Exception as e: return None def get_doc_chunks(docs): # Split the loaded data # text_splitter = RecursiveCharacterTextSplitter( # chunk_size=500, # chunk_overlap=100) text_splitter = SemanticChunker(OpenAIEmbeddings(model="text-embedding-3-small")) docs = text_splitter.split_documents(docs) return docs def get_doc_chunks_fc(docs): # Split the loaded data # text_splitter = RecursiveCharacterTextSplitter( # chunk_size=500, # chunk_overlap=100) text_splitter = SemanticChunker(OpenAIEmbeddings(model="text-embedding-3-small")) docs_splitted = [] for text in docs: text_splitted = text_splitter.split_text(text) docs_splitted+=text_splitted return docs_splitted def get_vectorstore_from_docs(doc_chunks): embedding = OpenAIEmbeddings(model="text-embedding-3-small") vectorstore = FAISS.from_documents(documents=doc_chunks, embedding=embedding) return vectorstore def get_vectorstore_from_text(texts): embedding = OpenAIEmbeddings(model="text-embedding-3-small") vectorstore = FAISS.from_texts(texts=texts, embedding=embedding) return vectorstore def get_conversation_chain(vectorstore): llm = ChatOpenAI(model="gpt-4o",temperature=0.5, max_tokens=2048) retriever=vectorstore.as_retriever() prompt = hub.pull("rlm/rag-prompt") # Chain rag_chain = ( {"context": retriever , "question": RunnablePassthrough()} | prompt | llm ) return rag_chain # FILL THE PROMPT FOR THE QUESTION VARIABLE THAT WILL BE USED IN THE RAG PROMPT, ATTENTION NOT CONFUSE WITH THE RAG PROMPT def fill_promptQ_template(input_variables, template): prompt = PromptTemplate(input_variables=["BRAND_NAME","BRAND_DESCRIPTION"], template=template) return prompt.format(BRAND_NAME=input_variables["BRAND_NAME"], BRAND_DESCRIPTION=input_variables["BRAND_DESCRIPTION"]) def text_to_list(text): lines = text.replace("- ","").split('\n') lines = [line.split() for line in lines] items = [[' '.join(line[:-1]),line[-1]] for line in lines] # Assuming `items` is the list of items for item in items: item[1] = re.sub(r'\D', '', item[1]) return items def delete_pp(pps): for pp in pps: for i in range(len(st.session_state['pp_grouped'])): if st.session_state['pp_grouped'][i]['name'] == pp: del st.session_state['pp_grouped'][i] break def display_list_urls(): for index, item in enumerate(st.session_state["urls"]): emp = st.empty() # Create an empty placeholder col1, col2 = emp.columns([7, 3]) # Divide the space into two columns # Button to delete the entry, placed in the second column if col2.button("❌", key=f"but{index}"): temp = st.session_state['parties_prenantes'][index] delete_pp(temp) del st.session_state.urls[index] del st.session_state["parties_prenantes"][index] st.rerun() # Rerun the app to update the display if len(st.session_state.urls) > index: # Instead of using markdown, use an expander in the first column with col1.expander(f"Source {index+1}: {item}"): pp = st.session_state["parties_prenantes"][index] st.write(pd.DataFrame(pp, columns=["Partie prenante"])) else: emp.empty() # Clear the placeholder if the index exceeds the list def colored_circle(color): return f'' def display_list_pps(): for index, item in enumerate(st.session_state["pp_grouped"]): emp = st.empty() col1, col2 = emp.columns([7, 3]) if col2.button("❌", key=f"butp{index}"): del st.session_state["pp_grouped"][index] st.rerun() if len(st.session_state["pp_grouped"]) > index: name = st.session_state["pp_grouped"][index]["name"] col1.markdown(f'

{colored_circle(st.session_state["pp_grouped"][index]["color"])} {st.session_state["pp_grouped"][index]["name"]}

', unsafe_allow_html=True ) else: emp.empty() def extract_pp(docs,input_variables): template_extraction_PP = """ Objectif : Identifiez toutes les parties prenantes de la marque suivante : Le nom de la marque de référence est le suivant : {BRAND_NAME} TA RÉPONSE DOIT ÊTRE SOUS FORME DE LISTE DE NOMS DE MARQUES, CHAQUE NOM SUR UNE LIGNE SÉPARÉE. """ #don't forget to add the input variables from the maim function if docs == None: return "445" #get text chunks text_chunks = get_doc_chunks(docs) #create vectorstore vectorstore = get_vectorstore_from_docs(text_chunks) chain = get_conversation_chain(vectorstore) question = fill_promptQ_template(input_variables, template_extraction_PP) start = time.perf_counter() response = chain.invoke(question) response_latency = time.perf_counter() - start # version plus poussée a considérer # each item in the list is a list with the name of the brand and the similarity percentage # partie_prenante = text_to_list(response.content) if "ne sais pas" in response.content: return "444" #calculate impact nbre_out_tokens = response.response_metadata["token_usage"]["completion_tokens"] provider = "openai" model = "gpt-4o" impact = compute_llm_impacts( provider=provider, model_name=model, output_token_count=nbre_out_tokens, request_latency=response_latency, ) st.session_state["partial_emissions"]["extraction_pp"]["el"] += impact.gwp.value #version simple partie_prenante = response.content.replace("- ","").split('\n') partie_prenante = [item.strip() for item in partie_prenante] return partie_prenante def generate_random_color(): # Generate random RGB values r = random.randint(0, 255) g = random.randint(0, 255) b = random.randint(0, 255) # Convert RGB to hexadecimal color_hex = '#{:02x}{:02x}{:02x}'.format(r, g, b) return color_hex def format_pp_add_viz(pp): y = 50 x = 50 for i in range(len(st.session_state['pp_grouped'])): if st.session_state['pp_grouped'][i]['y'] == y and st.session_state['pp_grouped'][i]['x'] == x: y += 5 if y > 95: y = 50 x += 5 if st.session_state['pp_grouped'][i]['name'] == pp: return None else: st.session_state['pp_grouped'].append({'name':pp, 'x':x,'y':y, 'color':generate_random_color()}) def add_pp(new_pp, default_value=50): new_pp = sorted(new_pp) new_pp = [item.lower().capitalize().strip() for item in new_pp] st.session_state['parties_prenantes'].append(new_pp) for pp in new_pp: format_pp_add_viz(pp) def add_existing_pps(pp,pouvoir,influence): for i in range(len(st.session_state['pp_grouped'])): if st.session_state['pp_grouped'][i]['name'] == pp: st.session_state['pp_grouped'][i]['x'] = influence st.session_state['pp_grouped'][i]['y'] = pouvoir return None st.session_state['pp_grouped'].append({'name':pp, 'x':influence,'y':pouvoir, 'color':generate_random_color()}) def load_csv(file): df = pd.read_csv(file) for index, row in df.iterrows(): add_existing_pps(row['parties prenantes'],row['pouvoir'],row['influence']) def add_pp_input_text(): new_pp = st.text_input("Ajouter une partie prenante") if st.button("Ajouter",key="add_single_pp"): format_pp_add_viz(new_pp) def complete_and_verify_url(partial_url): # Regex pattern for validating a URL regex = re.compile( r'^(?:http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,8}\.?|' # domain r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) regex = re.compile( r'^(?:http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,8}\.?|' # domain name r'localhost|' # or localhost r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # or IPv4 address r'(?::\d+)?' # optional port r'(?:[/?#][^\s]*)?$', # optional path, query, or fragment re.IGNORECASE) # Complete the URL if it doesn't have http:// or https:// if not partial_url.startswith(('http://', 'https://', 'www.')): if not partial_url.startswith('www.'): complete_url = 'https://www.' + partial_url else: complete_url = 'https://' + partial_url elif partial_url.startswith('www.'): complete_url = 'https://' + partial_url else: complete_url = partial_url # Check if the URL is valid if re.match(regex, complete_url): return (True, complete_url) else: return (False, complete_url) @st.dialog("Conseil IA",width="large") def show_conseil_ia(): prompt = "Prenant compte les données de l'entreprise (activité, produits, services ...), quelles sont les principales parties prenantes à animer pour une démarche RSE réussie ?" st.markdown(f"**{prompt}**") response = st.write_stream(get_response(prompt, "",st.session_state["latest_doc"][0].page_content)) st.warning("Quittez et saisissez une autre URL") def display_pp(): if "emission" not in st.session_state: tracker = EmissionsTracker() tracker.start() st.session_state["emission"] = tracker load_dotenv() fire_crawl_api_key = os.getenv("FIRECRAWL_API_KEY") #check if brand name and description are already set if "Nom de la marque" not in st.session_state: st.session_state["Nom de la marque"] = "" #check if urls and partie prenante are already set if "urls" not in st.session_state: st.session_state["urls"] = [] if "parties_prenantes" not in st.session_state: st.session_state['parties_prenantes'] = [] if "pp_grouped" not in st.session_state: #servira pour le plot et la cartographie des parties prenantes, regroupe sans doublons st.session_state['pp_grouped'] = [] if "latest_doc" not in st.session_state: st.session_state['latest_doc'] = "" if "not_pp" not in st.session_state: st.session_state["not_pp"] = "" st.title("IDENTIFIER ET ANIMER VOS PARTIES PRENANTES") #set brand name and description brand_name = st.text_input("Nom de la marque", st.session_state["Nom de la marque"]) st.session_state["Nom de la marque"] = brand_name option = st.radio("Source", ("A partir de votre site web", "A partir de vos documents entreprise","A partir de cartographie existante")) #if the user chooses to extract from website if option == "A partir de votre site web": url = st.text_input("Ajouter une URL") captions = ["L’IA prend en compte uniquement les textes contenus dans les pages web analysées","L’IA prend en compte les textes, les images et les liens URL contenus dans les pages web analysées"] scraping_option = st.radio("Mode", ("Analyse rapide", "Analyse profonde"),horizontal=True,captions = captions) #if the user clicks on the button if st.button("ajouter",key="add_pp"): st.session_state["not_pp"] = "" #complete and verify the url is_valid,url = complete_and_verify_url(url) if not is_valid: st.error("URL invalide") elif url in st.session_state["urls"] : st.error("URL déjà ajoutée") else: if scraping_option == "Analyse profonde": with st.spinner("Collecte des données..."): docs = get_docs_from_website_fc([url],fire_crawl_api_key) if docs is None: st.warning("Erreur lors de la collecte des données, 2eme essai avec collecte rapide...") with st.spinner("2eme essai, collecte rapide..."): docs = get_docs_from_website([url]) if scraping_option == "Analyse rapide": with st.spinner("Collecte des données..."): docs = get_docs_from_website([url]) if docs is None: st.error("Erreur lors de la collecte des données, URL unvalide") st.session_state["latest_doc"] = "" else: # Création de l'expander st.session_state["partial_emissions"]["Scrapping"]["cc"] = st.session_state["emission"].stop() st.session_state["latest_doc"] = docs with st.spinner("Processing..."): #handle the extraction input_variables = {"BRAND_NAME": brand_name, "BRAND_DESCRIPTION": ""} partie_prenante = extract_pp(docs, input_variables) if "444" in partie_prenante: #444 is the code for no brand found , chosen st.session_state["not_pp"] = "444" elif "445" in partie_prenante: #445 is the code for no website found with the given url st.error("Aucun site web trouvé avec l'url donnée") st.session_state["not_pp"] = "" else: st.session_state["not_pp"] = "" partie_prenante = sorted(partie_prenante) st.session_state["urls"].append(url) add_pp(partie_prenante) st.session_state["partial_emissions"]["extraction_pp"]["cc"] = st.session_state["emission"].stop() # alphabet = [ pp[0] for pp in partie_prenante] # pouvoir = [ 50 for _ in range(len(partie_prenante))] # df = pd.DataFrame({'partie_prenante': partie_prenante, 'pouvoir': pouvoir, 'code couleur': partie_prenante}) # st.write(df) # c = ( # alt.Chart(df) # .mark_circle(size=300) # .encode(x="partie_prenante", y=alt.Y("pouvoir",scale=alt.Scale(domain=[0,100])), color="code couleur") # ) # st.subheader("Vertical Slider") # age = st.slider("How old are you?", 0, 130, 25) # st.write("I'm ", age, "years old") # disp_vertical_slider(partie_prenante) # st.altair_chart(c, use_container_width=True) if option =="A partir de vos documents entreprise": uploaded_file = st.file_uploader("Télécharger le fichier PDF", type="pdf") if uploaded_file is not None: if st.button("ajouter",key="add_pp_pdf"): st.session_state["not_pp"] = "" with st.spinner("Processing..."): file_name = uploaded_file.name with open(file_name, mode='wb') as w: w.write(uploaded_file.getvalue()) pdf = PyPDFLoader(file_name) text = pdf.load() st.session_state["latest_doc"] = text input_variables = {"BRAND_NAME": brand_name, "BRAND_DESCRIPTION": ""} partie_prenante = extract_pp(text, input_variables) if "444" in partie_prenante: #444 is the code for no brand found , chosen st.session_state["not_pp"] = "444" elif "445" in partie_prenante: #445 is the code for no website found with the given url st.error("Aucun site web trouvé avec l'url donnée") st.session_state["not_pp"] = "" else: st.session_state["not_pp"] = "" partie_prenante = sorted(partie_prenante) st.session_state["urls"].append(file_name) add_pp(partie_prenante) if option == "A partir de cartographie existante": uploaded_file = st.file_uploader("Télécharger le fichier CSV", type="csv") if uploaded_file is not None: if st.button("ajouter",key="add_pp_csv"): file_name = uploaded_file.name with open(file_name, mode='wb') as w: w.write(uploaded_file.getvalue()) try: load_csv(file_name) brand_name_from_csv = file_name.split("-")[1] st.session_state["Nom de la marque"] = brand_name_from_csv except Exception as e: st.error("Erreur lors de la lecture du fichier") if st.session_state["not_pp"] == "444": st.warning("Aucune parties prenantes n'est identifiable sur l'URL fournie. Fournissez une autre URL ou bien cliquez sur le boutton ci-dessous pour un Conseils IA") if st.button("Conseil IA"): show_conseil_ia() #display docs if st.session_state["latest_doc"] != "": with st.expander("Cliquez ici pour éditer et voir le document"): docs = st.session_state["latest_doc"] cleaned_text = re.sub(r'\n\n+', '\n\n', docs[0].page_content.strip()) text_value = st.text_area("Modifier le texte ci-dessous:", value=cleaned_text, height=300) if st.button('Sauvegarder',key="save_doc_fake"): st.success("Texte sauvegardé avec succès!") display_list_urls() with st.expander("Liste des parties prenantes"): add_pp_input_text() display_list_pps() test_chart()