File size: 10,525 Bytes
566f3c9
e284167
566f3c9
e284167
 
566f3c9
 
e284167
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
566f3c9
e284167
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
566f3c9
e284167
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c81374f
e284167
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
import importlib.util
import json
import math
from pathlib import Path
from typing import List

import gradio as gr
import pandas as pd
from pydantic import ValidationError, parse_obj_as

SIG_FIGS = 4

# HACK: very hacky way to import from parent directory, while avoiding needing all the deps of the parent package
modality_path = "../dgeb/modality.py"
spec = importlib.util.spec_from_file_location("modality", modality_path)
modality = importlib.util.module_from_spec(spec)
spec.loader.exec_module(modality)
Modality = modality.Modality


tasks_path = "../dgeb/tasks/tasks.py"

# Load the module
spec = importlib.util.spec_from_file_location("tasks", tasks_path)
tasks = importlib.util.module_from_spec(spec)
spec.loader.exec_module(tasks)
TaskResult = tasks.TaskResult
DGEBModel = tasks.DGEBModel


# Assuming the class definitions provided above are complete and imported here


def format_num_params(param: int) -> str:
    # if the number of parameters is greater than 1 billion, display billion
    million = 1_000_000
    # billion = 1_000_000_000
    # if param >= billion:
    #     num_billions = int(param / 1_000_000_000)
    #     return f"{num_billions:}B"
    if param >= million:
        num_millions = int(param / 1_000_000)
        return f"{num_millions:}M"
    else:
        return f"{param:,}"


def load_json_files_from_directory(directory_path: Path) -> List[dict]:
    """
    Recursively load all JSON files within the specified directory path.

    :param directory_path: Path to the directory to search for JSON files.
    :return: List of dictionaries loaded from JSON files.
    """
    json_files_content = []
    for json_file in directory_path.rglob("*.json"):  # Recursively find all JSON files
        try:
            with open(json_file, "r", encoding="utf-8") as file:
                json_content = json.load(file)
                json_files_content.append(json_content)
        except Exception as e:
            print(f"Error loading {json_file}: {e}")
    return json_files_content


def load_results() -> List[TaskResult]:
    """
    Recursively load JSON files in ./submissions/** and return a list of TaskResult objects.
    """
    submissions_path = Path("./submissions")
    json_contents = load_json_files_from_directory(submissions_path)

    task_results_objects = []
    for content in json_contents:
        try:
            task_result = parse_obj_as(
                TaskResult, content
            )  # Using Pydantic's parse_obj_as for creating TaskResult objects
            task_results_objects.append(task_result)
        except ValidationError as e:
            print(f"Error parsing TaskResult object: {e}")
            raise e

    return task_results_objects


def task_results_to_dgeb_score(
    model: DGEBModel, model_results: List[TaskResult]
) -> dict:
    best_scores_per_task = []
    modalities_seen = set()
    for task_result in model_results:
        modalities_seen.add(task_result.task.modality)
        assert (
            task_result.model.hf_name == model.hf_name
        ), f"Model names do not match, {task_result.model.hf_name} != {model.hf_name}"
        primary_metric_id = task_result.task.primary_metric_id
        scores = []
        # Get the primary score for each layer.
        for result in task_result.results:
            for metric in result.metrics:
                if metric.id == primary_metric_id:
                    scores.append(metric.value)
        best_score = max(scores)
        best_scores_per_task.append(best_score)

    assert (
        len(modalities_seen) == 1
    ), f"Multiple modalities found for model {model.hf_name}"
    # Calculate the average of the best scores for each task.
    assert len(best_scores_per_task) > 0, f"No tasks found for model {model.hf_name}"
    dgeb_score = sum(best_scores_per_task) / len(best_scores_per_task)
    return {
        "Task Name": "DGEB Score",
        "Task Category": "DGEB",
        "Model": model.hf_name,
        "Modality": list(modalities_seen)[0],
        "Num. Parameters (millions)": format_num_params(model.num_params),
        "Emb. Dimension": model.embed_dim,
        "Score": dgeb_score,
    }


