# Copyright (c) Meta Platforms, Inc. and affiliates. import io import pickle from pathlib import Path from typing import Dict, List, Optional, Tuple import numpy as np from PIL import Image import rtree from utils.geo import BoundaryBox, Projection from .data import MapData from .download import get_osm from .parser import Groups from .raster import Canvas, render_raster_map, render_raster_masks from .reader import OSMData, OSMNode, OSMWay class MapIndex: def __init__( self, data: MapData, ): self.index_nodes = rtree.index.Index() for i, node in data.nodes.items(): self.index_nodes.insert(i, tuple(node.xy) * 2) self.index_lines = rtree.index.Index() for i, line in data.lines.items(): bbox = tuple(np.r_[line.xy.min(0), line.xy.max(0)]) self.index_lines.insert(i, bbox) self.index_areas = rtree.index.Index() for i, area in data.areas.items(): xy = np.concatenate(area.outers + area.inners) bbox = tuple(np.r_[xy.min(0), xy.max(0)]) self.index_areas.insert(i, bbox) self.data = data def query(self, bbox: BoundaryBox) -> Tuple[List[OSMNode], List[OSMWay]]: query = tuple(np.r_[bbox.min_, bbox.max_]) ret = [] for x in ["nodes", "lines", "areas"]: ids = getattr(self, "index_" + x).intersection(query) ret.append([getattr(self.data, x)[i] for i in ids]) return tuple(ret) def bbox_to_slice(bbox: BoundaryBox, canvas: Canvas): uv_min = np.ceil(canvas.to_uv(bbox.min_)).astype(int) uv_max = np.ceil(canvas.to_uv(bbox.max_)).astype(int) slice_ = (slice(uv_max[1], uv_min[1]), slice(uv_min[0], uv_max[0])) return slice_ def round_bbox(bbox: BoundaryBox, origin: np.ndarray, ppm: int): bbox = bbox.translate(-origin) bbox = BoundaryBox(np.round(bbox.min_ * ppm) / ppm, np.round(bbox.max_ * ppm) / ppm) return bbox.translate(origin) class MapTileManager: def __init__( self, osmpath:Path, ): self.osm = OSMData.from_file(osmpath) # @classmethod def from_bbox( self, projection: Projection, bbox: BoundaryBox, ppm: int, tile_size: int = 128, ): # bbox_osm = projection.unproject(bbox) # if path is not None and path.is_file(): # print(OSMData.from_file) # osm = OSMData.from_file(path) # if osm.box is not None: # assert osm.box.contains(bbox_osm) # else: # osm = OSMData.from_dict(get_osm(bbox_osm, path)) self.osm.add_xy_to_nodes(projection) map_data = MapData.from_osm(self.osm) map_index = MapIndex(map_data) bounds_x, bounds_y = [ np.r_[np.arange(min_, max_, tile_size), max_] for min_, max_ in zip(bbox.min_, bbox.max_) ] bbox_tiles = {} for i, xmin in enumerate(bounds_x[:-1]): for j, ymin in enumerate(bounds_y[:-1]): bbox_tiles[i, j] = BoundaryBox( [xmin, ymin], [bounds_x[i + 1], bounds_y[j + 1]] ) tiles = {} for ij, bbox_tile in bbox_tiles.items(): canvas = Canvas(bbox_tile, ppm) nodes, lines, areas = map_index.query(bbox_tile) masks = render_raster_masks(nodes, lines, areas, canvas) canvas.raster = render_raster_map(masks) tiles[ij] = canvas groups = {k: v for k, v in vars(Groups).items() if not k.startswith("__")} self.origin = bbox.min_ self.bbox = bbox self.tiles = tiles self.tile_size = tile_size self.ppm = ppm self.projection = projection self.groups = groups self.map_data = map_data return self.query(bbox) # return cls(tiles, bbox, tile_size, ppm, projection, groups, map_data) def query(self, bbox: BoundaryBox) -> Canvas: bbox = round_bbox(bbox, self.bbox.min_, self.ppm) canvas = Canvas(bbox, self.ppm) raster = np.zeros((3, canvas.h, canvas.w), np.uint8) bbox_all = bbox & self.bbox ij_min = np.floor((bbox_all.min_ - self.origin) / self.tile_size).astype(int) ij_max = np.ceil((bbox_all.max_ - self.origin) / self.tile_size).astype(int) - 1 for i in range(ij_min[0], ij_max[0] + 1): for j in range(ij_min[1], ij_max[1] + 1): tile = self.tiles[i, j] bbox_select = tile.bbox & bbox slice_query = bbox_to_slice(bbox_select, canvas) slice_tile = bbox_to_slice(bbox_select, tile) raster[(slice(None),) + slice_query] = tile.raster[ (slice(None),) + slice_tile ] canvas.raster = raster return canvas def save(self, path: Path): dump = { "bbox": self.bbox.format(), "tile_size": self.tile_size, "ppm": self.ppm, "groups": self.groups, "tiles_bbox": {}, "tiles_raster": {}, } if self.projection is not None: dump["ref_latlonalt"] = self.projection.latlonalt for ij, canvas in self.tiles.items(): dump["tiles_bbox"][ij] = canvas.bbox.format() raster_bytes = io.BytesIO() raster = Image.fromarray(canvas.raster.transpose(1, 2, 0).astype(np.uint8)) raster.save(raster_bytes, format="PNG") dump["tiles_raster"][ij] = raster_bytes with open(path, "wb") as fp: pickle.dump(dump, fp) @classmethod def load(cls, path: Path): with path.open("rb") as fp: dump = pickle.load(fp) tiles = {} for ij, bbox in dump["tiles_bbox"].items(): tiles[ij] = Canvas(BoundaryBox.from_string(bbox), dump["ppm"]) raster = np.asarray(Image.open(dump["tiles_raster"][ij])) tiles[ij].raster = raster.transpose(2, 0, 1).copy() projection = Projection(*dump["ref_latlonalt"]) return cls( tiles, BoundaryBox.from_string(dump["bbox"]), dump["tile_size"], dump["ppm"], projection, dump["groups"], ) class TileManager: def __init__( self, tiles: Dict, bbox: BoundaryBox, tile_size: int, ppm: int, projection: Projection, groups: Dict[str, List[str]], map_data: Optional[MapData] = None, ): self.origin = bbox.min_ self.bbox = bbox self.tiles = tiles self.tile_size = tile_size self.ppm = ppm self.projection = projection self.groups = groups self.map_data = map_data assert np.all(tiles[0, 0].bbox.min_ == self.origin) for tile in tiles.values(): assert bbox.contains(tile.bbox) @classmethod def from_bbox( cls, projection: Projection, bbox: BoundaryBox, ppm: float, path: Optional[Path] = None, tile_size: int = 128, ): bbox_osm = projection.unproject(bbox) if path is not None and path.is_file(): print(OSMData.from_file) osm = OSMData.from_file(path) if osm.box is not None: assert osm.box.contains(bbox_osm) else: osm = OSMData.from_dict(get_osm(bbox_osm, path)) osm.add_xy_to_nodes(projection) map_data = MapData.from_osm(osm) map_index = MapIndex(map_data) bounds_x, bounds_y = [ np.r_[np.arange(min_, max_, tile_size), max_] for min_, max_ in zip(bbox.min_, bbox.max_) ] bbox_tiles = {} for i, xmin in enumerate(bounds_x[:-1]): for j, ymin in enumerate(bounds_y[:-1]): bbox_tiles[i, j] = BoundaryBox( [xmin, ymin], [bounds_x[i + 1], bounds_y[j + 1]] ) tiles = {} for ij, bbox_tile in bbox_tiles.items(): canvas = Canvas(bbox_tile, ppm) nodes, lines, areas = map_index.query(bbox_tile) masks = render_raster_masks(nodes, lines, areas, canvas) canvas.raster = render_raster_map(masks) tiles[ij] = canvas groups = {k: v for k, v in vars(Groups).items() if not k.startswith("__")} return cls(tiles, bbox, tile_size, ppm, projection, groups, map_data) def query(self, bbox: BoundaryBox) -> Canvas: bbox = round_bbox(bbox, self.bbox.min_, self.ppm) canvas = Canvas(bbox, self.ppm) raster = np.zeros((3, canvas.h, canvas.w), np.uint8) bbox_all = bbox & self.bbox ij_min = np.floor((bbox_all.min_ - self.origin) / self.tile_size).astype(int) ij_max = np.ceil((bbox_all.max_ - self.origin) / self.tile_size).astype(int) - 1 for i in range(ij_min[0], ij_max[0] + 1): for j in range(ij_min[1], ij_max[1] + 1): tile = self.tiles[i, j] bbox_select = tile.bbox & bbox slice_query = bbox_to_slice(bbox_select, canvas) slice_tile = bbox_to_slice(bbox_select, tile) raster[(slice(None),) + slice_query] = tile.raster[ (slice(None),) + slice_tile ] canvas.raster = raster return canvas def save(self, path: Path): dump = { "bbox": self.bbox.format(), "tile_size": self.tile_size, "ppm": self.ppm, "groups": self.groups, "tiles_bbox": {}, "tiles_raster": {}, } if self.projection is not None: dump["ref_latlonalt"] = self.projection.latlonalt for ij, canvas in self.tiles.items(): dump["tiles_bbox"][ij] = canvas.bbox.format() raster_bytes = io.BytesIO() raster = Image.fromarray(canvas.raster.transpose(1, 2, 0).astype(np.uint8)) raster.save(raster_bytes, format="PNG") dump["tiles_raster"][ij] = raster_bytes with open(path, "wb") as fp: pickle.dump(dump, fp) @classmethod def load(cls, path: Path): with path.open("rb") as fp: dump = pickle.load(fp) tiles = {} for ij, bbox in dump["tiles_bbox"].items(): tiles[ij] = Canvas(BoundaryBox.from_string(bbox), dump["ppm"]) raster = np.asarray(Image.open(dump["tiles_raster"][ij])) tiles[ij].raster = raster.transpose(2, 0, 1).copy() projection = Projection(*dump["ref_latlonalt"]) return cls( tiles, BoundaryBox.from_string(dump["bbox"]), dump["tile_size"], dump["ppm"], projection, dump["groups"], )