import spaces
@spaces.GPU
def dummy_gpu():
pass
import gradio as gr
from huggingface_hub import HfApi, snapshot_download
from huggingface_hub.hf_api import RepoFile
from typing import Union
import tempfile
import shutil
from pathlib import Path
import time
def split_repo(repo_id: str, user_name: str, space_name: str, storage_name: str, storage_type: str, is_private: bool=True, threshold: int=10 * 1024 * 1024, hf_token: Union[str, None]="", progress=gr.Progress(track_tqdm=True)):
TEMP_DIR = tempfile.mkdtemp()
exist_ok = False
use_dupe_api = False
info_md = ""
dl_code = ""
space_id = f"{user_name}/{space_name}"
storage_id = f"{user_name}/{storage_name}"
try:
kwargs = {}
if hf_token: kwargs["token"] = hf_token
else: raise Exception("Token not found.")
api = HfApi()
if not exist_ok:
if api.repo_exists(repo_id=space_id, repo_type="space", **kwargs): raise Exception(f"{space_id} already exists.")
if api.repo_exists(repo_id=storage_id, repo_type=storage_type, **kwargs): raise Exception(f"{space_id} already exists.")
info = api.list_repo_tree(repo_id=repo_id, repo_type="space", recursive=True, **kwargs)
lfiles = []
sfiles = []
for i in info:
if not isinstance(i, RepoFile): continue
if i.lfs is not None and i.lfs.size > threshold: lfiles.append(i.path)
else: sfiles.append(i.path)
#print("Large files: ", lfiles)
#print("Small files: ", sfiles)
if len(lfiles) == 0: raise Exception("Large file not found.")
lfiles_str = "[" + ", ".join(['"' + s + '"' for s in lfiles]) + "]"
sv = api.get_space_variables(repo_id=repo_id, **kwargs)
sv = [{str(k): str(v)} for k, v in sv.items()] if sv and len(sv) > 0 else []
if api.repo_exists(repo_id=space_id, repo_type="space", **kwargs) and exist_ok: api.delete_repo(repo_id=space_id, repo_type="space", **kwargs)
if use_dupe_api:
api.duplicate_space(from_id=repo_id, to_id=space_id, exist_ok=exist_ok, private=is_private, hardware="cpu-basic", variables=sv, **kwargs)
time.sleep(10) # wait for finishing of space duplication
api.delete_files(repo_id=space_id, repo_type="space", delete_patterns=lfiles, **kwargs)
else:
snapshot_download(repo_id=repo_id, repo_type="space", ignore_patterns=lfiles, local_dir=TEMP_DIR, **kwargs)
api.create_repo(repo_id=space_id, repo_type="space", space_hardware="cpu-basic", space_variables=sv, space_sdk="gradio", exist_ok=exist_ok, private=is_private, **kwargs)
api.upload_folder(repo_id=space_id, repo_type="space", ignore_patterns=lfiles, folder_path=TEMP_DIR, path_in_repo=".", **kwargs)
snapshot_download(repo_id=repo_id, repo_type="space", allow_patterns=lfiles, local_dir=TEMP_DIR, **kwargs)
api.create_repo(repo_id=storage_id, repo_type=storage_type, exist_ok=exist_ok, private=is_private, **kwargs)
api.upload_folder(repo_id=storage_id, repo_type=storage_type, allow_patterns=lfiles, folder_path=TEMP_DIR, path_in_repo=".", **kwargs)
lfiles_str = "[" + ", ".join(['"' + s + '"' for s in lfiles]) + "]"
dl_code = f'from huggingface_hub import snapshot_download\nlarge_files = {lfiles_str}\nsnapshot_download(repo_id="{storage_id}", repo_type="{storage_type}", allow_patterns=large_files, local_dir=".")\n'
info_md = f'## Your new space URL: [{space_id}](https://hf.co/spaces/{space_id})
\n## Your new storage URL: [{storage_id}](https://hf.co/{storage_id if storage_type == "model" else "datasets/" + storage_id})'
except Exception as e:
print(e)
gr.Warning(f"Error: {e}")
finally:
if Path(TEMP_DIR).exists() and Path(TEMP_DIR).is_dir(): shutil.rmtree(TEMP_DIR)
return info_md, dl_code
css = """
.title { font-size: 3em; align-items: center; text-align: center; }
.info { align-items: center; text-align: center; }
.block.result { margin: 1em 0; padding: 1em; box-shadow: 0 0 3px 3px #664422, 0 0 3px 2px #664422 inset; border-radius: 6px; background: #665544; }
.desc [src$='#float'] { float: right; margin: 20px; }
"""
with gr.Blocks(theme="NoCrypt/miku@>=1.2.2", fill_width=True, css=css, delete_cache=(60, 3600)) as demo:
repo_id = gr.Textbox(label="Source repo ID", placeholder="levihsu/OOTDiffusion", value="")
with gr.Row(equal_height=True):
user_name = gr.Textbox(label="Your user name", value="")
space_name = gr.Textbox(label="Destination repo name", placeholder="OOTDiffusion", value="")
storage_name = gr.Textbox(label="Storage repo name", placeholder="OOTDiffusion-storage", value="")
storage_type = gr.Radio(label="Storage repo type", choices=["dataset", "model"], value="model")
with gr.Column():
hf_token = gr.Textbox(label="Your HF write token", placeholder="hf_...", value="")
gr.Markdown("Your token is available at [hf.co/settings/tokens](https://huggingface.co/settings/tokens).", elem_classes="info")
threshold = gr.Number(label="Size threshold (bytes)", value=10 * 1024 * 1024, minimum=1, maximum=5 * 1024 * 1024 * 1024, step=1)
is_private = gr.Checkbox(label="Private", value=True)
run_button = gr.Button("Submit", variant="primary")
dl_code = gr.Textbox(label="Code", value="", show_copy_button=True)
info_md = gr.Markdown("
", elem_classes="result")
run_button.click(split_repo, [repo_id, user_name, space_name, storage_name, storage_type, is_private, threshold, hf_token], [info_md, dl_code])
demo.queue().launch(ssr_mode=False)