|
--- |
|
license: mit |
|
tags: |
|
- pytorch |
|
- stable-diffusion |
|
- text2Image |
|
- stabilityai/stable-diffusion-2-1 |
|
datasets: |
|
- xchuan/text2image-fupo |
|
language: |
|
- en |
|
base_model: |
|
- stabilityai/stable-diffusion-2-1 |
|
pipeline_tag: text-to-image |
|
library_name: diffusers |
|
--- |
|
|
|
# This LoRA is trained based on stabilityai/stable-diffusion-2-1. |
|
|
|
## Inference |
|
```python |
|
from diffusers import StableDiffusionPipeline, DDIMScheduler |
|
pretrained_model_name_or_path = "stabilityai/stable-diffusion-2-1" |
|
weight_dtype = torch.float16 |
|
# 加载基础模型 |
|
pipeline = StableDiffusionPipeline.from_pretrained(pretrained_model_name_or_path, torch_dtype=weight_dtype).to("cuda") |
|
pipeline.load_lora_weights("xchuan/lora-stable-diffusion-2-1-fupo") |
|
pipeline.scheduler = DDIMScheduler.from_config(pipeline.scheduler.config) |
|
prompt = "A cartoon woman with pigtails, round face, colorful dress, and sunglasses" |
|
|
|
# 使用加载的 LoRA 模型进行推理 |
|
image = pipeline(prompt).images[0] |
|
|
|
# 使用 matplotlib 显示生成的图像 |
|
plt.imshow(image) |
|
plt.axis('off') # 隐藏坐标轴 |
|
plt.show() |
|
|
|
``` |
|
<div> |
|
<img src="./image1.jpg" width="300" style="display: inline-block;"/> |
|
<img src="./image2.jpg" width="300" style="display: inline-block;"/> |
|
</div> |
|
|
|
## Training code |
|
|
|
```python |
|
import torch |
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
from datasets import load_dataset |
|
|
|
dataset = load_dataset("xchuan/text2image-fupo",split="train") |
|
|
|
from transformers import CLIPTokenizer |
|
from huggingface_hub import login |
|
# ========== LoRA 模型库 ========== |
|
from peft import LoraConfig, get_peft_model, PeftModel |
|
|
|
|
|
login(token="替换为你自己的",add_to_git_credential=True) |
|
|
|
weight_dtype = torch.bfloat16 |
|
train_batch_size = 4 |
|
snr_gamma = 5 # SNR 参数,用于信噪比加权损失的调节系数 |
|
# 设置随机数种子以确保可重复性 |
|
seed = 1126 # 随机数种子 |
|
torch.manual_seed(seed) |
|
if torch.cuda.is_available(): |
|
torch.cuda.manual_seed_all(seed) |
|
|
|
# 优化器参数 |
|
unet_learning_rate = 1e-6 # UNet 的学习率,控制 UNet 参数更新的步长 |
|
text_encoder_learning_rate = 1e-4 # 文本编码器的学习率,控制文本嵌入层的参数更新步长 |
|
|
|
# 学习率调度器参数 |
|
lr_scheduler_name = "cosine_with_restarts" # 设置学习率调度器为 Cosine annealing with restarts,逐渐减少学习率并定期重启 |
|
lr_warmup_steps = 100 # 学习率预热步数,在最初的 100 步中逐渐增加学习率到最大值 |
|
max_train_steps = 500 # 总训练步数,决定了整个训练过程的迭代次数 |
|
num_cycles = 1 # Cosine 调度器的周期数量,在训练期间会重复 3 次学习率周期性递减并重启 |
|
|
|
pretrained_model_name_or_path = "stabilityai/stable-diffusion-2-1" |
|
|
|
# LoRA 配置 |
|
unet_lora_config = LoraConfig( |
|
r=32, # LoRA 的秩,即低秩矩阵的维度,决定了参数调整的自由度 |
|
lora_alpha=16, # 缩放系数,控制 LoRA 权重对模型的影响 |
|
init_lora_weights="gaussian", |
|
target_modules=["to_k", "to_q", "to_v", "to_out.0"], |
|
lora_dropout=0 # LoRA dropout 概率,0 表示不使用 dropout |
|
) |
|
|
|
from torchvision import transforms |
|
from torch.utils.data import DataLoader |
|
|
|
resolution = 512 |
|
|
|
|
|
train_transform = transforms.Compose([ |
|
transforms.Resize(resolution, interpolation=transforms.InterpolationMode.BILINEAR), # 调整图像大小 |
|
transforms.CenterCrop(resolution), # 中心裁剪图像 |
|
transforms.RandomHorizontalFlip(), # 随机水平翻转 |
|
transforms.ToTensor(), # 将图像转换为张量 |
|
]) |
|
|
|
def collate_fn(examples): |
|
pixel_values = [] |
|
input_ids = [] |
|
|
|
for example in examples: |
|
image_tensor = train_transform(example["image"]) |
|
if not isinstance(image_tensor, torch.Tensor): |
|
print(f"Expected Tensor, got {type(image_tensor)} instead.") |
|
continue |
|
pixel_values.append(image_tensor) |
|
|
|
input_text = "fupo:" + example["text"] |
|
tokenizer = CLIPTokenizer.from_pretrained(pretrained_model_name_or_path, subfolder="tokenizer") |
|
encode_text = tokenizer(input_text, return_tensors="pt",padding="max_length",truncation=True) |
|
inputs_id = encode_text["input_ids"].squeeze(0) |
|
input_ids.append(inputs_id) |
|
|
|
# 如果没有有效的图像,则返回空的字典 |
|
if len(pixel_values) == 0: |
|
return {"pixel_values": torch.empty(0), "input_ids": torch.empty(0)} |
|
|
|
pixel_values = torch.stack(pixel_values, dim=0).float() |
|
input_ids = torch.stack(input_ids, dim=0) |
|
return {"pixel_values": pixel_values, "input_ids": input_ids} |
|
|
|
|
|
train_dataloader = DataLoader(dataset, shuffle=True, collate_fn=collate_fn, batch_size=train_batch_size) |
|
|
|
from diffusers import SD3Transformer2DModel |
|
|
|
def prepare_lora_model(unet_lora_config, pretrained_model_name_or_path, model_path=None, resume=False, merge_lora=False): |
|
""" |
|
(1) 目标: |
|
- 加载完整的 Stable Diffusion 模型,包括 LoRA 层,并根据需要合并 LoRA 权重。这包括 Tokenizer、噪声调度器、UNet、VAE 和文本编码器。 |
|
|
|
(2) 参数: |
|
- unet_lora_config: LoraConfig, LoRA 的配置对象 |
|
- pretrained_model_name_or_path: str, Hugging Face 上的模型名称或路径 |
|
- model_path: str, 预训练模型的路径 |
|
- resume: bool, 是否从上一次训练中恢复 |
|
- merge_lora: bool, 是否在推理时合并 LoRA 权重 |
|
|
|
(3) 返回: |
|
- tokenizer: CLIPTokenizer |
|
- noise_scheduler: DDPMScheduler |
|
- unet: UNet2DConditionModel |
|
- vae: AutoencoderKL |
|
- text_encoder: CLIPTextModel |
|
""" |
|
# 加载噪声调度器,用于控制扩散模型的噪声添加和移除过程 |
|
noise_scheduler = DDIMScheduler.from_pretrained(pretrained_model_name_or_path, subfolder="scheduler") |
|
|
|
# 加载 Tokenizer,用于将文本标注转换为 tokens |
|
tokenizer = CLIPTokenizer.from_pretrained( |
|
pretrained_model_name_or_path, |
|
subfolder="tokenizer" |
|
) |
|
|
|
# 加载 CLIP 文本编码器,用于将文本标注转换为特征向量 |
|
text_encoder = CLIPTextModel.from_pretrained( |
|
pretrained_model_name_or_path, |
|
torch_dtype=weight_dtype, |
|
subfolder="text_encoder" |
|
) |
|
|
|
# 加载 VAE 模型,用于在扩散模型中处理图像的潜在表示 |
|
vae = AutoencoderKL.from_pretrained( |
|
pretrained_model_name_or_path, |
|
subfolder="vae" |
|
) |
|
|
|
# 加载 UNet 模型,负责处理扩散模型中的图像生成和推理过程 |
|
unet = UNet2DConditionModel.from_pretrained( |
|
pretrained_model_name_or_path, |
|
torch_dtype=weight_dtype, |
|
subfolder="unet" |
|
) |
|
|
|
# 冻结 VAE 参数 |
|
vae.requires_grad_(False) |
|
text_encoder.requires_grad_(False) |
|
unet.requires_grad_(False) |
|
|
|
# 如果设置为继续训练,则加载上一次的模型权重 |
|
if resume: |
|
if model_path is None or not os.path.exists(model_path): |
|
raise ValueError("当 resume 设置为 True 时,必须提供有效的 model_path") |
|
# 使用 PEFT 的 from_pretrained 方法加载 LoRA 模型 |
|
# text_encoder = PeftModel.from_pretrained(text_encoder, os.path.join(model_path, "text_encoder")) |
|
unet = PeftModel.from_pretrained(unet, os.path.join(model_path, "unet")) |
|
|
|
# 确保 LoRA 参数是可训练的,仅将指定的模块参数设为可训练 |
|
target_modules = ["to_k", "to_q", "to_v", "to_out.0"] |
|
|
|
for name, param in unet.named_parameters(): |
|
# 只对指定的目标模块设置 requires_grad 为 True |
|
if any(target_module in name for target_module in target_modules): |
|
param.requires_grad = True # 仅将 LoRA 参数设为可训练 |
|
|
|
|
|
print(f"✅ 已从 {model_path} 恢复模型权重") |
|
|
|
else: |
|
|
|
# 将 LoRA 配置应用到unet |
|
unet.add_adapter(unet_lora_config) |
|
|
|
# 打印可训练参数数量 |
|
print("📊 UNet 可训练参数:") |
|
trainable_params = 0 |
|
for name, param in unet.named_parameters(): |
|
if param.requires_grad: |
|
param_count = param.numel() # 计算该参数张量的元素数量 |
|
trainable_params += param_count |
|
# print(f"可训练参数: {name}, 形状: {param.shape}, 参数数量: {param_count}") |
|
|
|
print(f"总的 LoRA 可训练参数数量: {trainable_params}") |
|
|
|
if merge_lora: |
|
# 合并 LoRA 权重到基础模型,仅在推理时调用 |
|
# text_encoder = text_encoder.merge_and_unload() |
|
unet = unet.merge_and_unload() |
|
|
|
# 切换为评估模式 |
|
text_encoder.eval() |
|
unet.eval() |
|
|
|
# 将模型移动到 GPU 上并设置权重的数据类型 |
|
unet.to(device, dtype=weight_dtype) |
|
vae.to(device, dtype=weight_dtype) |
|
text_encoder.to(device, dtype=weight_dtype) |
|
|
|
return tokenizer, noise_scheduler, unet, vae, text_encoder |
|
|
|
def prepare_optimizer(unet, text_encoder, unet_learning_rate=5e-4, text_encoder_learning_rate=1e-4): |
|
# 筛选出 UNet 中需要训练的 Lora 层参数 |
|
unet_lora_layers = [p for p in unet.parameters() if p.requires_grad] |
|
|
|
# 将需要训练的参数分组并设置不同的学习率 |
|
trainable_params = [ |
|
{"params": unet_lora_layers, "lr": unet_learning_rate}, |
|
] |
|
|
|
# 使用 AdamW 优化器 |
|
optimizer = torch.optim.AdamW(trainable_params) |
|
|
|
return optimizer |
|
|
|
import os |
|
from diffusers.optimization import get_scheduler |
|
from diffusers.training_utils import compute_snr |
|
from diffusers import DDPMScheduler,AutoencoderKL,UNet2DConditionModel |
|
from transformers import CLIPTextModel |
|
|
|
project_name = "fupo" |
|
dataset_name = "fupo" |
|
# 根目录和主要目录 |
|
root_dir = "./" # 当前目录 |
|
main_dir = os.path.join(root_dir, "SD-2-1") # 主目录 |
|
# 项目目录 |
|
project_dir = os.path.join(main_dir, project_name) |
|
model_path = os.path.join(project_dir, "logs", "checkpoint-last") |
|
|
|
# 项目目录 |
|
project_dir = os.path.join(main_dir, project_name) |
|
model_path = os.path.join(project_dir, "logs", "checkpoint-last") |
|
|
|
# 准备模型 |
|
tokenizer, noise_scheduler, unet, vae, text_encoder = prepare_lora_model( |
|
unet_lora_config, |
|
pretrained_model_name_or_path, |
|
model_path, |
|
resume=False, |
|
merge_lora=False |
|
) |
|
|
|
# 准备优化器 |
|
optimizer = prepare_optimizer( |
|
unet, |
|
text_encoder, |
|
unet_learning_rate=unet_learning_rate, |
|
text_encoder_learning_rate=text_encoder_learning_rate |
|
) |
|
|
|
# 设置学习率调度器 |
|
lr_scheduler = get_scheduler( |
|
lr_scheduler_name, |
|
optimizer=optimizer, |
|
num_warmup_steps=lr_warmup_steps, |
|
num_training_steps=max_train_steps, |
|
num_cycles=num_cycles |
|
) |
|
|
|
print("✅ 模型和优化器准备完成!可以开始训练。") |
|
|
|
import math |
|
from huggingface_hub import HfApi, Repository |
|
from tqdm.auto import tqdm |
|
import torch.nn.functional as F |
|
from peft.utils import get_peft_model_state_dict |
|
from diffusers.utils import convert_state_dict_to_diffusers |
|
|
|
accumulation_steps = 4 # 梯度累积步数 |
|
max_norm = 0.5 |
|
output_folder = os.path.join(project_dir, "logs") |
|
# 禁用并行化,避免警告 |
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
# 初始化 |
|
global_step = 0 |
|
best_loss = float("inf") # 初始化为正无穷大,存储最佳损失值 |
|
|
|
# 进度条显示训练进度 |
|
progress_bar = tqdm( |
|
range(max_train_steps), # 根据 num_training_steps 设置 |
|
desc="训练步骤", |
|
) |
|
|
|
# 训练循环 |
|
for epoch in range(math.ceil(max_train_steps / len(train_dataloader))): |
|
# 如果你想在训练中增加评估,那在循环中增加 train() 是有必要的 |
|
unet.train() |
|
|
|
for step, batch in enumerate(train_dataloader): |
|
if global_step >= max_train_steps: |
|
break |
|
|
|
# 编码图像为潜在表示(latent) |
|
latents = vae.encode(batch["pixel_values"].to(device, dtype=weight_dtype)).latent_dist.sample() |
|
latents = latents * vae.config.scaling_factor # 根据 VAE 的缩放因子调整潜在空间 |
|
|
|
# 为潜在表示添加噪声,生成带噪声的图像 |
|
noise = torch.randn_like(latents) # 生成与潜在表示相同形状的随机噪声 |
|
timesteps = torch.randint(0, noise_scheduler.config.num_train_timesteps, (latents.shape[0],), device=device).long() |
|
noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps) |
|
|
|
# 获取文本的嵌入表示 |
|
encoder_hidden_states = text_encoder(batch["input_ids"].to(device),return_dict=False)[0] |
|
assert encoder_hidden_states is not None, "Encoder hidden states should not be None" |
|
|
|
# 计算目标值 |
|
if noise_scheduler.config.prediction_type == "epsilon": |
|
target = noise # 预测噪声 |
|
elif noise_scheduler.config.prediction_type == "v_prediction": |
|
target = noise_scheduler.get_velocity(latents, noise, timesteps) # 预测速度向量 |
|
|
|
# UNet 模型预测 |
|
with torch.autograd.detect_anomaly(): |
|
model_pred = unet(noisy_latents, timesteps, encoder_hidden_states, return_dict=False)[0] |
|
assert model_pred is not None, "Model prediction should not be None" |
|
|
|
# 计算损失 |
|
if not snr_gamma: |
|
loss = F.mse_loss(model_pred.float(), target.float(), reduction="mean") |
|
else: |
|
# 计算信噪比 (SNR) 并根据 SNR 加权 MSE 损失 |
|
snr = compute_snr(noise_scheduler, timesteps) |
|
mse_loss_weights = torch.stack([snr, snr_gamma * torch.ones_like(timesteps)], dim=1).min(dim=1)[0] |
|
if noise_scheduler.config.prediction_type == "epsilon": |
|
mse_loss_weights = mse_loss_weights / snr |
|
elif noise_scheduler.config.prediction_type == "v_prediction": |
|
mse_loss_weights = mse_loss_weights / (snr + 1) |
|
|
|
# 计算加权的 MSE 损失 |
|
loss = F.mse_loss(model_pred.float(), target.float(), reduction="none") |
|
loss = loss.mean(dim=list(range(1, len(loss.shape)))) * mse_loss_weights |
|
loss = loss.mean() |
|
|
|
# 反向传播 |
|
loss.backward() |
|
torch.nn.utils.clip_grad_norm_(unet.parameters(), max_norm) |
|
# 梯度累积 |
|
if (global_step + 1) % accumulation_steps == 0: |
|
optimizer.step() |
|
lr_scheduler.step() |
|
optimizer.zero_grad() |
|
progress_bar.update(1) |
|
global_step += 1 |
|
|
|
if global_step %100 == 0: |
|
# 保存当前损失最低的模型 |
|
if loss.item() < best_loss: |
|
best_loss = loss.item() |
|
save_path = os.path.join(output_folder, "best_checkpoint") |
|
os.makedirs(save_path, exist_ok=True) |
|
|
|
# 使用 save_pretrained 保存 PeftModel |
|
unet_lora_state_dict = convert_state_dict_to_diffusers(get_peft_model_state_dict(unet)) |
|
StableDiffusionPipeline.save_lora_weights( |
|
save_directory=save_path, |
|
unet_lora_layers=unet_lora_state_dict, |
|
safe_serialization=True, |
|
) |
|
# text_encoder.save_pretrained(os.path.join(save_path, "text_encoder")) |
|
print(f"💾 损失最小模型已保存到 {save_path}, 当前损失: {best_loss}") |
|
|
|
# 保存最终模型到 checkpoint-last |
|
save_path = os.path.join(output_folder, "checkpoint-last") |
|
os.makedirs(save_path, exist_ok=True) |
|
unet_lora_state_dict = convert_state_dict_to_diffusers(get_peft_model_state_dict(unet)) |
|
StableDiffusionPipeline.save_lora_weights( |
|
save_directory=save_path, |
|
unet_lora_layers=unet_lora_state_dict, |
|
safe_serialization=True, |
|
) |
|
print(f"💾 已保存最终模型到 {save_path}") |
|
|
|
``` |