File size: 5,721 Bytes
939c209
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4846310
 
939c209
7de9b42
052d8b5
a342cfc
cbcb88a
939c209
cbcb88a
7de9b42
 
 
 
 
4846310
7de9b42
052d8b5
 
 
 
7de9b42
 
052d8b5
669e136
7de9b42
 
 
052d8b5
7de9b42
939c209
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import argparse
import os
from datasets import load_dataset, Dataset
from huggingface_hub import HfApi

TOKEN = os.environ.get("DEBUG")
api = HfApi(token=TOKEN)

REQUESTS_DSET = "AIEnergyScore/requests_debug"
RESULTS_DSET = "AIEnergyScore/results_debug"
PENDING = 'PENDING'
COMPLETED = 'COMPLETED'
FAILED = 'FAILED'

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--run_dir",
        default="/runs",
        type=str,
        required=False,
        help="Path to the run directory.",
    )
    parser.add_argument(
        "--attempts",
        default="/attempts.txt",
        type=str,
        required=False,
        help="File with per-line run attempt directories. Assumes format '/runs/{task}/{model}/{timestamp}'",
    )
    parser.add_argument(
        "--failed_attempts",
        default="/failed_attempts.txt",
        type=str,
        required=False,
        help="File with per-line failed run directories. Assumes format '/runs/{task}/{model}/{timestamp}'",
    )
    args = parser.parse_args()
    return args

def check_for_traceback(run_dir):
    # run_dir="./runs/${experiment_name}/${backend_model}/${now}"
    found_error = False
    error_message = ""
    try:
        # Read error message
        with open(f"{run_dir}/error.log", 'r') as f:
            # There may be a better way to do this that finds the
            # index of Traceback, then prints from there : end-of-file index (the file length-1).
            for line in f:
                # Question: Do we even need to check for this? The presence of the
                # error file, or at least a non-empty one,
                # means there's been an error, no?
                if 'Traceback (most recent call last):' in line:
                    found_error = True
                if found_error:
                    error_message += line
    except FileNotFoundError as e:
        # When does this happen?
        print(f"Could not find {run_dir}/error.log")
    return error_message

def update_requests(requests, all_attempts, failed_attempts):
    """
     Sets All PENDING requests with the given model & task to 'COMPLETED' or 'FAILED.'
     Reads in the all_attempts text file and failed_attempts text file, in which
      each line is a run directory run_dir="/runs/${experiment_name}/${backend_model}/${now}"

    :param requests: requests Dataset
    :param all_attempts: text file of the run directories of each task/model/timestamp
    :param failed_attempts: text file of the run directories of each task/model/timestamp
    :return:
    """
    requests_df = requests.to_pandas()
    # Each line is a run directory, where
    # run_dir="/runs/${experiment_name}/${backend_model}/${now}", where
    # ${backend_model} is ${organization}/${model_name}
    for line in all_attempts:
        line = line.strip()
        print(f"Checking {line}")
        split_run_dir = line.strip().strip("/").split("/")
        print(f"Processing run directory {split_run_dir}")
        task = split_run_dir[1]
        print(f"Task is {task}")
        # The naming of the optimum benchmark configs uses an underscore.
        # The naming of the HF Api list models function uses a hyphen.
        # We therefore need to adapt this task string name depending on
        # which part of our pipeline we're talking to.
        hyphenated_task_name = "-".join(task.split("_"))
        model = "/".join([split_run_dir[2], split_run_dir[3]])
        print(f"Model is {model}")
        traceback_error = check_for_traceback(line)
        if traceback_error != "":
            print("Found a traceback error!")
            print(traceback_error)
            requests_df.loc[(requests_df["status"] == PENDING) & (requests_df["model"] == model) & (requests_df["task"] == hyphenated_task_name), ['status']] = FAILED
            requests_df.loc[(requests_df["status"] == PENDING) & (requests_df["model"] == model) & (requests_df["task"] == hyphenated_task_name), ['error_message']] = traceback_error
        elif line in failed_attempts:
            print(f"Job failed, but not sure why -- didn't find a traceback in {line}.")
            print(f"Setting {model}, {hyphenated_task_name}, status {PENDING} to {FAILED}.")
            print(requests_df[(requests_df["status"] == PENDING) & (requests_df["model"] == model) & (requests_df["task"] == hyphenated_task_name)])
            requests_df.loc[(requests_df["status"] == PENDING) & (requests_df["model"] == model) & (requests_df["task"] == hyphenated_task_name), ['status']] = FAILED
        else:
            requests_df.loc[(requests_df["status"] == PENDING) & (requests_df["model"] == model) & (requests_df["task"] == hyphenated_task_name), ['status']] = COMPLETED
    updated_dset = Dataset.from_pandas(requests_df)
    return updated_dset

if __name__ == '__main__':
    args = parse_args()
    # Uploads all run output to the results dataset.
    print(f"Uploading {args.run_dir} to {RESULTS_DSET}")
    api.upload_folder(
        folder_path=args.run_dir,
        repo_id=f"{RESULTS_DSET}",
        repo_type="dataset",
    )
    # Update requests dataset based on whether things have failed or not.
    print(f"Examining the run directory for each model & task to determine if it {FAILED} or {COMPLETED}.")
    requests = load_dataset(f"{REQUESTS_DSET}", split="test", token=TOKEN)
    all_attempts = open(f"{args.attempts}", "r+").readlines()
    failed_attempts = open(f"{args.failed_attempts}", "r+").readlines()
    updated_requests = update_requests(requests, all_attempts, failed_attempts)
    print(f"Uploading updated {REQUESTS_DSET}.")
    updated_requests.push_to_hub(f"{REQUESTS_DSET}", split="test", token=TOKEN)
    print("Done.")