|
|
|
|
|
import logging |
|
from dataclasses import dataclass, field |
|
from typing import Dict, List, Optional, Set, Tuple |
|
|
|
import numpy as np |
|
|
|
from .parser import ( |
|
filter_area, |
|
filter_node, |
|
filter_way, |
|
match_to_group, |
|
parse_area, |
|
parse_node, |
|
parse_way, |
|
Patterns, |
|
) |
|
from .reader import OSMData, OSMNode, OSMRelation, OSMWay |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def glue(ways: List[OSMWay]) -> List[List[OSMNode]]: |
|
result: List[List[OSMNode]] = [] |
|
to_process: Set[Tuple[OSMNode]] = set() |
|
|
|
for way in ways: |
|
if way.is_cycle(): |
|
result.append(way.nodes) |
|
else: |
|
to_process.add(tuple(way.nodes)) |
|
|
|
while to_process: |
|
nodes: List[OSMNode] = list(to_process.pop()) |
|
glued: Optional[List[OSMNode]] = None |
|
other_nodes: Optional[Tuple[OSMNode]] = None |
|
|
|
for other_nodes in to_process: |
|
glued = try_to_glue(nodes, list(other_nodes)) |
|
if glued is not None: |
|
break |
|
|
|
if glued is not None: |
|
to_process.remove(other_nodes) |
|
if is_cycle(glued): |
|
result.append(glued) |
|
else: |
|
to_process.add(tuple(glued)) |
|
else: |
|
result.append(nodes) |
|
|
|
return result |
|
|
|
|
|
def is_cycle(nodes: List[OSMNode]) -> bool: |
|
"""Is way a cycle way or an area boundary.""" |
|
return nodes[0] == nodes[-1] |
|
|
|
|
|
def try_to_glue(nodes: List[OSMNode], other: List[OSMNode]) -> Optional[List[OSMNode]]: |
|
"""Create new combined way if ways share endpoints.""" |
|
if nodes[0] == other[0]: |
|
return list(reversed(other[1:])) + nodes |
|
if nodes[0] == other[-1]: |
|
return other[:-1] + nodes |
|
if nodes[-1] == other[-1]: |
|
return nodes + list(reversed(other[:-1])) |
|
if nodes[-1] == other[0]: |
|
return nodes + other[1:] |
|
return None |
|
|
|
|
|
def multipolygon_from_relation(rel: OSMRelation, osm: OSMData): |
|
inner_ways = [] |
|
outer_ways = [] |
|
for member in rel.members: |
|
if member.type_ == "way": |
|
if member.role == "inner": |
|
if member.ref in osm.ways: |
|
inner_ways.append(osm.ways[member.ref]) |
|
elif member.role == "outer": |
|
if member.ref in osm.ways: |
|
outer_ways.append(osm.ways[member.ref]) |
|
else: |
|
logger.warning(f'Unknown member role "{member.role}".') |
|
if outer_ways: |
|
inners_path = glue(inner_ways) |
|
outers_path = glue(outer_ways) |
|
return inners_path, outers_path |
|
|
|
|
|
@dataclass |
|
class MapElement: |
|
id_: int |
|
label: str |
|
group: str |
|
tags: Optional[Dict[str, str]] |
|
|
|
|
|
@dataclass |
|
class MapNode(MapElement): |
|
xy: np.ndarray |
|
|
|
@classmethod |
|
def from_osm(cls, node: OSMNode, label: str, group: str): |
|
return cls( |
|
node.id_, |
|
label, |
|
group, |
|
node.tags, |
|
xy=node.xy, |
|
) |
|
|
|
|
|
@dataclass |
|
class MapLine(MapElement): |
|
xy: np.ndarray |
|
|
|
@classmethod |
|
def from_osm(cls, way: OSMWay, label: str, group: str): |
|
xy = np.stack([n.xy for n in way.nodes]) |
|
return cls( |
|
way.id_, |
|
label, |
|
group, |
|
way.tags, |
|
xy=xy, |
|
) |
|
|
|
|
|
@dataclass |
|
class MapArea(MapElement): |
|
outers: List[np.ndarray] |
|
inners: List[np.ndarray] = field(default_factory=list) |
|
|
|
@classmethod |
|
def from_relation(cls, rel: OSMRelation, label: str, group: str, osm: OSMData): |
|
outers_inners = multipolygon_from_relation(rel, osm) |
|
if outers_inners is None: |
|
return None |
|
outers, inners = outers_inners |
|
outers = [np.stack([n.xy for n in way]) for way in outers] |
|
inners = [np.stack([n.xy for n in way]) for way in inners] |
|
return cls( |
|
rel.id_, |
|
label, |
|
group, |
|
rel.tags, |
|
outers=outers, |
|
inners=inners, |
|
) |
|
|
|
@classmethod |
|
def from_way(cls, way: OSMWay, label: str, group: str): |
|
xy = np.stack([n.xy for n in way.nodes]) |
|
return cls( |
|
way.id_, |
|
label, |
|
group, |
|
way.tags, |
|
outers=[xy], |
|
) |
|
|
|
|
|
class MapData: |
|
def __init__(self): |
|
self.nodes: Dict[int, MapNode] = {} |
|
self.lines: Dict[int, MapLine] = {} |
|
self.areas: Dict[int, MapArea] = {} |
|
|
|
@classmethod |
|
def from_osm(cls, osm: OSMData): |
|
self = cls() |
|
|
|
for node in filter(filter_node, osm.nodes.values()): |
|
label = parse_node(node.tags) |
|
if label is None: |
|
continue |
|
group = match_to_group(label, Patterns.nodes) |
|
if group is None: |
|
group = match_to_group(label, Patterns.ways) |
|
if group is None: |
|
continue |
|
self.nodes[node.id_] = MapNode.from_osm(node, label, group) |
|
|
|
for way in filter(filter_way, osm.ways.values()): |
|
label = parse_way(way.tags) |
|
if label is None: |
|
continue |
|
group = match_to_group(label, Patterns.ways) |
|
if group is None: |
|
group = match_to_group(label, Patterns.nodes) |
|
if group is None: |
|
continue |
|
self.lines[way.id_] = MapLine.from_osm(way, label, group) |
|
|
|
for area in filter(filter_area, osm.ways.values()): |
|
label = parse_area(area.tags) |
|
if label is None: |
|
continue |
|
group = match_to_group(label, Patterns.areas) |
|
if group is None: |
|
group = match_to_group(label, Patterns.ways) |
|
if group is None: |
|
group = match_to_group(label, Patterns.nodes) |
|
if group is None: |
|
continue |
|
self.areas[area.id_] = MapArea.from_way(area, label, group) |
|
|
|
for rel in osm.relations.values(): |
|
if rel.tags.get("type") != "multipolygon": |
|
continue |
|
label = parse_area(rel.tags) |
|
if label is None: |
|
continue |
|
group = match_to_group(label, Patterns.areas) |
|
if group is None: |
|
group = match_to_group(label, Patterns.ways) |
|
if group is None: |
|
group = match_to_group(label, Patterns.nodes) |
|
if group is None: |
|
continue |
|
area = MapArea.from_relation(rel, label, group, osm) |
|
assert rel.id_ not in self.areas |
|
if area is not None: |
|
self.areas[rel.id_] = area |
|
|
|
return self |
|
|