|
from pathlib import Path |
|
from typing import Tuple |
|
|
|
import numpy as np |
|
import torch |
|
from torch.utils import data |
|
import math |
|
|
|
|
|
def apply_random_y_rotation(point_cloud_data: torch.Tensor) -> torch.Tensor: |
|
|
|
angle = (torch.rand(1).item() * 2 - 1) * 180 * torch.tensor(math.pi / 180, device='cuda') |
|
|
|
|
|
rotation_matrix = torch.tensor([[torch.cos(angle), 0, torch.sin(angle)], |
|
[0, 1, 0], |
|
[-torch.sin(angle), 0, torch.cos(angle)]], device='cuda') |
|
|
|
|
|
return torch.matmul(point_cloud_data, rotation_matrix.T) |
|
|
|
|
|
class PointCloudDataset(data.Dataset): |
|
def __init__(self, file: Path, |
|
n_samples=100, |
|
max_actors: int = 8, |
|
translation_factor=0.1, |
|
max_overlap: Tuple[float] = (0.2, 0.2, 0.2)): |
|
point_clouds_np = torch.tensor(np.load(str(file)), dtype=torch.float32, device='cuda') |
|
self.sparse_point_clouds = point_clouds_np |
|
self.n_samples = n_samples |
|
self.max_actors = max_actors |
|
self.translation_factor = translation_factor |
|
self.max_overlap = max_overlap |
|
|
|
|
|
self.indices = torch.randperm(len(self.sparse_point_clouds)) |
|
|
|
dataset = [] |
|
for _ in range(n_samples): |
|
accumulated_cloud = [] |
|
|
|
|
|
|
|
|
|
for i in range(max_actors): |
|
|
|
|
|
point_cloud = self.sparse_point_clouds[self.indices[index]] |
|
|
|
point_cloud_data = point_cloud[:, 2:5] |
|
|
|
valid_transform = False |
|
while not valid_transform: |
|
|
|
point_cloud = point_cloud_data.clone() |
|
|
|
|
|
self.apply_random_translation(point_cloud) |
|
|
|
rotated_point_cloud_data = apply_random_y_rotation(point_cloud) |
|
|
|
if not does_overlap(accumulated_cloud, point_cloud, self.max_overlap): |
|
accumulated_cloud.append(point_cloud) |
|
valid_transform = True |
|
|
|
def apply_random_translation(self, point_cloud: torch.Tensor) -> None: |
|
x_translation = (torch.rand(1).item() * 2 - 1) * self.translation_factor |
|
z_translation = (torch.rand(1).item() * 2 - 1) * self.translation_factor |
|
point_cloud[:, [0, 2]] += torch.tensor([x_translation, z_translation], device='cuda') |
|
|
|
def fill_point_cloud(self, point_cloud): |
|
target_num_points = 73 * self.max_actors |
|
current_num_points = point_cloud.shape[1] |
|
|
|
if current_num_points < target_num_points: |
|
num_points_to_add = target_num_points - current_num_points |
|
random_indices = torch.randint(0, current_num_points, (num_points_to_add,)) |
|
additional_points = point_cloud[:, random_indices, :] |
|
|
|
filled_point_cloud = torch.cat((point_cloud, additional_points), dim=1) |
|
else: |
|
filled_point_cloud = point_cloud |
|
|
|
return filled_point_cloud |
|
|
|
def __getitem__(self, index): |
|
|
|
point_cloud = np.vstack(accumulated_cloud) |
|
|
|
actor_labels = point_cloud[:, :, 0] |
|
marker_labels = point_cloud[:, :, 1] |
|
|
|
return actor_labels, marker_labels, rotated_point_cloud_data |
|
|
|
def __len__(self): |
|
return len(self.sparse_point_clouds) |
|
|
|
|
|
def does_overlap(accumulated_point_cloud, new_point_cloud, overlap_thresholds=(0.2, 0.2, 0.2)): |
|
def project_to_axis(point_cloud, axis): |
|
projected_points = point_cloud.clone() |
|
projected_points[:, axis] = 0 |
|
return projected_points |
|
|
|
def get_bounding_box_2d(points): |
|
min_values, _ = torch.min(points, dim=0) |
|
max_values, _ = torch.max(points, dim=0) |
|
return min_values, max_values |
|
|
|
def check_surface_area_overlap(bb1_min, bb1_max, bb2_min, bb2_max, axis, overlap_threshold): |
|
bb1_area = (bb1_max[axis] - bb1_min[axis]) * (bb1_max[1] - bb1_min[1]) |
|
bb2_area = (bb2_max[axis] - bb2_min[axis]) * (bb2_max[1] - bb2_min[1]) |
|
|
|
overlap_min = torch.max(bb1_min, bb2_min) |
|
overlap_max = torch.min(bb1_max, bb2_max) |
|
|
|
overlap_area = (overlap_max[axis] - overlap_min[axis]) * (overlap_max[1] - overlap_min[1]) |
|
overlap_area = torch.max(torch.tensor(0.0, device='cuda'), overlap_area) |
|
|
|
overlap_percentage = overlap_area / torch.min(bb1_area, bb2_area) |
|
|
|
return overlap_percentage >= overlap_threshold |
|
|
|
new_point_cloud_xz = project_to_axis(new_point_cloud, 1) |
|
new_point_cloud_min, new_point_cloud_max = get_bounding_box_2d(new_point_cloud_xz) |
|
|
|
overlaps = [] |
|
|
|
for pc in accumulated_point_cloud: |
|
for axis in range(len(overlap_thresholds)): |
|
pc_xz = project_to_axis(pc, axis) |
|
pc_min, pc_max = get_bounding_box_2d(pc_xz) |
|
|
|
if all( |
|
check_surface_area_overlap( |
|
new_point_cloud_min, |
|
new_point_cloud_max, |
|
pc_min, |
|
pc_max, |
|
axis, |
|
overlap_thresholds[axis], |
|
) |
|
for axis in range(len(overlap_thresholds)) |
|
): |
|
return True |
|
|
|
return False |
|
|
|
|
|
class NoOverlapDataLoader(data.DataLoader): |
|
def __init__(self, dataset: data.Dataset, max_overlap: Tuple[float] = (0.2, 0.2, 0.2), *args, **kwargs): |
|
super().__init__(dataset, *args, **kwargs) |
|
self.max_overlap = max_overlap |
|
|
|
def __iter__(self): |
|
accumulated_point_clouds = [] |
|
for actor_labels, marker_labels, point_cloud_data in super().__iter__(): |
|
if not does_overlap(accumulated_point_clouds, point_cloud_data, self.max_overlap): |
|
accumulated_point_clouds.append(point_cloud_data) |
|
yield actor_labels, marker_labels, point_cloud_data |
|
|