import streamlit as st import pandas as pd from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import time import os import base64 import xlsxwriter import numpy as np from tqdm.auto import tqdm def find_best_keyword(title_idx, similarity_matrix, used_keywords, keywords): best_index = -1 max_similarity = -1 for keyword_idx in range(len(keywords)): similarity = similarity_matrix[title_idx, keyword_idx] if keywords[keyword_idx] not in used_keywords and similarity > max_similarity: max_similarity = similarity best_index = keyword_idx return best_index def find_best_keyword_for_all_titles(similarity_matrix, used_keywords, keywords, titles): best_matches = [] for title_idx in range(similarity_matrix.shape[0]): best_index, max_similarity = -1, -1 for keyword_idx in range(similarity_matrix.shape[1]): if keywords[keyword_idx] not in used_keywords and similarity_matrix[title_idx, keyword_idx] > max_similarity: best_index, max_similarity = keyword_idx, similarity_matrix[title_idx, keyword_idx] if best_index != -1: used_keywords.add(keywords[best_index]) best_matches.append((title_idx, best_index, max_similarity)) unused_keywords = set(keywords) - used_keywords for title_idx, title in enumerate(titles): if not unused_keywords: break if best_matches[title_idx][1] == -1: best_keyword = unused_keywords.pop() best_matches[title_idx] = (title_idx, keywords.index(best_keyword), 0) used_keywords.add(best_keyword) return best_matches def update_generic_kw_sheet(xls, all_used_keywords): xls["Generic KW"]['Used'] = xls["Generic KW"]['Keyword'].apply(lambda x: x in all_used_keywords) return xls def remove_duplicate_keywords(xls): unique_keywords = set() for idx in range(0, len(xls.keys()), 2): kw_sheet_name = list(xls.keys())[idx] if kw_sheet_name == "Generic KW": # Skip the "Generic KW" sheet continue df_keywords = xls[kw_sheet_name] df_keywords = df_keywords.drop_duplicates(subset=['Keyword'], keep='first') # Remove duplicates across tabs df_keywords = df_keywords[~df_keywords['Keyword'].isin(unique_keywords)] unique_keywords.update(df_keywords['Keyword'].tolist()) xls[kw_sheet_name] = df_keywords return xls def remove_duplicate_keywords(xls): for sheet_name in xls.keys(): if sheet_name.startswith('KW '): df_keywords = xls[sheet_name] # Check if the 'Keyword' column exists in the dataframe if 'Keyword' in df_keywords.columns: df_keywords = df_keywords.drop_duplicates(subset=['Keyword'], keep='first') xls[sheet_name] = df_keywords return xls def find_best_product(keyword_idx, similarity_matrix, used_products, products): best_index = -1 max_similarity = -1 for product_idx in range(len(products)): similarity = similarity_matrix[product_idx, keyword_idx] if product_idx not in used_products and similarity > max_similarity: max_similarity = similarity best_index = product_idx return best_index def match_keywords_and_titles_case1(file_path): # Read the excel file xls = pd.read_excel(file_path, sheet_name=None) # Remove duplicate keywords xls = remove_duplicate_keywords(xls) total_tabs = len(xls.keys()) // 2 processed_tabs = 0 # Initialize the timer and weights start_time = time.time() initial_estimate = 600 # 10 minutes in seconds #weights = [max(len(xls[sheet_names[idx]]['Keyword'].unique()), len(xls[sheet_names[idx + 1]]['Title'])) for idx in range(0, len(sheet_names), 2)] # Match the keywords and titles sheet_names = list(xls.keys()) sheet_names = [sheet_name for sheet_name in sheet_names if not sheet_name.startswith("Generic KW")] weights = [max(len(xls[sheet_name]['Keyword'].unique()), len(xls[sheet_names[idx + 1]]['Title'])) if 'Title' in xls[sheet_names[idx + 1]].columns else len(xls[sheet_name]['Keyword'].unique()) for idx, sheet_name in enumerate(sheet_names[::2])] # Get the sheet names without the "Generic KW" sheet # sheet_names = [name for name in xls.keys() if name != "Generic KW"] model = SentenceTransformer('sentence-transformers/distiluse-base-multilingual-cased-v2') all_used_keywords = set() for idx in range(0, len(sheet_names), 2): kw_sheet_name = sheet_names[idx] if idx + 1 < len(sheet_names): title_sheet_name = sheet_names[idx + 1] else: break st.write(f"Processing sheets: {kw_sheet_name}, {title_sheet_name}") # Load keywords and create embeddings df_keywords = xls[kw_sheet_name] keywords = df_keywords['Keyword'].tolist() keyword_embeddings = model.encode(keywords) # Load titles and create embeddings df_titles = xls[title_sheet_name] title_embeddings = model.encode(df_titles['Title'].tolist()) # Calculate the similarity matrix similarity_matrix = cosine_similarity(title_embeddings, keyword_embeddings) # Find best keyword for each title best_keywords = [] used_keywords = set() # if len(keywords) < len(title_embeddings): # New logic when there are fewer keywords than product titles # best_matches = find_best_keyword_for_all_titles(similarity_matrix, used_keywords, keywords, title_embeddings) # for title_idx, best_index, max_similarity in best_matches: # if best_index != -1: # best_keyword = keywords[best_index] # used_keywords.add(best_keyword) # else: # best_keyword = "" # best_keywords.append(best_keyword) # else: # for title_idx in range(len(title_embeddings)): # best_index = find_best_keyword(title_idx, similarity_matrix, used_keywords, keywords) # if best_index != -1: # best_keyword = keywords[best_index] # used_keywords.add(best_keyword) # else: # best_keyword = "" # best_keywords.append(best_keyword) if len(keywords) < len(title_embeddings): # New logic when there are fewer keywords than product titles used_products = set() best_products = [] for keyword_idx in range(len(keywords)): best_product_idx = find_best_product(keyword_idx, similarity_matrix, used_products, title_embeddings) used_products.add(best_product_idx) best_products.append((keyword_idx, best_product_idx)) best_keywords = [""] * len(title_embeddings) for keyword_idx, best_product_idx in best_products: best_keywords[best_product_idx] = keywords[keyword_idx] else: for title_idx in range(len(title_embeddings)): best_index = find_best_keyword(title_idx, similarity_matrix, used_keywords, keywords) if best_index != -1: best_keyword = keywords[best_index] used_keywords.add(best_keyword) else: best_keyword = "" best_keywords.append(best_keyword) # Add best keywords to the DataFrame df_titles['Best_Keyword'] = best_keywords # Update the keyword DataFrame with usage information df_keywords['Used'] = df_keywords['Keyword'].apply(lambda x: x in used_keywords) all_used_keywords.update(used_keywords) # Save the updated DataFrames back to the xls dictionary xls[kw_sheet_name] = df_keywords xls[title_sheet_name] = df_titles processed_tabs += 1 progress = (processed_tabs / total_tabs) * 100 st.write(f"Progress: {progress:.2f}% completed") # Calculate the estimated time remaining elapsed_time = time.time() - start_time if processed_tabs == 1: total_time_estimate = elapsed_time * (sum(weights) / weights[0]) else: completed_weight = sum(weights[:processed_tabs]) remaining_weight = sum(weights[processed_tabs:]) total_time_estimate = elapsed_time * (sum(weights) / completed_weight) remaining_time = total_time_estimate - elapsed_time st.write(f"Progress: {progress:.2f}% completed", "----", f"Estimated time remaining: {remaining_time:.2f} seconds") # Update the "Generic KW" sheet with all used keywords xls = update_generic_kw_sheet(xls, all_used_keywords) # Write the updated dataframes to a new excel file output_file_path = f'output_{os.path.basename(file_path)}' with pd.ExcelWriter(output_file_path) as writer: for sheet_name, df in xls.items(): df.to_excel(writer, sheet_name=sheet_name, index=False) return output_file_path def match_keywords_and_titles_case2(file_path): xls = pd.read_excel(file_path, sheet_name=None) # Load keywords and create embeddings df_keywords = xls['Generic KW'] keywords = df_keywords['Keyword'].tolist() model = SentenceTransformer('sentence-transformers/distiluse-base-multilingual-cased-v2') sentences = keywords embeddings = model.encode(sentences) # Initialize keyword usage tracking keyword_used = {keyword: False for keyword in keywords} # Loop through the sheet names excluding the first one (Generic KW) and concatenate them sheet_names = list(xls.keys())[1:] df_all_titles = pd.concat([xls[sheet_name] for sheet_name in sheet_names], keys=sheet_names) # Calculate title embeddings for the concatenated DataFrame title_embeddings = model.encode(df_all_titles['Title'].tolist()) # Calculate the similarity matrix similarity_matrix = cosine_similarity(title_embeddings, embeddings) # Find initial best keyword for each title best_keywords = [] for title_idx in range(len(title_embeddings)): best_index = -1 max_similarity = -1 for keyword_idx in range(len(embeddings)): similarity = similarity_matrix[title_idx, keyword_idx] if not keyword_used[sentences[keyword_idx]] and similarity > max_similarity: max_similarity = similarity best_index = keyword_idx best_keyword = sentences[best_index] keyword_used[best_keyword] = True best_keywords.append(best_keyword) # Perform additional rounds of matching with a limited number of rounds max_rounds = 3 rounds = 0 while rounds < max_rounds: improvement_found = False for title_idx in range(len(title_embeddings)): current_keyword = best_keywords[title_idx] current_keyword_idx = sentences.index(current_keyword) for keyword_idx in range(len(embeddings)): if not keyword_used[sentences[keyword_idx]]: continue other_title_idx = best_keywords.index(sentences[keyword_idx]) current_similarity = similarity_matrix[title_idx, current_keyword_idx] new_similarity = similarity_matrix[title_idx, keyword_idx] if new_similarity > current_similarity: other_similarity = similarity_matrix[other_title_idx, keyword_idx] other_new_similarity = similarity_matrix[other_title_idx, current_keyword_idx] if other_new_similarity > other_similarity: # Swap the keywords and set improvement_found to True best_keywords[title_idx] = sentences[keyword_idx] best_keywords[other_title_idx] = current_keyword improvement_found = True if not improvement_found: break rounds += 1 # Add best keywords to the concatenated DataFrame df_all_titles['Best_Keyword'] = best_keywords # Split the concatenated DataFrame back into individual sheets for sheet_name in sheet_names: xls[sheet_name] = df_all_titles.loc[sheet_name].reset_index(drop=True) # Update the original dataframe with keyword usage information used_keywords = set(best_keywords) df_keywords['Used'] = df_keywords['Keyword'].apply(lambda x: x in used_keywords) xls['Generic KW'] = df_keywords # Write the updated dataframes to a new excel file output_file_path = f'output_{os.path.basename(file_path)}' with pd.ExcelWriter(output_file_path) as writer: for sheet_name, df in xls.items(): df.to_excel(writer, sheet_name=sheet_name, index=False) return output_file_path def get_binary_file_downloader_html(bin_file, file_label='File'): with open(bin_file, 'rb') as f: data = f.read() bin_str = base64.b64encode(data).decode() href = f'Download {file_label}' return href st.set_page_config(page_title="Title and keywords matching app", page_icon=":rocket:", layout="wide") st.title("Title and keywords matching app") uploaded_file = st.file_uploader("Choose an Excel file", type=['xlsx']) if uploaded_file is not None: file_details = {"FileName": uploaded_file.name, "FileType": uploaded_file.type, "FileSize": uploaded_file.size} #st.write(file_details) file_path = f"uploaded_{uploaded_file.name}" with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) tool_choice = st.radio("Choose a tool:", ("Tool 1", "Tool 2")) if st.button("Get matches in Excel file"): with st.spinner("Processing..."): if tool_choice == "Tool 1": output_file_path = match_keywords_and_titles_case1(file_path) # Show a web notification when the process is completed #toaster.show_toast("Title", "Your excel file is ready", duration=5) else: output_file_path = match_keywords_and_titles_case2(file_path) #toaster.show_toast("Title", "Your excel file is ready", duration=5) st.success("Done!") st.markdown(get_binary_file_downloader_html(output_file_path, f"Output_{uploaded_file.name}"), unsafe_allow_html=True)