|
|
import torch |
|
|
from transformers import ( |
|
|
LlamaForCausalLM, |
|
|
LlamaTokenizer, |
|
|
StoppingCriteria, |
|
|
) |
|
|
import os |
|
|
os.environ['CUDA_LAUNCH_BLOCKING'] = '1' |
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = '0' |
|
|
|
|
|
class Llama2Chinese: |
|
|
def __init__(self, model_path, mode='offline'): |
|
|
""" |
|
|
初始化LLM模板 |
|
|
|
|
|
Args: |
|
|
model_name_or_path (str): 模型名称或路径 |
|
|
mode (str, optional): 模式,'offline'表示离线模式,'api'表示使用API模式。默认为'offline'。 |
|
|
""" |
|
|
self.mode = mode |
|
|
self.load_in_8bit = True |
|
|
self.prefix_prompt = '''请用少于25个字回答以下问题 ''' |
|
|
self.history = [] |
|
|
self.model, self.tokenizer = self.init_model(model_path) |
|
|
self.model.eval() |
|
|
|
|
|
def init_model(self, model_path): |
|
|
""" |
|
|
初始化语言模型 |
|
|
|
|
|
Args: |
|
|
model_name_or_path (str): 模型名称或路径 |
|
|
|
|
|
Returns: |
|
|
model: 加载的语言模型 |
|
|
tokenizer: 加载的tokenizer |
|
|
""" |
|
|
tokenizer = LlamaTokenizer.from_pretrained(model_path) |
|
|
|
|
|
base_model = LlamaForCausalLM.from_pretrained( |
|
|
model_path, |
|
|
load_in_8bit=self.load_in_8bit, |
|
|
torch_dtype=torch.float16, |
|
|
low_cpu_mem_usage=True, |
|
|
device_map='cuda:0', |
|
|
) |
|
|
model_vocab_size = base_model.get_input_embeddings().weight.size(0) |
|
|
tokenzier_vocab_size = len(tokenizer) |
|
|
print(f"Vocab of the base model: {model_vocab_size}") |
|
|
print(f"Vocab of the tokenizer: {tokenzier_vocab_size}") |
|
|
if model_vocab_size != tokenzier_vocab_size: |
|
|
assert tokenzier_vocab_size > model_vocab_size |
|
|
print("Resize model embeddings to fit tokenizer") |
|
|
base_model.resize_token_embeddings(tokenzier_vocab_size) |
|
|
return base_model, tokenizer |
|
|
|
|
|
def generate(self, prompt, system_prompt="Below is an instruction that describes a task. Write a response that appropriately completes the request."): |
|
|
""" |
|
|
生成对话响应 |
|
|
|
|
|
Args: |
|
|
prompt (str): 对话的提示 |
|
|
system_prompt (str, optional): 系统提示。默认为""。 |
|
|
|
|
|
Returns: |
|
|
str: 对话响应 |
|
|
""" |
|
|
device = torch.device(0) |
|
|
|
|
|
|
|
|
|
|
|
if self.mode != 'api': |
|
|
try: |
|
|
|
|
|
question = self.message_to_prompt(prompt, system_prompt) |
|
|
|
|
|
|
|
|
|
|
|
inputs = self.tokenizer(question, return_tensors="pt") |
|
|
|
|
|
generation_config = dict( |
|
|
temperature=0.5, |
|
|
top_k=40, |
|
|
top_p=0.9, |
|
|
do_sample=True, |
|
|
num_beams=1, |
|
|
repetition_penalty=1.1, |
|
|
max_new_tokens=512 |
|
|
) |
|
|
generate_ids = self.model.generate( |
|
|
input_ids = inputs["input_ids"].to(device), |
|
|
attention_mask = inputs['attention_mask'].to(device), |
|
|
eos_token_id=self.tokenizer.eos_token_id, |
|
|
pad_token_id=self.tokenizer.pad_token_id, |
|
|
**generation_config |
|
|
) |
|
|
response = self.tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] |
|
|
response = response.split("### Response:")[-1].strip() |
|
|
|
|
|
|
|
|
return response |
|
|
except Exception as e: |
|
|
print(e) |
|
|
return "对不起,你的请求出错了,请再次尝试。\nSorry, your request has encountered an error. Please try again.\n" |
|
|
else: |
|
|
return self.predict_api(prompt) |
|
|
|
|
|
def message_to_prompt(self, message, system_prompt=""): |
|
|
system_prompt = self.prefix_prompt + system_prompt |
|
|
for interaction in self.history: |
|
|
user_prompt, bot_prompt = str(interaction[0]).strip(' '), str(interaction[1]).strip(' ') |
|
|
system_prompt = f"{system_prompt} ### Instruction:\n{user_prompt}\n\n### Response: {bot_prompt}\n\n" |
|
|
prompt = f"{system_prompt} ### Instruction:\n{message.strip()}\n\n### Response: " |
|
|
return prompt |
|
|
|
|
|
def predict_api(self, prompt): |
|
|
""" |
|
|
使用API预测对话响应 |
|
|
|
|
|
Args: |
|
|
prompt (str): 对话的提示 |
|
|
|
|
|
Returns: |
|
|
str: 对话响应 |
|
|
""" |
|
|
'''暂时不写api版本,与Linly-api相类似,感兴趣可以实现一下''' |
|
|
pass |
|
|
|
|
|
def chat(self, system_prompt, message): |
|
|
response = self.generate(message, system_prompt) |
|
|
self.history.append((message, response)) |
|
|
return response, self.history |
|
|
|
|
|
def clear_history(self): |
|
|
self.history = [] |
|
|
|
|
|
def test(): |
|
|
llm = Llama2Chinese("./Llama2-chat-13B-Chinese-50W") |
|
|
answer = llm.generate("如何应对压力") |
|
|
print(answer) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
test() |