Spaces:
Running
Running
| import os | |
| from typing import Dict, Optional | |
| from huggingface_hub import HfApi, get_token | |
| class HFDeployer: | |
| def __init__(self, token: Optional[str] = None): | |
| """ | |
| Initializes the Hugging Face deployer. | |
| If token is None, tries to retrieve it from HF_TOKEN environment variable | |
| or local cache. | |
| """ | |
| self.token = token or os.environ.get("HF_TOKEN") or get_token() | |
| if not self.token: | |
| raise ValueError("No Hugging Face token found. Please set HF_TOKEN.") | |
| self.api = HfApi(token=self.token) | |
| def _sanitize_repo_id(self, input_name: str, current_username: str) -> str: | |
| """Sanitizes the repo/space name to handle URLs and partial formats.""" | |
| input_name = input_name.strip() | |
| # Cas URL complète : https://huggingface.co/spaces/user/repo | |
| if "huggingface.co" in input_name: | |
| parts = input_name.split("huggingface.co/") | |
| if len(parts) > 1: | |
| path = parts[1] | |
| # Retire 'spaces/' si présent | |
| if path.startswith("spaces/"): | |
| path = path[7:] | |
| # Retire le slash final | |
| return path.rstrip("/") | |
| # Cas user/repo | |
| if "/" in input_name: | |
| return input_name | |
| # Cas repo seul -> user/repo | |
| return f"{current_username}/{input_name}" | |
| def deploy_space(self, | |
| space_name: str, | |
| files: Dict[str, str], | |
| username: Optional[str] = None, | |
| sdk: str = "gradio", | |
| private: bool = False) -> str: | |
| """ | |
| Creates a Space and deploys files. | |
| Args: | |
| space_name: Space name (e.g. 'strawberry-counter') | |
| files: Dictionary {filename: content} (e.g. {'app.py': '...'}) | |
| username: Target username or organization. If None, uses current user. | |
| sdk: 'gradio', 'streamlit', or 'docker' | |
| private: If True, creates a private repo | |
| Returns: | |
| The deployed Space URL. | |
| """ | |
| # 1. Determine full repo_id | |
| if not username: | |
| user_info = self.api.whoami() | |
| username = user_info["name"] | |
| # Use sanitization method | |
| repo_id = self._sanitize_repo_id(space_name, username) | |
| print(f"🚀 Preparing deployment to {repo_id}...") | |
| # 2. Repo creation (idempotent: does not crash if already exists) | |
| try: | |
| self.api.create_repo( | |
| repo_id=repo_id, | |
| repo_type="space", | |
| space_sdk=sdk, | |
| private=private, | |
| exist_ok=True | |
| ) | |
| print(f"✅ Repo {repo_id} ready.") | |
| except Exception as e: | |
| raise RuntimeError(f"Error creating repo: {str(e)}") | |
| # 3. File upload | |
| operations = [] | |
| for filename, content in files.items(): | |
| # Encode content to bytes for upload | |
| content_bytes = content.encode("utf-8") | |
| operations.append( | |
| self.api.run_as_future( | |
| self.api.upload_file, | |
| path_or_fileobj=content_bytes, | |
| path_in_repo=filename, | |
| repo_id=repo_id, | |
| repo_type="space" | |
| ) | |
| ) | |
| # Note: Pour simplifier ici on fait séquentiel ou on utilise upload_file direct. | |
| # Pour un vrai batch, commit_operation serait mieux, mais upload_file est simple pour démarrer. | |
| # Re-implémentation propre avec upload_file direct pour éviter complexité async pour l'instant | |
| try: | |
| for filename, content in files.items(): | |
| print(f"📤 Uploading {filename}...") | |
| content_bytes = content.encode("utf-8") | |
| self.api.upload_file( | |
| path_or_fileobj=content_bytes, | |
| path_in_repo=filename, | |
| repo_id=repo_id, | |
| repo_type="space", | |
| commit_message=f"Deploy {filename} via Meta-MCP" | |
| ) | |
| print("✅ All files uploaded.") | |
| except Exception as e: | |
| raise RuntimeError(f"Error uploading files: {str(e)}") | |
| # 4. URL Construction | |
| # Standard URL is https://huggingface.co/spaces/USERNAME/SPACE_NAME | |
| space_url = f"https://huggingface.co/spaces/{repo_id}" | |
| print(f"🎉 Deployment finished! Space accessible here: {space_url}") | |
| return space_url | |