#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : object_centric_vision.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 08/05/2023
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
import os.path as osp
from typing import Optional, Union, Tuple, List, Dict
from functools import cached_property
from dataclasses import dataclass
import jacinle
import jacinle.io as io
import open3d as o3d
import numpy as np
import cv2
import matplotlib.pyplot as plt
from concepts.vision.george_vision_pipeline.segmentation_models import random_colored_mask, visualize_instance_segmentation, ImageBasedPCDSegmentationModel, PointGuidedImageSegmentationModel, InstanceSegmentationResult
from concepts.math.rotationlib_wxyz import axisangle2quat, quat2mat
from concepts.simulator.pybullet.client import BulletClient
from concepts.simulator.urdf_utils.obj2urdf import ObjectUrdfBuilder
__all__ = [
'CameraTransformation', 'ObjectDetectionResult', 'ObjectCentricVisionPipeline',
'get_pointcloud', 'get_pointcloud_multicam', 'filter_pointcloud_range', 'ransac_table_detection', 'threshold_table_detection',
'project_pointcloud_to_plane', 'object_reconstruction_by_extrusion', 'remove_outliers', 'mesh_reconstruction_alpha_shape',
'canonize_mesh_center_', 'compute_transformation_from_plane_equation', 'visualize_with_camera_matrix',
'load_scene_in_pybullet',
]
[docs]
@dataclass
class ObjectDetectionResult(object):
"""The result of object detection."""
label: str
"""The label of the object."""
pcd: o3d.geometry.PointCloud
"""The point cloud of the object."""
reconstructed_pcd: Optional[o3d.geometry.PointCloud] = None
"""The reconstructed point cloud of the object."""
reconstructed_mesh: Optional[o3d.geometry.TriangleMesh] = None
"""The mesh of the object."""
pcd_mask: Optional[np.ndarray] = None
@property
def mesh(self) -> o3d.geometry.TriangleMesh:
"""Returns the mesh of the object. If the mesh is not reconstructed, an error will be raised."""
if self.reconstructed_mesh is None:
raise ValueError('The mesh is not reconstructed yet. Please call construct_object_meshes() first.')
return self.reconstructed_mesh
@property
def mesh_bbox(self) -> o3d.geometry.AxisAlignedBoundingBox:
"""Returns the bounding box of the mesh of the object. If the mesh is not reconstructed, an error will be raised."""
if self.reconstructed_mesh is None:
raise ValueError('The mesh is not reconstructed yet. Please call construct_object_meshes() first.')
return self.reconstructed_mesh.get_axis_aligned_bounding_box()
[docs]
class ObjectCentricVisionPipeline(object):
"""The pipeline for object-centric vision."""
[docs]
def __init__(self, color_image: np.ndarray, depth_image: np.ndarray):
"""Initializes the pipeline.
Args:
color_image: The color image.
depth_image: The depth image.
"""
self.color_image = color_image
self.depth_image = depth_image
self.camera_transform = None
self.foreground_ranges = None
self._pcd = None
self._foreground_segmentation = None
self._table_model = None
self._table_segmentation = None
self._image_detected_objects = None
self._detected_objects = None
@property
def pcd(self) -> o3d.geometry.PointCloud:
"""The point cloud of the scene."""
if self._pcd is None:
raise ValueError('Point cloud is not constructed yet. Please call construct_pointcloud() first.')
return self._pcd
[docs]
def manual_set_pcd(self, pcd: o3d.geometry.PointCloud):
self._pcd = pcd
@property
def foreground_segmentation(self) -> np.ndarray:
"""The segmentation of the foreground, in the form of a boolean array of the same length as the point cloud."""
if self._foreground_segmentation is None:
raise ValueError('Background segmentation is not constructed yet. Please call construct_foreground_by_cropping() first.')
return self._foreground_segmentation
@cached_property
def foreground_pcd(self) -> o3d.geometry.PointCloud:
"""The point cloud of the foreground."""
return self.pcd.select_by_index(np.where(self.foreground_segmentation)[0])
@property
def table_model(self) -> np.ndarray:
"""The model of the table, in the form of a length-4 array representing the plane equation ax + by + cz + d = 0."""
if self._table_model is None:
raise ValueError('Table model is not constructed yet. Please call construct_table_segmentation() first.')
return self._table_model
@property
def table_segmentation(self) -> np.ndarray:
"""The segmentation of the table, in the form of a boolean array of the same length as the point cloud."""
if self._table_segmentation is None:
raise ValueError('Table segmentation is not constructed yet. Please call construct_table_segmentation() first.')
return self._table_segmentation
@cached_property
def tabletop_segmentation(self) -> np.ndarray:
"""The segmentation of the tabletop, in the form of a boolean array of the same length as the point cloud. Tabletop is defined as the foreground points that are not table points."""
return ~self.table_segmentation & self.foreground_segmentation
@cached_property
def table_pcd(self) -> o3d.geometry.PointCloud:
"""The point cloud of the table."""
return self.pcd.select_by_index(np.where(self.table_segmentation)[0])
@cached_property
def tabletop_pcd(self) -> o3d.geometry.PointCloud:
"""The point cloud of the tabletop. Tabletop is defined as the foreground points that are not table points."""
return self.pcd.select_by_index(np.where(self.tabletop_segmentation)[0])
@property
def image_detected_objects(self) -> InstanceSegmentationResult:
"""The result of object detection in the image."""
if self._image_detected_objects is None:
raise ValueError('Object detection is not performed yet. Please call construct_object_segmentation() first.')
return self._image_detected_objects
@property
def detected_objects(self) -> List[ObjectDetectionResult]:
"""The result of object detection in the scene."""
if self._detected_objects is None:
raise ValueError('Object detection is not performed yet. Please call construct_object_segmentation() first.')
return self._detected_objects
[docs]
def construct_pointcloud(self, camera_transform: Optional[CameraTransformation] = None) -> o3d.geometry.PointCloud:
"""Constructs the point cloud from the color and depth images.
Args:
camera_transform: the camera transformation. If None, reconstruct the point cloud in the camera coordinate system.
Returns:
the point cloud.
"""
self.camera_transform = camera_transform
self._pcd = get_pointcloud(self.color_image, self.depth_image, camera_transform)
return self._pcd
[docs]
def construct_foreground_by_cropping(
self,
x_range: Optional[Tuple[float, float]] = None,
y_range: Optional[Tuple[float, float]] = None,
z_range: Optional[Tuple[float, float]] = None,
depth_range: Optional[Tuple[float, float]] = None
) -> np.ndarray:
"""Constructs the foreground segmentation by cropping the point cloud.
Args:
x_range: the range of x-axis in the world coordinate system.
y_range: the range of y-axis in the world coordinate system.
z_range: the range of z-axis in the world coordinate system.
depth_range: the range of depth in the camera coordinate system.
Returns:
the foreground segmentation as a boolean array of shape (N,). N is the number of points in the point cloud.
"""
depth_mask = np.ones(len(self.pcd.points), dtype=np.bool_)
if depth_range is not None:
depth_mask = depth_mask & (self.depth_image.flatten() > depth_range[0] * 1000) & (self.depth_image.flatten() < depth_range[1] * 1000)
self.foreground_ranges = (x_range, y_range, z_range)
self._foreground_segmentation = filter_pointcloud_range(self.pcd, x_range, y_range, z_range, return_condition=True) & depth_mask
return self._foreground_segmentation
[docs]
def construct_table_segmentation(self, method: str = 'ransac', **kwargs):
"""Constructs the table segmentation.
Args:
method: the method to use for table segmentation. Available methods are {'ransac', 'threshold'}.
Returns:
the table segmentation as a boolean array of shape (N,). N is the number of points in the point cloud.
"""
pcd = self.foreground_pcd
if method == 'ransac':
plane_model, raw_table_segmentation = ransac_table_detection(pcd, **kwargs, return_condition=True)
elif method == 'threshold':
plane_model, raw_table_segmentation = threshold_table_detection(pcd, **kwargs, return_condition=True)
else:
raise ValueError(f'Unknown method {method}. Available methods are {{ransac, threshold}}.')
table_segmentation = np.zeros(len(self.pcd.points), dtype=np.bool_)
table_segmentation[self.foreground_segmentation] = raw_table_segmentation
self._table_model = plane_model
self._table_segmentation = table_segmentation
return self._table_segmentation
[docs]
def construct_object_segmentation(
self, image_segmentation_model: Optional[ImageBasedPCDSegmentationModel] = None, detection_min_points: int = 50, enforce_detected_points: bool = True,
dbscan: bool = True, dbscan_eps: float = 0.01, dbscan_min_samples: int = 500, dbscan_min_points: int = 1500, dbscan_with_sam: bool = True, sam_model: Optional[PointGuidedImageSegmentationModel] = None, sam_max_points: int = 100000,
outlier_filter: bool = True, outlier_filter_radius: float = 0.005, outlier_filter_min_neighbors: int = 50,
min_height: float = 0.015, verbose: bool = False
):
"""Constructs the object segmentation. The segmentation is performed in the following steps:
1. If image_segmentation_model is not None, use the image segmentation model to segment the image.
2. If dbscan is True, use DBSCAN to cluster the remaining points from the previous step.
3. If outlier_filter is True, use radius outlier filter to remove outliers for each object obtained from the previous two steps.
4. Use height filter to remove objects that are too thin.
Args:
image_segmentation_model: the image segmentation model. If None, skip the image segmentation step.
detection_min_points: the minimum number of points for an object to be considered as a valid object.
enforce_detected_points: if True, keep all points that are detected by the image segmentation model.
Otherwise, only keep the points that are detected by the image segmentation model and are also in the "tabletop" segmentation, which equals foreground setminus table.
dbscan: if True, use DBSCAN to cluster the remaining points from the previous step.
dbscan_eps: the maximum distance between two samples for one to be considered as in the neighborhood of the other.
dbscan_min_samples: the number of samples in a neighborhood for a point to be considered as a core point.
dbscan_min_points: the minimum number of points for an object to be considered as a valid object.
dbscan_with_sam: if True, use SAM to filter the points before DBSCAN.
sam_model: the SAM model. If None, skip the SAM step.
sam_max_points: the maximum number of points to use for SAM. This option is useful to avoid the table being detected as an object.
outlier_filter: if True, use radius outlier filter to remove outliers for each object obtained from the previous two steps.
outlier_filter_radius: the radius of the sphere that will determine which points are neighbors.
outlier_filter_min_neighbors: the minimum number of neighbors that a point must have to be considered as an inlier.
min_height: the minimum height of an object.
verbose: if True, print out debug information.
"""
remain_condition = self.tabletop_segmentation.copy()
detected_objects = list()
if image_segmentation_model is not None:
results = image_segmentation_model.segment_image(self.color_image)
self._image_detected_objects = results
rect_th: int = 3; text_size: int = 1; text_th: int = 3
image = self.color_image.copy()
for i in range(results.nr_objects):
label = results.pred_cls[i]
masked_image = results.masks[i]
v, u = np.where(masked_image)
rgb_mask = random_colored_mask(masked_image)
image = cv2.addWeighted(image, 1, rgb_mask, 0.5, 0)
cv2.rectangle(image, (u.min(), v.min()), (u.max(), v.max()), color=(0, 255, 0), thickness=rect_th)
cv2.putText(image, label, (u.min(), v.min() + 10), cv2.FONT_HERSHEY_SIMPLEX, text_size, (0, 255, 0), thickness=text_th)
plt.figure(figsize=(10, 10))
plt.imshow(image)
plt.tight_layout()
plt.show()
for i in range(results.nr_objects):
label = results.pred_cls[i]
if 'table' in label:
continue
mask = results.masks[i].flatten()
if enforce_detected_points:
mask = mask & self.foreground_segmentation
else:
mask = mask & self.tabletop_segmentation
if verbose:
print('Detected object with label', results.pred_cls[i], 'and mask size', mask.sum(), 'points.')
if mask.sum() < detection_min_points or mask.sum() > sam_max_points:
continue
pcd = self.pcd.select_by_index(np.where(mask)[0])
detected_objects.append(ObjectDetectionResult(label, pcd, pcd_mask=mask))
remain_condition = remain_condition & (~mask)
# Run DBSCAN on the remaining points
if dbscan:
pcd = self.pcd.select_by_index(np.where(remain_condition)[0])
if verbose:
print('Running DBSCAN on remaining points with size', len(pcd.points))
labels = np.asarray(pcd.cluster_dbscan(eps=dbscan_eps, min_points=dbscan_min_samples, print_progress=verbose))
unique_labels = np.unique(labels)
for label in unique_labels:
if label == -1:
continue
mask = labels == label
label_name = f'unknown_{label}'
global_mask = np.zeros(len(self.pcd.points), dtype=np.bool_)
global_mask[remain_condition] = mask
if global_mask.sum() < dbscan_min_points:
continue
if dbscan_with_sam and sam_model is not None:
y, x = np.where(global_mask.reshape(self.color_image.shape[:2]))
y_mean, x_mean = int(y.mean()), int(x.mean())
if verbose:
print(' Running SAM on DBSCAN cluster with size', len(y), 'points.')
mask = sam_model.segment_from_point(self.color_image, (x_mean, y_mean))
if verbose:
print(' SAM mask size:', mask.sum())
if mask.sum() > sam_max_points:
if verbose:
print(' SAM mask too large, skipping.')
continue
mask = mask.flatten()
if enforce_detected_points:
mask = mask & self.foreground_segmentation
else:
mask = mask & self.tabletop_segmentation
global_mask = mask
pcd = self.pcd.select_by_index(np.where(global_mask)[0])
# pcd.paint_uniform_color(np.random.rand(3))
detected_objects.append(ObjectDetectionResult(label_name, pcd, pcd_mask=global_mask))
if verbose:
print('Detected object with label', label_name, 'and mask size', mask.sum(), 'points.')
if verbose:
print('Detected', len(detected_objects), 'objects before filtering.')
# Remove outliers and thin objects
filtered_objects = list()
for obj in detected_objects:
if outlier_filter:
obj.pcd = remove_outliers(obj.pcd, radius=outlier_filter_radius, min_neighbors=outlier_filter_min_neighbors)
if not dbscan_with_sam:
min_bounds = obj.pcd.get_min_bound()
max_bounds = obj.pcd.get_max_bound()
if max_bounds[2] - min_bounds[2] < min_height:
if verbose:
print('Object', obj.label, 'is too thin, skipping.')
continue
if np.asarray(obj.pcd.points).shape[0] < 10:
if verbose:
print('Object', obj.label, 'has too few points, skipping.')
continue
filtered_objects.append(obj)
if verbose:
print('Detected', len(filtered_objects), 'objects.')
self._detected_objects = filtered_objects
return self._detected_objects
[docs]
def construct_object_completion_by_extrusion(self, min_height: Optional[float] = 0.05, verbose: bool = False):
"""Construct object completion by extrusion the top surface of the table.
Args:
min_height: the minimum height of an object. If specified as None, skip the thin object filtering step.
Otherwise, if an object is too thin, it will be extruded to the minimum height.
verbose: if True, print out debug information.
"""
for obj in self.detected_objects:
if verbose:
print('Shape reconstructing object', obj.label, 'with', len(obj.pcd.points), 'points.')
obj.reconstructed_pcd = object_reconstruction_by_extrusion(obj.pcd, self.table_model, min_height=min_height)
[docs]
def construct_object_meshes(self, alpha: float = 0.1, verbose: bool = False):
"""Construct meshes for all detected objects using alpha shape.
Args:
alpha: the alpha value for alpha shape.
verbose: if True, print out debug information.
"""
for obj in self.detected_objects:
if verbose:
print('Mesh reconstruction for object', obj.label, 'with', len(obj.pcd.points), 'points.')
obj.reconstructed_mesh = mesh_reconstruction_alpha_shape(obj.reconstructed_pcd, alpha=alpha)
[docs]
def export_scene(self, output_dir: str, verbose: bool = False):
"""Export the scene to a directory. All the metadata will be saved as "metainfo.json".
Args:
output_dir: the output directory.
"""
io.mkdir(output_dir)
metainfo = {
'table_model': self.table_model.tolist(),
'table_transformation': compute_transformation_from_plane_equation(*self.table_model).tolist(),
'objects': list()
}
cv2.imwrite(osp.join(output_dir, 'color.png'), self.color_image[:, :, ::-1])
cv2.imwrite(osp.join(output_dir, 'depth.png'), self.depth_image)
builder = ObjectUrdfBuilder(output_dir)
for idx, detection in jacinle.tqdm_gofor(self.detected_objects, desc='Exporting objects'):
det_metainfo = {
'index': idx,
'label': detection.label,
'unique_label': f'o{idx:02d}_{detection.label}',
'pcd': f'{idx}_pcd.ply',
'mesh': f'{idx}_mesh.ply',
'mesh_obj': f'{idx}_mesh.obj',
'urdf': f'{idx}_mesh.obj.urdf',
}
metainfo['objects'].append(det_metainfo)
o3d.io.write_point_cloud(osp.join(output_dir, det_metainfo['pcd']), detection.pcd)
mesh = detection.reconstructed_mesh
mesh, center = canonize_mesh_center_(mesh)
det_metainfo['pos'] = center.tolist()
o3d.io.write_triangle_mesh(osp.join(output_dir, det_metainfo['mesh']), mesh)
o3d.io.write_triangle_mesh(osp.join(output_dir, det_metainfo['mesh_obj']), mesh, write_triangle_uvs=True)
with jacinle.cond_with(jacinle.suppress_stdout(), not verbose): # if verbose, print out the output of urdf builder
builder.build_urdf(osp.join(output_dir, det_metainfo['mesh_obj']), force_overwrite=True, decompose_concave=True, force_decompose=False, center=None)
io.dump_json(osp.join(output_dir, 'metainfo.json'), metainfo)
[docs]
def visualize_pcd(self, name: str):
"""Visualize a point cloud. It will also visualize two coordinate frames: the camera frame and the world frame.
Args:
name: the name of the point cloud to visualize. Can be one of the following:
- 'full': the full point cloud.
- 'foreground': the foreground point cloud.
- 'table': the table point cloud.
- 'tabletop': the tabletop point cloud.
"""
geometries = list()
if name == 'full':
geometries.append(self.pcd)
elif name == 'foreground':
geometries.append(self.foreground_pcd)
# visualize the bounding box
bbox = o3d.geometry.AxisAlignedBoundingBox(
min_bound=(self.foreground_ranges[0][0], self.foreground_ranges[1][0], self.foreground_ranges[2][0]),
max_bound=(self.foreground_ranges[0][1], self.foreground_ranges[1][1], self.foreground_ranges[2][1])
)
bbox.color = (0, 1, 0)
geometries.append(bbox)
elif name == 'table':
pcd = self.table_pcd
# pcd.paint_uniform_color([1, 0, 0])
geometries.append(pcd)
elif name == 'tabletop':
pcd = self.tabletop_pcd
geometries.append(pcd)
else:
raise ValueError(f'Unknown name {name}. Available names are {{full, foreground, table, tabletop}}.')
world_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.05, origin=[0, 0, 0])
geometries.append(world_frame)
if self.camera_transform is not None:
camera = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.05).transform(np.linalg.inv(self.camera_transform.extrinsics))
geometries.append(camera)
# o3d.visualization.draw_geometries(geometries)
visualize_with_camera_matrix(geometries, np.linalg.inv(self.camera_transform.extrinsics))
[docs]
def visualize_objects_raw_detection(self):
"""Visualize the raw detection results in 2D."""
visualize_instance_segmentation(self.color_image, self.image_detected_objects)
[docs]
def visualize_objects_2d(self, rect_th: int = 3, text_size: int = 1, text_th: int = 3):
"""Visualize the detected objects back-projected to the color image.
Args:
rect_th: the thickness of the rectangle.
text_size: the size of the text.
text_th: the thickness of the text.
"""
image = self.color_image.copy()
for detection in self.detected_objects:
label = detection.label
masked_image = detection.pcd_mask.reshape(self.color_image.shape[:2])
v, u = np.where(masked_image)
rgb_mask = random_colored_mask(masked_image)
image = cv2.addWeighted(image, 1, rgb_mask, 0.5, 0)
cv2.rectangle(image, (u.min(), v.min()), (u.max(), v.max()), color=(0, 255, 0), thickness=rect_th)
cv2.putText(image, label, (u.min(), v.min() + 10), cv2.FONT_HERSHEY_SIMPLEX, text_size, (0, 255, 0), thickness=text_th)
plt.figure(figsize=(10, 10))
plt.imshow(image)
plt.tight_layout()
plt.show()
[docs]
def visualize_objects_3d(self, name: str = 'mesh', overlay_background_pcd: bool = False):
"""Visualize the detected objects in 3D.
Args:
name: the name of the geometry to visualize. Can be one of the following:
- 'mesh': the reconstructed mesh.
- 'pcd': the reconstructed point cloud.
- 'raw_pcd': the raw point cloud.
overlay_background_pcd: whether to overlay the background point cloud.
"""
print('Visualizing point cloud.')
geometries = list()
for obj in self.detected_objects:
if name == 'mesh' and obj.reconstructed_mesh is not None:
geometries.append(obj.reconstructed_mesh)
elif name == 'pcd' and obj.reconstructed_pcd is not None:
geometries.append(obj.reconstructed_pcd)
elif name == 'raw_pcd':
geometries.append(obj.pcd)
# also add the bounding box
bbox = geometries[-1].get_axis_aligned_bounding_box()
bbox.color = (0, 1, 0)
geometries.append(bbox)
world_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.05, origin=[0, 0, 0])
geometries.append(world_frame)
if self.camera_transform is not None:
camera = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.05).transform(np.linalg.inv(self.camera_transform.extrinsics))
geometries.append(camera)
# visualize a table plane
if self.table_model is not None:
# table_model was estimated by RANSAC. So it's represented as a plane equation.
# We need to convert it to a plane mesh.
table_plane = o3d.geometry.TriangleMesh.create_box(width=1, height=1, depth=0.001).translate(np.array([0, -0.5, 0]))
table_plane.paint_uniform_color([0.5, 0.5, 0.5])
T = compute_transformation_from_plane_equation(*self.table_model)
table_plane = table_plane.transform(T)
geometries.append(table_plane)
if overlay_background_pcd:
geometries.append(self.foreground_pcd)
# o3d.visualization.draw_geometries(geometries)
visualize_with_camera_matrix(geometries, np.linalg.inv(self.camera_transform.extrinsics))
[docs]
def run_default_pipeline(
self, camera_transformation,
x_range: Tuple[float, float] = (0.2, 1.0), y_range: Tuple[float, float] = (-0.5, 0.5), z_range: Tuple[float, float] = (-0.1, 0.5), depth_range: Tuple[float, float] = (0.05, 2.0),
table_ransac_distance_threshold: float = 0.01, table_ransac_n: int = 3, table_ransac_num_iterations: int = 1000,
image_segmentation_model: Optional[ImageBasedPCDSegmentationModel] = None, detection_min_points: int = 50, enforce_detected_points: bool = True,
dbscan: bool = True, dbscan_eps: float = 0.01, dbscan_min_samples: int = 500, dbscan_min_points: int = 1500,
dbscan_with_sam: bool = True, sam_model: Optional[PointGuidedImageSegmentationModel] = None, sam_max_points: int = 50000,
outlier_filter: bool = True, outlier_filter_radius: float = 0.005, outlier_filter_min_neighbors: int = 50,
min_height: float = 0.015,
extrusion_min_height: float = 0.03,
mesh_alpha: float = 0.1
):
"""Running the entire pipeline with the default parameters."""
pbar = jacinle.tqdm_pbar(total=7, desc='Running object-centric vision pipeline', leave=False)
with pbar:
pbar.set_description('Reconstructing point cloud')
self.construct_pointcloud(camera_transformation)
pbar.update(); pbar.set_description('Segmenting the foreground')
self.construct_foreground_by_cropping(x_range, y_range, z_range, depth_range)
pbar.update(); pbar.set_description('Estimating the table plane')
self.construct_table_segmentation(method='ransac', distance_threshold=table_ransac_distance_threshold, ransac_n=table_ransac_n, num_iterations=table_ransac_num_iterations)
pbar.update(); pbar.set_description('Segmenting the objects')
self.construct_object_segmentation(
image_segmentation_model, detection_min_points=detection_min_points, enforce_detected_points=enforce_detected_points,
dbscan=dbscan, dbscan_eps=dbscan_eps, dbscan_min_samples=dbscan_min_samples, dbscan_min_points=dbscan_min_points,
dbscan_with_sam=dbscan_with_sam, sam_model=sam_model, sam_max_points=sam_max_points,
outlier_filter=outlier_filter, outlier_filter_radius=outlier_filter_radius, outlier_filter_min_neighbors=outlier_filter_min_neighbors,
min_height=min_height, verbose=True
)
pbar.update(); pbar.set_description('3D-reconstructing the objects')
self.construct_object_completion_by_extrusion(min_height=extrusion_min_height)
pbar.update(); pbar.set_description('Mesh-reconstructing the objects')
self.construct_object_meshes(alpha=mesh_alpha)
pbar.update(); pbar.set_description('Running NMS')
self._detected_objects = mesh_nms(self.detected_objects, 0.1)
[docs]
def get_pointcloud(color_image: np.ndarray, depth_image: np.ndarray, camera_transformation: Optional[CameraTransformation] = None) -> o3d.geometry.PointCloud:
"""Reconstruct a colored point cloud from a color and depth image.
Args:
color_image: a numpy array of shape (H, W, 3) representing the color image.
depth_image: a numpy array of shape (H, W) representing the depth image.
camera_transformation: a CameraTransformation object.
Returns:
a colored open3d point cloud.
"""
xs = np.arange(0, color_image.shape[1])
ys = np.arange(0, color_image.shape[0])
xx, yy = np.meshgrid(xs, ys)
xx = xx.flatten()
yy = yy.flatten()
zz = depth_image.flatten() / 1000.0
colors = color_image[yy, xx, :].astype(np.float64) / 255.0
if camera_transformation is not None:
xx, yy, zz = camera_transformation.c2w(xx, yy, zz)
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(np.stack([xx, yy, zz], axis=1))
pcd.colors = o3d.utility.Vector3dVector(colors)
return pcd
[docs]
def get_pointcloud_multicam(color_images: np.ndarray, depth_images: np.ndarray, camera_transformations: List[CameraTransformation]) -> o3d.geometry.PointCloud:
"""Reconstruct a colored point cloud from a color and depth image.
Args:
color_images: a numpy array of shape (N, H, W, 3) representing the color image.
depth_images: a numpy array of shape (N, H, W) representing the depth image.
camera_transformations: N CameraTransformation object.
Returns:
a colored open3d point cloud.
"""
all_points = list()
all_colors = list()
for color_image, depth_image, camera_transformation in zip(color_images, depth_images, camera_transformations):
xs = np.arange(0, color_image.shape[1])
ys = np.arange(0, color_image.shape[0])
xx, yy = np.meshgrid(xs, ys)
xx = xx.flatten()
yy = yy.flatten()
zz = depth_image.flatten() / 1000.0
colors = color_image[yy, xx, :].astype(np.float64) / 255.0
xx, yy, zz = camera_transformation.c2w(xx, yy, zz)
all_points.append(np.stack([xx, yy, zz], axis=1))
all_colors.append(colors)
all_points = np.concatenate(all_points)
all_colors = np.concatenate(all_colors)
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(all_points)
pcd.colors = o3d.utility.Vector3dVector(all_colors)
return pcd
[docs]
def filter_pointcloud_range(
pcd: o3d.geometry.PointCloud,
x_range: Optional[Tuple[float, float]] = None, y_range: Optional[Tuple[float, float]] = None, z_range: Optional[Tuple[float, float]] = None,
return_condition: bool = False
) -> Union[np.ndarray, o3d.geometry.PointCloud]:
"""Filter a point cloud by a range of x, y, z coordinates.
Args:
pcd: an open3d point cloud.
x_range: a tuple of (min, max) x coordinates.
y_range: a tuple of (min, max) y coordinates.
z_range: a tuple of (min, max) z coordinates.
return_condition: whether to return the condition used to filter the point cloud.
Returns:
an open3d point cloud or a numpy array of shape (N,) representing the condition.
"""
condition = np.ones(len(pcd.points), dtype=np.bool_)
xyz = np.asarray(pcd.points)
if x_range is not None:
condition &= (xyz[:, 0] >= x_range[0]) & (xyz[:, 0] <= x_range[1])
if y_range is not None:
condition &= (xyz[:, 1] >= y_range[0]) & (xyz[:, 1] <= y_range[1])
if z_range is not None:
condition &= (xyz[:, 2] >= z_range[0]) & (xyz[:, 2] <= z_range[1])
if return_condition:
return condition
pcd = pcd.select_by_index(np.where(condition)[0])
return pcd
[docs]
def ransac_table_detection(
pcd: o3d.geometry.PointCloud, distance_threshold: float = 0.005, ransac_n: int = 3, num_iterations: int = 1000, return_condition: bool = False
) -> Union[Tuple[np.ndarray, np.ndarray], Tuple[o3d.geometry.PointCloud, o3d.geometry.PointCloud]]:
"""Detect the table plane using RANSAC.
Args:
pcd: an open3d point cloud.
distance_threshold: the distance threshold for RANSAC.
ransac_n: the number of points to sample for RANSAC.
num_iterations: the number of iterations for RANSAC.
return_condition: whether to return the condition used to filter the point cloud.
Returns:
If return_condition is True, returns a tuple of (model, condition). Condition is a numpy array of shape (N,) representing the condition.
Otherwise, returns a tuple of (inliers, outliers). Inliers and outliers are open3d point clouds.
"""
model, inliers = pcd.segment_plane(distance_threshold, ransac_n, num_iterations)
if return_condition:
condition = np.zeros(len(pcd.points), dtype=np.bool_)
condition[inliers] = True
return model, condition
inlier_cloud = pcd.select_by_index(inliers)
outlier_cloud = pcd.select_by_index(inliers, invert=True)
return inlier_cloud, outlier_cloud
[docs]
def threshold_table_detection(
pcd: o3d.geometry.PointCloud, z_threshold: float = 0.005, return_condition: bool = False
) -> Union[Tuple[np.ndarray, np.ndarray], Tuple[o3d.geometry.PointCloud, o3d.geometry.PointCloud]]:
"""Detect the table plane using a simple threshold.
Args:
pcd: an open3d point cloud.
z_threshold: the threshold for the z coordinate.
return_condition: whether to return the condition used to filter the point cloud.
Returns:
If return_condition is True, returns a numpy array of shape (N,) representing the condition.
Otherwise, returns a tuple of (inliers, outliers). Inliers and outliers are open3d point clouds.
"""
xyz = np.asarray(pcd.points)
condition = np.abs(xyz[:, 2]) < z_threshold
if return_condition:
# TODO(Jiayuan Mao): check whether this should be + or -.
model = np.array([0, 0, 1, -z_threshold])
return model, condition
inlier_cloud = pcd.select_by_index(np.where(condition)[0])
outlier_cloud = pcd.select_by_index(np.where(condition)[0], invert=True)
return inlier_cloud, outlier_cloud
[docs]
def project_pointcloud_to_plane(pcd: o3d.geometry.PointCloud, model: np.ndarray) -> o3d.geometry.PointCloud:
"""Project a point cloud to a plane (into a 2D shape)
Args:
pcd: an open3d point cloud.
model: a numpy array of shape (4,) representing the plane model.
Returns:
an open3d point cloud.
"""
xyz = np.asarray(pcd.points).copy()
d = np.dot(xyz, model[:3]) + model[3]
xyz -= d[:, None] * model[None, :3]
new_pcd = o3d.geometry.PointCloud()
new_pcd.points = o3d.utility.Vector3dVector(xyz)
new_pcd.colors = pcd.colors
return new_pcd
def _cdist(xyz1: np.ndarray, xyz2: np.ndarray) -> np.ndarray:
"""Compute the pairwise distance between two point clouds.
Args:
xyz1: a numpy array of shape (N, 3).
xyz2: a numpy array of shape (M, 3).
Returns:
a numpy array of shape (N, M).
"""
return np.sqrt(np.sum((xyz1[:, None, :] - xyz2[None, :, :]) ** 2, axis=-1))
[docs]
def object_reconstruction_by_extrusion(
pcd: o3d.geometry.PointCloud, model: np.ndarray, extrusion_distance: float = 0.005, min_height: Optional[float] = 0.05,
include_inner_points: bool = False
) -> o3d.geometry.PointCloud:
"""Reconstruct an object by extrusion.
Args:
pcd: an open3d point cloud.
model: a numpy array of shape (4,) representing the plane model.
extrusion_distance: the distance to extrude.
min_height: the minimum height of the extrusion. If an object is too thin, we will extrude it to this height.
include_inner_points: whether to include the inner points of the object.
"""
# Compute the max distance from the plane.
xyz = np.asarray(pcd.points, dtype=np.float32).copy()
d = np.abs(model[:3].dot(xyz.T) + model[3]) / np.linalg.norm(model[:3])
max_d = np.max(d)
# Extrude the point cloud.
max_extrusion_nr = int(max_d / extrusion_distance)
projection = project_pointcloud_to_plane(pcd, model)
# Subsample the projected point cloud.
projection = projection.voxel_down_sample(voxel_size=0.005)
# projection = projection.uniform_down_sample(every_k_points=10)
projected_xyz = np.asarray(projection.points)
projected_rgb = np.asarray(projection.colors)
points = [(projected_xyz, projected_rgb)]
if include_inner_points:
for i in range(max_extrusion_nr):
extruded_xyz = projected_xyz + (i + 1) * extrusion_distance * model[:3]
# Compute the distance of all points to the extruded point cloud.
distances = _cdist(extruded_xyz, xyz)
# Find the closest point on the original pcd for each point.
closest_points = np.min(distances, axis=1)
# If the closest point is too close, we stop extruding for this point.
mask = closest_points > extrusion_distance
extruded_xyz = extruded_xyz[mask]
extruded_rgb = projected_rgb[mask]
points.append((extruded_xyz, extruded_rgb))
projected_xyz = projected_xyz[mask]
projected_rgb = projected_rgb[mask]
if min_height is not None and np.median(xyz[:, 2]) < min_height:
# if the object is just too thin, we add a top surface.
points.append((projected_xyz + np.array([0, 0, 0.05]), projected_rgb))
else:
points.append((xyz, np.asarray(pcd.colors)))
# Concatenate all points.
xyz = np.concatenate([p[0] for p in points], axis=0)
rgb = np.concatenate([p[1] for p in points], axis=0)
pcd = o3d.geometry.PointCloud()
pcd.points, pcd.colors = o3d.utility.Vector3dVector(xyz), o3d.utility.Vector3dVector(rgb)
return pcd
[docs]
def remove_outliers(pcd: o3d.geometry.PointCloud, min_neighbors: int, radius: float, verbose: bool = False) -> o3d.geometry.PointCloud:
"""Remove outliers from a point cloud.
Note: this function is not used in the pipeline.
Args:
pcd: an open3d point cloud.
min_neighbors: the minimum number of neighbors.
radius: the radius to search for neighbors.
Returns:
an open3d point cloud.
"""
if verbose:
print(" Removing outliers... Before: ", len(pcd.points))
pcd, _ = pcd.remove_radius_outlier(nb_points=min_neighbors, radius=radius)
if verbose:
print(" Removing outliers... After: ", len(pcd.points))
return pcd
[docs]
def mesh_reconstruction_alpha_shape(pcd: o3d.geometry.PointCloud, alpha: float = 0.1) -> o3d.geometry.TriangleMesh:
"""Reconstruct a mesh from a point cloud.
Args:
pcd: an open3d point cloud.
alpha: the alpha value for the alpha shape.
Returns:
an open3d triangle mesh.
"""
pcd = pcd.voxel_down_sample(voxel_size=0.0025)
pcd.estimate_normals()
# Project the object to the table plane
mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_alpha_shape(pcd, alpha=alpha)
# t_mesh = o3d.t.geometry.TriangleMesh.from_legacy(mesh)
# t_filled = t_mesh.fill_holes(hole_size=1)
# mesh = t_filled.to_legacy()
mesh.compute_vertex_normals()
mesh.compute_triangle_normals()
return mesh
[docs]
def mesh_nms(detected_objects: List[ObjectDetectionResult], bbox_overlapping_threshold: float = 0.1) -> List[ObjectDetectionResult]:
"""Perform non-maximum suppression on the detected objects.
Args:
detected_objects: a list of ObjectDetectionResult.
bbox_overlapping_threshold: the threshold for overlapping.
Returns:
a list of ObjectDetectionResult.
"""
# Sort the objects by their scores.
# Compute the pairwise IoU.
ious = np.zeros((len(detected_objects), len(detected_objects)))
for i in range(len(detected_objects)):
for j in range(i + 1, len(detected_objects)):
ious[i, j] = ious[j, i] = compute_iou_3d(detected_objects[i].mesh_bbox, detected_objects[j].mesh_bbox)
ious_visualization = [['' for _ in range(len(detected_objects) + 1)] for _ in range(len(detected_objects) + 1)]
for i in range(len(detected_objects)):
for j in range(len(detected_objects)):
if i == j:
ious_visualization[i + 1][j + 1] = 'X'
else:
ious_visualization[i + 1][j + 1] = f'{ious[i, j]:.2f}'
ious_visualization[i + 1][0] = f'{detected_objects[i].label}'
ious_visualization[0][i + 1] = f'{detected_objects[i].label}'
print(jacinle.tabulate(ious_visualization, tablefmt='simple'))
# Perform NMS.
selected_objects_indices = []
for i in range(len(detected_objects)):
if len(selected_objects_indices) == 0:
selected_objects_indices.append(i)
else:
if np.max(ious[i, selected_objects_indices]) > bbox_overlapping_threshold:
continue
selected_objects_indices.append(i)
return [detected_objects[i] for i in selected_objects_indices]
[docs]
def compute_iou_3d(bbox1: o3d.geometry.AxisAlignedBoundingBox, bbox2: o3d.geometry.AxisAlignedBoundingBox) -> float:
"""Compute the 3D IoU between two bounding boxes.
Args:
bbox1: an open3d axis aligned bounding box.
bbox2: an open3d axis aligned bounding box.
Returns:
the 3D IoU.
"""
# Compute the intersection.
bbox1 = np.asarray(bbox1.get_box_points())
bbox2 = np.asarray(bbox2.get_box_points())
a, b = bbox1.min(axis=0), bbox1.max(axis=0)
c, d = bbox2.min(axis=0), bbox2.max(axis=0)
x, y = np.maximum(a, c), np.minimum(b, d)
intersection = np.maximum(y - x, 0)
intersection_volume = np.prod(intersection)
a_volume = np.prod(b - a)
b_volume = np.prod(d - c)
union_volume = a_volume + b_volume - intersection_volume
return intersection_volume / union_volume
[docs]
def canonize_mesh_center_(mesh_: o3d.geometry.TriangleMesh) -> Tuple[o3d.geometry.TriangleMesh, np.ndarray]:
"""Canonize the mesh center. Note that this function modifies the mesh in place.
Args:
mesh_: an open3d triangle mesh.
Returns:
a tuple of an open3d triangle mesh, the center of the mesh.
"""
mesh_copy = o3d.geometry.TriangleMesh(mesh_)
# Compute the center of the mesh.
center = mesh_copy.get_center()
# Compute the transformation matrix.
T = np.eye(4)
T[:3, 3] = -center
# Apply the transformation matrix.
mesh_copy.transform(T)
return mesh_copy, center
[docs]
def visualize_with_camera_matrix(geometries, camera_extrinsics):
"""Visualize the geometries with a camera matrix.
Args:
geometries: a list of open3d geometries.
camera_extrinsics: a 4x4 camera extrinsics matrix.
"""
vis = o3d.visualization.VisualizerWithKeyCallback()
vis.create_window()
vis.get_render_option().background_color = np.asarray([0, 0, 0])
view_ctl = vis.get_view_control()
for geom in geometries:
vis.add_geometry(geom)
cam = view_ctl.convert_to_pinhole_camera_parameters()
cam.extrinsic = camera_extrinsics
view_ctl.convert_from_pinhole_camera_parameters(cam)
vis.run()
vis.destroy_window()
[docs]
def load_scene_in_pybullet(client: BulletClient, scene_dir: str, verbose: bool = False, simplify_name: bool = True, static: bool = False) -> Dict[str, Dict]:
scene_metafile = osp.join(scene_dir, 'metainfo.json')
scene_metainfo = jacinle.load(scene_metafile)
unique_labels = set()
non_unique_labels = set()
non_unique_label_counts = dict()
if simplify_name:
for o in scene_metainfo['objects']:
label = o['label'].replace(' ', '_')
if label in unique_labels:
non_unique_labels.add(label)
non_unique_label_counts[label] = 0
unique_labels.remove(label)
else:
unique_labels.add(label)
load_metainfo = dict()
for o in scene_metainfo['objects']:
urdf_filename = osp.basename(o['urdf'])
urdf_filename = osp.join(scene_dir, urdf_filename)
pos = o['pos']
pos = (pos[0], pos[1], pos[2] + 0.01)
if simplify_name:
name = o['label'].replace(' ', '_')
if name in non_unique_labels:
non_unique_label_counts[name] += 1
name = name + '_' + str(non_unique_label_counts[name])
else:
assert name in unique_labels
else:
name = o['unique_label'].replace(' ', '_')
index = client.load_urdf(urdf_filename, body_name=name, pos=pos, static=static)
load_metainfo[name] = dict(id=index, label=o['label'])
if verbose:
print('Loaded objects:')
for name, info in load_metainfo.items():
print(f' {name}: {info["label"]}')
return load_metainfo