Spaces:
Runtime error
Runtime error
| import subprocess | |
| import gradio as gr | |
| import zipfile | |
| import os | |
| import shutil | |
| import pandas as pd | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from huggingface_hub import snapshot_download, Repository, HfFolder | |
| from src.about import ( | |
| CITATION_BUTTON_LABEL, | |
| CITATION_BUTTON_TEXT, | |
| EVALUATION_QUEUE_TEXT, | |
| INTRODUCTION_TEXT, | |
| LLM_BENCHMARKS_TEXT, | |
| TITLE, | |
| ) | |
| from src.display.css_html_js import custom_css | |
| from src.display.utils import ( | |
| BENCHMARK_COLS, | |
| COLS, | |
| EVAL_COLS, | |
| EVAL_TYPES, | |
| NUMERIC_INTERVALS, | |
| TYPES, | |
| AutoEvalColumn, | |
| ModelType, | |
| fields, | |
| WeightType, | |
| Precision | |
| ) | |
| from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO, TOKEN, EVAL_REQUESTS_PATH_BACKEND, EVAL_RESULTS_PATH_BACKEND | |
| from src.populate import get_evaluation_queue_df, get_leaderboard_df | |
| from src.submission.submit import add_new_eval | |
| from src.submission.evaluate import calculate_metrics | |
| import json | |
| def handle_new_eval_submission(model_name, model_zip, model_link=None) -> str: | |
| try: | |
| # Input validation | |
| if not model_name: | |
| return "Please enter a model name." | |
| if not isinstance(model_name, str): | |
| return "Model name must be a string." | |
| if len(model_name.split()) > 1: | |
| return "Model name should be a single word with hyphens." | |
| # Check if the model name is already in the leaderboard | |
| if model_name in leaderboard_df[AutoEvalColumn.model.name].values: | |
| return "Model name already exists in the leaderboard. Please choose a different name." | |
| if model_zip is None: | |
| return "Please provide a zip file." | |
| extraction_path = os.path.join(EVAL_RESULTS_PATH_BACKEND, model_name) | |
| if model_zip is not None: | |
| # Check if the zip file is actually a zip file | |
| if not zipfile.is_zipfile(model_zip): | |
| return "Please upload a valid zip file." | |
| # Create extraction path if it doesn't exist | |
| os.makedirs(extraction_path, exist_ok=True) | |
| # Extract the zip file | |
| try: | |
| with zipfile.ZipFile(model_zip, 'r') as zip_ref: | |
| zip_ref.extractall(extraction_path) | |
| except zipfile.BadZipFile: | |
| return "The uploaded file is not a valid zip file." | |
| except Exception as e: | |
| return f"An error occurred while extracting the zip file: {str(e)}" | |
| print("File unzipped successfully to:", extraction_path) | |
| # Evaluate the model's performance | |
| try: | |
| calculate_metrics(extraction_path, model_name) | |
| except Exception as e: | |
| return f"An error occurred while calculating metrics: {str(e)}" | |
| # Upload results to repo | |
| results_file_path = os.path.join(os.getcwd(), EVAL_RESULTS_PATH, '3d-pope', model_name, 'results.json') | |
| if not os.path.exists(results_file_path): | |
| return f"Results file not found at {results_file_path}" | |
| try: | |
| with open(results_file_path, 'r') as f: | |
| json.load(f) # Validate JSON structure | |
| except json.JSONDecodeError: | |
| return "The results file is not a valid JSON file." | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=results_file_path, | |
| path_in_repo=os.path.join('3d-pope', model_name, 'results.json'), | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| ) | |
| except Exception as e: | |
| return f"An error occurred while uploading results: {str(e)}" | |
| # Restart the space | |
| try: | |
| restart_space() | |
| except Exception as e: | |
| return f"An error occurred while restarting the space: {str(e)}" | |
| return "Submission received and results are being processed. Please check the leaderboard for updates." | |
| except Exception as e: | |
| return f"An unexpected error occurred: {str(e)}" | |
| def restart_space(): | |
| API.restart_space(repo_id=REPO_ID) | |
| try: | |
| print(EVAL_REQUESTS_PATH) | |
| snapshot_download( | |
| repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN | |
| ) | |
| except Exception: | |
| restart_space() | |
| try: | |
| print(EVAL_RESULTS_PATH) | |
| snapshot_download( | |
| repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN | |
| ) | |
| except Exception: | |
| restart_space() | |
| raw_data, original_df = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS) | |
| leaderboard_df = original_df.copy() | |
| def custom_format(x): | |
| if pd.isna(x): | |
| return x # Return as is if NaN | |
| try: | |
| float_x = float(x) | |
| if float_x.is_integer(): | |
| return f"{int(float_x)}" | |
| else: | |
| return f"{float_x:.2f}".rstrip('0').rstrip('.') | |
| except ValueError: | |
| return x # Return as is if conversion to float fails | |
| numeric_cols = [col for col in leaderboard_df.columns if leaderboard_df[col].dtype in ['float64', 'float32']] | |
| leaderboard_df[numeric_cols] = leaderboard_df[numeric_cols].applymap(custom_format) | |
| ( | |
| finished_eval_queue_df, | |
| running_eval_queue_df, | |
| pending_eval_queue_df, | |
| ) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS) | |
| # Searching and filtering | |
| def update_table( | |
| hidden_df: pd.DataFrame, | |
| columns: list, | |
| # type_query: list, | |
| # precision_query: str, | |
| # size_query: list, | |
| # show_deleted: bool, | |
| query: str, | |
| ): | |
| # filtered_df = filter_models(hidden_df, type_query, size_query, precision_query, show_deleted) | |
| filtered_df = filter_queries(query, hidden_df) | |
| df = select_columns(filtered_df, columns) | |
| return df | |
| def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame: | |
| return df[(df[AutoEvalColumn.model.name].str.contains(query, case=False))] | |
| def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame: | |
| always_here_cols = [ | |
| # AutoEvalColumn.model_type_symbol.name, | |
| AutoEvalColumn.model.name, | |
| ] | |
| # We use COLS to maintain sorting | |
| filtered_df = df[ | |
| always_here_cols + [c for c in COLS if c in df.columns and c in columns] | |
| ] | |
| return filtered_df | |
| def filter_queries(query: str, filtered_df: pd.DataFrame) -> pd.DataFrame: | |
| final_df = [] | |
| if query != "": | |
| queries = [q.strip() for q in query.split(";")] | |
| for _q in queries: | |
| _q = _q.strip() | |
| if _q != "": | |
| temp_filtered_df = search_table(filtered_df, _q) | |
| if len(temp_filtered_df) > 0: | |
| final_df.append(temp_filtered_df) | |
| if len(final_df) > 0: | |
| filtered_df = pd.concat(final_df) | |
| existing_columns = [col for col in [AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name] if col in filtered_df.columns] | |
| filtered_df = filtered_df.drop_duplicates(subset=existing_columns) | |
| return filtered_df | |
| def filter_models( | |
| df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, show_deleted: bool | |
| ) -> pd.DataFrame: | |
| # Show all models | |
| # if show_deleted: | |
| # filtered_df = df | |
| # else: # Show only still on the hub models | |
| # filtered_df = df[df[AutoEvalColumn.still_on_hub.name] == True] | |
| filtered_df = df | |
| type_emoji = [t[0] for t in type_query] | |
| # filtered_df = filtered_df.loc[df[AutoEvalColumn.model_type_symbol.name].isin(type_emoji)] | |
| # filtered_df = filtered_df.loc[df[AutoEvalColumn.precision.name].isin(precision_query + ["None"])] | |
| numeric_interval = pd.IntervalIndex(sorted([NUMERIC_INTERVALS[s] for s in size_query])) | |
| params_column = pd.to_numeric(df[AutoEvalColumn.params.name], errors="coerce") | |
| mask = params_column.apply(lambda x: any(numeric_interval.contains(x))) | |
| filtered_df = filtered_df.loc[mask] | |
| return filtered_df | |
| demo = gr.Blocks(css=custom_css) | |
| with demo: | |
| gr.HTML(TITLE) | |
| gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") | |
| with gr.Tabs(elem_classes="tab-buttons") as tabs: | |
| with gr.TabItem("π 3D-POPE Benchmark", elem_id="llm-benchmark-tab-table", id=0): | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Row(): | |
| search_bar = gr.Textbox( | |
| placeholder=" π Search for your model (separate multiple queries with `;`) and press ENTER...", | |
| show_label=False, | |
| elem_id="search-bar", | |
| ) | |
| with gr.Row(): | |
| shown_columns = gr.CheckboxGroup( | |
| choices=[ | |
| c.name | |
| for c in fields(AutoEvalColumn) | |
| if not c.hidden and not c.never_hidden | |
| ], | |
| value=[ | |
| c.name | |
| for c in fields(AutoEvalColumn) | |
| if c.displayed_by_default and not c.hidden and not c.never_hidden | |
| ], | |
| label="Select columns to show", | |
| elem_id="column-select", | |
| interactive=True, | |
| ) | |
| # with gr.Row(): | |
| # deleted_models_visibility = gr.Checkbox( | |
| # value=False, label="Show gated/private/deleted models", interactive=True | |
| # ) | |
| # with gr.Column(min_width=320): | |
| #with gr.Box(elem_id="box-filter"): | |
| leaderboard_table = gr.components.Dataframe( | |
| value=leaderboard_df[ | |
| [c.name for c in fields(AutoEvalColumn) if c.never_hidden] | |
| + shown_columns.value | |
| ], | |
| headers=[c.name for c in fields(AutoEvalColumn) if c.never_hidden] + shown_columns.value, | |
| datatype=TYPES, | |
| elem_id="leaderboard-table", | |
| interactive=False, | |
| visible=True, | |
| ) | |
| # Dummy leaderboard for handling the case when the user uses backspace key | |
| hidden_leaderboard_table_for_search = gr.components.Dataframe( | |
| value=original_df[COLS], | |
| headers=COLS, | |
| datatype=TYPES, | |
| visible=False, | |
| ) | |
| search_bar.submit( | |
| update_table, | |
| [ | |
| hidden_leaderboard_table_for_search, | |
| shown_columns, | |
| # deleted_models_visibility, | |
| search_bar, | |
| ], | |
| leaderboard_table, | |
| ) | |
| for selector in [shown_columns]: | |
| selector.change( | |
| update_table, | |
| [ | |
| hidden_leaderboard_table_for_search, | |
| shown_columns, | |
| # deleted_models_visibility, | |
| search_bar, | |
| ], | |
| leaderboard_table, | |
| queue=True, | |
| ) | |
| with gr.TabItem("π About", elem_id="llm-benchmark-tab-table", id=2): | |
| gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="markdown-text") | |
| with gr.TabItem("π Submit here! ", elem_id="llm-benchmark-tab-table", id=3): | |
| with gr.Column(): | |
| with gr.Row(): | |
| gr.Markdown(EVALUATION_QUEUE_TEXT, elem_classes="markdown-text") | |
| with gr.Row(): | |
| gr.Markdown("# π Submit your results here!", elem_classes="markdown-text") | |
| with gr.Row(): | |
| model_name_textbox = gr.Textbox(label="Model name") | |
| model_zip_file = gr.File(label="Upload model prediction result ZIP file") | |
| # model_link_textbox = gr.Textbox(label="Link to model page") | |
| with gr.Row(): | |
| gr.Column() | |
| with gr.Column(scale=2): | |
| submit_button = gr.Button("Submit Model") | |
| submission_result = gr.Markdown() | |
| submit_button.click( | |
| handle_new_eval_submission, | |
| [model_name_textbox, model_zip_file], | |
| submission_result | |
| ) | |
| gr.Column() | |
| with gr.Row(): | |
| with gr.Accordion("π Citation", open=False): | |
| citation_button = gr.Textbox( | |
| value=CITATION_BUTTON_TEXT, | |
| label=CITATION_BUTTON_LABEL, | |
| lines=20, | |
| elem_id="citation-button", | |
| show_copy_button=True, | |
| ) | |
| scheduler = BackgroundScheduler() | |
| scheduler.add_job(restart_space, "interval", seconds=1800) | |
| scheduler.start() | |
| demo.queue(default_concurrency_limit=40).launch() |