gpt2chatbotenglish / model.py
Görkem Göknar
initial no big file
2f7b134
raw
history blame
19.2 kB
from transformers import AutoConfig
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from utils import SPECIAL_TOKENS, build_input_from_segments, add_special_tokens_
from utils import get_dataset, download_pretrained_model
import timeit
import logging
logging.basicConfig(format='%(asctime)s: %(message)s',level=logging.INFO)
logger = logging.getLogger(__file__)
import random
from itertools import chain
from pprint import pformat
#import warnings
import torch
import torch.nn.functional as F
import boto3
import os
import tarfile
import io
import base64
import json
import re
from types import SimpleNamespace
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
print("Loading Model.py module...")
s3 = boto3.client('s3')
def is_list_of_strings(lst):
if lst and isinstance(lst, list):
return all(isinstance(elem, str) for elem in lst)
else:
return False
class ServerlessModel:
def __init__(self, model_path=None, s3_bucket=None, file_prefix=None, efs_path=None):
#logging.basicConfig(level=logging.INFO)
#logger = logging.getLogger(__file__)
print("Trying to init model")
self.model = None
self.tokenizer = None
self.dataset = None
if s3_bucket is None:
if model_path is not None and efs_path is None :
print("Loading model from local..")
self.model, self.tokenizer, self.dataset = self.from_pretrained_local_path(model_path, file_prefix)
logging.debug("Done loading")
else:
##Load model from EFS, with config and tokenizer from local lambda space
if model_path is not None and efs_path is not None:
print("loading model from EFS")
self.model, self.tokenizer, self.dataset = self.from_pretrained(model_path, s3_bucket, file_prefix, efs_path=efs_path)
logging.debug("Done loading")
else:
#no bucket no path fail
print("ERROR: Model path not found")
raise Exception("No model path found")
else:
print("Loading model from s3 path..")
print(s3_bucket)
self.model, self.tokenizer, self.dataset = self.from_pretrained(
model_path, s3_bucket, file_prefix)
logging.debug("Done loading")
self.parameters = {
'max_length' : 25, #60
'min_length' : 1,
'device' : 'cpu',
'temperature' : 1.0, #1.5
'dynamic_temperature' : True,
'dynamic_temperature_range' : 0.15,
'top_k' : 50, #50
'top_p' : 0.9, #0.9
'no_sample' : False,
'max_history' : 2,
}
print("Done initializing model")
def from_pretrained(self, model_path: str, s3_bucket: str, file_prefix: str , efs_path = None ):
if efs_path is None:
model = self.load_model_from_s3(model_path, s3_bucket, file_prefix)
else:
model = self.load_model_from_efs(model_path,efs_path)
print("Model loaded.")
print("loading tokenizer from path: ", model_path)
tokenizer = self.load_tokenizer(model_path)
# Get sequence length max of 1024
tokenizer.model_max_length = 1024
print("tokenizer loaded")
self.model = model
self.tokenizer = tokenizer
add_special_tokens_(self.model, self.tokenizer)
#Will only use if it cannot find cache
DATASET_PATH = model_path + '/personafile.json' #maynot be needed if cache exists!
##We have cache no need for dataset path
DATASET_CACHE = model_path +'/persona_good' ##persona_good_gpt2_cache (no zip extension)
dataset = self.load_dataset(DATASET_PATH, DATASET_CACHE)
self.dataset = dataset
print("dataset loaded")
model.eval()
print("Model in eval mode, dataset and tokenizer also loaded")
return model, tokenizer, dataset
def load_model_from_path(self, model_path:str):
print("Loading model from path:",model_path)
model = GPT2LMHeadModel.from_pretrained(model_path)
model.eval()
self.model = model
return model
def from_pretrained_local_path(self, model_path: str, file_prefix: str):
print("Local model loading...")
model = GPT2LMHeadModel.from_pretrained(model_path)
tokenizer = self.load_tokenizer(model_path)
self.model = model
self.tokenizer = tokenizer
# Get sequence length max of 1024
tokenizer.model_max_length = 1024
add_special_tokens_(model, tokenizer)
#Will only use if it cannot find cache
DATASET_PATH = model_path + '/personafile.json' #maynot be needed if cache exists!
##We have cache no need for dataset path
DATASET_CACHE = model_path +'/persona_good' ##persona_good_gpt2_cache (no zip extension)
dataset = self.load_dataset(DATASET_PATH, DATASET_CACHE)
self.dataset = dataset
model.eval()
print("Model in eval mode, dataset and tokenizer also loaded")
return model, tokenizer, dataset
def load_model_from_efs(self, model_path: str, efs_path: str):
if model_path and efs_path:
config = AutoConfig.from_pretrained(f'{model_path}/config.json')
with open(efs_path, 'rb') as f:
# state messes things just use classics!
state = torch.load(io.BytesIO(
f.read()), map_location=lambda storage, loc: storage)
'''alt
with open(efs_path, 'rb') as f:
state = pickle.load(f, encoding='latin1')
'''
model = GPT2LMHeadModel.from_pretrained(
pretrained_model_name_or_path=None, state_dict=state, config=config)
return model
else:
raise KeyError('No model config path or EFS bin path')
def load_model_from_s3(self, model_path: str, s3_bucket: str, file_prefix: str):
if model_path and s3_bucket and file_prefix:
obj = s3.get_object(Bucket=s3_bucket, Key=file_prefix)
bytestream = io.BytesIO(obj['Body'].read())
tar = tarfile.open(fileobj=bytestream, mode="r:gz")
config = AutoConfig.from_pretrained(f'{model_path}/config.json')
for member in tar.getmembers():
if member.name.startswith("./._"):
# osx tar adds ./._XXX copyfile need to pass this file
continue
if member.name.endswith(".bin"):
f = tar.extractfile(member)
print("Model file extracted: " + member.name)
# state messes things just use classics!
state = torch.load(io.BytesIO(
f.read()), map_location=lambda storage, loc: storage)
model = GPT2LMHeadModel.from_pretrained(
pretrained_model_name_or_path=None, state_dict=state, config=config)
#model = AutoModelWithLMHead.from_pretrained("./", config=config)
return model
else:
raise KeyError('No S3 Bucket and Key Prefix provided')
def load_tokenizer(self, model_path: str):
print("loading tokenizer")
tokenizer = GPT2Tokenizer.from_pretrained(model_path)
return tokenizer
def load_dataset(self, DATASET_PATH: str, DATASET_CACHE: str, use_efs= False):
print("loading dataset")
dataset = get_dataset(self.tokenizer, DATASET_PATH, DATASET_CACHE)
return dataset
def encode(self, question, context):
encoded = self.tokenizer.encode_plus(question, context)
return encoded["input_ids"], encoded["attention_mask"]
def decode(self, token):
answer_tokens = self.tokenizer.convert_ids_to_tokens(
token, skip_special_tokens=True)
return self.tokenizer.convert_tokens_to_string(answer_tokens)
def generate_word(self, text, model=None, tokenizer=None, noprint=False):
if model is None or tokenizer is None:
print("ERROR: No model or tokenizer")
return None
inputs = tokenizer(text, return_tensors="pt")
# model output
outputs = model(**inputs, labels=inputs["input_ids"])
loss, logits = outputs[:2]
predicted_index = torch.argmax(logits[0, -1, :]).item()
predicted_text = tokenizer.decode([predicted_index])
# results
if not noprint:
print('input text:', text)
print('predicted text:', predicted_text)
return predicted_text
def top_filtering(self,logits, top_k=0., top_p=0.9, threshold=-float('Inf'), filter_value=-float('Inf')):
""" Filter a distribution of logits using top-k, top-p (nucleus) and/or threshold filtering
Args:
logits: logits distribution shape (vocabulary size)
top_k: <=0: no filtering, >0: keep only top k tokens with highest probability.
top_p: <=0.0: no filtering, >0.0: keep only a subset S of candidates, where S is the smallest subset
whose total probability mass is greater than or equal to the threshold top_p.
In practice, we select the highest probability tokens whose cumulative probability mass exceeds
the threshold top_p.
threshold: a minimal threshold to keep logits
"""
assert logits.dim() == 1 # Only work for batch size 1 for now - could update but it would obfuscate a bit the code
top_k = min(top_k, logits.size(-1))
if top_k > 0:
# Remove all tokens with a probability less than the last token in the top-k tokens
indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None]
logits[indices_to_remove] = filter_value
if top_p > 0.0:
# Compute cumulative probabilities of sorted tokens
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
cumulative_probabilities = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
# Remove tokens with cumulative probability above the threshold
sorted_indices_to_remove = cumulative_probabilities > top_p
# Shift the indices to the right to keep also the first token above the threshold
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = 0
# Back to unsorted indices and set them to -infinity
indices_to_remove = sorted_indices[sorted_indices_to_remove]
logits[indices_to_remove] = filter_value
indices_to_remove = logits < threshold
logits[indices_to_remove] = filter_value
return logits
def sample_sequence(self,personality, history, tokenizer, model, params=None, current_output=None):
start = timeit.default_timer()
if params is not None:
for k,v in params.items():
self.parameters[k] = v
##to access as dot notation
##param = SimpleNamespace(**parameters)
special_tokens_ids = tokenizer.convert_tokens_to_ids(SPECIAL_TOKENS)
if current_output is None:
current_output = []
for i in range(self.parameters['max_length']):
#print(">: {}/{} ".format(i, self.parameters['max_length'] ) ,end='\r', flush=True)
instance = build_input_from_segments(personality, history, current_output, tokenizer, with_eos=False)
input_ids = torch.tensor(instance["input_ids"], device=self.parameters['device']).unsqueeze(0)
token_type_ids = torch.tensor(instance["token_type_ids"], device=self.parameters['device']).unsqueeze(0)
logits = model(input_ids, token_type_ids=token_type_ids)
if isinstance(logits, tuple): # for gpt2 and maybe others
logits = logits[0]
#SPECIAL Dynamic Temperature mode
if self.parameters['dynamic_temperature']:
#random temperature withing -0.1 / + 0.1 or 'dynamic_temperature_range'
rand_range = random.uniform(-1 * self.parameters['dynamic_temperature_range'] , self.parameters['dynamic_temperature_range'])
temperature = self.parameters['temperature'] + rand_range
else:
temperature = self.parameters['temperature']
logits = logits[0, -1, :] / temperature
logits = self.top_filtering(logits, top_k=self.parameters['top_k'], top_p=self.parameters['top_p'])
probs = F.softmax(logits, dim=-1)
prev = torch.topk(probs, 1)[1] if self.parameters['no_sample'] else torch.multinomial(probs, 1)
if i < self.parameters['min_length'] and prev.item() in special_tokens_ids:
while prev.item() in special_tokens_ids:
if probs.max().item() == 1:
warnings.warn("Warning: model generating special token with probability 1.")
break # avoid infinitely looping over special token
prev = torch.multinomial(probs, num_samples=1)
if prev.item() in special_tokens_ids:
##breaks here if found end of anser!!
break
current_output.append(prev.item())
stop = timeit.default_timer()
#print(f"\nPredict in {stop - start} seconds\n")
return current_output
def dump_personalities_with_movies(self):
personalities = [ [dialog["name"], dialog["moviename"]] for dialog in self.dataset["train"]]
name_list = []
for person in personalities:
try:
name_tokenized = person[0]
name = self.tokenizer.decode(name_tokenized)
movies_tokenized = person[1]
movienames= ""
##check type of first element
##if int , only 1 movie
if isinstance(movies_tokenized[0], int):
movienames = self.tokenizer.decode(movies_tokenized)
movienames = movienames.replace(".txt", "")
else:
for movie in movies_tokenized:
moviename = self.tokenizer.decode(movie)
moviename = moviename.replace(".txt", "")
movienames = movienames + " / " + moviename
name_list.append([name,movienames])
except:
print("Could not do name:", self.tokenizer.decode(person[0]))
return name_list
def dump_personalities(self,as_list=False):
personalities = [dialog["personality"] for dialog in self.dataset["train"]]
name_list = []
for person in personalities:
name_tokenized = person[-1]
name = self.tokenizer.decode(name_tokenized)
name = name.replace("My name is ", "")[:-1]
name_list.append(name)
#print(name)
if as_list:
return name_list
else:
return " | ".join(name_list)
def get_personalities(self):
##THIS FUNCTION IS NOW LEGACY, USE dump_personalities
personalities = [dialog["personality"] for dialog in self.dataset["train"]]
people = [item[-1][-10:-1] for item in personalities]
##will get My Name is Something
people_list = self.tokenizer.decode(chain(*people))
#print( " | ".join( people_list.split(" ") ) )
text_to_remove = "My name is "
people_list = people_list.replace(text_to_remove, " | ")
#characters = " | ".join( people_list.split(" ") )
return people_list
def select_personality(self,characters,select_random=False):
##FIND people list
##this is for debug, usually has " is Name"
#people = [item[-1][-3:-1] for item in personalities]
personalities = [dialog["personality"] for dialog in self.dataset["train"]]
if select_random : return random.choice(personalities)
#people = [item[-1][-2:-1] for item in personalities]
#people_list = self.tokenizer.decode(chain(*people))
#print( " | ".join( people_list.split(" ") ) )
personality = None
name = "My name is " + str(characters)
name_token = self.tokenizer.encode(name)
#print(name_token)
index_start = len(name_token)+1
try:
index_of_name = [ item[-1][-1*index_start: -1]== name_token for item in personalities].index(True)
#print("Selected {} is at: {}".format(characters, str(index_of_name) ) )
personality = personalities[index_of_name]
except:
print("Not found ... Select again")
return None
##TALK TO HAL
#personality_hal = ["that's true. My name is Hal"]
#personality = tokenize(personality_hal)
#print(personality)
print("Selected personality: %s", self.tokenizer.decode(chain(*personality)))
return personality
def get_answer(self, input_text, personality, history, params=None):
##Check length of history (to save 1 computation!)
if len(history)>0:
#mostly it will be empty list so need a length check for performance
#would do string check also but just assume it is list of list of strings, as not public
new_hist = []
for ele in history:
new_hist.append( self.tokenizer.encode(ele) )
history = new_hist.copy()
history.append(self.tokenizer.encode(input_text))
with torch.no_grad():
out_ids = self.sample_sequence(personality, history, self.tokenizer, self.model, params=params)
history.append(out_ids)
history = history[-(2*self.parameters['max_history']+1):]
out_text = self.tokenizer.decode(out_ids, skip_special_tokens=True)
#print(out_text)
history_decoded = []
for ele in history:
history_decoded.append(self.tokenizer.decode(ele))
return out_text, history_decoded, self.parameters
def predict(self, question, parameter_dict):
try:
answer = self.generate_text(question, model=self.model,
tokenizer=self.tokenizer,
parameter_dict=parameter_dict,
)
return answer
except Exception as e:
raise Exception(
"Runtime error see cloudwatch logs : {}".format(repr(e)))