MedSam / sam2 /sam2_video_trainer.py
Dhruv-Ty's picture
initial commit
ac239ba
import unittest
import torch
import torch.nn as nn
import torch.nn.functional as F
from sam2.modeling.sam2_base import SAM2Base
from sam2.build_sam import build_sam2_video_predictor
from sam2.utils.transforms import SAM2Transforms
from sam2.modeling.sam2_utils import get_1d_sine_pe
class SAM2VideoTrainer(nn.Module):
"""
SAM2VideoTrainer is a PyTorch module for training a video segmentation model using SAM2.
Attributes:
device (torch.device): The device to run the model on.
model (nn.Module): The SAM2 video predictor model.
num_feature_levels (int): Number of feature levels in the model.
memory_size (int): Size of the memory for storing features.
_transforms (SAM2Transforms): Transformations applied to the input data.
_bb_feat_sizes (list): Spatial dimensions for backbone feature maps.
num_maskmem (int): Number of mask memory features.
sam_point_coords (torch.Tensor): Placeholder for SAM point coordinates.
sam_point_labels (torch.Tensor): Placeholder for SAM point labels.
_orig_hw (list): Original height and width of the input frames.
maskmem_features (list): List of mask memory features.
maskmem_pos_enc (list): List of mask memory positional encodings.
batch_size (int): Batch size of the input data.
obj_ptrs (list): List of object pointers.
"""
def __init__(self, model_cfg, sam2_checkpoint, device, memory_size=7, mask_threshold=0.5, use_mask_threshold=False):
"""
Initializes the SAM2VideoTrainer class.
Args:
model_cfg (dict): Configuration dictionary for the model.
sam2_checkpoint (str): Path to the SAM2 checkpoint file.
device (torch.device): The device to run the model on (e.g., 'cpu' or 'cuda').
memory_size (int, optional): Size of the memory. Defaults to 7.
mask_threshold (float, optional): Threshold for mask prediction. Defaults to 0.5.
use_mask_threshold (bool, optional): Flag to use mask thresholding. Defaults to False.
Attributes:
device (torch.device): The device to run the model on.
model (SAM2VideoPredictor): The SAM2 video predictor model.
num_feature_levels (int): Number of feature levels in the model.
memory_size (int): Size of the memory.
_transforms (SAM2Transforms): Transformations applied to the input data.
_bb_feat_sizes (list): Spatial dimensions for backbone feature maps.
num_maskmem (int): Number of mask memories.
sam_point_coords (torch.Tensor): Tensor for SAM point coordinates.
sam_point_labels (torch.Tensor): Tensor for SAM point labels.
mask_threshold (float): Threshold for mask prediction.
use_mask_threshold (bool): Flag to use mask thresholding.
"""
super().__init__()
self.device = device
self.model = build_sam2_video_predictor(
model_cfg, sam2_checkpoint, device=self.device, mode="train"
)
self.model.train()
self.num_feature_levels = self.model.num_feature_levels
self.num_maskmem = 7
self.memory_size = (
memory_size if memory_size <= self.num_maskmem else self.num_maskmem
)
self._transforms = SAM2Transforms(
resolution=self.model.image_size,
mask_threshold=0.5,
max_hole_area=0,
max_sprinkle_area=0,
)
# Spatial dim for backbone feature maps
self._bb_feat_sizes = [
(256, 256),
(128, 128),
(64, 64),
]
self.sam_point_coords = torch.zeros(1, 1, 2, device=device)
self.sam_point_labels = -torch.ones(1, 1, dtype=torch.int32, device=device)
self.mask_threshold = mask_threshold
self.use_mask_threshold = use_mask_threshold
self.init_state()
def init_state(self):
"""
Initializes the state variables for the video trainer.
This method sets the initial state of various attributes used in the video
training process. It resets the original height and width, mask memory
features, mask memory positional encoding, batch size, and object pointers
to their default values.
Attributes:
_orig_hw (tuple or None): Original height and width of the video frames.
maskmem_features (Any or None): Features related to mask memory.
maskmem_pos_enc (Any or None): Positional encoding for mask memory.
batch_size (int or None): Size of the batch for training.
obj_ptrs (list): List of object pointers used in the training process.
"""
self._orig_hw = None
self.maskmem_features = None
self.maskmem_pos_enc = None
self.batch_size = None
self.current_frame_idx = 0
self.obj_ptrs = []
self.num_frames = 0
def reset_state(self):
"""
Resets the state of the video trainer.
This method clears the internal state variables, setting them to their initial values:
- `_orig_hw`: Set to None. Represents the original height and width.
- `maskmem_features`: Set to None. Represents the mask memory features.
- `maskmem_pos_enc`: Set to None. Represents the mask memory positional encoding.
- `batch_size`: Set to None. Represents the batch size.
- `obj_ptrs`: Set to an empty list. Represents the object pointers.
"""
self._orig_hw = None
self.maskmem_features = None
self.maskmem_pos_enc = None
self.batch_size = None
self.current_frame_idx = 0
self.obj_ptrs = []
self.num_frames = 0
def forward(self, videos, bboxes, labels=None):
"""
Forward pass for processing video frames and predicting masks, logits, and IoUs.
Args:
videos (torch.Tensor): A tensor of shape (batch_size, num_frames, C, H, W) representing the input video frames.
bboxes (torch.Tensor): A tensor of shape (batch_size, 4) representing the bounding boxes for the first frame.
labels (torch.Tensor, optional): A tensor of shape (batch_size, num_frames, H, W) representing the ground truth masks for each frame. Defaults to None.
Returns:
tuple: A tuple containing:
- all_masks (list of torch.Tensor): A list of tensors representing the predicted masks for each frame.
- all_logits (list of torch.Tensor): A list of tensors representing the predicted logits for each frame.
- all_ious (list of torch.Tensor): A list of tensors representing the predicted IoUs for each frame.
"""
self.init_state()
batch_size, num_frames, C, H, W = videos.shape
self.num_frames = num_frames
self._orig_hw = [H, W]
self.batch_size = batch_size
# Extract features for all frames in the video
videos = videos.view(batch_size * num_frames, C, H, W)
features = self.model.forward_image(videos) # Extract features for all frames
features = {
k: (
v.view(batch_size, num_frames, *v.shape[1:])
if not isinstance(v, list)
else ([_v.view(batch_size, num_frames, *_v.shape[1:]) for _v in v])
)
for k, v in features.items()
}
frame_features = self.preprocess_frame_features(
features, batch_size, num_frames
)
# Process the first frame with bounding boxes as prompts
first_frame_features = frame_features[0]
first_frame_bbox = bboxes.view(batch_size, 4)
# Predict the first frame masks and IoUs
first_frame_masks, first_frame_logits, first_frame_ious, object_score_logits = (
self._predict_first_frame(first_frame_features, first_frame_bbox)
)
# Initialize memory with first frame predictions
prev_pred_mask = first_frame_masks if labels is None else labels[:, 0]
memory = self._initialize_memory(first_frame_features, prev_pred_mask, object_score_logits)
# Process remaining frames
all_masks, all_logits, all_ious = (
[first_frame_masks],
[first_frame_logits],
[first_frame_ious],
)
for t in range(1, num_frames):
self.current_frame_idx = t
frame_feature = frame_features[t]
masks, logits, ious, object_score_logits = self._predict_frame(
frame_feature, memory, prev_pred_mask
)
all_masks.append(masks)
all_logits.append(logits)
all_ious.append(ious)
if t < num_frames - 1:
prev_pred_mask = masks if labels is None else labels[:, t]
memory = self._update_memory(frame_feature, prev_pred_mask, memory, object_score_logits)
self.reset_state()
return all_masks, all_logits, all_ious
def normalize_bbox(self, bbox):
"""
Normalize the given bounding box coordinates.
This method transforms the bounding box coordinates to a normalized form
based on the original height and width of the image.
Args:
bbox (list or ndarray): The bounding box coordinates to be normalized.
Returns:
list or ndarray: The normalized bounding box coordinates.
"""
unnorm_bbox = self._transforms.transform_boxes(
bbox, normalize=True, orig_hw=self._orig_hw
)
return unnorm_bbox
def _get_points_placeholder(self, batch_size=None):
"""
Generates a placeholder for point coordinates and labels.
Args:
batch_size (int, optional): The size of the batch. If not provided,
defaults to the instance's batch_size attribute.
Returns:
tuple: A tuple containing:
- torch.Tensor: Expanded point coordinates tensor of shape (batch_size, -1, -1).
- torch.Tensor: Expanded point labels tensor of shape (batch_size, -1).
"""
batch_size = self.batch_size if batch_size is None else batch_size
points_placeholder = (
self.sam_point_coords.expand(batch_size, -1, -1),
self.sam_point_labels.expand(batch_size, -1),
)
return points_placeholder
def unbind_frame_features(self, frame_features, num_frames):
"""
Unbind image features from the model.
"""
keys = frame_features.keys()
unbinded_frame_features = []
for frame_idx in range(num_frames):
frame_feature = {}
for k in keys:
frame_feature[k] = (
frame_features[k][:, frame_idx]
if not isinstance(frame_features[k], list)
else [v[:, frame_idx] for v in frame_features[k]]
)
unbinded_frame_features.append(frame_feature)
return unbinded_frame_features
def preprocess_frame_features(self, frame_features, batch_size, num_frames):
"""
Preprocess frame features.
"""
frame_features = self.unbind_frame_features(frame_features, num_frames)
preprocessed_frame_features = []
for frame_idx, frame_feature in enumerate(frame_features):
feature_maps = frame_feature["backbone_fpn"][-self.num_feature_levels :]
# flatten NxCxHxW to HWxNxC
vision_feats = [x.flatten(2).permute(2, 0, 1) for x in feature_maps]
if (
frame_idx == 0 and self.model.directly_add_no_mem_embed
): # Add no memory embedding
vision_feats[-1] = vision_feats[-1] + self.model.no_mem_embed
# HWxNxC to NxCxHxW
feats = [
feat.permute(1, 2, 0).view(batch_size, -1, *feat_size)
for feat, feat_size in zip(
vision_feats[::-1], self._bb_feat_sizes[::-1]
)
][::-1]
_features = {
"image_embed": feats[-1],
"high_res_feats": feats[:-1],
"backbone_fpn": frame_feature["backbone_fpn"][
-self.num_feature_levels :
],
"vision_pos_enc": frame_feature["vision_pos_enc"][
-self.num_feature_levels :
],
}
preprocessed_frame_features.append(_features)
return preprocessed_frame_features
def _embed_bbox(self, bbox):
"""
Embed bounding boxes.
"""
bbox = self.normalize_bbox(bbox)
box_coords = bbox.reshape(-1, 2, 2)
box_labels = torch.tensor([[2, 3]], dtype=torch.int, device=bbox.device)
box_labels = box_labels.repeat(bbox.size(0), 1)
concat_points = (box_coords, box_labels)
sparse_embeddings, dense_embeddings = self.model.sam_prompt_encoder(
points=concat_points, boxes=None, masks=None
)
return sparse_embeddings, dense_embeddings
def _predict_first_frame(self, features, bbox):
"""
Predict masks and IoUs for the first frame.
"""
sparse_embeddings, dense_embeddings = self._embed_bbox(bbox)
low_res_masks, ious, sam_output_tokens, object_score_logits = (
self.model.sam_mask_decoder(
image_embeddings=features["image_embed"],
image_pe=self.model.sam_prompt_encoder.get_dense_pe(),
sparse_prompt_embeddings=sparse_embeddings,
dense_prompt_embeddings=dense_embeddings,
multimask_output=False,
repeat_image=False,
high_res_features=features["high_res_feats"],
)
)
sam_output_token = sam_output_tokens[:, -1]
obj_ptr = self.model.obj_ptr_proj(sam_output_token)
self.obj_ptrs.append(obj_ptr)
pred_mask, pred_logit = self._postprocess_masks(low_res_masks)
return pred_mask, pred_logit, ious[:, -1], object_score_logits
def _postprocess_masks(self, logits, size=None):
"""
Perform post-processing on output masks.
"""
size = self._orig_hw if size is None else size
logits = F.interpolate(logits, size, mode="bilinear", align_corners=False)
logits = logits[:, -1].unsqueeze(1)
masks = torch.sigmoid(logits)
if self.use_mask_threshold:
masks = (masks > self.mask_threshold).float()
return masks, logits
def _extract_memory_features(self, features, masks, object_score_logits):
"""
Extracts memory features from the given features and masks.
Args:
features (dict): A dictionary containing feature maps from the backbone FPN.
masks (Tensor): A tensor representing the masks to be used by the memory encoder.
Returns:
dict: A dictionary containing:
- "vision_features" (Tensor): The vision features extracted and processed by the memory encoder.
- "vision_pos_enc" (Tensor): The positional encoding of the vision features.
"""
pix_feat = features["backbone_fpn"][-1]
maskmem_out = self.model.memory_encoder(
pix_feat, masks, skip_mask_sigmoid=True # sigmoid already applied
)
maskmem_features = maskmem_out["vision_features"]
if self.model.no_obj_embed_spatial is not None:
is_obj_appearing = (object_score_logits > 0).float()
maskmem_features += (
1 - is_obj_appearing[..., None, None]
) * self.model.no_obj_embed_spatial[..., None, None].expand(
*maskmem_features.shape
)
maskmem_features = maskmem_features.flatten(2).permute(2, 0, 1)
maskmem_pos_enc = maskmem_out["vision_pos_enc"][-1].flatten(2).permute(2, 0, 1)
return {"vision_features": maskmem_features, "vision_pos_enc": maskmem_pos_enc}
def _initialize_memory(self, features, masks, object_score_logits):
"""
Initialize memory for the first frame.
"""
maskmem_out = self._extract_memory_features(features, masks, object_score_logits)
self.maskmem_features = [maskmem_out["vision_features"]]
self.maskmem_pos_enc = [maskmem_out["vision_pos_enc"]]
return self.maskmem_features, self.maskmem_pos_enc
def _update_memory(self, features, masks, memory=None, object_score_logits=None):
"""
Update memory with new frame data.
"""
if memory is None:
maskmem_features, maskmem_pos_enc = (
self.maskmem_features,
self.maskmem_pos_enc,
)
else:
maskmem_features, maskmem_pos_enc = memory
maskmem_out = self._extract_memory_features(features, masks, object_score_logits)
maskmem_features.append(maskmem_out["vision_features"])
maskmem_pos_enc.append(maskmem_out["vision_pos_enc"])
if len(maskmem_features) > self.memory_size:
self.maskmem_features = maskmem_features[-self.memory_size :]
self.maskmem_pos_enc = maskmem_pos_enc[-self.memory_size :]
return maskmem_features, maskmem_pos_enc
def _prepare_memory(self, memory):
"""
Prepare memory for the current frame.
"""
if memory is None:
maskmem_features, maskmem_pos_enc = (
self.maskmem_features,
self.maskmem_pos_enc,
)
else:
maskmem_features, maskmem_pos_enc = memory
for idx in range(len(maskmem_pos_enc)):
rel_pos = len(maskmem_pos_enc) - idx
maskmem_pos_enc[idx] = (
maskmem_pos_enc[idx] + self.model.maskmem_tpos_enc[rel_pos - 1]
)
obj_ptrs = torch.stack(self.obj_ptrs, dim=0)
if self.model.add_tpos_enc_to_obj_ptrs:
max_obj_ptrs_in_encoder = self.num_frames
pos_list = [self.current_frame_idx]
t_diff_max = max_obj_ptrs_in_encoder - 1
tpos_dim = self.model.hidden_dim if self.model.proj_tpos_enc_in_obj_ptrs else self.model.mem_dim
obj_pos = torch.tensor(pos_list, device=obj_ptrs.device)
obj_pos = get_1d_sine_pe(obj_pos / t_diff_max, dim=tpos_dim)
obj_pos = self.model.obj_ptr_tpos_proj(obj_pos)
obj_pos = obj_pos.unsqueeze(1).expand(-1, self.batch_size, self.model.mem_dim)
else:
obj_pos = obj_ptrs.new_zeros(
len(self.obj_ptrs), self.batch_size, self.model.mem_dim
)
C = self.model.hidden_dim
if self.model.mem_dim < C:
# split a pointer into (C // self.mem_dim) tokens for self.mem_dim < C
obj_ptrs = obj_ptrs.reshape(
-1, self.batch_size, C // self.model.mem_dim, self.model.mem_dim
)
obj_ptrs = obj_ptrs.permute(0, 2, 1, 3).flatten(0, 1)
obj_pos = obj_pos.repeat_interleave(C // self.model.mem_dim, dim=0)
num_obj_ptr_tokens = obj_ptrs.shape[0]
memory = torch.cat(maskmem_features + [obj_ptrs], dim=0)
memory_pos_embed = torch.cat(maskmem_pos_enc + [obj_pos], dim=0)
return memory, memory_pos_embed, num_obj_ptr_tokens
def _predict_frame(self, features, memory, prev_mask=None):
"""
Predict masks and IoUs for subsequent frames using memory.
"""
memory, memory_pos_embed, num_obj_ptr_tokens = self._prepare_memory(memory)
current_vision_feats = [
x.flatten(2).permute(2, 0, 1) for x in features["backbone_fpn"]
]
current_vision_pos_embeds = [
x.flatten(2).permute(2, 0, 1) for x in features["vision_pos_enc"]
]
pix_feat_with_mem = self.model.memory_attention(
curr=current_vision_feats[-1:],
curr_pos=current_vision_pos_embeds[-1:],
memory=memory,
memory_pos=memory_pos_embed,
num_obj_ptr_tokens=num_obj_ptr_tokens,
)
pix_feat_with_mem = pix_feat_with_mem.permute(1, 2, 0).view(
*features["backbone_fpn"][-1].shape
)
sparse_embeddings, dense_embeddings = self.model.sam_prompt_encoder(
points=self._get_points_placeholder(),
boxes=None,
masks=None,
)
low_res_masks, ious, _, object_score_logits = self.model.sam_mask_decoder(
image_embeddings=pix_feat_with_mem,
image_pe=self.model.sam_prompt_encoder.get_dense_pe(),
sparse_prompt_embeddings=sparse_embeddings,
dense_prompt_embeddings=dense_embeddings,
multimask_output=False,
repeat_image=False,
high_res_features=features["high_res_feats"],
)
pred_mask, pred_logit = self._postprocess_masks(low_res_masks)
return pred_mask, pred_logit, ious[:, -1], object_score_logits
class TestSAM2VideoTrainer(unittest.TestCase):
def setUp(self):
# Initialize parameters
sam2_checkpoint = "./checkpoints/sam2_hiera_tiny.pt"
model_cfg = "sam2_hiera_t.yaml"
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Create an instance of SAM2VideoTrainer
self.trainer = SAM2VideoTrainer(model_cfg, sam2_checkpoint, device=self.device)
# Define input video and bounding boxes
self.batch_size = 2
self.num_frames = 2
self.C = 3
self.H = 1024
self.W = 1024
# Create random video data and bounding boxes
self.videos = torch.randn(
self.batch_size, self.num_frames, self.C, self.H, self.W
).to(self.device)
self.masks = torch.zeros(
self.batch_size, self.num_frames, 1, self.H, self.W
).to(self.device)
self.bboxes = torch.tensor(
[[100, 100, 200, 200], [150, 150, 250, 250]], dtype=torch.float32
).to(self.device)
def test_forward(self):
# Execute the forward method
# masks, ious = self.trainer(self.videos, self.bboxes, self.masks)
masks, ious = self.trainer(self.videos, self.bboxes, None)
print("Masks shape:", masks[0].shape)
print("IoUs shape:", ious[0].shape)
print("Masks:", masks)
print("IoUs:", ious)
if __name__ == "__main__":
unittest.main()