File size: 10,355 Bytes
412c852
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# Copyright (c) OpenMMLab. All rights reserved.
import threading
from queue import Queue
from typing import List, Optional, Tuple

import numpy as np
import torch
from mmengine import Config
from mmengine.model import BaseModel
from mmengine.registry import init_default_scope
from mmengine.runner import load_checkpoint

try:
    from osgeo import gdal
except ImportError:
    gdal = None

from mmseg.registry import MODELS
from .utils import _preprare_data


class RSImage:
    """Remote sensing image class.

    Args:
        img (str or gdal.Dataset): Image file path or gdal.Dataset.
    """

    def __init__(self, image):
        self.dataset = gdal.Open(image, gdal.GA_ReadOnly) if isinstance(
            image, str) else image
        assert isinstance(self.dataset, gdal.Dataset), \
            f'{image} is not a image'
        self.width = self.dataset.RasterXSize
        self.height = self.dataset.RasterYSize
        self.channel = self.dataset.RasterCount
        self.trans = self.dataset.GetGeoTransform()
        self.proj = self.dataset.GetProjection()
        self.band_list = []
        self.band_list.extend(
            self.dataset.GetRasterBand(c + 1) for c in range(self.channel))
        self.grids = []

    def read(self, grid: Optional[List] = None) -> np.ndarray:
        """Read image data. If grid is None, read the whole image.

        Args:
            grid (Optional[List], optional): Grid to read. Defaults to None.
        Returns:
            np.ndarray: Image data.
        """
        if grid is None:
            return np.einsum('ijk->jki', self.dataset.ReadAsArray())
        assert len(
            grid) >= 4, 'grid must be a list containing at least 4 elements'
        data = self.dataset.ReadAsArray(*grid[:4])
        if data.ndim == 2:
            data = data[np.newaxis, ...]
        return np.einsum('ijk->jki', data)

    def write(self, data: Optional[np.ndarray], grid: Optional[List] = None):
        """Write image data.

        Args:
            grid (Optional[List], optional): Grid to write. Defaults to None.
            data (Optional[np.ndarray], optional): Data to write.
                Defaults to None.

        Raises:
            ValueError: Either grid or data must be provided.
        """
        if grid is not None:
            assert len(grid) == 8, 'grid must be a list of 8 elements'
            for band in self.band_list:
                band.WriteArray(
                    data[grid[5]:grid[5] + grid[7], grid[4]:grid[4] + grid[6]],
                    grid[0] + grid[4], grid[1] + grid[5])
        elif data is not None:
            for i in range(self.channel):
                self.band_list[i].WriteArray(data[..., i])
        else:
            raise ValueError('Either grid or data must be provided.')

    def create_seg_map(self, output_path: Optional[str] = None):
        if output_path is None:
            output_path = 'output_label.tif'
        driver = gdal.GetDriverByName('GTiff')
        seg_map = driver.Create(output_path, self.width, self.height, 1,
                                gdal.GDT_Byte)
        seg_map.SetGeoTransform(self.trans)
        seg_map.SetProjection(self.proj)
        seg_map_img = RSImage(seg_map)
        seg_map_img.path = output_path
        return seg_map_img

    def create_grids(self,
                     window_size: Tuple[int, int],
                     stride: Tuple[int, int] = (0, 0)):
        """Create grids for image inference.

        Args:
            window_size (Tuple[int, int]): the size of the sliding window.
            stride (Tuple[int, int], optional): the stride of the sliding
                window. Defaults to (0, 0).

        Raises:
            AssertionError: window_size must be a tuple of 2 elements.
            AssertionError: stride must be a tuple of 2 elements.
        """
        assert len(
            window_size) == 2, 'window_size must be a tuple of 2 elements'
        assert len(stride) == 2, 'stride must be a tuple of 2 elements'
        win_w, win_h = window_size
        stride_x, stride_y = stride

        stride_x = win_w if stride_x == 0 else stride_x
        stride_y = win_h if stride_y == 0 else stride_y

        x_half_overlap = (win_w - stride_x + 1) // 2
        y_half_overlap = (win_h - stride_y + 1) // 2

        for y in range(0, self.height, stride_y):
            y_end = y + win_h >= self.height
            y_offset = self.height - win_h if y_end else y
            y_size = win_h
            y_crop_off = 0 if y_offset == 0 else y_half_overlap
            y_crop_size = y_size if y_end else win_h - y_crop_off

            for x in range(0, self.width, stride_x):
                x_end = x + win_w >= self.width
                x_offset = self.width - win_w if x_end else x
                x_size = win_w
                x_crop_off = 0 if x_offset == 0 else x_half_overlap
                x_crop_size = x_size if x_end else win_w - x_crop_off

                self.grids.append([
                    x_offset, y_offset, x_size, y_size, x_crop_off, y_crop_off,
                    x_crop_size, y_crop_size
                ])


