# import matplotlib.pyplot as plt # %matplotlib inline # import seaborn as sns import pickle import pandas as pd import re import os import tensorflow as tf from tensorflow.keras.layers import Embedding, LSTM, Dense,Bidirectional from tensorflow.keras.models import Model from tensorflow.keras.preprocessing.text import Tokenizer from tensorflow.keras.preprocessing.sequence import pad_sequences from tensorflow.keras import backend as K import numpy as np import string from string import digits from sklearn.utils import shuffle from sklearn.model_selection import train_test_split import nltk from nltk.tokenize import word_tokenize from tqdm import tqdm from Data import Dataset,Dataloder """########################################------MODEL------######################################## """ ########################################------Encoder model------######################################## class Encoder(tf.keras.Model): def __init__(self,inp_vocab_size,embedding_size,lstm_size,input_length): super().__init__() self.inp_vocab_size = inp_vocab_size self.embedding_size = embedding_size self.lstm_size = lstm_size self.input_length = input_length #Initialize Embedding layer def build(self,input_shape): self.embedding = Embedding(input_dim=self.inp_vocab_size, output_dim=self.embedding_size, input_length=self.input_length,trainable=True,name="encoder_embed") #Intialize Encoder LSTM layer self.bilstm = tf.keras.layers.Bidirectional(LSTM(units = self.lstm_size,return_sequences=True,return_state=True),merge_mode='sum') def call(self,input_sequence,initial_state): ''' Input:Input_sequence[batch_size,input_length] Initial_state 4x[batch_size,encoder_units] Output: lstm_enc_output [batch_size,input_length,encoder_units] forward_h/c & backward_h/c [batch_size,encoder_units] ''' # print("initial_state",len(initial_state)) input_embd = self.embedding(input_sequence) lstm_enc_output, forward_h, forward_c, backward_h, backward_c = self.bilstm(input_embd,initial_state) return lstm_enc_output, forward_h, forward_c, backward_h, backward_c # return lstm_enc_output, forward_h, forward_c def initialize_states(self,batch_size): ''' Given a batch size it will return intial hidden state and intial cell state. If batch size is 32- Hidden state is zeros of size [32,lstm_units], cell state zeros is of size [32,lstm_units] ''' self.lstm_state_h = tf.random.uniform(shape=[batch_size,self.lstm_size],dtype=tf.float32) self.lstm_state_c = tf.random.uniform(shape=[batch_size,self.lstm_size],dtype=tf.float32) return self.lstm_state_h,self.lstm_state_c def initialize_states_bidirectional(self,batch_size): states = [tf.zeros((batch_size, self.lstm_size)) for i in range(4)] return states ########################################------Attention model------######################################## class Attention(tf.keras.layers.Layer): def __init__(self,scoring_function, att_units): super().__init__() self.att_units = att_units self.scoring_function = scoring_function # self.batch_size = batch_size # Please go through the reference notebook and research paper to complete the scoring functions if self.scoring_function=='dot': pass elif scoring_function == 'general': self.dense = Dense(self.att_units) elif scoring_function == 'concat': self.dense = tf.keras.layers.Dense(att_units, activation='tanh') self.dense1 = tf.keras.layers.Dense(1) def call(self,decoder_hidden_state,encoder_output): if self.scoring_function == 'dot': decoder_hidden_state = tf.expand_dims(decoder_hidden_state,axis=2) similarity = tf.matmul(encoder_output,decoder_hidden_state) weights = tf.nn.softmax(similarity,axis=1) context_vector = tf.matmul(weights,encoder_output,transpose_a=True) context_vector = tf.squeeze(context_vector, axis=1) return context_vector,weights elif self.scoring_function == 'general': decoder_hidden_state=tf.expand_dims(decoder_hidden_state, 1) score = tf.matmul(decoder_hidden_state, self.dense( encoder_output), transpose_b=True) attention_weights = tf.keras.activations.softmax(score, axis=-1) context_vector = tf.matmul(attention_weights, encoder_output) context_vector=tf.reduce_sum(context_vector, axis=1) attention_weights=tf.reduce_sum(attention_weights, axis=1) attention_weights=tf.expand_dims(attention_weights, 2) return context_vector,attention_weights elif self.scoring_function == 'concat': decoder_hidden_state=tf.expand_dims(decoder_hidden_state, 1) decoder_hidden_state = tf.tile( decoder_hidden_state, [1,30, 1]) score = self.dense1( self.dense(tf.concat((decoder_hidden_state, encoder_output), axis=-1))) score = tf.transpose(score, [0, 2, 1]) attention_weights = tf.keras.activations.softmax(score, axis=-1) context_vector = tf.matmul(attention_weights, encoder_output) context_vector=tf.reduce_sum(context_vector, axis=1) attention_weights=tf.reduce_sum(attention_weights, axis=1) attention_weights=tf.expand_dims(attention_weights, 2) return context_vector,attention_weights ########################################------OneStepDecoder model------######################################## class OneStepDecoder(tf.keras.Model): def __init__(self,tar_vocab_size, embedding_dim, input_length, dec_units ,score_fun ,att_units): # Initialize decoder embedding layer, LSTM and any other objects needed super().__init__() self.tar_vocab_size = tar_vocab_size self.embedding_dim = embedding_dim self.input_length = input_length self.dec_units = dec_units self.score_fun = score_fun self.att_units = att_units def build(self,input_shape): self.attention = Attention('concat', self.att_units) self.embedding = Embedding(input_dim=self.tar_vocab_size,output_dim=self.embedding_dim, input_length=self.input_length,mask_zero=True,trainable=True,name="Decoder_Embed") self.bilstm = tf.keras.layers.Bidirectional(LSTM(units = self.dec_units,return_sequences=True,return_state=True),merge_mode='sum') self.dense = Dense(self.tar_vocab_size) def call(self,input_to_decoder, encoder_output, f_state_h,f_state_c,b_state_h,b_state_c): dec_embd = self.embedding(input_to_decoder) context_vectors,attention_weights = self.attention(f_state_h,encoder_output) context_vectors_ = tf.expand_dims(context_vectors,axis=1) concat_vector = tf.concat([dec_embd,context_vectors_],axis=2) states = [f_state_h,f_state_c,b_state_h,b_state_c] decoder_outputs,dec_f_state_h,dec_f_state_c,dec_b_state_h,dec_b_state_c = self.bilstm(concat_vector,states) decoder_outputs = tf.squeeze(decoder_outputs,axis=1) dense_output = self.dense(decoder_outputs) return dense_output,dec_f_state_h,dec_f_state_c,attention_weights,context_vectors ########################################------Decoder model------######################################## class Decoder(tf.keras.Model): def __init__(self,out_vocab_size, embedding_dim, input_length, dec_units ,score_fun ,att_units): #Intialize necessary variables and create an object from the class onestepdecoder super().__init__() self.out_vocab_size = out_vocab_size self.embedding_dim = embedding_dim self.input_length = input_length self.dec_units = dec_units self.score_fun = score_fun self.att_units = att_units def build(self,input_shape): self.onestep_decoder = OneStepDecoder(self.out_vocab_size, self.embedding_dim, self.input_length, self.dec_units ,self.score_fun , self.att_units) def call(self, input_to_decoder,encoder_output,f_decoder_hidden_state,f_decoder_cell_state,b_decoder_hidden_state,b_decoder_cell_state ): all_outputs = tf.TensorArray(tf.float32, size=self.input_length,name="output_array") for timestep in range(self.input_length): output,state_h,state_c,attention_weights,context_vector = self.onestep_decoder(input_to_decoder[:,timestep:timestep+1],encoder_output, f_decoder_hidden_state,f_decoder_cell_state,b_decoder_hidden_state,b_decoder_cell_state) all_outputs = all_outputs.write(timestep,output) all_outputs = tf.transpose(all_outputs.stack(),[1,0,2]) return all_outputs ########################################------encoder_decoder model------######################################## class encoder_decoder(tf.keras.Model): def __init__(self,out_vocab_size,inp_vocab_size,embedding_dim,embedding_size,in_input_length,tar_input_length,dec_units,lstm_size,att_units,batch_size): super().__init__() #Intialize objects from encoder decoder self.out_vocab_size = out_vocab_size self.inp_vocab_size = inp_vocab_size self.embedding_dim_target = embedding_dim self.embedding_dim_input = embedding_size self.in_input_length = in_input_length self.tar_input_length = tar_input_length self.dec_lstm_size = dec_units self.enc_lstm_size = lstm_size self.att_units = att_units self.batch_size = batch_size def build(self,input_shape): self.encoder = Encoder(self.inp_vocab_size,self.embedding_dim_input,self.enc_lstm_size,self.in_input_length) self.decoder = Decoder(self.out_vocab_size,self.embedding_dim_target, self.tar_input_length, self.dec_lstm_size ,'general' ,self.att_units) def call(self,data): input_sequence, target_sequence = data[0],data[1] # print(input_sequence.shape) encoder_initial_state = self.encoder.initialize_states_bidirectional(self.batch_size) # print(len(encoder_initial_state)) encoder_output,f_encoder_state_h,f_encoder_state_c,b_encoder_state_h,b_encoder_state_c = self.encoder(input_sequence,encoder_initial_state) decoder_output = self.decoder(target_sequence,encoder_output,f_encoder_state_h,f_encoder_state_c,b_encoder_state_h,b_encoder_state_c) return decoder_output def loss_function(real, pred): loss_object = tf.keras.losses.SparseCategoricalCrossentropy( from_logits=True) mask = tf.math.logical_not(tf.math.equal(real, 0)) loss_ = loss_object(real, pred) mask = tf.cast(mask, dtype=loss_.dtype) loss_ *= mask return tf.reduce_mean(loss_) def accuracy(real,pred): pred_val = K.cast(K.argmax(pred,axis=-1),dtype='float32') real_val = K.cast(K.equal(real,pred_val),dtype='float32') mask = K.cast(K.greater(real,0),dtype='float32') n_correct = K.sum(mask*real_val) n_total = K.sum(mask) return n_correct/n_total def load_weights(): """======================================================LOADING======================================================""" # Dataset with open('dataset/30_length/train.pickle', 'rb') as handle: train = pickle.load(handle) with open('dataset/30_length/validation.pickle', 'rb') as handle: validation = pickle.load(handle) # Tokenizer with open('tokenizer/30_tokenizer_eng.pickle', 'rb') as handle: tokenizer_eng = pickle.load(handle) with open('tokenizer/30_tokenizer_ass.pickle', 'rb') as handle: tokenizer_ass = pickle.load(handle) # Vocab Size vocab_size_ass = len(tokenizer_ass.word_index.keys()) vocab_size_eng = len(tokenizer_eng.word_index.keys()) return train,validation,tokenizer_eng,tokenizer_ass,vocab_size_ass,vocab_size_eng def main(): train,validation,tokenizer_eng,tokenizer_ass,vocab_size_ass,vocab_size_eng = load_weights() in_input_length = 30 tar_input_length = 30 inp_vocab_size = vocab_size_ass out_vocab_size = vocab_size_eng dec_units = 128 lstm_size = 128 att_units = 256 batch_size = 32 embedding_dim = 300 embedding_size = 300 train_dataset = Dataset(train, tokenizer_ass, tokenizer_eng, in_input_length) test_dataset = Dataset(validation, tokenizer_ass, tokenizer_eng, in_input_length) train_dataloader = Dataloder(train_dataset, batch_size) test_dataloader = Dataloder(test_dataset, batch_size) print(train_dataloader[0][0][0].shape, train_dataloader[0][0][1].shape, train_dataloader[0][1].shape) model = encoder_decoder(out_vocab_size,inp_vocab_size,embedding_dim,embedding_size,in_input_length,tar_input_length,dec_units,lstm_size,att_units,batch_size) optimizer = tf.keras.optimizers.Adam() model.compile(optimizer=optimizer,loss=loss_function,metrics=[accuracy]) # train_steps=train.shape[0]//32 # valid_steps=validation.shape[0]//32 model.fit(train_dataloader, steps_per_epoch=10, epochs=1,verbose=1, validation_data=train_dataloader, validation_steps=1) model.load_weights('models/bi_directional_concat_256_batch_160_epoch_30_length_ass_eng_nmt_weights.h5') model.fit(train_dataloader, steps_per_epoch=10, epochs=1,verbose=1, validation_data=train_dataloader, validation_steps=1) model.summary() return model,tokenizer_eng,tokenizer_ass,in_input_length # if __name__=="__main__": # main()