Ahmed Ahmed
Add model-tracing code for p-value computation (without binary files)
de071e9
import torch
import glob
from typing import List
from datasets import load_dataset, concatenate_datasets, Dataset
from accelerate.data_loader import DataLoaderShard
from transformers import AutoTokenizer
def prepare_hf_dataset(hf_path, block_size, tokenizer, split="test"):
raw_dataset = load_dataset(hf_path, split=split)
dataset = raw_dataset.map(
lambda examples: tokenize_function(examples, tokenizer),
batched=True,
remove_columns=["text"],
).map(lambda examples: group_texts(examples, block_size), batched=True, batch_size=1)
dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])
return dataset
def prepare_programming_dataset(
json_path: str, block_size: int, tokenizer: AutoTokenizer, columns_ignored: List[str]
):
raw_dataset = load_dataset("json", data_files=json_path)
dataset = (
raw_dataset["train"]
.map(
lambda examples: tokenize_function(examples, tokenizer),
batched=True,
num_proc=4,
remove_columns=columns_ignored,
)
.map(
lambda examples: group_texts(examples, block_size),
batched=True,
batch_size=1,
num_proc=1,
)
)
dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])
return dataset
def prepare_random_sample_dataset(num_samples, block_size, vocab_size=32000):
tokens = torch.randint(low=0, high=vocab_size, size=(num_samples, block_size))
dictionary = {"input_ids": tokens, "attention_mask": torch.ones(tokens.shape), "labels": tokens}
dataset = Dataset.from_dict(dictionary)
dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])
return dataset
def load_m2d2_datasets(
test_name: str,
block_size: int,
tokenizer: AutoTokenizer,
columns_ignored: List[str],
):
base_path = "/juice4/scr4/nlp/model-tracing/m2d2_s2orc"
json_dir = f"{base_path}/{test_name}"
json_files = glob.glob(f"{json_dir}/*.json")
if not json_files:
raise ValueError(f"No JSON files found for test case: {test_name}")
datasets = []
for json_file in json_files:
dataset = prepare_programming_dataset(json_file, block_size, tokenizer, columns_ignored)
datasets.append(dataset)
combined_dataset = concatenate_datasets(datasets)
return combined_dataset
def load_dolma_programming_datasets(
test_name: str,
block_size: int,
tokenizer: AutoTokenizer,
columns_ignored: List[str],
):
base_path = "/juice4/scr4/nlp/model-tracing/dolma_program_languages"
json_dir = f"{base_path}/json_files_{test_name}"
json_files = glob.glob(f"{json_dir}/*.json")
datasets = []
for json_file in json_files:
dataset = prepare_programming_dataset(json_file, block_size, tokenizer, columns_ignored)
datasets.append(dataset)
combined_dataset = concatenate_datasets(datasets)
return combined_dataset
def load_generated_datasets(base_model_name, ft_model_name, block_size, tokenizer, columns_ignored):
json_file_base = (
"/juice4/scr4/nlp/model-tracing/generations/"
+ base_model_name.replace("/", "-")
+ "_gentext.json"
)
json_file_ft = (
"/juice4/scr4/nlp/model-tracing/generations/"
+ ft_model_name.replace("/", "-")
+ "_gentext.json"
)
dataset_base = prepare_programming_dataset(
json_file_base, block_size, tokenizer, columns_ignored
)
dataset_ft = prepare_programming_dataset(json_file_ft, block_size, tokenizer, columns_ignored)
datasets = []
datasets.append(dataset_base)
datasets.append(dataset_ft)
combined_dataset = concatenate_datasets(datasets)
return combined_dataset
def prepare_hf_dataloader(dataset, batch_size: int):
return DataLoaderShard(dataset, batch_size=batch_size)
def evaluate_70b(model, dataloader, device: str = "cuda:0"):
losses = []
with torch.no_grad():
for batch in dataloader:
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
labels = batch["labels"].to(device)
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels,
)
loss = outputs.loss
losses.append(loss.item())
return losses
def evaluate(model, dataloader, device: str = "cuda"):
losses = []
model.to(device)
with torch.no_grad():
for batch in dataloader:
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
labels = batch["labels"].to(device)
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels,
)
loss = outputs.loss
losses.append(loss.item())
model.to("cpu")
return losses
def prepare_aya_dataset(subset: str, language: str, block_size: int, tokenizer: AutoTokenizer):
"""
Prepare the Aya dataset for a specific subset and language.
"""
raw_dataset = load_dataset("CohereForAI/aya_evaluation_suite", subset)
filtered_dataset = raw_dataset.filter(lambda example: example["language"] == language)
dataset = filtered_dataset.map(
lambda examples: tokenize_function(examples, tokenizer),
batched=True,
remove_columns=filtered_dataset.column_names,
).map(lambda examples: group_texts(examples, block_size), batched=True, batch_size=1)
dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])
return dataset
def tokenize_aya_function(examples, tokenizer: AutoTokenizer):
"""
Tokenize Aya dataset examples.
"""
return tokenizer(examples["inputs"])
def tokenize_function(examples, tokenizer):
if "text" in examples:
return tokenizer(examples["text"])
elif "inputs" in examples:
return tokenizer(examples["inputs"])
else:
raise ValueError("Neither 'text' nor 'inputs' found in examples")
def group_texts(examples, block_size):
concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()}
total_length = len(concatenated_examples["input_ids"])
total_length = (total_length // block_size) * block_size
# Split by chunks of max_len.
result = {
k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
for k, t in concatenated_examples.items()
}
result["labels"] = result["input_ids"].copy()
return result