Spaces:
Running
Running
import datetime | |
import os | |
import threading | |
import time | |
import io | |
import traceback | |
import json | |
from typing import Dict, Any, List, Optional | |
from urllib.parse import urlencode | |
from fastapi import Depends, FastAPI, Form, HTTPException, Request, UploadFile, Body, Query | |
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, StreamingResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from huggingface_hub import hf_hub_download | |
from huggingface_hub.utils import disable_progress_bars | |
from huggingface_hub.utils._errors import EntryNotFoundError | |
from loguru import logger | |
from pydantic import BaseModel, Field | |
from requests.exceptions import RequestException | |
from filelock import FileLock, Timeout as FileLockTimeout | |
from competitions import __version__, utils | |
from competitions.errors import AuthenticationError, PastDeadlineError, SubmissionError, SubmissionLimitError | |
from competitions.info import CompetitionInfo | |
from competitions.leaderboard import Leaderboard | |
from competitions.oauth import attach_oauth | |
from competitions.runner import JobRunner | |
from competitions.submissions import Submissions | |
from competitions.text import SUBMISSION_SELECTION_TEXT, SUBMISSION_TEXT | |
from competitions.utils import team_file_api, submission_api, leaderboard_api | |
from competitions.enums import SubmissionStatus | |
HF_TOKEN = os.environ.get("HF_TOKEN", None) | |
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
COMPETITION_ID = os.environ.get("COMPETITION_ID") | |
OUTPUT_PATH = os.environ.get("OUTPUT_PATH", "/tmp/model") | |
START_DATE = os.environ.get("START_DATE", "2000-12-31") | |
DISABLE_PUBLIC_LB = int(os.environ.get("DISABLE_PUBLIC_LB", 0)) | |
disable_progress_bars() | |
try: | |
REQUIREMENTS_FNAME = hf_hub_download( | |
repo_id=COMPETITION_ID, | |
filename="requirements.txt", | |
token=HF_TOKEN, | |
repo_type="dataset", | |
) | |
except EntryNotFoundError: | |
REQUIREMENTS_FNAME = None | |
if REQUIREMENTS_FNAME: | |
logger.info("Uninstalling and installing requirements") | |
utils.uninstall_requirements(REQUIREMENTS_FNAME) | |
utils.install_requirements(REQUIREMENTS_FNAME) | |
class LeaderboardRequest(BaseModel): | |
lb: str | |
class UpdateSelectedSubmissionsRequest(BaseModel): | |
submission_ids: str | |
class UpdateTeamNameRequest(BaseModel): | |
new_team_name: str | |
# "majar": { | |
# "type": "string", | |
# "title": "Majar", | |
# "minLength": 1, | |
# }, | |
# "degree": { | |
# "type": "string", | |
# "title": "Degree", | |
# "enum": ["Bachelor", "Master", "PhD"] | |
# }, | |
# "advising_professor": { | |
# "type": "string", | |
# "title": "Advising Professo", | |
# "minLength": 1, | |
# }, | |
# "grade": { | |
# "type": "string", | |
# "title": "Grade", | |
# "minLength": 1, | |
# }, | |
# "expected_graduation_year": { | |
# "type": "string", | |
# "title": "Expected Graduation Year", | |
# "minLength": 1, | |
# } | |
class TeamMember(BaseModel): | |
name: str = Field(..., max_length=50, description="Name of the team member") | |
email: str = Field(..., max_length=100, description="Email of the team member") | |
institution: str = Field(..., max_length=100, description="Institution of the team member") | |
is_student: bool = Field(..., description="Is the team member a student?") | |
majar: Optional[str] = Field(None, max_length=50, description="Major of the team member") | |
degree: Optional[str] = Field(None, description="Degree of the team member") | |
advising_professor: Optional[str] = Field(None, max_length=50, description="Advising professor of the team member") | |
grade: Optional[str] = Field(None, max_length=50, description="Grade of the team member") | |
expected_graduation_year: Optional[str] = Field(None, max_length=4, description="Expected graduation year of the team member") | |
class RegisterRequest(BaseModel): | |
team_name: str = Field(..., max_length=20, description="Name of the team") | |
team_members: List[TeamMember] = Field(min_length=1, max_length=100, description="List of team members") | |
def run_job_runner(): | |
job_runner = JobRunner( | |
competition_id=COMPETITION_ID, | |
token=HF_TOKEN, | |
output_path=OUTPUT_PATH, | |
) | |
job_runner.run() | |
def start_job_runner_thread(): | |
thread = threading.Thread(target=run_job_runner) | |
# thread.daemon = True | |
thread.start() | |
return thread | |
def watchdog(job_runner_thread): | |
while True: | |
if not job_runner_thread.is_alive(): | |
logger.warning("Job runner thread stopped. Restarting...") | |
job_runner_thread = start_job_runner_thread() | |
time.sleep(10) | |
job_runner_thread = start_job_runner_thread() | |
watchdog_thread = threading.Thread(target=watchdog, args=(job_runner_thread,)) | |
watchdog_thread.daemon = True | |
watchdog_thread.start() | |
app = FastAPI() | |
attach_oauth(app) | |
static_path = os.path.join(BASE_DIR, "static") | |
app.mount("/static", StaticFiles(directory=static_path), name="static") | |
templates_path = os.path.join(BASE_DIR, "templates") | |
templates = Jinja2Templates(directory=templates_path) | |
def read_form(request: Request): | |
""" | |
This function is used to render the HTML file | |
:param request: | |
:return: | |
""" | |
if HF_TOKEN is None: | |
return HTTPException(status_code=500, detail="HF_TOKEN is not set.") | |
competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) | |
context = { | |
"request": request, | |
"logo": competition_info.logo_url, | |
"competition_type": competition_info.competition_type, | |
"version": __version__, | |
"rules_available": competition_info.rules is not None, | |
} | |
return templates.TemplateResponse("index.html", context) | |
def use_oauth(request: Request, user_token: str = Depends(utils.user_authentication)): | |
result = { | |
"is_login": False, | |
"is_admin": False, | |
"is_registered": False, | |
} | |
comp_org = COMPETITION_ID.split("/")[0] | |
if user_token: | |
result["is_login"] = True | |
result["is_admin"] = utils.is_user_admin(user_token, comp_org) | |
team_info = utils.team_file_api.get_team_info(user_token) | |
result["is_registered"] = team_info is not None | |
result["is_white_team"] = team_info and team_info["id"] in team_file_api.get_team_white_list() | |
return {"response": result} | |
def user_logout(request: Request): | |
"""Endpoint that logs out the user (e.g. delete cookie session).""" | |
if "oauth_info" in request.session: | |
request.session.pop("oauth_info", None) | |
competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) | |
context = { | |
"request": request, | |
"logo": competition_info.logo_url, | |
"competition_type": competition_info.competition_type, | |
"__version__": __version__, | |
"rules_available": competition_info.rules is not None, | |
} | |
return templates.TemplateResponse("index.html", context) | |
def get_comp_info(request: Request): | |
competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) | |
info = competition_info.competition_desc | |
resp = {"response": info} | |
return resp | |
def get_dataset_info(request: Request): | |
competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) | |
info = competition_info.dataset_desc | |
resp = {"response": info} | |
return resp | |
# @app.get("/rules", response_class=JSONResponse) | |
# def get_rules(request: Request): | |
# competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) | |
# if competition_info.rules is not None: | |
# return {"response": competition_info.rules} | |
# return {"response": "No rules available."} | |
def get_submission_info(request: Request, user_token: str = Depends(utils.user_authentication)): | |
if utils.team_file_api.get_team_info(user_token) is None: | |
return {"response": "Please register your team first."} | |
competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) | |
info = competition_info.submission_desc | |
resp = {"response": info} | |
return resp | |
def fetch_leaderboard(_: str = Depends(utils.user_authentication)): | |
df = leaderboard_api.get_leaderboard() | |
if len(df) == 0: | |
return {"response": "No teams yet. Why not make a submission?"} | |
resp = {"response": df.to_markdown(index=False)} | |
return resp | |
def my_submissions(request: Request, user_token: str = Depends(utils.user_authentication)): | |
competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) | |
if user_token is None: | |
return { | |
"response": { | |
"submissions": "", | |
"submission_text": SUBMISSION_TEXT.format(competition_info.submission_limit), | |
"error": "**Invalid token. Please login.**", | |
"team_name": "", | |
} | |
} | |
team_info = utils.team_file_api.get_team_info(user_token) | |
if team_info["id"] not in team_file_api.get_team_white_list(): | |
raise HTTPException(status_code=403, detail="You are not allowed to access this resource.") | |
sub = Submissions( | |
end_date=competition_info.end_date, | |
submission_limit=competition_info.submission_limit, | |
competition_id=COMPETITION_ID, | |
token=HF_TOKEN, | |
competition_type=competition_info.competition_type, | |
hardware=competition_info.hardware, | |
) | |
try: | |
subs = sub.my_submissions(user_token) | |
except AuthenticationError: | |
return { | |
"response": { | |
"submissions": "", | |
"submission_text": SUBMISSION_TEXT.format(competition_info.submission_limit), | |
"error": "**Invalid token. Please login.**", | |
"team_name": "", | |
} | |
} | |
subs = subs.to_dict(orient="records") | |
error = "" | |
if len(subs) == 0: | |
error = "**You have not made any submissions yet.**" | |
subs = "" | |
submission_text = SUBMISSION_TEXT.format(competition_info.submission_limit) | |
submission_selection_text = SUBMISSION_SELECTION_TEXT.format(competition_info.selection_limit) | |
team_name = team_info["name"] | |
for sub in subs: | |
sub.pop("final_score", "") | |
resp = { | |
"response": { | |
"submissions": subs, | |
"submission_text": submission_text + submission_selection_text, | |
"error": error, | |
"team_name": team_name, | |
} | |
} | |
return resp | |
def new_submission( | |
hub_model: str = Form(...), | |
submission_comment: str = Form(None), | |
user_token: str = Depends(utils.user_authentication), | |
): | |
if submission_comment is None: | |
submission_comment = "" | |
if user_token is None: | |
return {"response": "Invalid token. Please login."} | |
todays_date = datetime.datetime.now() | |
start_date = datetime.datetime.strptime(START_DATE, "%Y-%m-%d") | |
if todays_date < start_date: | |
comp_org = COMPETITION_ID.split("/")[0] | |
if not utils.is_user_admin(user_token, comp_org): | |
return {"response": "Competition has not started yet!"} | |
team_id = team_file_api.get_team_info(user_token)["id"] | |
if team_id not in team_file_api.get_team_white_list(): | |
return {"response": "You are not allowed to make submissions."} | |
team_submission_limit_dict = team_file_api.get_team_submission_limit() | |
lock = FileLock(f"./submission_lock/{team_id}.lock", blocking=False) | |
try: | |
with lock: | |
if submission_api.exists_submission_info(team_id) and submission_api.count_by_status(team_id, [SubmissionStatus.QUEUED, SubmissionStatus.PENDING, SubmissionStatus.PROCESSING]) > 0: | |
return {"response": "Another submission is being processed. Please wait a moment."} | |
competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) | |
if team_id in team_submission_limit_dict.keys(): | |
submission_limit = team_submission_limit_dict[team_id] | |
else: | |
submission_limit = competition_info.submission_limit | |
sub = Submissions( | |
end_date=competition_info.end_date, | |
submission_limit=submission_limit, | |
competition_id=COMPETITION_ID, | |
token=HF_TOKEN, | |
competition_type=competition_info.competition_type, | |
hardware=competition_info.hardware, | |
) | |
if competition_info.competition_type == "script": | |
resp = sub.new_submission(user_token, hub_model, submission_comment) | |
return {"response": f"Success! You have {resp} submissions remaining today."} | |
return {"response": "Invalid competition type"} | |
except RequestException: | |
logger.error("Hugging Face Hub is unreachable, please try again later:", traceback.format_exc()) | |
return {"response": "Hugging Face Hub is unreachable, please try again later"} | |
except AuthenticationError: | |
return {"response": "Invalid token"} | |
except PastDeadlineError: | |
return {"response": "Competition has ended"} | |
except SubmissionError: | |
return {"response": "Invalid submission file"} | |
except SubmissionLimitError: | |
return {"response": "Submission limit reached"} | |
except FileLockTimeout: | |
return {"response": "Another submission is being processed. Please wait a moment."} | |
def update_selected_submissions( | |
request: Request, body: UpdateSelectedSubmissionsRequest, user_token: str = Depends(utils.user_authentication) | |
): | |
submission_ids = body.submission_ids | |
if user_token is None: | |
return {"success": False, "error": "Invalid token, please login."} | |
competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) | |
sub = Submissions( | |
end_date=competition_info.end_date, | |
submission_limit=competition_info.submission_limit, | |
competition_id=COMPETITION_ID, | |
token=HF_TOKEN, | |
competition_type=competition_info.competition_type, | |
hardware=competition_info.hardware, | |
) | |
submission_ids = submission_ids.split(",") | |
submission_ids = [s.strip() for s in submission_ids] | |
if len(submission_ids) > competition_info.selection_limit: | |
return { | |
"success": False, | |
"error": f"Please select at most {competition_info.selection_limit} submissions.", | |
} | |
sub.update_selected_submissions(user_token=user_token, selected_submission_ids=submission_ids) | |
return {"success": True, "error": ""} | |
def update_team_name( | |
request: Request, body: UpdateTeamNameRequest, user_token: str = Depends(utils.user_authentication) | |
): | |
new_team_name = body.new_team_name | |
if user_token is None: | |
return {"success": False, "error": "Invalid token. Please login."} | |
if str(new_team_name).strip() == "": | |
return {"success": False, "error": "Team name cannot be empty."} | |
try: | |
utils.team_file_api.update_team_name(user_token, new_team_name) | |
return {"success": True, "error": ""} | |
except Exception as e: | |
return {"success": False, "error": str(e)} | |
def admin_comp_info(request: Request, user_token: str = Depends(utils.user_authentication)): | |
comp_org = COMPETITION_ID.split("/")[0] | |
user_is_admin = utils.is_user_admin(user_token, comp_org) | |
if not user_is_admin: | |
return {"response": "You are not an admin."}, 403 | |
competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) | |
markdowns = { | |
"competition_desc": competition_info.competition_desc, | |
"rules": competition_info.rules, | |
"submission_desc": competition_info.submission_desc, | |
"dataset_desc": competition_info.dataset_desc, | |
} | |
if markdowns["rules"] is None: | |
markdowns["rules"] = "No rules available." | |
config = { | |
"SUBMISSION_LIMIT": competition_info.submission_limit, | |
"SELECTION_LIMIT": competition_info.selection_limit, | |
"END_DATE": competition_info.end_date.strftime("%Y-%m-%d"), | |
"EVAL_HIGHER_IS_BETTER": competition_info.eval_higher_is_better, | |
"SUBMISSION_COLUMNS": competition_info.submission_columns_raw, | |
"SUBMISSION_ID_COLUMN": competition_info.submission_id_col, | |
"LOGO": competition_info.logo_url, | |
"COMPETITION_TYPE": competition_info.competition_type, | |
"EVAL_METRIC": competition_info.metric, | |
"SUBMISSION_ROWS": competition_info.submission_rows, | |
"TIME_LIMIT": competition_info.time_limit, | |
"DATASET": competition_info.dataset, | |
"SUBMISSION_FILENAMES": competition_info.submission_filenames, | |
"SCORING_METRIC": competition_info.scoring_metric, | |
"HARDWARE": competition_info.hardware, | |
} | |
return {"response": {"config": config, "markdowns": markdowns}} | |
def update_comp_info(data: Dict[str, Any] = Body(...), user_token: str = Depends(utils.user_authentication)): | |
comp_org = COMPETITION_ID.split("/")[0] | |
user_is_admin = utils.is_user_admin(user_token, comp_org) | |
if not user_is_admin: | |
return {"response": "You are not an admin."}, 403 | |
competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) | |
config = data["config"] | |
markdowns = data["markdowns"] | |
valid_keys = [ | |
"SUBMISSION_LIMIT", | |
"SELECTION_LIMIT", | |
"END_DATE", | |
"EVAL_HIGHER_IS_BETTER", | |
"SUBMISSION_COLUMNS", | |
"SUBMISSION_ID_COLUMN", | |
"LOGO", | |
"COMPETITION_TYPE", | |
"EVAL_METRIC", | |
"SUBMISSION_ROWS", | |
"TIME_LIMIT", | |
"DATASET", | |
"SUBMISSION_FILENAMES", | |
"SCORING_METRIC", | |
"HARDWARE", | |
] | |
for key in config: | |
if key not in valid_keys: | |
return {"success": False, "error": f"Invalid key: {key}"} | |
try: | |
competition_info.update_competition_info(config, markdowns, HF_TOKEN) | |
except Exception as e: | |
logger.error(e) | |
return {"success": False}, 500 | |
return {"success": True} | |
def register_example_file(): | |
file_path = os.path.join(BASE_DIR, "static", "Competition Participants Information Collection Template.xlsx") | |
return FileResponse(file_path, media_type="application/octet-stream", filename="Competition Participants Information Collection Template.xlsx") | |
def register( | |
request: RegisterRequest, | |
user_token: str = Depends(utils.user_authentication) | |
): | |
if user_token is None: | |
return {"success": False, "response": "Please login."} | |
team_info = utils.team_file_api.get_team_info(user_token) | |
if team_info is not None: | |
return {"success": False, "response": "You have already registered your team."} | |
utils.team_file_api.create_team( | |
user_token, | |
request.team_name, | |
request.model_dump() | |
) | |
return {"success": True, "response": "Team created successfully."} | |
def register_page(request: Request): | |
""" | |
This function is used to render the registration HTML page. | |
""" | |
if HF_TOKEN is None: | |
return HTTPException(status_code=500, detail="HF_TOKEN is not set.") | |
context = { | |
"request": request, | |
"version": __version__, | |
} | |
return templates.TemplateResponse("register_page.html", context) | |
def update_team_info( | |
request: RegisterRequest, | |
user_token: str = Depends(utils.user_authentication) | |
): | |
if user_token is None: | |
return {"success": False, "response": "Please login."} | |
utils.team_file_api.update_team( | |
user_token, | |
request.team_name, | |
request.model_dump() | |
) | |
return {"success": True, "response": "Update team info success."} | |
def update_team_info_page(request: Request, user_token: str = Depends(utils.user_authentication)): | |
""" | |
This function is used to render the update team info HTML page. | |
""" | |
if user_token is None: | |
return HTTPException(status_code=403, detail="Please login to access this page.") | |
team_info = utils.team_file_api.get_team_info(user_token) | |
if team_info is None: | |
return HTTPException(status_code=403, detail="You have not registered your team yet.") | |
print(team_info) | |
context = { | |
"request": request, | |
"team_info_json_string": json.dumps(team_info), | |
"version": __version__, | |
} | |
return templates.TemplateResponse("update_team_info_page.html", context) | |