Spaces:
Runtime error
Runtime error
# Copyright (c) OpenMMLab. All rights reserved. | |
import logging | |
from typing import List, Optional | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from mmengine.logging import print_log | |
from mmengine.structures import PixelData | |
from torch import Tensor | |
from mmseg.registry import MODELS | |
from mmseg.structures import SegDataSample | |
from mmseg.utils import (ConfigType, OptConfigType, OptMultiConfig, | |
OptSampleList, SampleList, add_prefix) | |
from ..utils import resize | |
from .encoder_decoder import EncoderDecoder | |
class DepthEstimator(EncoderDecoder): | |
"""Encoder Decoder depth estimator. | |
EncoderDecoder typically consists of backbone, decode_head, auxiliary_head. | |
Note that auxiliary_head is only used for deep supervision during training, | |
which could be dumped during inference. | |
1. The ``loss`` method is used to calculate the loss of model, | |
which includes two steps: (1) Extracts features to obtain the feature maps | |
(2) Call the decode head loss function to forward decode head model and | |
calculate losses. | |
.. code:: text | |
loss(): extract_feat() -> _decode_head_forward_train() -> _auxiliary_head_forward_train (optional) | |
_decode_head_forward_train(): decode_head.loss() | |
_auxiliary_head_forward_train(): auxiliary_head.loss (optional) | |
2. The ``predict`` method is used to predict depth estimation results, | |
which includes two steps: (1) Run inference function to obtain the list of | |
depth (2) Call post-processing function to obtain list of | |
``SegDataSample`` including ``pred_depth_map``. | |
.. code:: text | |
predict(): inference() -> postprocess_result() | |
inference(): whole_inference()/slide_inference() | |
whole_inference()/slide_inference(): encoder_decoder() | |
encoder_decoder(): extract_feat() -> decode_head.predict() | |
3. The ``_forward`` method is used to output the tensor by running the model, | |
which includes two steps: (1) Extracts features to obtain the feature maps | |
(2)Call the decode head forward function to forward decode head model. | |
.. code:: text | |
_forward(): extract_feat() -> _decode_head.forward() | |
Args: | |
backbone (ConfigType): The config for the backnone of depth estimator. | |
decode_head (ConfigType): The config for the decode head of depth estimator. | |
neck (OptConfigType): The config for the neck of depth estimator. | |
Defaults to None. | |
auxiliary_head (OptConfigType): The config for the auxiliary head of | |
depth estimator. Defaults to None. | |
train_cfg (OptConfigType): The config for training. Defaults to None. | |
test_cfg (OptConfigType): The config for testing. Defaults to None. | |
data_preprocessor (dict, optional): The pre-process config of | |
:class:`BaseDataPreprocessor`. | |
pretrained (str, optional): The path for pretrained model. | |
Defaults to None. | |
init_cfg (dict, optional): The weight initialized config for | |
:class:`BaseModule`. | |
""" # noqa: E501 | |
def __init__(self, | |
backbone: ConfigType, | |
decode_head: ConfigType, | |
neck: OptConfigType = None, | |
auxiliary_head: OptConfigType = None, | |
train_cfg: OptConfigType = None, | |
test_cfg: OptConfigType = None, | |
data_preprocessor: OptConfigType = None, | |
pretrained: Optional[str] = None, | |
init_cfg: OptMultiConfig = None): | |
super().__init__( | |
backbone=backbone, | |
decode_head=decode_head, | |
neck=neck, | |
auxiliary_head=auxiliary_head, | |
train_cfg=train_cfg, | |
test_cfg=test_cfg, | |
data_preprocessor=data_preprocessor, | |
pretrained=pretrained, | |
init_cfg=init_cfg) | |
def extract_feat(self, | |
inputs: Tensor, | |
batch_img_metas: Optional[List[dict]] = None) -> Tensor: | |
"""Extract features from images.""" | |
if getattr(self.backbone, 'class_embed_select', False) and \ | |
isinstance(batch_img_metas, list) and \ | |
'category_id' in batch_img_metas[0]: | |
cat_ids = [meta['category_id'] for meta in batch_img_metas] | |
cat_ids = torch.tensor(cat_ids).to(inputs.device) | |
inputs = (inputs, cat_ids) | |
x = self.backbone(inputs) | |
if self.with_neck: | |
x = self.neck(x) | |
return x | |
def encode_decode(self, inputs: Tensor, | |
batch_img_metas: List[dict]) -> Tensor: | |
"""Encode images with backbone and decode into a depth map of the same | |
size as input.""" | |
x = self.extract_feat(inputs, batch_img_metas) | |
depth = self.decode_head.predict(x, batch_img_metas, self.test_cfg) | |
return depth | |
def _decode_head_forward_train(self, inputs: List[Tensor], | |
data_samples: SampleList) -> dict: | |
"""Run forward function and calculate loss for decode head in | |
training.""" | |
losses = dict() | |
loss_decode = self.decode_head.loss(inputs, data_samples, | |
self.train_cfg) | |
losses.update(add_prefix(loss_decode, 'decode')) | |
return losses | |
def _auxiliary_head_forward_train(self, inputs: List[Tensor], | |
data_samples: SampleList) -> dict: | |
"""Run forward function and calculate loss for auxiliary head in | |
training.""" | |
losses = dict() | |
if isinstance(self.auxiliary_head, nn.ModuleList): | |
for idx, aux_head in enumerate(self.auxiliary_head): | |
loss_aux = aux_head.loss(inputs, data_samples, self.train_cfg) | |
losses.update(add_prefix(loss_aux, f'aux_{idx}')) | |
else: | |
loss_aux = self.auxiliary_head.loss(inputs, data_samples, | |
self.train_cfg) | |
losses.update(add_prefix(loss_aux, 'aux')) | |
return losses | |
def loss(self, inputs: Tensor, data_samples: SampleList) -> dict: | |
"""Calculate losses from a batch of inputs and data samples. | |
Args: | |
inputs (Tensor): Input images. | |
data_samples (list[:obj:`SegDataSample`]): The seg data samples. | |
It usually includes information such as `metainfo` and | |
`gt_depth_map`. | |
Returns: | |
dict[str, Tensor]: a dictionary of loss components | |
""" | |
if data_samples is not None: | |
batch_img_metas = [ | |
data_sample.metainfo for data_sample in data_samples | |
] | |
else: | |
batch_img_metas = [ | |
dict( | |
ori_shape=inputs.shape[2:], | |
img_shape=inputs.shape[2:], | |
pad_shape=inputs.shape[2:], | |
padding_size=[0, 0, 0, 0]) | |
] * inputs.shape[0] | |
x = self.extract_feat(inputs, batch_img_metas) | |
losses = dict() | |
loss_decode = self._decode_head_forward_train(x, data_samples) | |
losses.update(loss_decode) | |
if self.with_auxiliary_head: | |
loss_aux = self._auxiliary_head_forward_train(x, data_samples) | |
losses.update(loss_aux) | |
return losses | |
def predict(self, | |
inputs: Tensor, | |
data_samples: OptSampleList = None) -> SampleList: | |
"""Predict results from a batch of inputs and data samples with post- | |
processing. | |
Args: | |
inputs (Tensor): Inputs with shape (N, C, H, W). | |
data_samples (List[:obj:`SegDataSample`], optional): The seg data | |
samples. It usually includes information such as `metainfo` | |
and `gt_depth_map`. | |
Returns: | |
list[:obj:`SegDataSample`]: Depth estimation results of the | |
input images. Each SegDataSample usually contain: | |
- ``pred_depth_max``(PixelData): Prediction of depth estimation. | |
""" | |
if data_samples is not None: | |
batch_img_metas = [ | |
data_sample.metainfo for data_sample in data_samples | |
] | |
else: | |
batch_img_metas = [ | |
dict( | |
ori_shape=inputs.shape[2:], | |
img_shape=inputs.shape[2:], | |
pad_shape=inputs.shape[2:], | |
padding_size=[0, 0, 0, 0]) | |
] * inputs.shape[0] | |
depth = self.inference(inputs, batch_img_metas) | |
return self.postprocess_result(depth, data_samples) | |
def _forward(self, | |
inputs: Tensor, | |
data_samples: OptSampleList = None) -> Tensor: | |
"""Network forward process. | |
Args: | |
inputs (Tensor): Inputs with shape (N, C, H, W). | |
data_samples (List[:obj:`SegDataSample`]): The seg | |
data samples. It usually includes information such | |
as `metainfo` and `gt_depth_map`. | |
Returns: | |
Tensor: Forward output of model without any post-processes. | |
""" | |
x = self.extract_feat(inputs) | |
return self.decode_head.forward(x) | |
def slide_flip_inference(self, inputs: Tensor, | |
batch_img_metas: List[dict]) -> Tensor: | |
"""Inference by sliding-window with overlap and flip. | |
If h_crop > h_img or w_crop > w_img, the small patch will be used to | |
decode without padding. | |
Args: | |
inputs (tensor): the tensor should have a shape NxCxHxW, | |
which contains all images in the batch. | |
batch_img_metas (List[dict]): List of image metainfo where each may | |
also contain: 'img_shape', 'scale_factor', 'flip', 'img_path', | |
'ori_shape', and 'pad_shape'. | |
For details on the values of these keys see | |
`mmseg/datasets/pipelines/formatting.py:PackSegInputs`. | |
Returns: | |
Tensor: The depth estimation results. | |
""" | |
h_stride, w_stride = self.test_cfg.stride | |
h_crop, w_crop = self.test_cfg.crop_size | |
batch_size, _, h_img, w_img = inputs.size() | |
out_channels = self.out_channels | |
h_grids = max(h_img - h_crop + h_stride - 1, 0) // h_stride + 1 | |
w_grids = max(w_img - w_crop + w_stride - 1, 0) // w_stride + 1 | |
preds = inputs.new_zeros((batch_size, out_channels, h_img, w_img)) | |
count_mat = inputs.new_zeros((batch_size, 1, h_img, w_img)) | |
for h_idx in range(h_grids): | |
for w_idx in range(w_grids): | |
y1 = h_idx * h_stride | |
x1 = w_idx * w_stride | |
y2 = min(y1 + h_crop, h_img) | |
x2 = min(x1 + w_crop, w_img) | |
y1 = max(y2 - h_crop, 0) | |
x1 = max(x2 - w_crop, 0) | |
crop_img = inputs[:, :, y1:y2, x1:x2] | |
# change the image shape to patch shape | |
batch_img_metas[0]['img_shape'] = crop_img.shape[2:] | |
# the output of encode_decode is depth tensor map | |
# with shape [N, C, H, W] | |
crop_depth_map = self.encode_decode(crop_img, batch_img_metas) | |
# average out the original and flipped prediction | |
crop_depth_map_flip = self.encode_decode( | |
crop_img.flip(dims=(3, )), batch_img_metas) | |
crop_depth_map_flip = crop_depth_map_flip.flip(dims=(3, )) | |
crop_depth_map = (crop_depth_map + crop_depth_map_flip) / 2.0 | |
preds += F.pad(crop_depth_map, | |
(int(x1), int(preds.shape[3] - x2), int(y1), | |
int(preds.shape[2] - y2))) | |
count_mat[:, :, y1:y2, x1:x2] += 1 | |
assert (count_mat == 0).sum() == 0 | |
depth = preds / count_mat | |
return depth | |
def inference(self, inputs: Tensor, batch_img_metas: List[dict]) -> Tensor: | |
"""Inference with slide/whole style. | |
Args: | |
inputs (Tensor): The input image of shape (N, 3, H, W). | |
batch_img_metas (List[dict]): List of image metainfo where each may | |
also contain: 'img_shape', 'scale_factor', 'flip', 'img_path', | |
'ori_shape', 'pad_shape', and 'padding_size'. | |
For details on the values of these keys see | |
`mmseg/datasets/pipelines/formatting.py:PackSegInputs`. | |
Returns: | |
Tensor: The depth estimation results. | |
""" | |
assert self.test_cfg.get('mode', 'whole') in ['slide', 'whole', | |
'slide_flip'], \ | |
f'Only "slide", "slide_flip" or "whole" test mode are ' \ | |
f'supported, but got {self.test_cfg["mode"]}.' | |
ori_shape = batch_img_metas[0]['ori_shape'] | |
if not all(_['ori_shape'] == ori_shape for _ in batch_img_metas): | |
print_log( | |
'Image shapes are different in the batch.', | |
logger='current', | |
level=logging.WARN) | |
if self.test_cfg.mode == 'slide': | |
depth_map = self.slide_inference(inputs, batch_img_metas) | |
if self.test_cfg.mode == 'slide_flip': | |
depth_map = self.slide_flip_inference(inputs, batch_img_metas) | |
else: | |
depth_map = self.whole_inference(inputs, batch_img_metas) | |
return depth_map | |
def postprocess_result(self, | |
depth: Tensor, | |
data_samples: OptSampleList = None) -> SampleList: | |
""" Convert results list to `SegDataSample`. | |
Args: | |
depth (Tensor): The depth estimation results. | |
data_samples (list[:obj:`SegDataSample`]): The seg data samples. | |
It usually includes information such as `metainfo` and | |
`gt_depth_map`. Default to None. | |
Returns: | |
list[:obj:`SegDataSample`]: Depth estomation results of the | |
input images. Each SegDataSample usually contain: | |
- ``pred_depth_map``(PixelData): Prediction of depth estimation. | |
""" | |
batch_size, C, H, W = depth.shape | |
if data_samples is None: | |
data_samples = [SegDataSample() for _ in range(batch_size)] | |
only_prediction = True | |
else: | |
only_prediction = False | |
for i in range(batch_size): | |
if not only_prediction: | |
img_meta = data_samples[i].metainfo | |
# remove padding area | |
if 'img_padding_size' not in img_meta: | |
padding_size = img_meta.get('padding_size', [0] * 4) | |
else: | |
padding_size = img_meta['img_padding_size'] | |
padding_left, padding_right, padding_top, padding_bottom =\ | |
padding_size | |
# i_depth shape is 1, C, H, W after remove padding | |
i_depth = depth[i:i + 1, :, padding_top:H - padding_bottom, | |
padding_left:W - padding_right] | |
flip = img_meta.get('flip', None) | |
if flip: | |
flip_direction = img_meta.get('flip_direction', None) | |
assert flip_direction in ['horizontal', 'vertical'] | |
if flip_direction == 'horizontal': | |
i_depth = i_depth.flip(dims=(3, )) | |
else: | |
i_depth = i_depth.flip(dims=(2, )) | |
# resize as original shape | |
i_depth = resize( | |
i_depth, | |
size=img_meta['ori_shape'], | |
mode='bilinear', | |
align_corners=self.align_corners, | |
warning=False).squeeze(0) | |
else: | |
i_depth = depth[i] | |
data_samples[i].set_data( | |
{'pred_depth_map': PixelData(**{'data': i_depth})}) | |
return data_samples | |