NoorAfaqi's picture
Update app.py
fea1e8b verified
# -*- coding: utf-8 -*-
"""HabibiTranslator.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1lYP3XxUCWdiihU0mIejW_KCqTvy7-tz6
"""
import torch
torch.cuda.is_available()
import torch
import torch.nn as nn
import torch.optim as optim
import math
from datasets import load_dataset
import numpy as np
from collections import Counter
import gradio as gr
# Seting random seed for reproducibility
torch.manual_seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dataset = load_dataset('Helsinki-NLP/tatoeba_mt', 'ara-eng', trust_remote_code=True)
# tokenization (word-level)
def tokenize(text):
return text.split()
# Building vocabulary from dataset
def build_vocab(data, tokenizer, min_freq=2):
counter = Counter()
for example in data:
counter.update(tokenizer(example['sourceString']))
counter.update(tokenizer(example['targetString']))
# Adding special tokens
specials = ['<pad>', '<sos>', '<eos>', '<unk>']
vocab = specials + [word for word, freq in counter.items() if freq >= min_freq]
word2idx = {word: idx for idx, word in enumerate(vocab)}
idx2word = {idx: word for word, idx in word2idx.items()}
return word2idx, idx2word
# Converting text to tensor (adjusted to fit special tokens within max_len)
def text_to_tensor(text, vocab, tokenizer, max_len=52):
tokens = tokenizer(text)[:max_len - 2] # Reserving space for <sos> and <eos>
tokens = ['<sos>'] + tokens + ['<eos>']
tensor = [vocab.get(token, vocab['<unk>']) for token in tokens]
return torch.tensor(tensor, dtype=torch.long)
train_data = dataset['validation'] # Using validation as training data for demo
test_data = dataset['test']
# Building shared vocabulary (for simplicity, using both languages in one vocab)
word2idx, idx2word = build_vocab(train_data, tokenize)
# Hyperparameters for data
max_len = 52 # Increased to account for <sos> and <eos>
batch_size = 32
train_data_list = list(train_data) # Convert Dataset to list once
print(f"Length of train_data_list: {len(train_data_list)}")
def get_batches(data_list, batch_size, max_len=52):
total_batches = len(data_list) // batch_size + (1 if len(data_list) % batch_size else 0)
print(f"Total batches to process: {total_batches}")
for i in range(0, len(data_list), batch_size):
batch = data_list[i:i + batch_size]
src_batch = [text_to_tensor(example['sourceString'], word2idx, tokenize, max_len) for example in batch]
tgt_batch = [text_to_tensor(example['targetString'], word2idx, tokenize, max_len) for example in batch]
src_batch = nn.utils.rnn.pad_sequence(src_batch, padding_value=word2idx['<pad>'], batch_first=False).to(device)
tgt_batch = nn.utils.rnn.pad_sequence(tgt_batch, padding_value=word2idx['<pad>'], batch_first=False).to(device)
if src_batch.size(0) > max_len:
src_batch = src_batch[:max_len, :]
elif src_batch.size(0) < max_len:
padding = torch.full((max_len - src_batch.size(0), src_batch.size(1)), word2idx['<pad>'], dtype=torch.long).to(device)
src_batch = torch.cat([src_batch, padding], dim=0)
if tgt_batch.size(0) > max_len:
tgt_batch = tgt_batch[:max_len, :]
elif tgt_batch.size(0) < max_len:
padding = torch.full((max_len - tgt_batch.size(0), tgt_batch.size(1)), word2idx['<pad>'], dtype=torch.long).to(device)
tgt_batch = torch.cat([tgt_batch, padding], dim=0)
src_batch = src_batch.transpose(0, 1) # [batch_size, seq_len]
tgt_batch = tgt_batch.transpose(0, 1) # [batch_size, seq_len]
yield src_batch, tgt_batch
print("Revised Chunk 1 (Seventh Iteration) completed: Dataset loaded and preprocessing debugged.")
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=52):
super().__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0) # Shape: (1, max_len, d_model)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:, :x.size(1), :]
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super().__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
def scaled_dot_product_attention(self, Q, K, V, mask=None):
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
attn = torch.softmax(scores, dim=-1)
return torch.matmul(attn, V)
def forward(self, Q, K, V, mask=None):
batch_size = Q.size(0)
seq_len_q = Q.size(1)
seq_len_k = K.size(1)
Q = self.W_q(Q)
K = self.W_k(K)
V = self.W_v(V)
Q = Q.view(batch_size, seq_len_q, self.num_heads, self.d_k).transpose(1, 2)
K = K.view(batch_size, seq_len_k, self.num_heads, self.d_k).transpose(1, 2)
V = V.view(batch_size, seq_len_k, self.num_heads, self.d_k).transpose(1, 2)
output = self.scaled_dot_product_attention(Q, K, V, mask)
output = output.transpose(1, 2).contiguous().view(batch_size, seq_len_q, self.d_model)
return self.W_o(output)
class FeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super().__init__()
self.linear1 = nn.Linear(d_model, d_ff)
self.linear2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()
def forward(self, x):
return self.linear2(self.relu(self.linear1(x)))
class EncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super().__init__()
self.mha = MultiHeadAttention(d_model, num_heads)
self.ff = FeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
attn_output = self.mha(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_output))
ff_output = self.ff(x)
return self.norm2(x + self.dropout(ff_output))
class DecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super().__init__()
self.mha1 = MultiHeadAttention(d_model, num_heads)
self.mha2 = MultiHeadAttention(d_model, num_heads)
self.ff = FeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
attn1_output = self.mha1(x, x, x, tgt_mask)
x = self.norm1(x + self.dropout(attn1_output))
attn2_output = self.mha2(x, enc_output, enc_output, src_mask)
x = self.norm2(x + self.dropout(attn2_output))
ff_output = self.ff(x)
return self.norm3(x + self.dropout(ff_output))
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model=256, num_heads=8, num_layers=3, d_ff=1024, max_len=52, dropout=0.1):
super().__init__()
self.d_model = d_model
self.src_embedding = nn.Embedding(src_vocab_size, d_model)
self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.pos_encoding = PositionalEncoding(d_model, max_len)
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.fc_out = nn.Linear(d_model, tgt_vocab_size)
self.dropout = nn.Dropout(dropout)
def generate_mask(self, src, tgt):
src_mask = (src != word2idx['<pad>']).unsqueeze(1).unsqueeze(2)
tgt_mask = (tgt != word2idx['<pad>']).unsqueeze(1).unsqueeze(3)
seq_len = tgt.size(1)
nopeak_mask = (1 - torch.triu(torch.ones(1, seq_len, seq_len), diagonal=1)).bool().to(device)
tgt_mask = tgt_mask & nopeak_mask
return src_mask, tgt_mask
def forward(self, src, tgt):
src_mask, tgt_mask = self.generate_mask(src, tgt)
src_embedded = self.dropout(self.pos_encoding(self.src_embedding(src) * math.sqrt(self.d_model)))
tgt_embedded = self.dropout(self.pos_encoding(self.tgt_embedding(tgt) * math.sqrt(self.d_model)))
enc_output = src_embedded
for enc_layer in self.encoder_layers:
enc_output = enc_layer(enc_output, src_mask)
dec_output = tgt_embedded
for dec_layer in self.decoder_layers:
dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)
return self.fc_out(dec_output)
print("Revised Chunk 2 (Fourth Iteration) completed: Transformer model fixed with max_len=52.")
vocab_size = len(word2idx)
model = Transformer(
src_vocab_size=vocab_size,
tgt_vocab_size=vocab_size,
d_model=256,
num_heads=8,
num_layers=3,
d_ff=1024,
max_len=52,
dropout=0.1
).to(device)
# Loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=word2idx['<pad>'])
optimizer = optim.Adam(model.parameters(), lr=0.0001)
# Training loop with progress feedback
def train(model, data, epochs=20):
model.train()
total_batches = len(data) // batch_size + (1 if len(data) % batch_size else 0)
print(f"Total batches per epoch: {total_batches}")
for epoch in range(epochs):
total_loss = 0
for batch_idx, (src_batch, tgt_batch) in enumerate(get_batches(data, batch_size, max_len=52), 1):
if batch_idx % 100 == 0: # Printing every 100 batches for feedback
print(f"Epoch {epoch + 1}, Batch {batch_idx}/{total_batches} ")
optimizer.zero_grad()
output = model(src_batch, tgt_batch[:, :-1])
loss = criterion(output.view(-1, vocab_size), tgt_batch[:, 1:].reshape(-1))
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / total_batches
print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}")
# Main function
def translate(model, sentence, max_len=52):
model.eval()
with torch.no_grad():
src = text_to_tensor(sentence, word2idx, tokenize, max_len).unsqueeze(0).to(device)
tgt = torch.tensor([word2idx['<sos>']], dtype=torch.long).unsqueeze(0).to(device)
for _ in range(max_len):
output = model(src, tgt)
next_token = output[:, -1, :].argmax(dim=-1).item()
if next_token == word2idx['<eos>']:
break
tgt = torch.cat([tgt, torch.tensor([[next_token]], dtype=torch.long).to(device)], dim=1)
translated = [idx2word[idx.item()] for idx in tgt[0] if idx.item() in idx2word]
return ' '.join(translated[1:])
# Testing
test_sentence = "ุนู…ุฑูƒ ุฑุงูŠุญ ุงู„ู…ูƒุณูŠูƒุŸ"
translated = translate(model, test_sentence)
print(f"Input: {test_sentence}")
print(f"Translated: {translated}")
print("Chunk 3 completed: Training and inference implemented.")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Instantiate the model (assuming train_dataset is already defined)
model = Transformer(
src_vocab_size=vocab_size,
tgt_vocab_size=vocab_size
).to(device)
# Load model checkpoint and set to evaluation mode
model.load_state_dict(torch.load("habibi.pth", map_location=device))
model.eval()
def gradio_translate(text):
return translate(model, text)
interface = gr.Interface(
fn=gradio_translate,
inputs=gr.Textbox(lines=2, placeholder="Enter Arabic sentence here..."),
outputs="text",
title="Habibi-Translator",
description="Translate Arabic sentences to English using a Transformer model."
)
interface.launch()
print("Chunk 4 completed: Gradio interface deployed.")