kebeliu's picture
Create app.py
fae3a33 verified
import gradio as gr
import requests
import json
from transformers import AutoConfig
import math
from typing import Dict, Tuple, Optional
class LLMMemoryCalculator:
def __init__(self):
self.precision_bytes = {
'fp32': 4,
'fp16': 2,
'bf16': 2,
'int8': 1,
'int4': 0.5
}
# -------------------------------------------------
# 📥 基础工具
# -------------------------------------------------
def get_model_config(self, model_id: str) -> Dict:
"""获取模型配置"""
try:
config = AutoConfig.from_pretrained(model_id, trust_remote_code=True)
return config
except Exception as e:
raise Exception(f"无法获取模型配置: {str(e)}")
def get_file_size_from_url(self, model_id: str, filename: str) -> int:
"""通过 HEAD 请求获取文件大小(备用)"""
try:
url = f"https://huggingface.co/{model_id}/resolve/main/{filename}"
response = requests.head(url, timeout=10)
if response.status_code == 200:
content_length = response.headers.get('Content-Length')
if content_length:
return int(content_length)
return 0
except:
return 0
# -------------------------------------------------
# 📦 获取模型权重大小
# -------------------------------------------------
def get_model_size_from_hf(self, model_id: str) -> Tuple[float, str]:
"""优先使用 *.index.json 中的 metadata.total_size,回退到文件列表/HEAD"""
try:
# 1️⃣ 尝试读取 index.json(safetensors > pytorch)
for index_name, tag in [
("model.safetensors.index.json", "safetensors_index"),
("pytorch_model.bin.index.json", "pytorch_index")
]:
url = f"https://huggingface.co/{model_id}/resolve/main/{index_name}"
resp = requests.get(url, timeout=10)
if resp.status_code == 200:
try:
data = resp.json()
except ValueError:
# 某些仓库 index.json 以文本形式存储,需要手动解析
data = json.loads(resp.text)
total_bytes = data.get("metadata", {}).get("total_size", 0)
if total_bytes > 0:
return total_bytes / (1024 ** 3), tag
# 2️⃣ 调用 Hub API,尝试直接读取 size 字段
api_url = f"https://huggingface.co/api/models/{model_id}"
response = requests.get(api_url, timeout=10)
if response.status_code != 200:
raise Exception(f"API请求失败: {response.status_code}")
model_info = response.json()
# 2a. 查找 siblings 列表中带 size 的 .safetensors 文件
safetensors_files = [f for f in model_info.get('siblings', [])
if f['rfilename'].endswith('.safetensors') and 'size' in f]
if safetensors_files:
total_size = sum(f['size'] for f in safetensors_files)
return total_size / (1024 ** 3), "safetensors_files"
# 2b. 使用 HEAD 请求补全未包含 size 的 .safetensors 文件
safetensors_no_size = [f for f in model_info.get('siblings', [])
if f['rfilename'].endswith('.safetensors')]
if safetensors_no_size:
total_size = 0
for f in safetensors_no_size:
total_size += self.get_file_size_from_url(model_id, f['rfilename'])
if total_size > 0:
return total_size / (1024 ** 3), "safetensors_head"
# 2c. 同理处理 pytorch_model-xxxxx.bin
pytorch_files = [f for f in model_info.get('siblings', [])
if f['rfilename'].endswith('.bin') and 'size' in f]
if pytorch_files:
total_size = sum(f['size'] for f in pytorch_files)
return total_size / (1024 ** 3), "pytorch_files"
pytorch_no_size = [f for f in model_info.get('siblings', [])
if f['rfilename'].endswith('.bin')]
if pytorch_no_size:
total_size = 0
for f in pytorch_no_size:
total_size += self.get_file_size_from_url(model_id, f['rfilename'])
if total_size > 0:
return total_size / (1024 ** 3), "pytorch_head"
# 3️⃣ 如果仍然无法确定大小,走估算逻辑
raise Exception("未找到权重大小信息")
except Exception:
# 估算
return self.estimate_model_size_from_config(model_id)
# -------------------------------------------------
# 📐 估算逻辑(与原始保持一致)
# -------------------------------------------------
def estimate_model_size_from_config(self, model_id: str) -> Tuple[float, str]:
"""根据 config.json 估算模型大小(FP16)"""
try:
config = self.get_model_config(model_id)
vocab_size = getattr(config, 'vocab_size', 50000)
hidden_size = getattr(config, 'hidden_size', getattr(config, 'd_model', 4096))
num_layers = getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 32))
intermediate_size = getattr(config, 'intermediate_size', hidden_size * 4)
# Embedding
embedding_params = vocab_size * hidden_size
# Transformer layer
attention_params = 4 * hidden_size * hidden_size
ffn_params = 2 * hidden_size * intermediate_size
ln_params = 2 * hidden_size
params_per_layer = attention_params + ffn_params + ln_params
total_params = embedding_params + num_layers * params_per_layer
if hasattr(config, 'tie_word_embeddings') and not config.tie_word_embeddings:
total_params += vocab_size * hidden_size
model_size_gb = (total_params * 2) / (1024 ** 3) # 默认 fp16
return model_size_gb, "estimated"
except Exception as e:
raise Exception(f"无法估算模型大小: {str(e)}")
# -------------------------------------------------
# 🗄️ KV Cache 计算(原逻辑保持)
# -------------------------------------------------
def calculate_kv_cache_size(self, config, context_length: int, batch_size: int = 1) -> Dict[str, float]:
try:
num_layers = getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 32))
hidden_size = getattr(config, 'hidden_size', getattr(config, 'd_model', 4096))
num_attention_heads = getattr(config, 'num_attention_heads', getattr(config, 'num_heads', 32))
num_key_value_heads = getattr(config, 'num_key_value_heads', num_attention_heads)
is_mla = hasattr(config, 'kv_lora_rank') and config.kv_lora_rank is not None
head_dim = hidden_size // num_attention_heads
if is_mla:
kv_lora_rank = getattr(config, 'kv_lora_rank', 512)
kv_cache_per_token = kv_lora_rank * 2
attention_type = "MLA"
elif num_key_value_heads < num_attention_heads:
kv_cache_per_token = num_key_value_heads * head_dim * 2
attention_type = "GQA"
else:
kv_cache_per_token = num_attention_heads * head_dim * 2
attention_type = "MHA"
total_kv_cache = (kv_cache_per_token * context_length * num_layers * batch_size * 2) / (1024 ** 3)
return {
'size_gb': total_kv_cache,
'attention_type': attention_type,
'num_kv_heads': num_key_value_heads,
'num_attention_heads': num_attention_heads,
'head_dim': head_dim
}
except Exception as e:
raise Exception(f"计算KV Cache失败: {str(e)}")
# -------------------------------------------------
# 🧮 综合内存需求计算(保持不变)
# -------------------------------------------------
def calculate_memory_requirements(self, model_id: str, gpu_memory_gb: float, num_gpus: int,
context_length: int, utilization_rate: float = 0.9) -> Dict:
try:
config = self.get_model_config(model_id)
model_size_gb, size_source = self.get_model_size_from_hf(model_id)
kv_info = self.calculate_kv_cache_size(config, context_length)
available_memory = gpu_memory_gb * num_gpus * utilization_rate
other_overhead = model_size_gb * 0.1
total_memory_needed = model_size_gb + kv_info['size_gb'] + other_overhead
is_feasible = total_memory_needed <= available_memory
memory_margin = available_memory - total_memory_needed
memory_per_gpu = total_memory_needed / num_gpus
return {
'model_id': model_id,
'model_size_gb': round(model_size_gb, 2),
'size_source': size_source,
'kv_cache_gb': round(kv_info['size_gb'], 2),
'attention_type': kv_info['attention_type'],
'other_overhead_gb': round(other_overhead, 2),
'total_memory_needed_gb': round(total_memory_needed, 2),
'available_memory_gb': round(available_memory, 2),
'memory_margin_gb': round(memory_margin, 2),
'memory_per_gpu_gb': round(memory_per_gpu, 2),
'is_feasible': is_feasible,
'utilization_per_gpu': round((memory_per_gpu / gpu_memory_gb) * 100, 1),
'config_info': {
'num_layers': getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 'N/A')),
'hidden_size': getattr(config, 'hidden_size', getattr(config, 'd_model', 'N/A')),
'num_attention_heads': kv_info['num_attention_heads'],
'num_kv_heads': kv_info['num_kv_heads'],
'head_dim': kv_info['head_dim']
}
}
except Exception as e:
return {'error': str(e)}
# -------------------------------------------------
# 🌟 Gradio 界面构建(保持原逻辑)
# -------------------------------------------------
def create_gradio_interface():
calculator = LLMMemoryCalculator()
def calculate_memory(model_id, gpu_memory, num_gpus, context_length, utilization_rate):
if not model_id.strip():
return "请输入模型ID"
try:
result = calculator.calculate_memory_requirements(
model_id.strip(),
float(gpu_memory),
int(num_gpus),
int(context_length),
float(utilization_rate) / 100
)
if 'error' in result:
return f"❌ 错误: {result['error']}"
status = "✅ 可以运行" if result['is_feasible'] else "❌ 显存不足"
output = f"""
## 模型分析结果
**模型**: {result['model_id']}
**状态**: {status}
### 📊 内存分析
- **模型大小**: {result['model_size_gb']} GB ({result['size_source']})
- **KV Cache**: {result['kv_cache_gb']} GB
- **其他开销**: {result['other_overhead_gb']} GB
- **总需求**: {result['total_memory_needed_gb']} GB
- **可用显存**: {result['available_memory_gb']} GB
- **剩余显存**: {result['memory_margin_gb']} GB
### 🔧 模型配置
- **注意力类型**: {result['attention_type']}
- **层数**: {result['config_info']['num_layers']}
- **隐藏维度**: {result['config_info']['hidden_size']}
- **注意力头数**: {result['config_info']['num_attention_heads']}
- **KV头数**: {result['config_info']['num_kv_heads']}
- **头维度**: {result['config_info']['head_dim']}
### 💾 GPU使用情况
- **每GPU内存**: {result['memory_per_gpu_gb']} GB
- **每GPU利用率**: {result['utilization_per_gpu']}%
### 💡 建议
"""
if result['is_feasible']:
output += f"✅ 当前配置可以成功运行该模型。剩余 {result['memory_margin_gb']} GB 显存。"
else:
needed_extra = abs(result['memory_margin_gb'])
output += f"❌ 需要额外 {needed_extra} GB 显存才能运行。\n建议:\n- 增加GPU数量\n- 使用更大显存的GPU\n- 减少上下文长度\n- 使用模型量化(如int8/int4)"
return output
except Exception as e:
return f"❌ 计算出错: {str(e)}"
with gr.Blocks(title="LLM GPU内存计算器", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🚀 LLM GPU内存需求计算器")
gr.Markdown("输入模型信息和硬件配置,计算是否能够成功运行大语言模型")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## 📝 输入参数")
model_id = gr.Textbox(label="🤗 Hugging Face 模型ID",
placeholder="例如: deepseek-ai/DeepSeek-R1-0528-Qwen3-8B",
value="deepseek-ai/DeepSeek-R1-0528-Qwen3-8B")
with gr.Row():
gpu_memory = gr.Number(label="💾 单张GPU显存 (GB)", value=24, minimum=1, maximum=1000)
num_gpus = gr.Number(label="🔢 GPU数量", value=1, minimum=1, maximum=64, precision=0)
with gr.Row():
context_length = gr.Number(label="📏 上下文长度", value=16384, minimum=512, maximum=1000000, precision=0)
utilization_rate = gr.Slider(label="⚡ 显存利用率 (%)", minimum=50, maximum=95, value=90, step=5)
calculate_btn = gr.Button("🔍 计算内存需求", variant="primary")
with gr.Column(scale=2):
gr.Markdown("## 📊 计算结果")
output = gr.Markdown("点击计算按钮开始分析...")
calculate_btn.click(fn=calculate_memory,
inputs=[model_id, gpu_memory, num_gpus, context_length, utilization_rate],
outputs=output)
gr.Markdown("""
## 📚 使用示例
**小型模型**: `microsoft/DialoGPT-medium`
**中型模型**: `microsoft/DialoGPT-large`
**大型模型**: `meta-llama/Llama-2-7b-hf`
**超大模型**: `meta-llama/Llama-2-13b-hf`
注意:某些模型可能需要申请访问权限。
""")
return demo
if __name__ == "__main__":
demo = create_gradio_interface()
demo.launch(share=True, debug=True)