import argparse import ast import asyncio import json import re import time from concurrent.futures import ThreadPoolExecutor import numpy as np from tqdm import tqdm from sglang.test.test_utils import add_common_other_args_and_parse, get_call_generate from sglang.utils import download_and_cache_file, dump_state_text, read_jsonl INVALID = -9999999 def get_one_example(lines, i, include_answer): ret = "Question: " + lines[i]["question"] + "\nAnswer:" if include_answer: ret += " " + lines[i]["answer"] return ret def get_few_shot_examples(lines, k): ret = "" for i in range(k): ret += get_one_example(lines, i, True) + "\n\n" return ret def get_answer_value(answer_str): answer_str = answer_str.replace(",", "") numbers = re.findall(r"\d+", answer_str) if len(numbers) < 1: return INVALID try: return ast.literal_eval(numbers[-1]) except SyntaxError: return INVALID def main(args): # Select backend call_generate = get_call_generate(args) # Read data url = "https://raw.githubusercontent.com/openai/grade-school-math/master/grade_school_math/data/test.jsonl" filename = download_and_cache_file(url) lines = list(read_jsonl(filename)) # Construct prompts num_questions = args.num_questions num_shots = args.num_shots few_shot_examples = get_few_shot_examples(lines, num_shots) questions = [] labels = [] for i in range(len(lines[:num_questions])): questions.append(get_one_example(lines, i, False)) labels.append(get_answer_value(lines[i]["answer"])) assert all(l != INVALID for l in labels) states = [None] * len(labels) # Run requests if args.backend != "lmql": # Use thread pool def get_one_answer(i): answer = call_generate( prompt=few_shot_examples + questions[i], temperature=0, max_tokens=256, stop=["Question", "Assistant:", "<|separator|>"], ) states[i] = answer tic = time.time() if args.parallel == 1: for i in tqdm(range(len(questions))): get_one_answer(i) else: with ThreadPoolExecutor(args.parallel) as executor: list( tqdm( executor.map(get_one_answer, list(range(len(questions)))), total=len(questions), ) ) else: # Use asyncio async def batched_call(batch_size): for i in range(0, len(questions), batch_size): tasks = [] for q in questions[i : i + batch_size]: tasks.append( call_generate( few_shot_examples + q, temperature=0, max_tokens=256, stop="Question", ) ) rets = await asyncio.gather(*tasks) for j in range(len(rets)): states[i + j] = rets[j] tic = time.time() asyncio.run(batched_call(batch_size=args.parallel)) latency = time.time() - tic preds = [] for i in range(len(states)): preds.append(get_answer_value(states[i])) # Compute accuracy acc = np.mean(np.array(preds) == np.array(labels)) invalid = np.mean(np.array(preds) == INVALID) # Print results print(f"Accuracy: {acc:.3f}") print(f"Invalid: {invalid:.3f}") print(f"Latency: {latency:.3f} s") # Dump results dump_state_text(f"tmp_output_{args.backend}.txt", states) with open(args.result_file, "a") as fout: value = { "task": "gsm8k", "backend": args.backend, "num_gpus": 1, "latency": round(latency, 3), "accuracy": round(acc, 3), "num_requests": args.num_questions, "other": { "num_questions": args.num_questions, "parallel": args.parallel, }, } fout.write(json.dumps(value) + "\n") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--num-shots", type=int, default=5) parser.add_argument("--data-path", type=str, default="test.jsonl") parser.add_argument("--num-questions", type=int, default=200) args = add_common_other_args_and_parse(parser) main(args)