Spaces:
Running
Running
import spaces | |
def dummy_gpu(): | |
pass | |
import gradio as gr | |
from huggingface_hub import HfApi, snapshot_download | |
from huggingface_hub.hf_api import RepoFile | |
from typing import Union | |
import tempfile | |
import shutil | |
from pathlib import Path | |
import time | |
def split_repo(repo_id: str, user_name: str, space_name: str, storage_name: str, storage_type: str, is_private: bool=True, threshold: int=10 * 1024 * 1024, hf_token: Union[str, None]="", progress=gr.Progress(track_tqdm=True)): | |
TEMP_DIR = tempfile.mkdtemp() | |
exist_ok = False | |
use_dupe_api = False | |
info_md = "" | |
dl_code = "" | |
space_id = f"{user_name}/{space_name}" | |
storage_id = f"{user_name}/{storage_name}" | |
try: | |
kwargs = {} | |
if hf_token: kwargs["token"] = hf_token | |
else: raise Exception("Token not found.") | |
api = HfApi() | |
if not exist_ok: | |
if api.repo_exists(repo_id=space_id, repo_type="space", **kwargs): raise Exception(f"{space_id} already exists.") | |
if api.repo_exists(repo_id=storage_id, repo_type=storage_type, **kwargs): raise Exception(f"{space_id} already exists.") | |
info = api.list_repo_tree(repo_id=repo_id, repo_type="space", recursive=True, **kwargs) | |
lfiles = [] | |
sfiles = [] | |
for i in info: | |
if not isinstance(i, RepoFile): continue | |
if i.lfs is not None and i.lfs.size > threshold: lfiles.append(i.path) | |
else: sfiles.append(i.path) | |
#print("Large files: ", lfiles) | |
#print("Small files: ", sfiles) | |
if len(lfiles) == 0: raise Exception("Large file not found.") | |
lfiles_str = "[" + ", ".join(['"' + s + '"' for s in lfiles]) + "]" | |
sv = api.get_space_variables(repo_id=repo_id, **kwargs) | |
sv = [{str(k): str(v)} for k, v in sv.items()] if sv and len(sv) > 0 else [] | |
if api.repo_exists(repo_id=space_id, repo_type="space", **kwargs) and exist_ok: api.delete_repo(repo_id=space_id, repo_type="space", **kwargs) | |
if use_dupe_api: | |
api.duplicate_space(from_id=repo_id, to_id=space_id, exist_ok=exist_ok, private=is_private, hardware="cpu-basic", variables=sv, **kwargs) | |
time.sleep(10) # wait for finishing of space duplication | |
api.delete_files(repo_id=space_id, repo_type="space", delete_patterns=lfiles, **kwargs) | |
else: | |
snapshot_download(repo_id=repo_id, repo_type="space", ignore_patterns=lfiles, local_dir=TEMP_DIR, **kwargs) | |
api.create_repo(repo_id=space_id, repo_type="space", space_hardware="cpu-basic", space_variables=sv, space_sdk="gradio", exist_ok=exist_ok, private=is_private, **kwargs) | |
api.upload_folder(repo_id=space_id, repo_type="space", ignore_patterns=lfiles, folder_path=TEMP_DIR, path_in_repo=".", **kwargs) | |
snapshot_download(repo_id=repo_id, repo_type="space", allow_patterns=lfiles, local_dir=TEMP_DIR, **kwargs) | |
api.create_repo(repo_id=storage_id, repo_type=storage_type, exist_ok=exist_ok, private=is_private, **kwargs) | |
api.upload_folder(repo_id=storage_id, repo_type=storage_type, allow_patterns=lfiles, folder_path=TEMP_DIR, path_in_repo=".", **kwargs) | |
lfiles_str = "[" + ", ".join(['"' + s + '"' for s in lfiles]) + "]" | |
dl_code = f'from huggingface_hub import snapshot_download\nlarge_files = {lfiles_str}\nsnapshot_download(repo_id="{storage_id}", repo_type="{storage_type}", allow_patterns=large_files, local_dir=".")\n' | |
info_md = f'## Your new space URL: [{space_id}](https://hf.co/spaces/{space_id})<br>\n## Your new storage URL: [{storage_id}](https://hf.co/{storage_id if storage_type == "model" else "datasets/" + storage_id})' | |
except Exception as e: | |
print(e) | |
gr.Warning(f"Error: {e}") | |
finally: | |
if Path(TEMP_DIR).exists() and Path(TEMP_DIR).is_dir(): shutil.rmtree(TEMP_DIR) | |
return info_md, dl_code | |
css = """ | |
.title { font-size: 3em; align-items: center; text-align: center; } | |
.info { align-items: center; text-align: center; } | |
.block.result { margin: 1em 0; padding: 1em; box-shadow: 0 0 3px 3px #664422, 0 0 3px 2px #664422 inset; border-radius: 6px; background: #665544; } | |
.desc [src$='#float'] { float: right; margin: 20px; } | |
""" | |
with gr.Blocks(theme="NoCrypt/miku@>=1.2.2", fill_width=True, css=css, delete_cache=(60, 3600)) as demo: | |
repo_id = gr.Textbox(label="Source repo ID", placeholder="levihsu/OOTDiffusion", value="") | |
with gr.Row(equal_height=True): | |
user_name = gr.Textbox(label="Your user name", value="") | |
space_name = gr.Textbox(label="Destination repo name", placeholder="OOTDiffusion", value="") | |
storage_name = gr.Textbox(label="Storage repo name", placeholder="OOTDiffusion-storage", value="") | |
storage_type = gr.Radio(label="Storage repo type", choices=["dataset", "model"], value="model") | |
with gr.Column(): | |
hf_token = gr.Textbox(label="Your HF write token", placeholder="hf_...", value="") | |
gr.Markdown("Your token is available at [hf.co/settings/tokens](https://huggingface.co/settings/tokens).", elem_classes="info") | |
threshold = gr.Number(label="Size threshold (bytes)", value=10 * 1024 * 1024, minimum=1, maximum=5 * 1024 * 1024 * 1024, step=1) | |
is_private = gr.Checkbox(label="Private", value=True) | |
run_button = gr.Button("Submit", variant="primary") | |
dl_code = gr.Textbox(label="Code", value="", show_copy_button=True) | |
info_md = gr.Markdown("<br><br><br>", elem_classes="result") | |
run_button.click(split_repo, [repo_id, user_name, space_name, storage_name, storage_type, is_private, threshold, hf_token], [info_md, dl_code]) | |
demo.queue().launch(ssr_mode=False) | |