# Copyright (c) OpenMMLab. All rights reserved. import logging from typing import List, Optional import torch import torch.nn as nn import torch.nn.functional as F from mmengine.logging import print_log from mmengine.structures import PixelData from torch import Tensor from mmseg.registry import MODELS from mmseg.structures import SegDataSample from mmseg.utils import (ConfigType, OptConfigType, OptMultiConfig, OptSampleList, SampleList, add_prefix) from ..utils import resize from .encoder_decoder import EncoderDecoder @MODELS.register_module() class DepthEstimator(EncoderDecoder): """Encoder Decoder depth estimator. EncoderDecoder typically consists of backbone, decode_head, auxiliary_head. Note that auxiliary_head is only used for deep supervision during training, which could be dumped during inference. 1. The ``loss`` method is used to calculate the loss of model, which includes two steps: (1) Extracts features to obtain the feature maps (2) Call the decode head loss function to forward decode head model and calculate losses. .. code:: text loss(): extract_feat() -> _decode_head_forward_train() -> _auxiliary_head_forward_train (optional) _decode_head_forward_train(): decode_head.loss() _auxiliary_head_forward_train(): auxiliary_head.loss (optional) 2. The ``predict`` method is used to predict depth estimation results, which includes two steps: (1) Run inference function to obtain the list of depth (2) Call post-processing function to obtain list of ``SegDataSample`` including ``pred_depth_map``. .. code:: text predict(): inference() -> postprocess_result() inference(): whole_inference()/slide_inference() whole_inference()/slide_inference(): encoder_decoder() encoder_decoder(): extract_feat() -> decode_head.predict() 3. The ``_forward`` method is used to output the tensor by running the model, which includes two steps: (1) Extracts features to obtain the feature maps (2)Call the decode head forward function to forward decode head model. .. code:: text _forward(): extract_feat() -> _decode_head.forward() Args: backbone (ConfigType): The config for the backnone of depth estimator. decode_head (ConfigType): The config for the decode head of depth estimator. neck (OptConfigType): The config for the neck of depth estimator. Defaults to None. auxiliary_head (OptConfigType): The config for the auxiliary head of depth estimator. Defaults to None. train_cfg (OptConfigType): The config for training. Defaults to None. test_cfg (OptConfigType): The config for testing. Defaults to None. data_preprocessor (dict, optional): The pre-process config of :class:`BaseDataPreprocessor`. pretrained (str, optional): The path for pretrained model. Defaults to None. init_cfg (dict, optional): The weight initialized config for :class:`BaseModule`. """ # noqa: E501 def __init__(self, backbone: ConfigType, decode_head: ConfigType, neck: OptConfigType = None, auxiliary_head: OptConfigType = None, train_cfg: OptConfigType = None, test_cfg: OptConfigType = None, data_preprocessor: OptConfigType = None, pretrained: Optional[str] = None, init_cfg: OptMultiConfig = None): super().__init__( backbone=backbone, decode_head=decode_head, neck=neck, auxiliary_head=auxiliary_head, train_cfg=train_cfg, test_cfg=test_cfg, data_preprocessor=data_preprocessor, pretrained=pretrained, init_cfg=init_cfg) def extract_feat(self, inputs: Tensor, batch_img_metas: Optional[List[dict]] = None) -> Tensor: """Extract features from images.""" if getattr(self.backbone, 'class_embed_select', False) and \ isinstance(batch_img_metas, list) and \ 'category_id' in batch_img_metas[0]: cat_ids = [meta['category_id'] for meta in batch_img_metas] cat_ids = torch.tensor(cat_ids).to(inputs.device) inputs = (inputs, cat_ids) x = self.backbone(inputs) if self.with_neck: x = self.neck(x) return x def encode_decode(self, inputs: Tensor, batch_img_metas: List[dict]) -> Tensor: """Encode images with backbone and decode into a depth map of the same size as input.""" x = self.extract_feat(inputs, batch_img_metas) depth = self.decode_head.predict(x, batch_img_metas, self.test_cfg) return depth def _decode_head_forward_train(self, inputs: List[Tensor], data_samples: SampleList) -> dict: """Run forward function and calculate loss for decode head in training.""" losses = dict() loss_decode = self.decode_head.loss(inputs, data_samples, self.train_cfg) losses.update(add_prefix(loss_decode, 'decode')) return losses def _auxiliary_head_forward_train(self, inputs: List[Tensor], data_samples: SampleList) -> dict: """Run forward function and calculate loss for auxiliary head in training.""" losses = dict() if isinstance(self.auxiliary_head, nn.ModuleList): for idx, aux_head in enumerate(self.auxiliary_head): loss_aux = aux_head.loss(inputs, data_samples, self.train_cfg) losses.update(add_prefix(loss_aux, f'aux_{idx}')) else: loss_aux = self.auxiliary_head.loss(inputs, data_samples, self.train_cfg) losses.update(add_prefix(loss_aux, 'aux')) return losses def loss(self, inputs: Tensor, data_samples: SampleList) -> dict: """Calculate losses from a batch of inputs and data samples. Args: inputs (Tensor): Input images. data_samples (list[:obj:`SegDataSample`]): The seg data samples. It usually includes information such as `metainfo` and `gt_depth_map`. Returns: dict[str, Tensor]: a dictionary of loss components """ if data_samples is not None: batch_img_metas = [ data_sample.metainfo for data_sample in data_samples ] else: batch_img_metas = [ dict( ori_shape=inputs.shape[2:], img_shape=inputs.shape[2:], pad_shape=inputs.shape[2:], padding_size=[0, 0, 0, 0]) ] * inputs.shape[0] x = self.extract_feat(inputs, batch_img_metas) losses = dict() loss_decode = self._decode_head_forward_train(x, data_samples) losses.update(loss_decode) if self.with_auxiliary_head: loss_aux = self._auxiliary_head_forward_train(x, data_samples) losses.update(loss_aux) return losses def predict(self, inputs: Tensor, data_samples: OptSampleList = None) -> SampleList: """Predict results from a batch of inputs and data samples with post- processing. Args: inputs (Tensor): Inputs with shape (N, C, H, W). data_samples (List[:obj:`SegDataSample`], optional): The seg data samples. It usually includes information such as `metainfo` and `gt_depth_map`. Returns: list[:obj:`SegDataSample`]: Depth estimation results of the input images. Each SegDataSample usually contain: - ``pred_depth_max``(PixelData): Prediction of depth estimation. """ if data_samples is not None: batch_img_metas = [ data_sample.metainfo for data_sample in data_samples ] else: batch_img_metas = [ dict( ori_shape=inputs.shape[2:], img_shape=inputs.shape[2:], pad_shape=inputs.shape[2:], padding_size=[0, 0, 0, 0]) ] * inputs.shape[0] depth = self.inference(inputs, batch_img_metas) return self.postprocess_result(depth, data_samples) def _forward(self, inputs: Tensor, data_samples: OptSampleList = None) -> Tensor: """Network forward process. Args: inputs (Tensor): Inputs with shape (N, C, H, W). data_samples (List[:obj:`SegDataSample`]): The seg data samples. It usually includes information such as `metainfo` and `gt_depth_map`. Returns: Tensor: Forward output of model without any post-processes. """ x = self.extract_feat(inputs) return self.decode_head.forward(x) def slide_flip_inference(self, inputs: Tensor, batch_img_metas: List[dict]) -> Tensor: """Inference by sliding-window with overlap and flip. If h_crop > h_img or w_crop > w_img, the small patch will be used to decode without padding. Args: inputs (tensor): the tensor should have a shape NxCxHxW, which contains all images in the batch. batch_img_metas (List[dict]): List of image metainfo where each may also contain: 'img_shape', 'scale_factor', 'flip', 'img_path', 'ori_shape', and 'pad_shape'. For details on the values of these keys see `mmseg/datasets/pipelines/formatting.py:PackSegInputs`. Returns: Tensor: The depth estimation results. """ h_stride, w_stride = self.test_cfg.stride h_crop, w_crop = self.test_cfg.crop_size batch_size, _, h_img, w_img = inputs.size() out_channels = self.out_channels h_grids = max(h_img - h_crop + h_stride - 1, 0) // h_stride + 1 w_grids = max(w_img - w_crop + w_stride - 1, 0) // w_stride + 1 preds = inputs.new_zeros((batch_size, out_channels, h_img, w_img)) count_mat = inputs.new_zeros((batch_size, 1, h_img, w_img)) for h_idx in range(h_grids): for w_idx in range(w_grids): y1 = h_idx * h_stride x1 = w_idx * w_stride y2 = min(y1 + h_crop, h_img) x2 = min(x1 + w_crop, w_img) y1 = max(y2 - h_crop, 0) x1 = max(x2 - w_crop, 0) crop_img = inputs[:, :, y1:y2, x1:x2] # change the image shape to patch shape batch_img_metas[0]['img_shape'] = crop_img.shape[2:] # the output of encode_decode is depth tensor map # with shape [N, C, H, W] crop_depth_map = self.encode_decode(crop_img, batch_img_metas) # average out the original and flipped prediction crop_depth_map_flip = self.encode_decode( crop_img.flip(dims=(3, )), batch_img_metas) crop_depth_map_flip = crop_depth_map_flip.flip(dims=(3, )) crop_depth_map = (crop_depth_map + crop_depth_map_flip) / 2.0 preds += F.pad(crop_depth_map, (int(x1), int(preds.shape[3] - x2), int(y1), int(preds.shape[2] - y2))) count_mat[:, :, y1:y2, x1:x2] += 1 assert (count_mat == 0).sum() == 0 depth = preds / count_mat return depth def inference(self, inputs: Tensor, batch_img_metas: List[dict]) -> Tensor: """Inference with slide/whole style. Args: inputs (Tensor): The input image of shape (N, 3, H, W). batch_img_metas (List[dict]): List of image metainfo where each may also contain: 'img_shape', 'scale_factor', 'flip', 'img_path', 'ori_shape', 'pad_shape', and 'padding_size'. For details on the values of these keys see `mmseg/datasets/pipelines/formatting.py:PackSegInputs`. Returns: Tensor: The depth estimation results. """ assert self.test_cfg.get('mode', 'whole') in ['slide', 'whole', 'slide_flip'], \ f'Only "slide", "slide_flip" or "whole" test mode are ' \ f'supported, but got {self.test_cfg["mode"]}.' ori_shape = batch_img_metas[0]['ori_shape'] if not all(_['ori_shape'] == ori_shape for _ in batch_img_metas): print_log( 'Image shapes are different in the batch.', logger='current', level=logging.WARN) if self.test_cfg.mode == 'slide': depth_map = self.slide_inference(inputs, batch_img_metas) if self.test_cfg.mode == 'slide_flip': depth_map = self.slide_flip_inference(inputs, batch_img_metas) else: depth_map = self.whole_inference(inputs, batch_img_metas) return depth_map def postprocess_result(self, depth: Tensor, data_samples: OptSampleList = None) -> SampleList: """ Convert results list to `SegDataSample`. Args: depth (Tensor): The depth estimation results. data_samples (list[:obj:`SegDataSample`]): The seg data samples. It usually includes information such as `metainfo` and `gt_depth_map`. Default to None. Returns: list[:obj:`SegDataSample`]: Depth estomation results of the input images. Each SegDataSample usually contain: - ``pred_depth_map``(PixelData): Prediction of depth estimation. """ batch_size, C, H, W = depth.shape if data_samples is None: data_samples = [SegDataSample() for _ in range(batch_size)] only_prediction = True else: only_prediction = False for i in range(batch_size): if not only_prediction: img_meta = data_samples[i].metainfo # remove padding area if 'img_padding_size' not in img_meta: padding_size = img_meta.get('padding_size', [0] * 4) else: padding_size = img_meta['img_padding_size'] padding_left, padding_right, padding_top, padding_bottom =\ padding_size # i_depth shape is 1, C, H, W after remove padding i_depth = depth[i:i + 1, :, padding_top:H - padding_bottom, padding_left:W - padding_right] flip = img_meta.get('flip', None) if flip: flip_direction = img_meta.get('flip_direction', None) assert flip_direction in ['horizontal', 'vertical'] if flip_direction == 'horizontal': i_depth = i_depth.flip(dims=(3, )) else: i_depth = i_depth.flip(dims=(2, )) # resize as original shape i_depth = resize( i_depth, size=img_meta['ori_shape'], mode='bilinear', align_corners=self.align_corners, warning=False).squeeze(0) else: i_depth = depth[i] data_samples[i].set_data( {'pred_depth_map': PixelData(**{'data': i_depth})}) return data_samples