CC-KuLLM3-LoRA / ai_hub_CC_QA_data_preprocessing_multithread_for_hf.py
song9's picture
data preprocessing codes
4fd2346 verified
from tqdm import tqdm
import os
import pandas as pd
import json
import time
import re
import threading
from concurrent.futures import ThreadPoolExecutor
from transformers import AutoTokenizer
import torch
from enum import Enum
import traceback
# ignore warning
import warnings
warnings.filterwarnings("ignore")
os.environ['HF_TOKEN'] = "YOUR HUGGINGFACE ACCESS TOKEN"
os.environ["HUGGINGFACEHUB_API_TOKEN"] = "YOUR HUGGINGFACE ACCESS TOKEN"
''' directory tree
.root
├── Training
│   ├── cs
│   ├── dasan
│   ├── finance
│   └── health
└── Validation
├── cs
├── dasan
├── finance
└── health
'''
root_dir = os.path.join('SET YOUR DATA DIR PATH TO root')
is_train = "Training" # Set to Validation if you want to preprocess validation data
## 데이터 종류 설정
# cs (고객 상담), dasan (다산콜센터), finance (금융/보험), health (질병관리본부)
category = ['dasan', 'finance', 'health', 'cs']
filedirpaths = []
for c in category:
filedirpaths.append(os.path.join(root_dir, is_train, c))
filepaths = []
for fdp in filedirpaths:
filenames = os.listdir(fdp)
for filename in filenames:
filepaths.append(os.path.join(fdp,filename))
filepaths = list(filter(lambda x:'.json' in x, filepaths)) # json 파일만 추출
# 전처리된 파일들을 chat-template-formatting하여 데이터프레임으로 저장
ddf_columns = ['category','seq_id','dialogue']
dialogue_df = pd.DataFrame(
data=[['' for _ in ddf_columns]],
columns=ddf_columns
)
dialogue_df_lock = threading.Lock() # 멀티 스레딩 시, 해당 객체로의 동시 접근을 방지
def data_preprocessing(filepath):
ddf_columns = ['category','seq_id','dialogue']
with open(os.path.join('SET PATH of short_sentences.json'), 'r', encoding='utf-8') as f:
shorts_to_filter = json.load(f)
def set_private_token(text):
# 정규 표현식 패턴: 2개 이상 반복되는 'x', 'X', 'o', 'O', '0', '-', 'ㅇ'
pattern = r"(x{2,}|X{2,}|o{2,}|O{2,}|0{2,}|\-{2,}|ㅇ{2,})"
# 해당 패턴을 '<|private|>'로 대체
return re.sub(pattern, '<|private|>', text)
def merge_continuous_role(conversation):
merged_conversation = []
current_entry = None
for entry in conversation:
# 현재 entry가 비어있으면 새로운 entry로 초기화
if current_entry is None:
current_entry = entry
# 이전 entry와 role이 같으면 content를 합침
elif current_entry['role'] == entry['role']:
current_entry['content'] += ' ' + entry['content']
# role이 다르면, 현재까지의 entry를 결과에 추가하고 새로운 entry로 초기화
else:
merged_conversation.append(current_entry)
current_entry = entry
# 마지막 entry 추가
if current_entry is not None:
merged_conversation.append(current_entry)
return merged_conversation
try:
# print(filepath)
global dialogue_df
tokenizer = AutoTokenizer.from_pretrained("song9/CC-KuLLM3-LoRA")
savepath = os.path.join(f"{filepath.split('.json')[0]}.tsv")
columns = ['domain','category','id','speaker','sequence','cust_intent','couns_intent','QA','cust_q','couns_q','cust_a','couns_a','entities','dictionary','knowlegde_base']
### 아래 if문에서 else문의 과정(json파일들을 읽어서 하나의 dataframe으로 결합하는 과정)이 가장 시간이 오래 걸린다.
# # else문은 대략 8시간이 소요된다. 한 번만 dataframe을 저장하면 아래 로직들은 금방 끝난다.
if os.path.exists(savepath): # 이미 dialogue json을 tsv파일로 변환한게 있으면 그 파일을 사용
df = pd.read_csv(savepath, sep='\t')
# print(f"✅ {savepath} tsv file read")
else: # json 파일을 dataFrame으로 파싱하는 과정
df = pd.DataFrame(data=[['0' for _ in range(len(columns))]],columns=columns)
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
for d in tqdm(data):
df = pd.concat([df, pd.DataFrame(data=[d.values()], columns=columns)], axis=0)
df = df.iloc[1:]
df.reset_index(inplace=True, drop=True)
df.to_csv(savepath, sep='\t', index=False)
# print(f"✅ {savepath} tsv file saved")
# Preprocessing
# sequence가 1인 row는 제거
# sequence 컬럼을 int로 변환
df['sequence'] = df['sequence'].astype(int)
# sequence가 1인 대화 id -> 아래서 확인하는 걸로 변경
# one_seq_id = pd.DataFrame(df.groupby('id')['sequence'].max()).query('id == 1').reset_index()['id'].tolist()
# df = df[~df['id'].isin(one_seq_id)]
temp_dialogue_df = pd.DataFrame(
data=[['' for _ in ddf_columns]],
columns=ddf_columns
)
# system_prompt = '당신은 고객의 전화에 응대하는 전화 상담원입니다.\n 당신은 비도덕적이거나, 성적이거나, 불법적이거나 또는 사회 통념적으로 허용되지 않는 발언은 하지 않습니다.\n 고객에게 친절하게 대화하며, 고객의 응답에 가능한 정확하고 예의 바르게 응답함으로써 최대한 도와주려고 노력합니다.\n 고객의 질문을 이해하지 못했다면, 어떤 부분을 이해하지 못했는지 설명하고 고객에게 구체적인 질문을 요구합니다.\n 당신은 고객과 전화로 소통하기 때문에 답변이 간결해야 합니다. 거짓 정보를 발언하지 않도록 주의합니다.'
# sys_pattern = r'<<SYS>>\n.*\n<</SYS>>'
# 여러 대화들이 하나의 dataframe에 섞여있는데 이를 분리하여 각각의 json파일로 저장
# 모든 대화들을 chat dataset formatting하여 json 파일로 저장.
# 같은 시퀀스 id별로 df_prep를 나눠 하나의 대화.json파일로 저장
for seq_id in tqdm(df['id'].unique()):
# print(seq_id)
sequence_df = df[df['id']==seq_id] # 같은 seq_id의 대화들만 있는 dataframe
if len(sequence_df) < 9: # 고객-상담원 turn이 5번 이상인 대화들만 처리
continue
# print(sequence_df[['cust_q','couns_q','cust_a','couns_a']])
seq_category = os.path.dirname(filepath).split('/')[-1] # 해당 대화의 카테고리 (cs, finance, health, dasan)
category_dirpath = os.path.join(root_dir,is_train,f'{seq_category}-preprocessed')
if not os.path.exists(category_dirpath):
os.mkdir(category_dirpath)
savepath_processed_json = os.path.join(category_dirpath,f'{seq_category}_{seq_id}.json') # 대화 1개별로 1개의 json파일에 저장할 예정
# if os.path.exists(savepath_processed_json):
# with open(savepath_processed_json, 'r', encoding='utf-8') as f:
# dialogue = json.load(f)
# # print(f"✅ {savepath_processed_json} json file read")
# else:
# 혹시 모르니 sequence_df를 'sequence'컬럼을 기준으로 오름차순 정렬해줌.
sequence_df = sequence_df.sort_values('sequence', ascending=True)
sequence_df.reset_index(inplace=True, drop=True)
dialogue = []
past_role = ''
# dialogue = dialogue + [{'role':'system', 'content':'당신은 "위드마인드"에서 만든 "전화 상담원"입니다.\n 당신은 비도덕적이거나, 성적이거나, 불법적이거나 또는 사회 통념적으로 허용되지 않는 발언은 하지 않습니다.\n 고객에게 친절하게 대화하며, 고객의 응답에 가능한 정확하고 예의 바르게 응답함으로써 최대한 도와주려고 노력합니다.\n 고객의 질문을 이해하지 못했다면, 어떤 부분을 이해하지 못했는지 설명하고 고객에게 구체적인 질문을 요구합니다.\n 당신은 고객과 전화로 소통하기 때문에 답변이 간결해야 합니다. 거짓 정보를 발언하지 않도록 주의합니다.'}]
for i, row in sequence_df.iterrows():
# print(f"{i}행, {list(filter(lambda x: not pd.isna(x), sequence_df.loc[i,['cust_q','cust_a','couns_q','couns_a']].tolist()))[0][:10]}")
if row['speaker'] == '상담사':
role = 'assistant'
else:
role = 'user'
# print(f"role = {role}, past_role = {past_role}")
c = ''.join(filter(lambda x: not pd.isna(x), sequence_df.loc[i,['cust_q','cust_a','couns_q','couns_a']].tolist())).strip()
c = set_private_token(c)
if past_role == role: # 이전 발화자와 현재 발화자가 같으면 하나의 발화로 연결
# print(f"seq_id={i} : {role}이 2번 말함. <|interjection|> 토큰 추가")
# dialogue = dialogue + [{'role':"assistant" if role=="user" else "user", 'content':"<|interjection|>"}]
# 같은 발화자가 2번 연속 말하면 이전의 발화에 이어붙임
dialogue[-1]['content'] = dialogue[-1]['content'] + ' ' + c
past_role = role
continue
past_role = role
dialogue = dialogue + [{'role':role, 'content':c}]
# 만약 대화가 gpt(counselor)부터 시작한다면 human부터 시작하도록 해야 한다.
if dialogue[0].get('role') == 'assistant':
dialogue.insert(0,{'role':'user', 'content':'<|startofcall|>'})
else: # 고객이 대화 시작이어도 startofcall 토큰 추가
dialogue[0]['content'] = '<|startofcall|>' + dialogue[0]['content']
# 만약 마지막 대화가 고객이라면 고객의 발화를 제거한다.
if dialogue[-1].get('role') == 'user':
dialogue = dialogue[:-1]
# 고객, 상담원의 대화에서 길이가 9 미만이고 & short_sentence 구문이 포함되어 있으면 해당 대화는 제거함.
is_short = False
new_dialogue = []
dialogue_copy = dialogue.copy()
for i, turn in enumerate(dialogue_copy):
if i < 2:
new_dialogue.append(turn)
continue
cur_role = turn['role']
content = turn['content']
if i == len(dialogue_copy)-2:
new_dialogue.append(turn)
continue
if sum([x in content for x in shorts_to_filter]) > 0 and len(content)<9:
# print(content , '- is short')
is_short = True
continue
if is_short:
is_short = False
if cur_role != new_dialogue[-1]['role']:
new_dialogue.append(turn)
else:
new_dialogue[-1]['content'] += " " + content
else:
if cur_role != new_dialogue[-1]['role']:
new_dialogue.append(turn)
else:
new_dialogue[-1]['content'] += " " + content
new_dialogue = merge_continuous_role(new_dialogue) # 연속되는 role이 있는지 재확인한다.
dialogue = [
{
'dialogue':new_dialogue
}
]
with open(savepath_processed_json, 'w', encoding='utf-8') as f:
json.dump(dialogue, f, ensure_ascii=False)
# print(f"✅ {savepath_processed_json} json file saved")
# dialogue를 chat-template formatting 하여 dialogue_df에 저장
chat_template_formatted_dialogue = tokenizer.apply_chat_template(dialogue[0]['dialogue'], tokenize=False, )
# 맨 앞의 bos 토큰은 제거해준다.
# chat_template_formatted_dialogue = chat_template_formatted_dialogue[3:]
# chat_template_formatted_dialogue = re.sub(sys_pattern, f'<<SYS>>\n{system_prompt}\n<</SYS>>', chat_template_formatted_dialogue)
temp_dialogue_df = pd.concat([temp_dialogue_df, pd.DataFrame(
data=[[seq_category, seq_id, chat_template_formatted_dialogue]],
columns=ddf_columns
)])
# print(f"✅ seq_id별 json file saved")
temp_dialogue_df = temp_dialogue_df.iloc[1:]
if not os.path.exists(os.path.join(root_dir,is_train,'temp_tsvs')):
os.mkdir(os.path.join(root_dir,is_train,'temp_tsvs'))
tddf_savepath = os.path.join(root_dir,is_train,'temp_tsvs',os.path.basename(filepath).replace('.json','.tsv'))
temp_dialogue_df.to_csv(tddf_savepath, sep='\t', index=False)
# print(f"✅ {tddf_savepath} tsv file saved")
# Lock을 사용하여 dialogue_df에 안전하게 접근
with dialogue_df_lock:
dialogue_df = pd.concat([dialogue_df, temp_dialogue_df], axis=0)
except Exception as e:
print(f"에러 타입: {type(e).__name__}") # 예외의 타입
print(f"에러 메시지: {e}") # 예외 메시지
traceback.print_exc() # 전체 스택 트레이스 출력
# 멀티 쓰레딩을 사용하여 파일 처리
with ThreadPoolExecutor(max_workers=15) as executor: # 최대 스레드 수를 적절히 설정
executor.map(data_preprocessing, filepaths)
dialogue_df = dialogue_df.iloc[1:]
savepath_dialogue_df = os.path.join(root_dir, is_train, 'train_no_short.tsv')
dialogue_df.to_csv(savepath_dialogue_df, sep='\t', index=False)
print('✅dialogue_df 저장 완료')