Spaces:
Sleeping
Sleeping
from langchain_chroma import Chroma | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from sentence_transformers import SentenceTransformer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from openpyxl import load_workbook | |
import pandas as pd | |
import gradio as gr | |
import numpy as np | |
import re | |
# Chroma DB 로드 | |
###기본(BGE) | |
model_huggingface_ori = HuggingFaceEmbeddings(model_name='BAAI/bge-m3') | |
persist_directory_ori = './chroma_kpi_250416' | |
kpi_pool_ori = Chroma(persist_directory=persist_directory_ori, embedding_function=model_huggingface_ori) | |
print(kpi_pool_ori._collection.count()) | |
persist_directory_ori2 = './chroma_ncs_241230' | |
ncs_db_ori = Chroma(persist_directory=persist_directory_ori2, embedding_function=model_huggingface_ori) | |
print(ncs_db_ori._collection.count()) | |
### | |
model_huggingface = HuggingFaceEmbeddings(model_name='snunlp/KR-SBERT-V40K-klueNLI-augSTS') | |
persist_directory1 = './chroma_kpi_250528_SBERT' | |
kpi_pool = Chroma(persist_directory=persist_directory1, embedding_function=model_huggingface) | |
print(kpi_pool._collection.count()) | |
#### | |
model_huggingface2 = HuggingFaceEmbeddings(model_name='jhgan/ko-sbert-sts') | |
persist_directory_jhgan1 = './chroma_kpi_250528_SBERTjhgan' | |
kpi_pool2 = Chroma(persist_directory=persist_directory_jhgan1, embedding_function=model_huggingface2) | |
print(kpi_pool2._collection.count()) | |
def search_unit(unit_query): | |
results = ncs_db_ori.similarity_search_with_relevance_scores(unit_query, k=7) | |
# 검색 결과 텍스트 포맷팅 | |
text = "" | |
for i, doc in enumerate(results): | |
# 텍스트와 메타데이터 처리 | |
unit = doc[0].page_content.replace(", ", " / ") | |
job_name = doc[0].metadata['세분류코드명'] | |
# 코드 계층 처리 | |
code = [ | |
doc[0].metadata['대분류코드명'], | |
doc[0].metadata['중분류코드명'], | |
doc[0].metadata['소분류코드명'] | |
] | |
code_str = " > ".join(code) | |
# 텍스트 포맷팅 | |
similarity = round(doc[1], 3) | |
text += f""" | |
<span style="font-size: 18px;">**[{i+1}] {job_name}**</span> | |
<span style="font-size: 13px;"> | {code_str} | similarity {similarity} </span> | |
<br> {unit} <br> | |
""" | |
return text | |
def search_unit_all(unit_query): | |
text_bge = search_unit(unit_query, "BGE") | |
text_snu = search_unit(unit_query, "SBERT-snunlp") | |
#text_jh = search_unit(unit_query, "SBERT-jhgan") | |
return text_bge, text_snu | |
downsample_model_ori = SentenceTransformer('BAAI/bge-m3') | |
downsample_model = SentenceTransformer('snunlp/KR-SBERT-V40K-klueNLI-augSTS') | |
downsample_model_2 = SentenceTransformer('jhgan/ko-sbert-sts') | |
def filter_semantically_similar_texts_by_embedding(df, mode, embedding_field='embedding_input', similarity_threshold=0.8): | |
texts = df[embedding_field].tolist() | |
# 텍스트를 임베딩 | |
if mode == "BGE": | |
embeddings = downsample_model_ori.encode(texts) | |
elif mode == "SBERT-snunlp": | |
embeddings = downsample_model.encode(texts) | |
else: | |
embeddings = downsample_model_2.encode(texts) | |
# 코사인 유사도 행렬 계산 | |
similarity_matrix = cosine_similarity(embeddings) | |
np.fill_diagonal(similarity_matrix, 0) | |
# 유사도가 threshold 이상인 항목 필터링 | |
filtered_indices = [] | |
excluded_indices = set() | |
for i in range(len(texts)): | |
if i not in excluded_indices: | |
filtered_indices.append(i) | |
similar_indices = np.where(similarity_matrix[i] > similarity_threshold)[0] | |
excluded_indices.update(similar_indices) | |
return df.iloc[filtered_indices].reset_index(drop=True) | |
def search_kpi(kpi_query, kpi_count, mode): | |
if mode == "BGE": | |
print("BGE 검색 시작") | |
results = kpi_pool_ori.similarity_search_with_relevance_scores(kpi_query, k=50) | |
elif mode == "SBERT-snunlp": | |
print("SBERT-snunlp 검색 시작") | |
results = kpi_pool.similarity_search_with_relevance_scores(kpi_query, k=50) | |
else: | |
print("SBERT-jhgan 검색 시작") | |
results = kpi_pool2.similarity_search_with_relevance_scores(kpi_query, k=50) | |
# 메타데이터 + 점수 추출 | |
records = [ | |
{**doc.metadata, '유사도점수': score} | |
for doc, score in results | |
] | |
# DataFrame으로 변환 | |
df = pd.DataFrame(records) | |
df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향'] | |
df = df.drop_duplicates(subset=['정의', '산식']).head(15) | |
df = df.iloc[:kpi_count] | |
df = df.reset_index(drop=True) | |
# 카테고리 생성 (BSC 관점 + 전략방향) | |
df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향'] | |
visible_df = df[['지표명', '산식', '비고', '카테고리']].copy() | |
kpi_list = list(range(1, len(visible_df) + 1)) | |
kpi_df = df[['지표명', '정의', '산식', '유형', '비고', 'BSC 관점', '전략방향', '전략과제']].copy() | |
return gr.update(visible=True), gr.update(choices=kpi_list), visible_df, kpi_df, kpi_list, gr.update(visible=False) | |
def search_kpi_one(kpi_query, kpi_count, mode): | |
if mode == "BGE": | |
print("BGE 검색 시작") | |
results = kpi_pool_ori.similarity_search_with_relevance_scores(kpi_query, k=50) | |
elif mode == "SBERT-snunlp": | |
print("SBERT-snunlp 검색 시작") | |
results = kpi_pool.similarity_search_with_relevance_scores(kpi_query, k=50) | |
else: | |
print("SBERT-jhgan 검색 시작") | |
results = kpi_pool2.similarity_search_with_relevance_scores(kpi_query, k=50) | |
# 메타데이터 + 점수 추출 | |
records = [ | |
{**doc.metadata, '유사도점수': score} | |
for doc, score in results | |
] | |
# DataFrame으로 변환 | |
df = pd.DataFrame(records) | |
df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향'] | |
df = df.drop_duplicates(subset=['정의', '산식']).head(15) | |
df = df.iloc[:kpi_count] | |
df = df.reset_index(drop=True) | |
# 카테고리 생성 (BSC 관점 + 전략방향) | |
df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향'] | |
visible_df = df[['지표명', '산식', '비고']].copy() | |
kpi_list = list(range(1, len(visible_df) + 1)) | |
kpi_df = df[['지표명', '정의', '산식', '유형', '비고', 'BSC 관점', '전략방향', '전략과제']].copy() | |
return visible_df, kpi_df, kpi_list | |
def format_df_html(df): | |
html = "" | |
for i, row in df.iterrows(): | |
html += f""" | |
<div style="margin-bottom: 5px;"> | |
<span style="font-size: 18px; font-weight: bold;">[{i+1}] {row['지표명']}</span><br> | |
<span style="font-size: 13px; color: gray;">{row['비고']}</span><br> | |
<div style="margin-top: 5px; font-size: 14px; color: #333;">{row['산식']} | |
</div> | |
<div style="height: 8px;"></div> | |
</div> | |
""" | |
return html | |
def search_kpi_all_models(kpi_query, kpi_count): | |
print("함수 호출, 테이블 생성 시작") | |
# 각 모델별 결과 | |
visible_bge, kpi_bge, list_bge = search_kpi_one(kpi_query, kpi_count, "BGE") | |
visible_sn, kpi_sn, list_sn = search_kpi_one(kpi_query, kpi_count, "SBERT-snunlp") | |
visible_jh, kpi_jh, list_jh = search_kpi_one(kpi_query, kpi_count, "SBERT-jhgan") | |
print("함수 종료") | |
visible_df = [visible_bge, visible_sn, visible_jh] | |
visible_df_text = [format_df_html(df) for df in visible_df] | |
#gr.update(visible=True), gr.update(choices=kpi_list), visible_df, kpi_df, kpi_list, gr.update(visible=False) | |
return ( | |
gr.update(visible=True), | |
gr.update(choices=list_bge), #체크박스리스트 | |
gr.update(choices=list_sn), | |
gr.update(choices=list_jh), | |
visible_df_text[0], # kpi_table1 | |
visible_df_text[1], # kpi_table2 | |
visible_df_text[2], # kpi_table3 | |
kpi_bge, kpi_sn, kpi_jh, | |
list_bge, list_sn, list_jh, | |
gr.update(visible=False) | |
) | |
# 셀 주소와 값을 매핑한 딕셔너리 생성 | |
def make_excel_table(dataframe, start_cell): | |
table_dict = {} | |
# 시작 셀 좌표 계산 | |
start_row = int(''.join(filter(str.isdigit, start_cell))) # 시작 행 (숫자) | |
start_col = ord(start_cell[0].upper()) - ord('A') + 1 # 시작 열 (문자 -> 숫자) | |
# 데이터프레임 반복 처리 | |
for row_index, row in enumerate(dataframe.itertuples(index=False), start=start_row): | |
for col_index, value in enumerate(row, start=start_col): | |
# 셀 주소 계산 (예: B5, C5, ...) | |
cell = f"{chr(ord('A') + col_index - 1)}{row_index}" | |
table_dict[cell] = value | |
return table_dict | |
# 다운로드 파일 생성 함수 | |
def generate_excel(df1, df2, df3, kpi_list1, kpi_list2, kpi_list3, kpi_query): | |
#각 모델별 filtered_df 생성 | |
def get_filtered(df, kpi_list, model_name): | |
if kpi_list: | |
indices = [int(i) - 1 for i in kpi_list] # -1 보정 | |
filtered = df.iloc[indices].copy() | |
filtered["출처"] = model_name | |
return filtered | |
else: | |
# 선택된 KPI 없을 때: 빈 DataFrame 반환 | |
return pd.DataFrame(columns=list(df.columns) + ["출처"]) | |
# 인덱스(-1 보정)로 DataFrame 필터링 | |
#filtered_df = df.iloc[[int(i) - 1 for i in kpi_list]] if kpi_list else pd.DataFrame(columns=df.columns) | |
filtered_df1 = get_filtered(df1, kpi_list1, "BGE3") | |
filtered_df2 = get_filtered(df2, kpi_list2, "snunlp") | |
filtered_df3 = get_filtered(df3, kpi_list3, "jhgan") | |
filtered_df = pd.concat([filtered_df1, filtered_df2, filtered_df3], ignore_index=True) #필터링내용 병합 | |
filtered_df = filtered_df.drop_duplicates(subset='산식') # '산식' 기준 중복 제거 | |
# 엑셀 파일 열기 | |
file_path = "./template.xlsx" | |
workbook = load_workbook(file_path) | |
sheet = workbook.active | |
update_values = make_excel_table(filtered_df, 'B4') | |
for cell, value in update_values.items(): | |
sheet[cell].value = value | |
# 워크시트 기본 확대 수준 설정(%) | |
sheet.sheet_view.zoomScaleNormal = 85 | |
# 파일 저장 | |
filename = f"KPI_POOL_{kpi_query}.xlsx" | |
safe_filename = re.sub(r'\s*/\s*', '_', filename) | |
safe_filename = re.sub(r'\s+', ' ', safe_filename) | |
output_file = safe_filename.strip() | |
workbook.save(output_file) | |
return gr.update(value=output_file, visible=True) | |
def toggle_selection(current_selection, kpi_list): | |
if set(current_selection) == set(kpi_list): # 이미 전체 선택된 경우 | |
return [] | |
else: # 전체 선택 안 된 경우 | |
return kpi_list | |
def toggle_all_selections(sel1, list1, sel2, list2, sel3, list3): | |
def toggle(current, full_list): | |
return [] if set(current) == set(full_list) else full_list | |
return ( | |
toggle(sel1, list1), | |
toggle(sel2, list2), | |
toggle(sel3, list3) | |
) | |
css = """ | |
/* 데이터프레임 스타일 */ | |
.gradio-container table { | |
table-layout: fixed; | |
width: 100%; | |
} | |
.gradio-container td{ | |
white-space: nowrap !important; | |
overflow-x: auto !important; | |
text-align: left; | |
font-size: 13px; | |
letter-spacing: -1px !important; | |
} | |
/* 헤더 기본 스타일 */ | |
.gradio-container th[aria-sort]::after { | |
visibility: hidden !important; /* 아이콘만 감춤 */ | |
} | |
.gradio-container th .header-content { | |
justify-content: center !important; | |
text-align: center; | |
font-size: 13px; | |
letter-spacing: -1px !important; | |
} | |
.gradio-container th span { | |
text-align: center !important; | |
display: block !important; | |
width: 100%; | |
} | |
.v_check { padding-top: 39px !important; | |
margin-right: 0px !important; | |
padding-right: 0px !important; | |
margin-left: 0px !important; | |
padding-left: 0px !important; | |
} | |
.v_check div { display: block !important; } | |
.v_check label { | |
max-width: 80%; /* 전체 너비 유지 */ | |
padding: 2px; /* 내부 여백 조정 */ | |
margin-bottom: 52px; /* 라벨 간 간격 설정 */ | |
border: 1px solid transparent !important; | |
letter-spacing: -1px !important; /* 자간 좁게 설정*/ | |
justify-content: center; | |
} | |
div.svelte-1nguped { | |
background: transparent !important; | |
border: none !important; | |
} | |
.left-padding { padding-left: 43px !important; /* 왼쪽 패딩 추가 */ } | |
.custom-markdown h3 { | |
font-size: 18px; /* 본문 및 목록 글자 크기 */ | |
} | |
.custom-markdown blockquote { | |
margin-bottom: 8px !important; | |
} | |
.custom-markdown p, .custom-markdown li { | |
margin-top: 8px !important; | |
font-size: 15px; /* 본문 및 목록 글자 크기 */ | |
line-height: 1.5; | |
} | |
.custom-markdown a { | |
font-size: 15px; | |
color: #000000; | |
} | |
.no-margine { | |
margin-bottom: 0px !important; | |
padding-bottom: 0px !important; | |
margin-top: 0px !important; | |
padding-top: 0px !important; | |
gap: 0px !important; | |
} | |
""" | |
guide = """> ### 저작권 및 유의사항 안내 | |
- 본 앱은 시앤피컨설팅이 개발한 KPI POOL 검색 도구로, AI 기반 추천 결과는 참고용으로 제공됩니다. | |
- AI 기반 추천 알고리즘은 전문 컨설팅을 대체할 수 없으며, 반드시 조직의 전략, 평가 목적, 데이터 수집 가능성 등과의 적합성 검토가 필요합니다. | |
- KPI 설정과 적용에 대한 개별 맞춤 검토는 시앤피컨설팅의 전문 컨설턴트에게 문의해 주세요. | |
<br><br> | |
> ### Contact Us | |
시앤피컨설팅그룹 일터혁신본부 | Tel. 02-6257-1448 | http://www.cnp.re.kr | [email protected] | |
""" | |
empty_df = pd.DataFrame(columns=["지표명", "산식", "비고", "카테고리"]) | |
with gr.Blocks(css=css, fill_width=True) as demo: | |
df_state1 = gr.State() | |
df_state2 = gr.State() | |
df_state3 = gr.State() | |
check_state1 = gr.State() | |
check_state2 = gr.State() | |
check_state3 = gr.State() | |
with gr.Row(): | |
#mode = gr.Dropdown(choices={"BGE","SBERT-snunlp","SBERT-jhgan"}, label="모델을 선택하세요") | |
gr.Markdown(" ") | |
with gr.Tab("KPI Pool 검색"): | |
with gr.Column(elem_classes="left-padding"): | |
with gr.Row(equal_height=True): | |
kpi_query = gr.Textbox(scale=30, submit_btn=True, | |
label= "성과평가를 진행할 [핵심업무 or 핵심성공요인]을 입력해주세요😊! (검색 키워드는 직무기술서 또는 NCS 능력단위 참고)", | |
placeholder="예: 자금 → 자금조달 / 재무위험관리 / 자금운용") | |
kpi_count = gr.Slider(label="KPI 출력 개수", value = 7, minimum=5, maximum=10, step=1, scale=7) | |
copyright = gr.Markdown(guide, visible=True, elem_classes="custom-markdown") | |
with gr.Column(visible=False) as output_area: | |
with gr.Group(): | |
with gr.Row(): | |
with gr.Group(): | |
with gr.Tab("BAAI"): | |
with gr.Row(): | |
kpi_checkbox1 = gr.CheckboxGroup(choices=[], interactive=True, elem_classes="v_check", container=False, min_width=5, scale=1) | |
with gr.Column(scale=11): | |
kpi_table1 = gr.HTML(label="BGE 결과") | |
with gr.Group(): | |
with gr.Tab("snunlp"): | |
with gr.Row(): | |
kpi_checkbox2 = gr.CheckboxGroup(choices=[], interactive=True, elem_classes="v_check", container=False, min_width=5, scale=1) | |
with gr.Column(scale=11): | |
kpi_table2 = gr.HTML(label="SBERT-snunlp 결과") | |
with gr.Group(): | |
with gr.Tab("jhgan"): | |
with gr.Row(): | |
kpi_checkbox3 = gr.CheckboxGroup(choices=[], interactive=True, elem_classes="v_check", container=False, min_width=5, scale=1) | |
with gr.Column(scale=11): | |
kpi_table3 = gr.HTML(label="SBERT-jhgan 결과") | |
with gr.Row(): | |
gr.Column(scale=2) | |
select_button = gr.Button("All", scale=1) | |
download_button = gr.Button("Download",scale=1) | |
clear_button = gr.Button("Clear",scale=1) | |
gr.Column(scale=2) | |
file_download = gr.Files(label="Download", interactive=False, visible=False) | |
#kpi_query.submit(search_kpi, inputs = [kpi_query, kpi_count, mode], outputs = [output_area, kpi_checkbox, kpi_table, df_state, check_state, copyright]) | |
kpi_query.submit( | |
search_kpi_all_models, | |
inputs = [kpi_query, kpi_count], | |
outputs = [ | |
output_area, | |
kpi_checkbox1, kpi_checkbox2, kpi_checkbox3, | |
kpi_table1, kpi_table2, kpi_table3, | |
df_state1, df_state2, df_state3, | |
check_state1,check_state2,check_state3, | |
copyright | |
] | |
) | |
#select_button.click(fn=toggle_selection, inputs=[kpi_checkbox, check_state], outputs=kpi_checkbox, show_progress='hidden') | |
select_button.click( | |
fn=toggle_all_selections, | |
inputs=[ | |
kpi_checkbox1, check_state1, | |
kpi_checkbox2, check_state2, | |
kpi_checkbox3, check_state3 | |
], | |
outputs=[ | |
kpi_checkbox1, | |
kpi_checkbox2, | |
kpi_checkbox3 | |
], | |
show_progress='hidden' | |
) | |
download_button.click( | |
generate_excel, | |
inputs=[df_state1, df_state2, df_state3, kpi_checkbox1, kpi_checkbox2, kpi_checkbox3, kpi_query], | |
outputs=[file_download] | |
) | |
clear_button.click( | |
fn=lambda: (None, None, None, | |
None, gr.update(visible=False), | |
gr.update(choices=[], value=[]),gr.update(choices=[], value=[]),gr.update(choices=[], value=[]), | |
gr.update(value=""),gr.update(value=""),gr.update(value=""), | |
gr.update(value=None, visible=False), gr.update(visible=True)), | |
outputs=[df_state1, df_state2, df_state3, | |
kpi_query, output_area, | |
kpi_checkbox1, kpi_checkbox2, kpi_checkbox3, | |
kpi_table1, kpi_table2, kpi_table3, | |
file_download, copyright], | |
show_progress='hidden' | |
) | |
with gr.Tab("[참고] NCS 능력단위"): | |
unit_query = gr.Textbox(label="업종 or 직종 + 직무명을 입력하세요😊", scale=1, submit_btn=True, | |
placeholder="예: 의약품 법률자문, 공공행정 경영기획, 재무회계 자금") | |
with gr.Row(): | |
with gr.Group(): | |
unit_result = gr.Markdown() | |
#with gr.Group(): | |
# with gr.Tab("SBERT-snunlp"): | |
# unit_result2 = gr.Markdown() | |
unit_query.submit(search_unit, inputs=[unit_query], outputs=[unit_result]) | |
#unit_query.submit(search_unit_all, inputs=unit_query, outputs=[unit_result1, unit_result2]) | |
demo.launch(debug=True) |