# coding=utf-8 # Copyright 2024 Google AI and The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """PyTorch Siglip model.""" import math import warnings from dataclasses import dataclass from typing import Any, Optional, Tuple, Union import numpy as np import torch import torch.utils.checkpoint from torch import nn from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss from torch.nn.init import _calculate_fan_in_and_fan_out import torch.nn.functional as F from transformers.activations import ACT2FN from transformers.modeling_utils import PreTrainedModel from transformers.utils import (add_start_docstrings, add_start_docstrings_to_model_forward, is_flash_attn_2_available, is_flash_attn_greater_or_equal_2_10, logging, replace_return_docstrings) from .configuration_damovl_encoder import DAMOVLVisionConfig if is_flash_attn_2_available(): from flash_attn import flash_attn_varlen_func from transformers.modeling_flash_attention_utils import \ _flash_attention_forward else: flash_attn_varlen_func = None logger = logging.get_logger(__name__) def _trunc_normal_(tensor, mean, std, a, b): # Cut & paste from PyTorch official master until it's in a few official releases - RW # Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf def norm_cdf(x): # Computes standard normal cumulative distribution function return (1.0 + math.erf(x / math.sqrt(2.0))) / 2.0 if (mean < a - 2 * std) or (mean > b + 2 * std): warnings.warn( "mean is more than 2 std from [a, b] in nn.init.trunc_normal_. " "The distribution of values may be incorrect.", stacklevel=2, ) # Values are generated by using a truncated uniform distribution and # then using the inverse CDF for the normal distribution. # Get upper and lower cdf values l = norm_cdf((a - mean) / std) u = norm_cdf((b - mean) / std) # Uniformly fill tensor with values from [l, u], then translate to # [2l-1, 2u-1]. tensor.uniform_(2 * l - 1, 2 * u - 1) # Use inverse cdf transform for normal distribution to get truncated # standard normal tensor.erfinv_() # Transform to proper mean, std tensor.mul_(std * math.sqrt(2.0)) tensor.add_(mean) # Clamp to ensure it's in the proper range tensor.clamp_(min=a, max=b) def trunc_normal_tf_( tensor: torch.Tensor, mean: float = 0.0, std: float = 1.0, a: float = -2.0, b: float = 2.0 ) -> torch.Tensor: """Fills the input Tensor with values drawn from a truncated normal distribution. The values are effectively drawn from the normal distribution :math:`\\mathcal{N}(\text{mean}, \text{std}^2)` with values outside :math:`[a, b]` redrawn until they are within the bounds. The method used for generating the random values works best when :math:`a \\leq \text{mean} \\leq b`. NOTE: this 'tf' variant behaves closer to Tensorflow / JAX impl where the bounds [a, b] are applied when sampling the normal distribution with mean=0, std=1.0 and the result is subsequently scaled and shifted by the mean and std args. Args: tensor: an n-dimensional `torch.Tensor` mean: the mean of the normal distribution std: the standard deviation of the normal distribution a: the minimum cutoff value b: the maximum cutoff value """ with torch.no_grad(): _trunc_normal_(tensor, 0, 1.0, a, b) tensor.mul_(std).add_(mean) def variance_scaling_(tensor, scale=1.0, mode="fan_in", distribution="normal"): fan_in, fan_out = _calculate_fan_in_and_fan_out(tensor) if mode == "fan_in": denom = fan_in elif mode == "fan_out": denom = fan_out elif mode == "fan_avg": denom = (fan_in + fan_out) / 2 variance = scale / denom if distribution == "truncated_normal": # constant is stddev of standard normal truncated to (-2, 2) trunc_normal_tf_(tensor, std=math.sqrt(variance) / 0.87962566103423978) elif distribution == "normal": with torch.no_grad(): tensor.normal_(std=math.sqrt(variance)) elif distribution == "uniform": bound = math.sqrt(3 * variance) with torch.no_grad(): tensor.uniform_(-bound, bound) else: raise ValueError(f"invalid distribution {distribution}") def lecun_normal_(tensor): variance_scaling_(tensor, mode="fan_in", distribution="truncated_normal") def default_flax_embed_init(tensor): variance_scaling_(tensor, mode="fan_in", distribution="normal") # Copied from transformers.models.llama.modeling_llama.rotate_half def rotate_half(x): """Rotates half the hidden dims of the input.""" x1 = x[..., : x.shape[-1] // 2] x2 = x[..., x.shape[-1] // 2 :] return torch.cat((-x2, x1), dim=-1) def apply_rotary_pos_emb_vision(tensor: torch.Tensor, freqs: torch.Tensor) -> torch.Tensor: orig_dtype = tensor.dtype tensor = tensor.float() cos = freqs.cos() sin = freqs.sin() cos = cos.unsqueeze(1).repeat(1, 1, 2).unsqueeze(0).float() sin = sin.unsqueeze(1).repeat(1, 1, 2).unsqueeze(0).float() output = (tensor * cos) + (rotate_half(tensor) * sin) output = output.to(orig_dtype) return output class VisionRotaryEmbedding(nn.Module): def __init__(self, dim: int, theta: float = 10000.0) -> None: super().__init__() inv_freq = 1.0 / (theta ** (torch.arange(0, dim, 2, dtype=torch.float) / dim)) self.register_buffer("inv_freq", inv_freq, persistent=False) def forward(self, seqlen: int) -> torch.Tensor: seq = torch.arange(seqlen, device=self.inv_freq.device, dtype=self.inv_freq.dtype) freqs = torch.outer(seq, self.inv_freq) return freqs class DAMOVLVisionEmbeddings(nn.Module): def __init__(self, config: DAMOVLVisionConfig): super().__init__() self.config = config self.embed_dim = config.hidden_size self.patch_size = config.patch_size self.patch_embedding = nn.Conv2d( in_channels=config.num_channels, out_channels=self.embed_dim, kernel_size=self.patch_size, stride=self.patch_size, padding="valid", ) def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: hidden_states = hidden_states.view( -1, self.config.num_channels, self.patch_size, self.patch_size ) patch_embeds = self.patch_embedding(hidden_states) # shape = [*, width, grid, grid] # embeddings = patch_embeds.flatten(2).transpose(1, 2) embeddings = patch_embeds.view(-1, self.embed_dim) return embeddings class VisionAttention(nn.Module): """Multi-headed attention from 'Attention Is All You Need' paper""" # Copied from transformers.models.clip.modeling_clip.CLIPAttention.__init__ def __init__(self, config): super().__init__() self.config = config self.embed_dim = config.hidden_size self.num_heads = config.num_attention_heads self.head_dim = self.embed_dim // self.num_heads if self.head_dim * self.num_heads != self.embed_dim: raise ValueError( f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:" f" {self.num_heads})." ) self.scale = self.head_dim**-0.5 self.dropout = config.attention_dropout self.k_proj = nn.Linear(self.embed_dim, self.embed_dim) self.v_proj = nn.Linear(self.embed_dim, self.embed_dim) self.q_proj = nn.Linear(self.embed_dim, self.embed_dim) self.out_proj = nn.Linear(self.embed_dim, self.embed_dim) def forward( self, hidden_states: torch.Tensor, cu_seqlens: torch.Tensor, rotary_pos_emb: torch.Tensor = None, ) -> torch.Tensor: """Input shape: Time x Channel""" q_len, _ = hidden_states.size() query_states = self.q_proj(hidden_states) key_states = self.k_proj(hidden_states) value_states = self.v_proj(hidden_states) query_states = query_states.view(q_len, self.num_heads, self.head_dim) key_states = key_states.view(q_len, self.num_heads, self.head_dim) value_states = value_states.view(q_len, self.num_heads, self.head_dim) query_states = apply_rotary_pos_emb_vision(query_states.unsqueeze(0), rotary_pos_emb).squeeze(0) key_states = apply_rotary_pos_emb_vision(key_states.unsqueeze(0), rotary_pos_emb).squeeze(0) attention_mask = torch.zeros([1, q_len, q_len], device=q.device, dtype=torch.bool) for i in range(1, len(cu_seqlens)): attention_mask[..., cu_seqlens[i - 1] : cu_seqlens[i], cu_seqlens[i - 1] : cu_seqlens[i]] = True query_states = query_states.transpose(0, 1) key_states = key_states.transpose(0, 1) value_states = value_states.transpose(0, 1) attn_weights = torch.matmul(query_states, key_states.transpose(1, 2)) / math.sqrt(self.head_dim) attn_weights = attn_weights + attention_mask # upcast attention to fp32 attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(q.dtype) attn_weights = nn.functional.dropout(attn_weights, p=self.dropout, training=self.training) attn_output = torch.matmul(attn_weights, value_states) attn_output = attn_output.transpose(0, 1) attn_output = attn_output.reshape(q_len, -1) attn_output = self.out_proj(attn_output) return attn_output class VisionFlashAttention2(VisionAttention): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Adapted from transformers.models.llama.modeling_llama.LlamaFlashAttention2.forward def forward( self, hidden_states: torch.Tensor, cu_seqlens: torch.Tensor, rotary_pos_emb: torch.Tensor = None, ) -> torch.Tensor: q_len, _ = hidden_states.size() query_states = self.q_proj(hidden_states) key_states = self.k_proj(hidden_states) value_states = self.v_proj(hidden_states) # Flash attention requires the input to have the shape # batch_size x seq_length x head_dim x hidden_dim # therefore we just need to keep the original shape query_states = query_states.view(q_len, self.num_heads, self.head_dim) key_states = key_states.view(q_len, self.num_heads, self.head_dim) value_states = value_states.view(q_len, self.num_heads, self.head_dim) query_states = apply_rotary_pos_emb_vision(query_states.unsqueeze(0), rotary_pos_emb).squeeze(0) key_states = apply_rotary_pos_emb_vision(key_states.unsqueeze(0), rotary_pos_emb).squeeze(0) max_seqlen = (cu_seqlens[1:] - cu_seqlens[:-1]).max().item() attn_output = flash_attn_varlen_func(query_states, key_states, value_states, cu_seqlens, cu_seqlens, max_seqlen, max_seqlen).reshape( q_len, -1 ) attn_output = self.out_proj(attn_output) return attn_output class VisionSdpaAttention(VisionAttention): def forward( self, hidden_states: torch.Tensor, cu_seqlens: torch.Tensor, rotary_pos_emb: torch.Tensor = None, ) -> torch.Tensor: if output_attentions: # TODO: Improve this warning with e.g. `model.config.attn_implementation = "manual"` once this is implemented. logger.warning_once( "DAMOVLVisionModel is using VisionSdpaAttention, but `torch.nn.functional.scaled_dot_product_attention` does not support `output_attentions=True`. Falling back to the manual attention implementation, " 'but specifying the manual implementation will be required from Transformers version v5.0.0 onwards. This warning can be removed using the argument `attn_implementation="eager"` when loading the model.' ) return super().forward( hidden_states=hidden_states, cu_seqlens=cu_seqlens, rotary_pos_emb=rotary_pos_emb, ) seq_length = hidden_states.shape[0] query_states = self.q_proj(hidden_states) key_states = self.k_proj(hidden_states) value_states = self.v_proj(hidden_states) query_states = query_states.view(q_len, self.num_heads, self.head_dim) key_states = key_states.view(q_len, self.num_heads, self.head_dim) value_states = value_states.view(q_len, self.num_heads, self.head_dim) query_states = apply_rotary_pos_emb_vision(query_states.unsqueeze(0), rotary_pos_emb).squeeze(0) key_states = apply_rotary_pos_emb_vision(key_states.unsqueeze(0), rotary_pos_emb).squeeze(0) attention_mask = torch.zeros([1, seq_length, seq_length], device=q.device, dtype=torch.bool) for i in range(1, len(cu_seqlens)): attention_mask[..., cu_seqlens[i - 1] : cu_seqlens[i], cu_seqlens[i - 1] : cu_seqlens[i]] = True query_states = query_states.transpose(0, 1) key_states = key_states.transpose(0, 1) value_states = value_states.transpose(0, 1) attn_output = F.scaled_dot_product_attention(query_states, key_states, value_states, attention_mask, dropout_p=0.0) attn_output = attn_output.transpose(0, 1) attn_output = attn_output.reshape(seq_length, -1) attn_output = self.proj(attn_output) return attn_output DAMOVL_VISION_ATTENTION_CLASSES = { "eager": VisionAttention, "flash_attention_2": VisionFlashAttention2, "sdpa": VisionSdpaAttention, } # Copied from transformers.models.clip.modeling_clip.CLIPMLP with CLIP->DAMOVL class DAMOVLVisionMLP(nn.Module): def __init__(self, config): super().__init__() self.config = config self.activation_fn = ACT2FN[config.hidden_act] self.fc1 = nn.Linear(config.hidden_size, config.intermediate_size) self.fc2 = nn.Linear(config.intermediate_size, config.hidden_size) def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: hidden_states = self.fc1(hidden_states) hidden_states = self.activation_fn(hidden_states) hidden_states = self.fc2(hidden_states) return hidden_states class DAMOVLVisionEncoderLayer(nn.Module): def __init__(self, config: DAMOVLVisionConfig): super().__init__() self.embed_dim = config.hidden_size self.self_attn = DAMOVL_VISION_ATTENTION_CLASSES[config._attn_implementation](config=config) self.layer_norm1 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps) self.mlp = DAMOVLVisionMLP(config) self.layer_norm2 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps) # Ignore copy def forward(self, hidden_states, cu_seqlens, rotary_pos_emb) -> torch.Tensor: hidden_states = hidden_states + self.self_attn( self.layer_norm1(hidden_states), cu_seqlens=cu_seqlens, rotary_pos_emb=rotary_pos_emb ) hidden_states = hidden_states + self.mlp(self.layer_norm2(hidden_states)) return hidden_states class DAMOVLPreTrainedModel(PreTrainedModel): """ An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained models. """ config_class = DAMOVLVisionConfig base_model_prefix = "damovl" supports_gradient_checkpointing = True _no_split_modules = [ "DAMOVLVisionEncoderLayer", "DAMOVLVisionEmbeddings", ] _supports_flash_attn_2 = True _supports_sdpa = True def _init_weights(self, module): """Initialize the weights""" if isinstance(module, nn.Embedding): default_flax_embed_init(module.weight) elif isinstance(module, VisionAttention): nn.init.xavier_uniform_(module.q_proj.weight) nn.init.xavier_uniform_(module.k_proj.weight) nn.init.xavier_uniform_(module.v_proj.weight) nn.init.xavier_uniform_(module.out_proj.weight) nn.init.zeros_(module.q_proj.bias) nn.init.zeros_(module.k_proj.bias) nn.init.zeros_(module.v_proj.bias) nn.init.zeros_(module.out_proj.bias) elif isinstance(module, DAMOVLVisionMLP): nn.init.xavier_uniform_(module.fc1.weight) nn.init.xavier_uniform_(module.fc2.weight) nn.init.normal_(module.fc1.bias, std=1e-6) nn.init.normal_(module.fc2.bias, std=1e-6) elif isinstance(module, (nn.Linear, nn.Conv2d)): lecun_normal_(module.weight) if module.bias is not None: nn.init.zeros_(module.bias) elif isinstance(module, nn.LayerNorm): module.bias.data.zero_() module.weight.data.fill_(1.0) class DAMOVLVisionEncoder(nn.Module): def __init__(self, config: DAMOVLVisionConfig): super().__init__() self.config = config head_dim = config.hidden_size // config.num_attention_heads self.spatial_merge_size = config.spatial_merge_size self.rotary_pos_emb = VisionRotaryEmbedding(head_dim // 2) self.layers = nn.ModuleList([DAMOVLVisionEncoderLayer(config) for _ in range(config.num_hidden_layers)]) self.gradient_checkpointing = False def rot_pos_emb(self, grid_thw, strides): pos_ids = [] for (t, h, w), stride in zip(grid_thw, strides): hpos_ids = torch.arange(h).unsqueeze(1).expand(-1, w) hpos_ids = hpos_ids.reshape( h // stride, stride, w // stride, stride, ) hpos_ids = hpos_ids.permute(0, 2, 1, 3) hpos_ids = hpos_ids.flatten() wpos_ids = torch.arange(w).unsqueeze(0).expand(h, -1) wpos_ids = wpos_ids.reshape( h // stride, stride, w // stride, stride, ) wpos_ids = wpos_ids.permute(0, 2, 1, 3) wpos_ids = wpos_ids.flatten() pos_ids.append(torch.stack([hpos_ids, wpos_ids], dim=-1).repeat(t, 1)) pos_ids = torch.cat(pos_ids, dim=0) max_grid_size = grid_thw[:, 1:].max() rotary_pos_emb_full = self.rotary_pos_emb(max_grid_size) rotary_pos_emb = rotary_pos_emb_full[pos_ids].flatten(1) return rotary_pos_emb def forward(self, hidden_states, grid_thws, strides) -> torch.Tensor: # BUG: These codes will cause deepspeed issue: `RuntimeError: disagreement between rank0 and rankx` # rotary_pos_emb = [] # for thw in grid_thws: # rotary_pos_emb.append(self.rot_pos_emb(thw).unsqueeze(0)) # rotary_pos_emb1 = torch.cat(rotary_pos_emb, dim=1).squeeze(0) # grid_thws = torch.cat(grid_thws, dim = 0) # new version of creating rotary position embedding # grid_thws shapes like [batch_flatten_image_num, 3] # grid_thws = torch.cat(grid_thws, dim = 0) # is conducted in the `encoder.py` rotary_pos_emb = self.rot_pos_emb(grid_thws, strides) cu_seqlens = torch.repeat_interleave(grid_thws[:, 1] * grid_thws[:, 2], grid_thws[:, 0]).cumsum(dim=0, dtype=torch.int32) cu_seqlens = F.pad(cu_seqlens, (1, 0), value=0) for blk in self.layers: if self.gradient_checkpointing and self.training: hidden_states = self._gradient_checkpointing_func( blk.__call__, hidden_states, cu_seqlens, rotary_pos_emb ) else: hidden_states = blk(hidden_states, cu_seqlens=cu_seqlens, rotary_pos_emb=rotary_pos_emb) return hidden_states class DAMOVLVisionTransformer(nn.Module): def __init__(self, config: DAMOVLVisionConfig): super().__init__() self.config = config embed_dim = config.hidden_size self.embeddings = DAMOVLVisionEmbeddings(config) self.encoder = DAMOVLVisionEncoder(config) self.post_layernorm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps) def forward(self, hidden_states, grid_thws, strides) -> torch.Tensor: # print(hidden_states) # hidden_states = torch.cat(hidden_states, dim = 1) hidden_states = self.embeddings(hidden_states) hidden_states = self.encoder(hidden_states, grid_thws, strides) hidden_states = self.post_layernorm(hidden_states) return hidden_states class DAMOVLVisionModel(DAMOVLPreTrainedModel): config_class = DAMOVLVisionConfig main_input_name = "hidden_states" def __init__(self, config: DAMOVLVisionConfig): super().__init__(config) self.vision_model = DAMOVLVisionTransformer(config) # Initialize weights and apply final processing self.post_init() def get_input_embeddings(self) -> nn.Module: return self.vision_model.embeddings.patch_embedding def forward(self, hidden_states, grid_thws, strides) -> torch.Tensor: return self.vision_model(hidden_states=hidden_states, grid_thws=grid_thws, strides=strides)