|
import functools |
|
import os |
|
import shutil |
|
import torch |
|
|
|
from pathlib import Path |
|
from urllib.request import Request, urlopen |
|
from typing import Optional |
|
|
|
|
|
def variant_cache_dir(): |
|
hf_hub_cache = os.environ.get("HF_HUB_CACHE") |
|
if hf_hub_cache is not None: |
|
return Path(hf_hub_cache) / "md_variants" |
|
|
|
hf_home = os.environ.get("HF_HOME") |
|
if hf_home is not None: |
|
return Path(hf_home) / "hub" / "md_variants" |
|
|
|
return Path("~/.cache/huggingface/hub").expanduser() / "md_variants" |
|
|
|
|
|
def cached_variant_path(variant_id: str): |
|
variant, *rest = variant_id.split("/", 1) |
|
step = rest[0] if rest else "final" |
|
|
|
cache_dir = variant_cache_dir() / variant |
|
os.makedirs(cache_dir, exist_ok=True) |
|
dest = cache_dir / f"{step}.pt" |
|
if dest.exists(): |
|
return dest |
|
|
|
md_endpoint = os.getenv("MOONDREAM_ENDPOINT", "https://api.moondream.ai") |
|
|
|
headers = {"User-Agent": "moondream-torch"} |
|
api_key = os.getenv("MOONDREAM_API_KEY") |
|
if api_key is not None: |
|
headers["X-Moondream-Auth"] = api_key |
|
|
|
req = Request(f"{md_endpoint}/v1/variants/{variant_id}/download", headers=headers) |
|
with urlopen(req) as r, open(dest, "wb") as f: |
|
shutil.copyfileobj(r, f) |
|
return dest |
|
|
|
|
|
def nest(flat): |
|
tree = {} |
|
for k, v in flat.items(): |
|
parts = k.split(".") |
|
d = tree |
|
for p in parts[:-1]: |
|
d = d.setdefault(p, {}) |
|
d[parts[-1]] = v |
|
return tree |
|
|
|
|
|
@functools.lru_cache(maxsize=5) |
|
def variant_state_dict(variant_id: Optional[str] = None, device: str = "cpu"): |
|
if variant_id is None: |
|
return None |
|
|
|
state_dict = torch.load( |
|
cached_variant_path(variant_id), map_location=device, weights_only=True |
|
) |
|
|
|
|
|
rename_rules = [ |
|
("text_model.transformer.h", "text.blocks"), |
|
(".mixer", ".attn"), |
|
(".out_proj", ".proj"), |
|
(".Wqkv", ".qkv"), |
|
(".parametrizations.weight.0", ""), |
|
] |
|
new_state_dict = {} |
|
for key, tensor in state_dict.items(): |
|
new_key = key |
|
for old, new in rename_rules: |
|
if old in new_key: |
|
new_key = new_key.replace(old, new) |
|
new_state_dict[new_key] = tensor |
|
|
|
return nest(new_state_dict) |
|
|