class RSInferencer:
    """Remote sensing inference class.

    Args:
        model (BaseModel): The loaded model.
        batch_size (int, optional): Batch size. Defaults to 1.
        thread (int, optional): Number of threads. Defaults to 1.
    """

    def __init__(self, model: BaseModel, batch_size: int = 1, thread: int = 1):
        self.model = model
        self.batch_size = batch_size
        self.END_FLAG = object()
        self.read_buffer = Queue(self.batch_size)
        self.write_buffer = Queue(self.batch_size)
        self.thread = thread

    @classmethod
    def from_config_path(cls,
                         config_path: str,
                         checkpoint_path: str,
                         batch_size: int = 1,
                         thread: int = 1,
                         device: Optional[str] = 'cpu'):
        """Initialize a segmentor from config file.

        Args:
            config_path (str): Config file path.
            checkpoint_path (str): Checkpoint path.
            batch_size (int, optional): Batch size. Defaults to 1.
        """
        init_default_scope('mmseg')
        cfg = Config.fromfile(config_path)
        model = MODELS.build(cfg.model)
        model.cfg = cfg
        load_checkpoint(model, checkpoint_path, map_location='cpu')
        model.to(device)
        model.eval()
        return cls(model, batch_size, thread)

    @classmethod
    def from_model(cls,
                   model: BaseModel,
                   checkpoint_path: Optional[str] = None,
                   batch_size: int = 1,
                   thread: int = 1,
                   device: Optional[str] = 'cpu'):
        """Initialize a segmentor from model.

        Args:
            model (BaseModel): The loaded model.
            checkpoint_path (Optional[str]): Checkpoint path.
            batch_size (int, optional): Batch size. Defaults to 1.
        """
        if checkpoint_path is not None:
            load_checkpoint(model, checkpoint_path, map_location='cpu')
        model.to(device)
        return cls(model, batch_size, thread)

    def read(self,
             image: RSImage,
             window_size: Tuple[int, int],
             strides: Tuple[int, int] = (0, 0)):
        """Load image data to read buffer.

        Args:
            image (RSImage): The image to read.
            window_size (Tuple[int, int]): The size of the sliding window.
            strides (Tuple[int, int], optional): The stride of the sliding
                window. Defaults to (0, 0).
        """
        image.create_grids(window_size, strides)
        for grid in image.grids:
            self.read_buffer.put([grid, image.read(grid=grid)])
        self.read_buffer.put(self.END_FLAG)

    def inference(self):
        """Inference image data from read buffer and put the result to write
        buffer."""
        while True:
            item = self.read_buffer.get()
            if item == self.END_FLAG:
                self.read_buffer.put(self.END_FLAG)
                self.write_buffer.put(item)
                break
            data, _ = _preprare_data(item[1], self.model)
            with torch.no_grad():
                result = self.model.test_step(data)
            item[1] = result[0].pred_sem_seg.cpu().data.numpy()[0]
            self.write_buffer.put(item)
            self.read_buffer.task_done()

    def write(self, image: RSImage, output_path: Optional[str] = None):
        """Write image data from write buffer.

        Args:
            image (RSImage): The image to write.
            output_path (Optional[str], optional): The path to save the
                segmentation map. Defaults to None.
        """
        seg_map = image.create_seg_map(output_path)
        while True:
            item = self.write_buffer.get()
            if item == self.END_FLAG:
                break
            seg_map.write(data=item[1], grid=item[0])
            self.write_buffer.task_done()

    def run(self,
            image: RSImage,
            window_size: Tuple[int, int],
            strides: Tuple[int, int] = (0, 0),
            output_path: Optional[str] = None):
        """Run inference with multi-threading.

        Args:
            image (RSImage): The image to inference.
            window_size (Tuple[int, int]): The size of the sliding window.
            strides (Tuple[int, int], optional): The stride of the sliding
                window. Defaults to (0, 0).
            output_path (Optional[str], optional): The path to save the
                segmentation map. Defaults to None.
        """
        read_thread = threading.Thread(
            target=self.read, args=(image, window_size, strides))
        read_thread.start()
        inference_threads = []
        for _ in range(self.thread):
            inference_thread = threading.Thread(target=self.inference)
            inference_thread.start()
            inference_threads.append(inference_thread)
        write_thread = threading.Thread(
            target=self.write, args=(image, output_path))
        write_thread.start()
        read_thread.join()
        for inference_thread in inference_threads:
            inference_thread.join()
        write_thread.join()