import json from datetime import datetime from pathlib import Path from huggingface_hub import snapshot_download import tqdm.auto as tqdm from typing import Any, Dict, List, Tuple from collections import defaultdict from metric import _metric import os import pandas as pd os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" os.environ["HF_HUB_DOWNLOAD_TIMEOUT"] = "20" COMP_CACHE = os.environ.get("COMP_CACHE", "./competition_cache") def download_competition_data(competition_names: List[str]) -> None: """Download copies to local environment""" for repo_id in tqdm.tqdm(competition_names): snapshot_download( repo_id=repo_id, local_dir=os.path.join(COMP_CACHE, repo_id), repo_type="dataset", token=os.environ.get("HF_TOKEN"), ) STATUS_MAP = {0: "PENDING", 1: "QUEUED", 2: "PROCESSING", 3: "SUCCESS", 4: "FAILED"} ## Make a directory to store computed results os.makedirs(Path("competition_cache") / "cached_results", exist_ok=True) def load_teams(competition_space_path: Path) -> pd.DataFrame: team_file_name = "teams.json" return pd.read_json(Path(competition_space_path) / team_file_name).T def json_to_dataframe(data, extra_column_name=None, extra_column_value=None): flat_data = [] for entry in data: original_flat_entry = {**entry} flat_entry = {k: v for k, v in original_flat_entry.items() if not "score" in k} times = { k.replace("score", "time"): v.get("total_time", -1) for k, v in original_flat_entry.items() if "score" in k } flat_entry.update(times) if extra_column_name: flat_entry[extra_column_name] = extra_column_value flat_data.append(flat_entry) df = pd.DataFrame(flat_data) return df def load_submission_map(competition_space_path: Path) -> Tuple[Dict[str, str], pd.DataFrame]: submission_info_dir = "submission_info" submission_info_files = list((Path(competition_space_path) / submission_info_dir).glob("*.json")) # Loop and collect submission IDs by team team_submissions: Dict[str, str] = {} submission_summaries: List[pd.DataFrame] = [] for file in submission_info_files: with open(file, "r") as fn: json_data = json.load(fn) submission_summaries.append( json_to_dataframe( data=json_data["submissions"], extra_column_name="team_id", extra_column_value=json_data["id"] ) ) submission_list = pd.read_json(file).submissions.values.tolist() for submission in submission_list: team_submissions[submission["submission_id"]] = submission["submitted_by"] submission_summary = pd.concat(submission_summaries, axis=0) submission_summary["status_reason"] = submission_summary["status"].apply(lambda x: STATUS_MAP[x]) return team_submissions, submission_summary def get_member_to_team_map(teams: pd.DataFrame, team_submissions: Dict[str, str]) -> Dict[str, str]: member_map: Dict[str, str] = {} for member_id in team_submissions.values(): member_map[member_id] = teams[teams.members.apply(lambda x: member_id in x)].id.values[0] return member_map def load_submissions(competition_space_path: Path) -> Dict[str, Dict[str, pd.DataFrame]]: submission_dir = "submissions" submissions: Dict[str, Dict[str, pd.DataFrame]] = defaultdict(dict) for file in list((Path(competition_space_path) / submission_dir).glob("*.csv")): file_name = str(file).split("/")[-1].split(".")[0] team_id = "-".join(file_name.split("/")[-1].split("-")[:5]) sub_id = "-".join(file_name.split("/")[-1].split("-")[5:]) submissions[team_id][sub_id] = pd.read_csv(file).set_index("id") return submissions def compute_metric_per_team(solution_df: pd.DataFrame, team_submissions: Dict[str, pd.DataFrame]) -> Dict[str, Any]: results: Dict[str, Any] = {} for submission_id, submission in team_submissions.items(): try: results[submission_id] = _metric( solution_df=solution_df, submission_df=submission, mode="detailed", full=True ) except Exception as e: # raise e print("SKIPPING: ", submission_id, e) return results def prep_public(public_results: Dict[str, Any]) -> Dict[str, Any]: new: Dict[str, Any] = {} for key, value in public_results.items(): if key in ["proportion", "roc", "original_source"]: continue new[key] = value return new def prep_private(private_results: Dict[str, Any]) -> Dict[str, Any]: new: Dict[str, Any] = {} for key, value in private_results.items(): if key in ["proportion", "roc", "anon_source"]: continue new[key] = value return new def extract_roc(results: Dict[str, Any]) -> Dict[str, Any]: new: Dict[str, Any] = {} for key, value in results.items(): if key in ["roc"]: for sub_key, sub_value in value.items(): new[sub_key] = sub_value continue if key in ["auc"]: new[key] = value return new if __name__ == "__main__": ## Download data spaces: List[str] = ["safe-challenge/video-challenge-pilot-config", "safe-challenge/video-challenge-task-1-config"] download_competition_data(competition_names=spaces) ## Loop for space in spaces: local_dir = Path("competition_cache") / space ## Load relevant data teams = load_teams(competition_space_path=local_dir) team_submissions, submission_summaries = load_submission_map(competition_space_path=local_dir) member_map = get_member_to_team_map(teams=teams, team_submissions=team_submissions) submissions = load_submissions(competition_space_path=local_dir) ## Load solutions solutions_df = pd.read_csv(local_dir / "solution.csv").set_index("id") ## Loop and save by team public, private, rocs = [], [], [] for team_id, submission_set in submissions.items(): results = compute_metric_per_team(solution_df=solutions_df, team_submissions=submission_set) public_results = { key: prep_public(value["public_score"]) for key, value in results.items() if key in team_submissions } private_results = { key: prep_private(value["private_score"]) for key, value in results.items() if key in team_submissions } ## Add timing public_times = { x["submission_id"]: x["public_time"] for x in submission_summaries[submission_summaries["submission_id"].isin(results.keys())][ ["submission_id", "public_time"] ].to_dict(orient="records") } private_times = { x["submission_id"]: x["private_time"] for x in submission_summaries[submission_summaries["submission_id"].isin(results.keys())][ ["submission_id", "private_time"] ].to_dict(orient="records") } for key in public_results.keys(): public_results[key]["total_time"] = public_times[key] for key in private_results.keys(): private_results[key]["total_time"] = private_times[key] ## Roc computations roc_results = { key: extract_roc(value["private_score"]) for key, value in results.items() if key in team_submissions } roc_df = pd.json_normalize(roc_results.values()) if len(roc_df) != 0: roc_df.insert(loc=0, column="submission_id", value=roc_results.keys()) roc_df.insert( loc=0, column="team", value=[ teams[teams.id == member_map[team_submissions[submission_id]]].name.values[0] for submission_id in roc_results.keys() ], ) roc_df.insert( loc=0, column="submission_repo", value=[ submission_summaries[ submission_summaries.team_id == member_map[team_submissions[submission_id]] ].submission_repo.values[0] for submission_id in roc_results.keys() ], ) roc_df.insert( loc=0, column="datetime", value=[ submission_summaries[ submission_summaries.team_id == member_map[team_submissions[submission_id]] ].datetime.values[0] for submission_id in roc_results.keys() ], ) roc_df["label"] = roc_df.apply( lambda x: f"AUC: {round(x['auc'], 2)} - {x['team']} - {x['submission_repo']}", axis=1 ) rocs.append(roc_df) ## Append results to save in cache public_df = pd.json_normalize(public_results.values()) public_df.insert( loc=0, column="submission_id", value=list(public_results.keys()), ) public_df.insert( loc=0, column="team", value=[ teams[teams.id == member_map[team_submissions[submission_id]]].name.values[0] for submission_id in public_results.keys() ], ) public_df.insert( loc=0, column="team_id", value=[ teams[teams.id == member_map[team_submissions[submission_id]]].id.values[0] for submission_id in public_results.keys() ], ) public_df.insert( loc=0, column="datetime", value=[ submission_summaries[submission_summaries.submission_id == submission_id].datetime.values[0] for submission_id in public_results.keys() ], ) public.append(public_df) ## Private results private_df = pd.json_normalize(private_results.values()) private_df.insert( loc=0, column="submission_id", value=list(private_results.keys()), ) private_df.insert( loc=0, column="team", value=[ teams[teams.id == member_map[team_submissions[submission_id]]].name.values[0] for submission_id in private_results.keys() ], ) private_df.insert( loc=0, column="team_id", value=[ teams[teams.id == member_map[team_submissions[submission_id]]].id.values[0] for submission_id in private_results.keys() ], ) private_df.insert( loc=0, column="datetime", value=[ submission_summaries[submission_summaries.submission_id == submission_id].datetime.values[0] for submission_id in private_results.keys() ], ) private.append(private_df) ## Save as csvs public = pd.concat(public, axis=0).sort_values(by="balanced_accuracy", ascending=False) private = pd.concat(private, axis=0).sort_values(by="balanced_accuracy", ascending=False) rocs = pd.concat(rocs, axis=0).explode(["tpr", "fpr", "threshold"], ignore_index=True) public.to_csv( Path("competition_cache") / "cached_results" / f"{str(local_dir).split('/')[-1]}_public_score.csv", index=False, ) private.to_csv( Path("competition_cache") / "cached_results" / f"{str(local_dir).split('/')[-1]}_private_score.csv", index=False, ) rocs.to_csv( Path("competition_cache") / "cached_results" / f"{str(local_dir).split('/')[-1]}_rocs.csv", index=False ) submission_summaries.to_csv( Path("competition_cache") / "cached_results" / f"{str(local_dir).split('/')[-1]}_submissions.csv", index=False, ) ## Update time import datetime import pytz # Get the current time in EST est_timezone = pytz.timezone("US/Eastern") current_time_est = datetime.datetime.now(est_timezone) # Format the time as desired formatted_time = current_time_est.strftime("%Y-%m-%d %H:%M:%S %Z") formatted = f"Updated on {formatted_time}" with open("competition_cache/updated.txt", "w") as file: file.write(formatted)