def task_results_to_df(model_results: List[TaskResult]) -> pd.DataFrame:
    # Initialize an empty list to hold all rows of data
    data_rows = []
    all_models = {}
    for res in model_results:
        task = res.task
        model = res.model
        all_models[model.hf_name] = model
        print(f"Processing {task.display_name} for {model.hf_name}")
        for layer in res.results:
            total_layers = model.num_layers - 1
            mid_layer = math.ceil(total_layers / 2)
            if mid_layer == layer.layer_number:
                layer.layer_display_name = "mid"
            elif total_layers == layer.layer_number:
                layer.layer_display_name = "last"

            if layer.layer_display_name not in ["mid", "last"]:
                # calculate if the layer is mid or last
                print(
                    f"Layer {layer.layer_number} is not mid or last out of {total_layers}. Skipping"
                )
                continue
            else:
                # For each Metric in the Layer
                # pivoting the data so that each metric is a row
                metric_ids = []
                primary_metric_label = f"{task.primary_metric_id} (primary metric)"
                for metric in layer.metrics:
                    if task.primary_metric_id == metric.id:
                        metric_ids.append(primary_metric_label)
                    else:
                        metric_ids.append(metric.id)

                metric_values = [metric.value for metric in layer.metrics]
                zipped = zip(metric_ids, metric_values)
                # sort primary metric id first
                sorted_zip = sorted(
                    zipped,
                    key=lambda x: x[0] != primary_metric_label,
                )
                data_rows.append(
                    {
                        "Task Name": task.display_name,
                        "Task Category": task.type,
                        "Model": model.hf_name,
                        "Num. Parameters (millions)": format_num_params(
                            model.num_params
                        ),
                        "Emb. Dimension": model.embed_dim,
                        "Modality": task.modality,
                        "Layer": layer.layer_display_name,
                        **dict(sorted_zip),
                    }
                )
    for model_name, model in all_models.items():
        results_for_model = [
            res for res in model_results if res.model.hf_name == model_name
        ]
        assert len(results_for_model) > 0, f"No results found for model {model_name}"
        dgeb_score_record = task_results_to_dgeb_score(model, results_for_model)
        print(f'model {model.hf_name} dgeb score: {dgeb_score_record["Score"]}')
        data_rows.append(dgeb_score_record)
    print("Finished processing all results")
    df = pd.DataFrame(data_rows)
    return df


df = task_results_to_df(load_results())
image_path = "./DGEB_Figure.png"
with gr.Blocks() as demo:
    gr.Label("Diverse Genomic Embedding Benchmark", show_label=False, scale=2)
    gr.HTML(
        f"<img src='file/{image_path}' alt='DGEB Figure' style='border-radius: 0.8rem; width: 50%; margin-left: auto; margin-right: auto; margin-top:12px;'>"
    )
    gr.HTML(
        """
<div style='width: 50%; margin-left: auto; margin-right: auto; padding-bottom: 8px;text-align: center;'>
DGEB Leaderboard. To submit, refer to the <a href="https://github.com/TattaBio/DGEB/blob/leaderboard/README.md" target="_blank" style="text-decoration: underline">DGEB GitHub repository</a> Refer to the <a href="https://www.tatta.bio/dgeb" target="_blank" style="text-decoration: underline">DGEB paper</a> for details on metrics, tasks, and models.
</div>
"""
    )

    unique_categories = df["Task Category"].unique()
    # sort "DGEB" to the start
    unique_categories = sorted(unique_categories, key=lambda x: x != "DGEB")
    for category in unique_categories:
        with gr.Tab(label=category):
            unique_tasks_in_category = df[df["Task Category"] == category][
                "Task Name"
            ].unique()
            # sort "Overall" to the start
            unique_tasks_in_category = sorted(
                unique_tasks_in_category, key=lambda x: x != "Overall"
            )
            for task in unique_tasks_in_category:
                with gr.Tab(label=task):
                    columns_to_hide = ["Task Name", "Task Category"]
                    # get rows where Task Name == task and Task Category == category
                    filtered_df = (
                        df[
                            (df["Task Name"] == task)
                            & (df["Task Category"] == category)
                        ].drop(columns=columns_to_hide)
                    ).dropna(axis=1, how="all")  # drop all NaN columns for Overall tab
                    # round all values to 4 decimal places
                    rounded_df = filtered_df.round(SIG_FIGS)

                    # calculate ranking column
                    # if in Overview tab, rank by average metric value
                    if task == "Overall":
                        # rank by average col
                        rounded_df["Rank"] = filtered_df["Average"].rank(
                            ascending=False
                        )
                    else:
                        avoid_cols = [
                            "Model",
                            "Emb. Dimension",
                            "Num. Parameters (millions)",
                            "Modality",
                            "Layer",
                        ]
                        rounded_df["Rank"] = (
                            rounded_df.drop(columns=avoid_cols, errors="ignore")
                            .sum(axis=1)
                            .rank(ascending=False)
                        )
                    # make Rank first column
                    cols = list(rounded_df.columns)
                    cols.insert(0, cols.pop(cols.index("Rank")))
                    rounded_df = rounded_df[cols]
                    # sort by rank
                    rounded_df = rounded_df.sort_values("Rank")
                    data_frame = gr.DataFrame(rounded_df)


demo.launch(allowed_paths=["."])