"""Linea 522 es donde estan los formatos"""
import os
import subprocess
import signal
import tempfile
from pathlib import Path
from textwrap import dedent
from typing import Optional, Tuple, List, Union
from dataclasses import dataclass, field
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False"
import gradio as gr
from huggingface_hub import HfApi, ModelCard, whoami
from gradio_huggingfacehub_search import HuggingfaceHubSearch
from apscheduler.schedulers.background import BackgroundScheduler
@dataclass
class QuantizationConfig:
"""Configuration for model quantization."""
method: str
use_imatrix: bool = False
imatrix_method: str = "IQ4_NL"
train_data: str = ""
quant_embedding: bool = False
embedding_tensor_method: str = "Q8_0"
leave_output: bool = False
quant_output: bool = False
output_tensor_method: str = "Q8_0"
# Generated values - These will be set during processing
fp16_model: str = field(default="", init=False)
quantized_gguf: str = field(default="", init=False)
imatrix_file: str = field(default="", init=False)
@dataclass
class SplitConfig:
"""Configuration for model splitting."""
enabled: bool = False
max_tensors: int = 256
max_size: Optional[str] = None
@dataclass
class OutputConfig:
"""Configuration for output settings."""
private_repo: bool = False
repo_name: str = ""
filename: str = ""
@dataclass
class ModelProcessingConfig:
"""Configuration for the entire model processing pipeline."""
token: str
model_id: str
model_name: str
outdir: str
quant_config: QuantizationConfig
split_config: SplitConfig
output_config: OutputConfig
# Generated values - These will be set during processing
new_repo_url: str = field(default="", init=False)
new_repo_id: str = field(default="", init=False)
class GGUFConverterError(Exception):
"""Custom exception for GGUF conversion errors."""
pass
class HuggingFaceModelProcessor:
"""Handles the processing of Hugging Face models to GGUF format."""
ERROR_LOGIN = "Debes loguearte para usar mi versión modificada de GGUF-my-repo."
DOWNLOAD_FOLDER = "./downloads"
OUTPUT_FOLDER = "./outputs"
CALIBRATION_FILE = "calibration_data_v5_rc.txt"
QUANTIZE_TIMEOUT=86400
HF_TO_GGUF_TIMEOUT=3600
IMATRIX_TIMEOUT=86400
SPLIT_TIMEOUT=3600
KILL_TIMEOUT=5
def __init__(self):
self.SPACE_ID = os.environ.get("SPACE_ID", "")
self.SPACE_URL = f"https://{self.SPACE_ID.replace('/', '-')}.hf.space/" if self.SPACE_ID else "http://localhost:7860/"
self.HF_TOKEN = os.environ.get("HF_TOKEN")
self.RUN_LOCALLY = os.environ.get("RUN_LOCALLY")
# Create necessary folders
self._create_folder(self.DOWNLOAD_FOLDER)
self._create_folder(self.OUTPUT_FOLDER)
def _create_folder(self, folder_name: str) -> str:
"""Create a folder if it doesn't exist."""
if not os.path.exists(folder_name):
print(f"Creating folder: {folder_name}")
os.makedirs(folder_name)
return folder_name
def _validate_token(self, oauth_token: Optional[gr.OAuthToken]) -> str:
"""Validate the OAuth token and return the token string."""
if oauth_token is None or oauth_token.token is None:
raise GGUFConverterError(self.ERROR_LOGIN)
try:
whoami(oauth_token.token)
return oauth_token.token
except Exception as e:
raise GGUFConverterError(self.ERROR_LOGIN)
def _escape_html(self, s: str) -> str:
"""Escape HTML characters for safe display."""
replacements = [
("&", "&"),
("<", "<"),
(">", ">"),
('"', """),
("\n", "
")
]
for old, new in replacements:
s = s.replace(old, new)
return s
def _get_model_creator(self, model_id: str) -> str:
"""Extract model creator from model ID."""
return model_id.split('/')[0]
def _get_model_name(self, model_id: str) -> str:
"""Extract model name from model ID."""
return model_id.split('/')[-1]
def _upload_file(self, processing_config: ModelProcessingConfig, path_or_fileobj: str, path_in_repo: str) -> None:
"""Upload a file to Hugging Face repository."""
if self.RUN_LOCALLY == "1":
print("Saltar subida...")
return
api = HfApi(token=processing_config.token)
api.upload_file(
path_or_fileobj=path_or_fileobj,
path_in_repo=path_in_repo,
repo_id=processing_config.new_repo_id,
)
def _generate_importance_matrix(self, quant_config: QuantizationConfig) -> None:
"""Generate importance matrix for quantization."""
if not os.path.isfile(quant_config.fp16_model):
raise GGUFConverterError(f"Model file not found: {quant_config.fp16_model}")
if quant_config.train_data:
train_data_path = quant_config.train_data
else:
train_data_path = self.CALIBRATION_FILE
if not os.path.isfile(train_data_path):
raise GGUFConverterError(f"Training data file not found: {train_data_path}")
print(f"Training data file path: {train_data_path}")
print("Corriendo comando imatrix...")
imatrix_command = [
"llama-imatrix",
"-m", quant_config.fp16_model,
"-f", train_data_path,
"-ngl", "99",
"--output-frequency", "10",
"-o", quant_config.imatrix_file,
]
process = subprocess.Popen(imatrix_command, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.IMATRIX_TIMEOUT)
except subprocess.TimeoutExpired:
print("Cálculo de Imatrix agotó el tiempo. Enviando SIGINT para permitir una terminación ordenada...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("El proceso Imatrix aún no terminó. Terminando el proceso a la fuerza....")
process.kill()
raise GGUFConverterError("Error al generar Imatrix: operación agotó el tiempo.")
if process.returncode != 0:
raise GGUFConverterError(f"Error generating imatrix: code={process.returncode}.")
print(f"Importance matrix generation completed: {os.path.abspath(quant_config.imatrix_file)}")
def _split_and_upload_model(self, processing_config: ModelProcessingConfig) -> None:
"""Split large model files and upload shards."""
quant_config = processing_config.quant_config
split_config = processing_config.split_config
print(f"Model path: {quant_config.quantized_gguf}")
print(f"Output dir: {processing_config.outdir}")
split_cmd = ["llama-gguf-split", "--split"]
if split_config.max_size:
split_cmd.extend(["--split-max-size", split_config.max_size])
else:
split_cmd.extend(["--split-max-tensors", str(split_config.max_tensors)])
model_path_prefix = '.'.join(quant_config.quantized_gguf.split('.')[:-1])
split_cmd.extend([quant_config.quantized_gguf, model_path_prefix])
print(f"Split command: {split_cmd}")
process = subprocess.Popen(split_cmd, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.SPLIT_TIMEOUT)
except subprocess.TimeoutExpired:
print("División agotó el tiempo. Enviando SIGINT para permitir una terminación ordenada...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("La división agotó el tiempo. Matando el proceso...")
process.kill()
raise GGUFConverterError("Error al dividir el modelo: operación agotó el tiempo.")
if process.returncode != 0:
raise GGUFConverterError(f"Error splitting the model: code={process.returncode}")
print("División del modelo completada con éxito.!")
# Remove original model file
if os.path.exists(quant_config.quantized_gguf):
os.remove(quant_config.quantized_gguf)
model_file_prefix = model_path_prefix.split('/')[-1]
print(f"Model file name prefix: {model_file_prefix}")
sharded_model_files = [
f for f in os.listdir(processing_config.outdir)
if f.startswith(model_file_prefix) and f.endswith(".gguf")
]
if not sharded_model_files:
raise GGUFConverterError("No se encontraron archivos shardeados.")
print(f"Sharded model files: {sharded_model_files}")
for file in sharded_model_files:
file_path = os.path.join(processing_config.outdir, file)
try:
print(f"Uploading file: {file_path}")
self._upload_file(processing_config, file_path, file)
except Exception as e:
raise GGUFConverterError(f"Error uploading file {file_path}: {e}")
print("El modelo shardado se subió con éxito.!")
def _download_base_model(self, processing_config: ModelProcessingConfig) -> str:
"""Download and convert Hugging Face model to GGUF FP16 format."""
print(f"Downloading model {processing_config.model_name}")
if os.path.exists(processing_config.quant_config.fp16_model):
print("Omitiendo conversión a fp16....")
print(f"Converted model path: {os.path.abspath(processing_config.quant_config.fp16_model)}")
return processing_config.quant_config.fp16_model
with tempfile.TemporaryDirectory(dir=self.DOWNLOAD_FOLDER) as tmpdir:
local_dir = f"{Path(tmpdir)}/{processing_config.model_name}"
print(f"Local directory: {os.path.abspath(local_dir)}")
# Download model
api = HfApi(token=processing_config.token)
pattern = (
"*.safetensors"
if any(
file.path.endswith(".safetensors")
for file in api.list_repo_tree(
repo_id=processing_config.model_id,
recursive=True,
)
)
else "*.bin"
)
dl_pattern = ["*.md", "*.json", "*.model"]
dl_pattern += [pattern]
api.snapshot_download(repo_id=processing_config.model_id, local_dir=local_dir, allow_patterns=dl_pattern)
print("Modelo descargado con éxito.!")
print(f"Model directory contents: {os.listdir(local_dir)}")
config_dir = os.path.join(local_dir, "config.json")
adapter_config_dir = os.path.join(local_dir, "adapter_config.json")
if os.path.exists(adapter_config_dir) and not os.path.exists(config_dir):
raise GGUFConverterError(
'adapter_config.json is present.
If you are converting a LoRA adapter to GGUF, '
'please use GGUF-my-lora.'
)
# Convert HF to GGUF
print(f"Converting to GGUF FP16: {os.path.abspath(processing_config.quant_config.fp16_model)}")
convert_command = [
"python3", "/app/convert_hf_to_gguf.py", local_dir,
"--outtype", "f16", "--outfile", processing_config.quant_config.fp16_model
]
process = subprocess.Popen(convert_command, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.HF_TO_GGUF_TIMEOUT)
except subprocess.TimeoutExpired:
print("Conversión agotó el tiempo. Enviando SIGINT para permitir una terminación ordenada...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("Conversión agotó el tiempo. Matando el proceso....")
process.kill()
raise GGUFConverterError("Error al convertir a fp16: operación agotó el tiempo.")
if process.returncode != 0:
raise GGUFConverterError(f"Error converting to fp16: code={process.returncode}")
print("Modelo convertido a fp16 con éxito!")
print(f"Converted model path: {os.path.abspath(processing_config.quant_config.fp16_model)}")
return processing_config.quant_config.fp16_model
def _quantize_model(self, quant_config: QuantizationConfig) -> str:
"""Quantize the GGUF model."""
quantize_cmd = ["llama-quantize"]
if quant_config.quant_embedding:
quantize_cmd.extend(["--token-embedding-type", quant_config.embedding_tensor_method])
if quant_config.leave_output:
quantize_cmd.append("--leave-output-tensor")
else:
if quant_config.quant_output:
quantize_cmd.extend(["--output-tensor-type", quant_config.output_tensor_method])
# Set imatrix file path if needed
if quant_config.use_imatrix:
self._generate_importance_matrix(quant_config)
quantize_cmd.extend(["--imatrix", quant_config.imatrix_file])
else:
print("No se está usando cuantización imatrix.")
quantize_cmd.append(quant_config.fp16_model)
quantize_cmd.append(quant_config.quantized_gguf)
quantize_cmd.append(quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method)
print(f"Quantizing model with {quantize_cmd}")
# Use Popen for quantization
process = subprocess.Popen(quantize_cmd, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.QUANTIZE_TIMEOUT)
except subprocess.TimeoutExpired:
print("Cuantización agotó el tiempo. Enviando SIGINT para permitir una terminación ordenada...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("Cuantización agotó el tiempo. Matando el proceso...")
process.kill()
raise GGUFConverterError("Error al cuantizar: operación agotó el tiempo.")
if process.returncode != 0:
raise GGUFConverterError(f"Error quantizing: code={process.returncode}")
print(f"Quantized successfully with {quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method} option!")
print(f"Quantized model path: {os.path.abspath(quant_config.quantized_gguf)}")
return quant_config.quantized_gguf
def _create_empty_repo(self, processing_config: ModelProcessingConfig):
api = HfApi(token=processing_config.token)
new_repo_url = api.create_repo(
repo_id=processing_config.output_config.repo_name,
exist_ok=True,
private=processing_config.output_config.private_repo
)
processing_config.new_repo_url = new_repo_url.url
processing_config.new_repo_id = new_repo_url.repo_id
print("Repositorio creado satisfactoriamente!", processing_config.new_repo_url)
return new_repo_url
def _generate_readme(self, processing_config: ModelProcessingConfig, quant_config: QuantizationConfig) -> str:
"""Generate README.md for the quantized model."""
creator = self._get_model_creator(processing_config.model_id)
username = whoami(processing_config.token)["name"]
try:
card = ModelCard.load(processing_config.model_id, token=processing_config.token)
except:
card = ModelCard("")
if card.data.tags is None:
card.data.tags = []
card.data.tags.extend(["llama-cpp", "gguf-my-repo"])
card.data.base_model = processing_config.model_id
card.text = dedent(
f"""
# {processing_config.model_name}
**Model creator:** [{creator}](https://huggingface.co/{creator})
**Original model**: [{processing_config.model_id}](https://huggingface.co/{processing_config.model_id})
**GGUF quantization:** provided by [{username}](https:/huggingface.co/{username}) using `llama.cpp`
## Special thanks
🙏 Special thanks to [Georgi Gerganov](https://github.com/ggerganov) and the whole team working on [llama.cpp](https://github.com/ggerganov/llama.cpp/) for making all of this possible.
## Use with Ollama
```bash
ollama run "hf.co/{processing_config.new_repo_id}:{quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method}"
```
## Use with LM Studio
```bash
lms load "{processing_config.new_repo_id}"
```
## Use with llama.cpp CLI
```bash
llama-cli --hf "{processing_config.new_repo_id}:{quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method}" -p "The meaning to life and the universe is"
```
## Use with llama.cpp Server:
```bash
llama-server --hf "{processing_config.new_repo_id}:{quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method}" -c 4096
```
"""
)
readme_path = f"{processing_config.outdir}/README.md"
card.save(readme_path)
return readme_path
def process_model(self, processing_config: ModelProcessingConfig) -> Tuple[str, str]:
"""Main method to process a model through the entire pipeline."""
quant_config = processing_config.quant_config
split_config = processing_config.split_config
output_config = processing_config.output_config
print(f"Current working directory: {os.path.abspath(os.getcwd())}")
# Download and convert base model
self._download_base_model(processing_config)
# Quantize the model
self._quantize_model(quant_config)
# Create empty repo
self._create_empty_repo(processing_config)
# Upload model
if split_config.enabled:
print(f"Splitting quantized model: {os.path.abspath(quant_config.quantized_gguf)}")
self._split_and_upload_model(processing_config)
else:
try:
print(f"Uploading quantized model: {os.path.abspath(quant_config.quantized_gguf)}")
self._upload_file(processing_config, quant_config.quantized_gguf, output_config.filename)
except Exception as e:
raise GGUFConverterError(f"Error uploading quantized model: {e}")
# Upload imatrix if it exists
if quant_config.use_imatrix and os.path.isfile(quant_config.imatrix_file):
try:
print(f"Uploading imatrix.dat: {os.path.abspath(quant_config.imatrix_file)}")
self._upload_file(processing_config, quant_config.imatrix_file, f"{processing_config.model_name}-imatrix.gguf")
except Exception as e:
raise GGUFConverterError(f"Error uploading imatrix.dat: {e}")
# Upload README.md
readme_path = self._generate_readme(processing_config, quant_config)
self._upload_file(processing_config, readme_path, "README.md")
print(f"Uploaded successfully with {quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method} option!")
class GGUFConverterUI:
"""Gradio UI for the GGUF Converter."""
def __init__(self):
self.processor = HuggingFaceModelProcessor()
self.css = """/* Custom CSS to allow scrolling */
.gradio-container {overflow-y: auto;}
"""
# Initialize components
self._initialize_components()
self._setup_interface()
def _initialize_components(self):
"""Initialize all UI components."""
#####
# Base model section
#####
self.model_id = HuggingfaceHubSearch(
label="ID del Modelo en el Hub",
placeholder="Buscar el ID del modelo en Hugging Face",
search_type="model",
)
#####
# Quantization section
#####
self.use_imatrix = gr.Checkbox(
value=False,
label="Usar cuantización Imatrix",
info="Usar matriz de importancia para la cuantización."
)
self.q_method = gr.Dropdown(
choices=["Q3_K_S", "Q3_K_M", "Q3_K_L", "Q4_0", "Q4_1", "Q4_K_S", "Q4_K_M", "MXFP4_MOE", "Q5_0", "Q5_1", "Q5_K_S", "Q5_K_M", "Q6_K", "Q8_0", "F16", "BF16", "COPY"],
label="Método de cuantización",
info="Tipo de cuantización GGML",
value="Q4_K_M",
filterable=False,
visible=True
)
self.imatrix_q_method = gr.Dropdown(
choices=["IQ1_S", "IQ1_M", "IQ2_XXS", "IQ2_XS", "IQ2_S", "IQ2_M", "Q2_K_S", "Q2_K", "IQ3_XXS", "IQ3_XS", "IQ3_S", "IQ3_M", "Q3_K_S", "Q3_K_M", "Q3_K_L", "Q4_K_S", "Q4_K_M", "IQ4_XS", "IQ4_NL", "Q5_K_M", "Q5_K_S"],
label="Método de cuantización Imatrix",
info="Tipo de cuantización GGML imatrix",
value="IQ4_NL",
filterable=False,
visible=False
)
self.train_data_file = gr.File(
label="Dataset de entrenamiento",
file_types=[".txt", ".json", ".jsonl", ".parquet", ".csv"],
visible=False
)
#####
# Advanced Options section
#####
self.split_model = gr.Checkbox(
value=False,
label="Dividir modelo",
info="Particionar el modelo usando gguf-split."
)
self.split_max_tensors = gr.Number(
value=256,
label="Máximo de tensores por archivo",
info="Número máximo de tensores por archivo al particionar el modelo.",
visible=False
)
self.split_max_size = gr.Textbox(
label="Tamaño máximo de archivo",
info="Tamaño máximo de archivo al particionar el modelo (--split-max-size). Puedes dejarlo vacío para usar el valor predeterminado. Sufijos aceptados: M, G. Ejemplo: 256M, 5G",
visible=False
)
self.leave_output = gr.Checkbox(
value=False,
label="Dejar tensor de salida",
info="Dejar output.weight sin (re)cuantizar"
)
self.quant_embedding = gr.Checkbox(
value=False,
label="Cuantificar tensor de embeddings",
info="Cuantizar el tensor de embeddings por separado."
)
self.embedding_tensor_method = gr.Dropdown(
choices=["Q2_K", "Q3_K", "Q4_K", "Q5_K", "Q6_K", "Q8_0", "F16"],
label="Método de cuantización de embeddings",
info="usar un tipo de cuantización específico para el tensor de embeddings de tokens.",
value="Q8_0",
filterable=False,
visible=False
)
self.quant_output = gr.Checkbox(
value=False,
label="Cuantizar tensor de salida",
info="Cuantizar el tensor de salida por separado."
)
self.output_tensor_method = gr.Dropdown(
choices=["Q2_K", "Q3_K", "Q4_K", "Q5_K", "Q6_K", "Q8_0", "F16"],
label="Método de cuantización de salida",
info="usar un tipo de cuantización específico para el tensor output.weight",
value="Q8_0",
filterable=False,
visible=False
)
#####
# Output Settings section
#####
self.private_repo = gr.Checkbox(
value=False,
label="Repositorio Privado",
info="Crear un repositorio privado bajo tu nombre de usuario."
)
self.repo_name = gr.Textbox(
label="Nombre del repositorio de salida",
info="Establece el nombre de tu repositorio",
max_lines=1
)
self.gguf_name = gr.Textbox(
label="Nombre del archivo de salida",
info="Establece el nombre del archivo de salida",
max_lines=1
)
#####
# Buttons section
#####
self.clear_btn = gr.ClearButton(
value="Clear",
variant="secondary",
components=[
self.model_id,
self.q_method,
self.use_imatrix,
self.imatrix_q_method,
self.private_repo,
self.train_data_file,
self.leave_output,
self.quant_embedding,
self.embedding_tensor_method,
self.quant_output,
self.output_tensor_method,
self.split_model,
self.split_max_tensors,
self.split_max_size,
self.repo_name,
self.gguf_name,
]
)
self.submit_btn = gr.Button(
value="Submit",
variant="primary"
)
#####
# Outputs section
#####
self.output_label = gr.Markdown(label="output")
self.output_image = gr.Image(
show_label=False,
show_download_button=False,
interactive=False
)
@staticmethod
def _update_output_repo(model_id: str, oauth_token: Optional[gr.OAuthToken]) -> str:
"""Update output repository name based on model and user."""
if oauth_token is None or not oauth_token.token:
return ""
if not model_id:
return ""
try:
username = whoami(oauth_token.token)["name"]
model_name = model_id.split('/')[-1]
return f"{username}/{model_name}-GGUF"
except:
return ""
@staticmethod
def _update_output_filename(model_id: str, use_imatrix: bool, q_method: str, imatrix_q_method: str) -> str:
"""Update output filename based on model and quantization settings."""
if not model_id:
return ""
model_name = model_id.split('/')[-1]
if use_imatrix:
return f"{model_name}-{imatrix_q_method.upper()}-imat.gguf"
return f"{model_name}-{q_method.upper()}.gguf"
def _setup_interface(self):
"""Set up the Gradio interface."""
with gr.Blocks(css=self.css) as self.demo:
#####
# Layout
#####
gr.Markdown(HuggingFaceModelProcessor.ERROR_LOGIN)
gr.LoginButton(min_width=250)
gr.HTML("
{self.processor._escape_html(str(e))}', "error.png")
def launch(self):
"""Launch the Gradio interface."""
# Set up space restart scheduler
def restart_space():
HfApi().restart_space(repo_id=self.processor.SPACE_ID, token=self.processor.HF_TOKEN, factory_reboot=True)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=21600)
scheduler.start()
# Launch the interface
self.demo.queue(default_concurrency_limit=1, max_size=5).launch(debug=True, show_api=False)
# Main execution
if __name__ == "__main__":
ui = GGUFConverterUI()
ui.launch()