Spaces:
Sleeping
Sleeping
# Copyright (c) Meta Platforms, Inc. and affiliates. | |
import numpy as np | |
import torch | |
from .voting import argmax_xyr, log_softmax_spatial, sample_xyr | |
from .utils import deg2rad, make_grid, rotmat2d | |
def log_gaussian(points, mean, sigma): | |
return -1 / 2 * torch.sum((points - mean) ** 2, -1) / sigma**2 | |
def log_laplace(points, mean, sigma): | |
return -torch.sum(torch.abs(points - mean), -1) / sigma | |
def propagate_belief( | |
Δ_xy, Δ_yaw, canvas_target, canvas_source, belief, num_rotations=None | |
): | |
# We allow a different sampling resolution in the target frame | |
if num_rotations is None: | |
num_rotations = belief.shape[-1] | |
angles = torch.arange( | |
0, 360, 360 / num_rotations, device=Δ_xy.device, dtype=Δ_xy.dtype | |
) | |
uv_grid = make_grid(canvas_target.w, canvas_target.h, device=Δ_xy.device) | |
xy_grid = canvas_target.to_xy(uv_grid.to(Δ_xy)) | |
Δ_xy_world = torch.einsum("nij,j->ni", rotmat2d(deg2rad(-angles)), Δ_xy) | |
xy_grid_prev = xy_grid[..., None, :] + Δ_xy_world[..., None, None, :, :] | |
uv_grid_prev = canvas_source.to_uv(xy_grid_prev).to(Δ_xy) | |
angles_prev = angles + Δ_yaw | |
angles_grid_prev = angles_prev.tile((canvas_target.h, canvas_target.w, 1)) | |
prior, valid = sample_xyr( | |
belief[None, None], | |
uv_grid_prev.to(belief)[None], | |
angles_grid_prev.to(belief)[None], | |
nearest_for_inf=True, | |
) | |
return prior, valid | |
def markov_filtering(observations, canvas, xys, yaws, idxs=None): | |
assert len(observations) == len(canvas) == len(xys) == len(yaws) | |
if idxs is None: | |
idxs = range(len(observations)) | |
belief = None | |
beliefs = [] | |
for i in idxs: | |
obs = observations[i] | |
if belief is None: | |
belief = obs | |
else: | |
Δ_xy = rotmat2d(deg2rad(yaws[i])) @ (xys[i - 1] - xys[i]) | |
Δ_yaw = yaws[i - 1] - yaws[i] | |
prior, valid = propagate_belief( | |
Δ_xy, Δ_yaw, canvas[i], canvas[i - 1], belief | |
) | |
prior = prior[0, 0].masked_fill_(~valid[0], -np.inf) | |
belief = prior + obs | |
belief = log_softmax_spatial(belief) | |
beliefs.append(belief) | |
uvt_seq = torch.stack([argmax_xyr(p) for p in beliefs]) | |
return beliefs, uvt_seq | |
def integrate_observation( | |
source, | |
target, | |
xy_source, | |
xy_target, | |
yaw_source, | |
yaw_target, | |
canvas_source, | |
canvas_target, | |
**kwargs | |
): | |
Δ_xy = rotmat2d(deg2rad(yaw_target)) @ (xy_source - xy_target) | |
Δ_yaw = yaw_source - yaw_target | |
prior, valid = propagate_belief( | |
Δ_xy, Δ_yaw, canvas_target, canvas_source, source, **kwargs | |
) | |
prior = prior[0, 0].masked_fill_(~valid[0], -np.inf) | |
target.add_(prior) | |
target.sub_(target.max()) # normalize to avoid overflow | |
return prior | |
class RigidAligner: | |
def __init__( | |
self, | |
canvas_ref=None, | |
xy_ref=None, | |
yaw_ref=None, | |
num_rotations=None, | |
track_priors=False, | |
): | |
self.canvas = canvas_ref | |
self.xy_ref = xy_ref | |
self.yaw_ref = yaw_ref | |
self.rotmat_ref = None | |
self.num_rotations = num_rotations | |
self.belief = None | |
self.priors = [] if track_priors else None | |
self.yaw_slam2geo = None | |
self.Rt_slam2geo = None | |
def update(self, observation, canvas, xy, yaw): | |
# initialization | |
if self.canvas is None: | |
self.canvas = canvas | |
if self.xy_ref is None: | |
self.xy_ref = xy | |
self.yaw_ref = yaw | |
self.rotmat_ref = rotmat2d(deg2rad(self.yaw_ref)) | |
if self.num_rotations is None: | |
self.num_rotations = observation.shape[-1] | |
if self.belief is None: | |
self.belief = observation.new_zeros( | |
(self.canvas.h, self.canvas.w, self.num_rotations) | |
) | |
prior = integrate_observation( | |
observation, | |
self.belief, | |
xy, | |
self.xy_ref, | |
yaw, | |
self.yaw_ref, | |
canvas, | |
self.canvas, | |
num_rotations=self.num_rotations, | |
) | |
if self.priors is not None: | |
self.priors.append(prior.cpu()) | |
return prior | |
def update_with_ref(self, observation, canvas, xy, yaw): | |
if self.belief is not None: | |
observation = observation.clone() | |
integrate_observation( | |
self.belief, | |
observation, | |
self.xy_ref, | |
xy, | |
self.yaw_ref, | |
yaw, | |
self.canvas, | |
canvas, | |
num_rotations=observation.shape[-1], | |
) | |
self.belief = observation | |
self.canvas = canvas | |
self.xy_ref = xy | |
self.yaw_ref = yaw | |
def compute(self): | |
uvt_align_ref = argmax_xyr(self.belief) | |
self.yaw_ref_align = uvt_align_ref[-1] | |
self.xy_ref_align = self.canvas.to_xy(uvt_align_ref[:2].double()) | |
self.yaw_slam2geo = self.yaw_ref - self.yaw_ref_align | |
R_slam2geo = rotmat2d(deg2rad(self.yaw_slam2geo)) | |
t_slam2geo = self.xy_ref_align - R_slam2geo @ self.xy_ref | |
self.Rt_slam2geo = (R_slam2geo, t_slam2geo) | |
def transform(self, xy, yaw): | |
if self.Rt_slam2geo is None or self.yaw_slam2geo is None: | |
raise ValueError("Missing transformation, call `compute()` first!") | |
xy_geo = self.Rt_slam2geo[1].to(xy) + xy @ self.Rt_slam2geo[0].T.to(xy) | |
return xy_geo, (self.yaw_slam2geo.to(yaw) + yaw) % 360 | |
class GPSAligner(RigidAligner): | |
def __init__(self, distribution=log_laplace, **kwargs): | |
self.distribution = distribution | |
super().__init__(**kwargs) | |
if self.num_rotations is None: | |
raise ValueError("Rotation number is required.") | |
angles = torch.arange(0, 360, 360 / self.num_rotations) | |
self.rotmats = rotmat2d(deg2rad(-angles)) | |
self.xy_grid = None | |
def update(self, xy_gps, accuracy, canvas, xy, yaw): | |
# initialization | |
if self.canvas is None: | |
self.canvas = canvas | |
if self.xy_ref is None: | |
self.xy_ref = xy | |
self.yaw_ref = yaw | |
self.rotmat_ref = rotmat2d(deg2rad(self.yaw_ref)) | |
if self.xy_grid is None: | |
self.xy_grid = self.canvas.to_xy(make_grid(self.canvas.w, self.canvas.h)) | |
if self.belief is None: | |
self.belief = xy_gps.new_zeros( | |
(self.canvas.h, self.canvas.w, self.num_rotations) | |
) | |
# integration | |
Δ_xy = self.rotmat_ref @ (xy - self.xy_ref) | |
Δ_xy_world = torch.einsum("nij,j->ni", self.rotmats.to(xy), Δ_xy) | |
xy_grid_prev = ( | |
self.xy_grid.to(xy)[..., None, :] + Δ_xy_world[..., None, None, :, :] | |
) | |
prior = self.distribution(xy_grid_prev, xy_gps, accuracy) | |
self.belief.add_(prior) | |
self.belief.sub_(self.belief.max()) # normalize to avoid overflow | |
if self.priors is not None: | |
self.priors.append(prior.cpu()) | |
return prior | |