Spaces:
Paused
Paused
| import sys | |
| import os | |
| import pickle | |
| import json | |
| import threading | |
| import io | |
| import enum | |
| import hugsim_env | |
| import subprocess as sp | |
| import shutil | |
| import time | |
| from collections import deque, OrderedDict | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, Optional, List, Tuple | |
| from dataclasses import dataclass | |
| sys.path.append(os.getcwd()) | |
| from moviepy import ImageSequenceClip | |
| from fastapi import FastAPI, Body, Header, Depends, HTTPException, Query | |
| from fastapi.responses import HTMLResponse, Response | |
| from omegaconf import OmegaConf, DictConfig | |
| from huggingface_hub import HfApi | |
| import open3d as o3d | |
| import numpy as np | |
| import gymnasium | |
| import uvicorn | |
| import psutil | |
| import torch | |
| from glob import glob | |
| from sim.utils.sim_utils import traj2control, traj_transform_to_global | |
| from sim.utils.score_calculator import hugsim_evaluate | |
| ADMIN_TOKEN = os.getenv('ADMIN_TOKEN', None) | |
| HF_TOKEN = os.getenv('HF_TOKEN', None) | |
| COMPETITION_ID = os.getenv('COMPETITION_ID', None) | |
| hf_api = HfApi(token=HF_TOKEN) | |
| class SubmissionStatus(enum.Enum): | |
| PENDING = 0 | |
| QUEUED = 1 | |
| PROCESSING = 2 | |
| SUCCESS = 3 | |
| FAILED = 4 | |
| def to_video(observations: List[Any], output_path: str): | |
| frames = [] | |
| for obs in observations: | |
| row1 = np.concatenate([obs['CAM_FRONT_LEFT'], obs['CAM_FRONT'], obs['CAM_FRONT_RIGHT']], axis=1) | |
| row2 = np.concatenate([obs['CAM_BACK_RIGHT'], obs['CAM_BACK'], obs['CAM_BACK_LEFT']], axis=1) | |
| frame = np.concatenate([row1, row2], axis=0) | |
| frames.append(frame) | |
| clip = ImageSequenceClip(frames, fps=4) | |
| clip.write_videofile(output_path) | |
| def get_gpu_memory(): | |
| output_to_list = lambda x: x.decode('ascii').split('\n')[:-1] | |
| COMMAND = "nvidia-smi --query-gpu=memory.used --format=csv" | |
| try: | |
| memory_use_info = output_to_list(sp.check_output(COMMAND.split(),stderr=sp.STDOUT))[1:] | |
| except sp.CalledProcessError as e: | |
| raise RuntimeError("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output)) | |
| memory_use_values = [int(x.split()[0]) for x in memory_use_info] | |
| return memory_use_values | |
| def get_system_status(): | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| cpu_count = psutil.cpu_count(logical=True) | |
| virtual_mem = psutil.virtual_memory() | |
| total_mem = virtual_mem.total / (1024 ** 3) | |
| used_mem = virtual_mem.used / (1024 ** 3) | |
| mem_percent = virtual_mem.percent | |
| system_info = { | |
| "cpu_percent": cpu_percent, | |
| "cpu_count": cpu_count, | |
| "total_memory_gb": round(total_mem, 2), | |
| "used_memory_gb": round(used_mem, 2), | |
| "memory_percent": mem_percent, | |
| "gpus": get_gpu_memory(), | |
| } | |
| return system_info | |
| def get_token_info(token: str) -> Dict[str, Any]: | |
| token_info_path = hf_api.hf_hub_download( | |
| repo_id=COMPETITION_ID, | |
| filename=f"token_data_info/{token}.json", | |
| repo_type="dataset", | |
| ) | |
| with open(token_info_path, 'r') as f: | |
| token_info = json.load(f) | |
| return token_info | |
| def download_submission_info(team_id: str) -> Dict[str, Any]: | |
| """ | |
| Download the submission info from Hugging Face Hub. | |
| Args: | |
| team_id (str): The team ID. | |
| Returns: | |
| Dict[str, Any]: The submission info. | |
| """ | |
| submission_info_path = hf_api.hf_hub_download( | |
| repo_id=COMPETITION_ID, | |
| filename=f"submission_info/{team_id}.json", | |
| repo_type="dataset", | |
| ) | |
| with open(submission_info_path, 'r') as f: | |
| submission_info = json.load(f) | |
| return submission_info | |
| def upload_submission_info(team_id: str, user_submission_info: Dict[str, Any]): | |
| user_submission_info_json = json.dumps(user_submission_info, indent=4) | |
| user_submission_info_json_bytes = user_submission_info_json.encode("utf-8") | |
| user_submission_info_json_buffer = io.BytesIO(user_submission_info_json_bytes) | |
| hf_api.upload_file( | |
| path_or_fileobj=user_submission_info_json_buffer, | |
| path_in_repo=f"submission_info/{team_id}.json", | |
| repo_id=COMPETITION_ID, | |
| repo_type="dataset", | |
| ) | |
| def update_submission_data(team_id: str, submission_id: str, data: Dict[str, Any]): | |
| user_submission_info = download_submission_info(team_id) | |
| for submission in user_submission_info["submissions"]: | |
| if submission["submission_id"] == submission_id: | |
| submission.update(data) | |
| break | |
| upload_submission_info(team_id, user_submission_info) | |
| def delete_client_space(client_space_id: str): | |
| try: | |
| hf_api.delete_repo( | |
| repo_id=client_space_id, | |
| repo_type="space" | |
| ) | |
| except: | |
| print(f"Failed to delete space {client_space_id}. It may not exist or already deleted.") | |
| class FifoDict: | |
| def __init__(self, max_size: int): | |
| self.max_size = max_size | |
| self._order_dict = OrderedDict() | |
| self.locker = threading.Lock() | |
| def push(self, key: str, value: Any): | |
| with self.locker: | |
| if key in self._order_dict: | |
| self._order_dict.move_to_end(key) | |
| return | |
| if len(self._order_dict) >= self.max_size: | |
| self._order_dict.popitem(last=False) | |
| self._order_dict[key] = value | |
| def get(self, key: str) -> Any: | |
| return self._order_dict.get(key, None) | |
| class SceneConfig: | |
| name: str | |
| cfg: DictConfig | |
| class EnvExecuteResult: | |
| cur_scene_done: bool | |
| done: bool | |
| class EnvHandler: | |
| """A class to handle the environment for HUGSim. | |
| This can include multiple scene and configurations. | |
| """ | |
| def __init__(self, scene_list: List[SceneConfig], base_output: str): | |
| self._created_time = datetime.now(timezone.utc) | |
| self._last_active_time = datetime.now(timezone.utc) | |
| self._lock = threading.Lock() | |
| self.scene_list = scene_list | |
| self.base_output = base_output | |
| self.env = None | |
| self.reset_env() | |
| def _switch_scene(self, scene_index: int): | |
| """ | |
| Switch to a different scene based on the index. | |
| Args: | |
| scene_index (int): The index of the scene to switch to. | |
| """ | |
| if scene_index < 0 or scene_index >= len(self.scene_list): | |
| raise ValueError("Invalid scene index.") | |
| self.close() | |
| self.cur_scene_index = scene_index | |
| scene_config = self.scene_list[scene_index] | |
| self._log(f"Switch to scene: {scene_config.name}_{scene_config.cfg.scenario.mode}") | |
| print(f"Switch to scene: {scene_config.name}_{scene_config.cfg.scenario.mode}") | |
| self.cur_output = os.path.join(self.base_output, | |
| f"{scene_config.name}_{scene_config.cfg.scenario.mode}") | |
| os.makedirs(self.cur_output, exist_ok=True) | |
| self.env = gymnasium.make('hugsim_env/HUGSim-v0', cfg=scene_config.cfg, output=self.cur_output) | |
| self._scene_cnt = 0 | |
| self._scene_done = False | |
| self._save_data = {'type': 'closeloop', 'frames': []} | |
| self._observations_save = [] | |
| self._obs, self._info = self.env.reset() | |
| self._log(f"Switched to scene: {scene_config.name}") | |
| def close(self): | |
| """ | |
| Close the environment and release resources. | |
| """ | |
| if self.env is not None: | |
| del self.env | |
| self.env = None | |
| self._log("Environment closed.") | |
| def reset_env(self): | |
| """ | |
| Reset the environment and initialize variables. | |
| """ | |
| self._last_active_time = datetime.now(timezone.utc) | |
| self._log_list = deque(maxlen=100) | |
| self._done = False | |
| self._score_list = [] | |
| self._switch_scene(0) | |
| self._log("Environment reset complete.") | |
| def get_current_state(self): | |
| """ | |
| Get the current state of the environment. | |
| """ | |
| self._last_active_time = datetime.now(timezone.utc) | |
| return { | |
| "obs": self._obs, | |
| "info": self._info, | |
| } | |
| def created_time(self) -> datetime: | |
| """ | |
| Get the creation time of the environment handler. | |
| Returns: | |
| datetime: The creation time. | |
| """ | |
| return self._created_time | |
| def last_active_time(self) -> datetime: | |
| """ | |
| Get the last active time of the environment handler. | |
| Returns: | |
| datetime: The last active time. | |
| """ | |
| return self._last_active_time | |
| def has_done(self) -> bool: | |
| """ | |
| Check if the episode is done. | |
| Returns: | |
| bool: True if the episode is done, False otherwise. | |
| """ | |
| return self._done | |
| def has_scene_done(self) -> bool: | |
| """ | |
| Check if the current scene is done. | |
| Returns: | |
| bool: True if the current scene is done, False otherwise. | |
| """ | |
| return self._scene_done | |
| def log_list(self) -> deque: | |
| """ | |
| Get the log list. | |
| Returns: | |
| deque: The log list containing recent log messages. | |
| """ | |
| return self._log_list | |
| def execute_action(self, plan_traj: np.ndarray) -> EnvExecuteResult: | |
| """ | |
| Execute the action based on the planned trajectory. | |
| Args: | |
| plan_traj (Any): The planned trajectory to follow. | |
| Returns: | |
| bool: True if the episode is done, False otherwise. | |
| """ | |
| self._last_active_time = datetime.now(timezone.utc) | |
| acc, steer_rate = traj2control(plan_traj, self._info) | |
| action = {'acc': acc, 'steer_rate': steer_rate} | |
| self._log("Executing action:", action) | |
| self._obs, _, terminated, truncated, self._info = self.env.step(action) | |
| self._scene_cnt += 1 | |
| self._scene_done = terminated or truncated or self._scene_cnt > 400 | |
| imu_plan_traj = plan_traj[:, [1, 0]] | |
| imu_plan_traj[:, 1] *= -1 | |
| global_traj = traj_transform_to_global(imu_plan_traj, self._info['ego_box']) | |
| self._save_data['frames'].append({ | |
| 'time_stamp': self._info['timestamp'], | |
| 'is_key_frame': True, | |
| 'ego_box': self._info['ego_box'], | |
| 'obj_boxes': self._info['obj_boxes'], | |
| 'obj_names': ['car' for _ in self._info['obj_boxes']], | |
| 'planned_traj': { | |
| 'traj': global_traj, | |
| 'timestep': 0.5 | |
| }, | |
| 'collision': self._info['collision'], | |
| 'rc': self._info['rc'] | |
| }) | |
| self._observations_save.append(self._obs['rgb']) | |
| if not self._scene_done: | |
| return EnvExecuteResult(cur_scene_done=False, done=False) | |
| with open(os.path.join(self.cur_otuput, 'data.pkl'), 'wb') as wf: | |
| pickle.dump([self._save_data], wf) | |
| ground_xyz = np.asarray(o3d.io.read_point_cloud(os.path.join(self.cur_otuput, 'ground.ply')).points) | |
| scene_xyz = np.asarray(o3d.io.read_point_cloud(os.path.join(self.cur_otuput, 'scene.ply')).points) | |
| results = hugsim_evaluate([self._save_data], ground_xyz, scene_xyz) | |
| with open(os.path.join(self.cur_otuput, 'eval.json'), 'w') as f: | |
| json.dump(results, f) | |
| self._score_list.append(results.copy()) | |
| to_video(self._observations_save, os.path.join(self.cur_otuput, 'video.mp4')) | |
| self._log(f"Scene {self.cur_scene_index} completed. Evaluation results saved.") | |
| if self.cur_scene_index < len(self.scene_list) - 1: | |
| self._switch_scene(self.cur_scene_index + 1) | |
| return EnvExecuteResult(cur_scene_done=True, done=False) | |
| self._done = True | |
| return EnvExecuteResult(cur_scene_done=True, done=True) | |
| def _log(self, *messages): | |
| log_message = f"[{str(datetime.now())}]" + " ".join([str(msg) for msg in messages]) + "\n" | |
| with self._lock: | |
| self._log_list.append(log_message) | |
| def calculate_score(self) -> Dict[str, Any]: | |
| """ | |
| Calculate the score based on the current environment state. | |
| Returns: | |
| Dict[str, Any]: The score dictionary. | |
| """ | |
| if not self._done: | |
| raise ValueError("Environment is not done yet. Cannot calculate score.") | |
| rc = np.mean([float(score['rc']) for score in self._score_list]).round(4) | |
| hdscore = np.mean([float(score['hdscore']) for score in self._score_list]).round(4) | |
| return {"rc": rc, "hdscore": hdscore} | |
| class EnvHandlerManager: | |
| def __init__(self): | |
| self._env_handlers = {} | |
| self._token_info_map = {} | |
| self._lock = threading.Lock() | |
| threading.Thread(target=self._clean_expired_env_handlers, daemon=True).start() | |
| def _get_scene_list(self, base_output: str) -> List[SceneConfig]: | |
| """ | |
| Load the scene configurations from the YAML files. | |
| Returns: | |
| List[SceneConfig]: A list of scene configurations. | |
| """ | |
| scene_list = [] | |
| for data_type in ['kitti360', 'waymo', 'nuscenes', 'pandaset']: | |
| base_path = os.path.join(os.path.dirname(__file__), "web_server_config", f'{data_type}_base.yaml') | |
| camera_path = os.path.join(os.path.dirname(__file__), "web_server_config", f'{data_type}_camera.yaml') | |
| kinematic_path = os.path.join(os.path.dirname(__file__), "web_server_config", 'kinematic.yaml') | |
| base_config = OmegaConf.load(base_path) | |
| camera_config = OmegaConf.load(camera_path) | |
| kinematic_config = OmegaConf.load(kinematic_path) | |
| scenarios_list = glob(f"/app/app_datas/ss/scenarios/{data_type}/*.yaml") | |
| for scenario_path in scenarios_list: | |
| scenario_config = OmegaConf.load(scenario_path) | |
| cfg = OmegaConf.merge( | |
| {"scenario": scenario_config}, | |
| {"base": base_config}, | |
| {"camera": camera_config}, | |
| {"kinematic": kinematic_config} | |
| ) | |
| model_path = os.path.join(cfg.base.model_base, cfg.scenario.scene_name) | |
| model_config = OmegaConf.load(os.path.join(model_path, 'cfg.yaml')) | |
| model_config.update({"model_path": f"/app/app_datas/ss/scenes/{data_type}/{cfg.scenario.scene_name}"}) | |
| cfg.update(model_config) | |
| cfg.base.output_dir = base_output | |
| scene_list.append(SceneConfig(name=cfg.scenario.scene_name, cfg=cfg)) | |
| return scene_list | |
| def _generate_env_handler(self, env_id: str): | |
| base_output = "/app/app_datas/env_output" | |
| scene_list = self._get_scene_list(base_output) | |
| output = os.path.join(base_output, f"{env_id}_hugsim_env") | |
| os.makedirs(output, exist_ok=True) | |
| return EnvHandler(scene_list, base_output=output) | |
| def exists_env_handler(self, env_id: str) -> bool: | |
| """ | |
| Check if the environment handler for the given environment ID exists. | |
| Args: | |
| env_id (str): The environment ID. | |
| Returns: | |
| bool: True if the environment handler exists, False otherwise. | |
| """ | |
| with self._lock: | |
| return env_id in self._env_handlers | |
| def get_env_handler(self, env_id: str, token_info: Dict[str, Any]) -> EnvHandler: | |
| """ | |
| Get the environment handler for the given environment ID. | |
| Args: | |
| env_id (str): The environment ID. | |
| Returns: | |
| EnvHandler: The environment handler instance. | |
| """ | |
| with self._lock: | |
| if env_id not in self._env_handlers: | |
| self._env_handlers[env_id] = self._generate_env_handler(env_id) | |
| self._token_info_map[env_id] = token_info | |
| return self._env_handlers[env_id] | |
| def close_env_handler(self, env_id: str): | |
| """ | |
| Close the environment handler for the given environment ID. | |
| Args: | |
| env_id (str): The environment ID. | |
| """ | |
| with self._lock: | |
| env = self._env_handlers.pop(env_id, None) | |
| self._env_handlers[env_id] = None | |
| if env is not None: | |
| env.close() | |
| torch.cuda.empty_cache() | |
| def _clean_expired_env_handlers(self): | |
| """ | |
| Clean up expired environment handlers based on the last active time. | |
| """ | |
| while 1: | |
| try: | |
| current_time = datetime.now(timezone.utc) | |
| with self._lock: | |
| expired_env_ids = [ | |
| env_id | |
| for env_id, handler in self._env_handlers.items() | |
| if handler and ((current_time - handler.created_time).total_seconds() > 3600 * 1.5 or (current_time - handler.last_active_time).total_seconds() > 180) | |
| ] | |
| for env_id in expired_env_ids: | |
| self.close_env_handler(env_id) | |
| token_info = self._token_info_map.pop(env_id, None) | |
| if token_info: | |
| update_submission_data(token_info["team_id"], token_info["submission_id"], {"status": SubmissionStatus.FAILED.value, "error_message": "SPACE_TIMEOUT"}) | |
| delete_client_space(token_info["client_space_id"]) | |
| except Exception as e: | |
| print(f"Error in cleaning expired environment handlers: {e}") | |
| time.sleep(15) | |
| app = FastAPI() | |
| _result_dict= FifoDict(max_size=100) | |
| env_manager = EnvHandlerManager() | |
| def _get_env_handler( | |
| auth_token: Optional[str] = Header(None), | |
| query_token: Optional[str] = Query(None) | |
| ) -> EnvHandler: | |
| token = auth_token or query_token | |
| if not token: | |
| raise HTTPException(status_code=401, detail="Authorization token is required.") | |
| try: | |
| token_info = get_token_info(token) | |
| except Exception: | |
| raise HTTPException(status_code=401) | |
| submission_id = token_info["submission_id"] | |
| team_id = token_info["team_id"] | |
| if not env_manager.exists_env_handler(submission_id): | |
| update_submission_data(team_id, submission_id, {"status": SubmissionStatus.PROCESSING.value}) | |
| env_handler = env_manager.get_env_handler(submission_id, token_info) | |
| if env_handler is None: | |
| raise HTTPException(status_code=404, detail="Environment handler already closed.") | |
| return env_handler | |
| def _load_numpy_ndarray_json_str(json_str: str) -> np.ndarray: | |
| """ | |
| Load a numpy ndarray from a JSON string. | |
| """ | |
| data = json.loads(json_str) | |
| return np.array(data["data"], dtype=data["dtype"]).reshape(data["shape"]) | |
| def reset_endpoint(env_handler: EnvHandler = Depends(_get_env_handler)): | |
| """ | |
| Reset the environment. | |
| """ | |
| env_handler.reset_env() | |
| return {"success": True} | |
| def get_current_state_endpoint(env_handler: EnvHandler = Depends(_get_env_handler)): | |
| """ | |
| Get the current state of the environment. | |
| """ | |
| state = env_handler.get_current_state() | |
| data = { | |
| "done": env_handler.has_done, | |
| "cur_scene_done": env_handler.has_scene_done, | |
| "state": state, | |
| } | |
| return Response(content=pickle.dumps(data), media_type="application/octet-stream") | |
| def execute_action_endpoint( | |
| plan_traj: str = Body(..., embed=True), | |
| transaction_id: str = Body(..., embed=True), | |
| auth_token: str = Header(...), | |
| env_handler: EnvHandler = Depends(_get_env_handler) | |
| ): | |
| """ | |
| Execute the action based on the planned trajectory. | |
| Args: | |
| plan_traj (str): The planned trajectory in JSON format. | |
| transaction_id (str): The unique transaction ID for caching results. | |
| env_handler (EnvHandler): The environment handler instance. | |
| Returns: | |
| Response: The response containing the execution result. | |
| """ | |
| cache_result = _result_dict.get(transaction_id) | |
| if cache_result is not None: | |
| return Response(content=cache_result, media_type="application/octet-stream") | |
| if env_handler.has_done: | |
| result = pickle.dumps({"done": True, "cur_scene_done": True, "state": env_handler.get_current_state()}) | |
| _result_dict.push(transaction_id, result) | |
| return Response(content=result, media_type="application/octet-stream") | |
| plan_traj = _load_numpy_ndarray_json_str(plan_traj) | |
| execute_result = env_handler.execute_action(plan_traj) | |
| if execute_result.done: | |
| token_info = get_token_info(auth_token) | |
| env_manager.close_env_handler(token_info["submission_id"]) | |
| delete_client_space(token_info["client_space_id"]) | |
| final_score = env_handler.calculate_score() | |
| update_submission_data(token_info["team_id"], token_info["submission_id"], {"status": SubmissionStatus.SUCCESS.value, "score": final_score}) | |
| hf_api.upload_folder( | |
| repo_id=COMPETITION_ID, | |
| folder_path=env_handler.base_output, | |
| repo_type="dataset", | |
| path_in_repo=f"eval_results/{token_info['submission_id']}", | |
| ) | |
| shutil.rmtree(env_handler.base_output, ignore_errors=True) | |
| result = pickle.dumps({"done": execute_result.done, "cur_scene_done": execute_result.cur_scene_done, "state": env_handler.get_current_state()}) | |
| _result_dict.push(transaction_id, result) | |
| return Response(content=result, media_type="application/octet-stream") | |
| state = env_handler.get_current_state() | |
| result = pickle.dumps({"done": execute_result.done, "cur_scene_done": execute_result.cur_scene_done, "state": state}) | |
| _result_dict.push(transaction_id, result) | |
| return Response(content=result, media_type="application/octet-stream") | |
| def main_page_endpoint(env_handler: EnvHandler = Depends(_get_env_handler)): | |
| """ | |
| Endpoint to display the submission logs. | |
| """ | |
| log_str = "\n".join(env_handler.log_list) | |
| html_content = f""" | |
| <html><body><pre>{log_str}</pre></body></html> | |
| <script> | |
| setTimeout(function() {{ | |
| window.location.reload(); | |
| }}, 5000); | |
| </script> | |
| """ | |
| return HTMLResponse(content=html_content) | |
| def main_page_endpoint( | |
| admin_token: Optional[str] = Query(None), | |
| ): | |
| """ | |
| Main page endpoint to display logs. | |
| """ | |
| if admin_token != ADMIN_TOKEN: | |
| html_content = f""" | |
| <html> | |
| running | |
| </html> | |
| """ | |
| return HTMLResponse(content=html_content) | |
| system_info = get_system_status() | |
| html_content = f""" | |
| <html> | |
| <head> | |
| <title>System Status</title> | |
| </head> | |
| <body> | |
| <h1>System Status</h1> | |
| <pre>{json.dumps(system_info, indent=4)}</pre> | |
| </body> | |
| </html> | |
| """ | |
| return HTMLResponse(content=html_content) | |
| uvicorn.run(app, host="0.0.0.0", port=7860, workers=1) | |