import json
from pathlib import Path
import os
import numpy as np
import open3d as o3d
import torch
import klampt
from .point_vsf import PointVSF
from ..utils.data_utils import clean_light_points
from ..visualize.common import O3DViewParams
from dataclasses import dataclass, field, asdict
from typing import Tuple, List, Optional, Union, Dict
[docs]
@dataclass
class VSFFactoryConfig:
"""Configuration for the VSF factory process.
"""
features : List[str] = field(default_factory=list)
"""the factory will produce features according to this order"""
fps_num : Optional[int] = None
"""if present, performs Farthest Point Sampling to downsample to this number of points"""
voxel_size : Optional[float] = 0.005
"""performs voxel subsampling to this resolution"""
debug : bool = False
"""whether to visually debug the factory construction process"""
verbose : int = 0
"""verbosity level"""
class StandardVSFFeatures:
"""Gives a list of features that can be automatically generated by name.
You can also include 'colors' and 'normals'
"""
@staticmethod
def height(pts : np.ndarray) -> np.ndarray:
"""Feature: height above minimum height."""
zmin = np.min(pts[:,3])
return pts[:,3] - zmin
@staticmethod
def distance_to_centroid(pts : np.ndarray) -> np.ndarray:
"""Feature: distance to centroid"""
pts_feats = pts.copy()
pts_feats -= np.mean(pts_feats, axis=0)
max_dist = np.linalg.norm(pts_feats, axis=1).max()
pts_feats /= max_dist
return pts_feats
@staticmethod
def signed_distance(pts : np.ndarray, sdf : klampt.Geometry3D):
"""Feature: signed distance to a geometry"""
d = []
for p in pts:
d.append(sdf.distance_point(list(p)).d)
return np.array(d)
@staticmethod
def closest_distance(pts: np.ndarray, center_points : np.ndarray, knn : int = 10) -> np.ndarray:
"""Features are the closest distances to any of the centers in
center_indices.
"""
subset_tsr = o3d.core.Tensor(center_points)
subset_knn = o3d.core.nns.NearestNeighborSearch(subset_tsr)
subset_knn.knn_index()
pts_tsr = o3d.core.Tensor(pts)
prox_idx_ary, prox_eud_ary = subset_knn.knn_search(pts_tsr, knn)
return prox_eud_ary.numpy()
@staticmethod
def extrapolate(pts : np.ndarray, center_points : np.ndarray, center_features : np.ndarray, knn : int = 10) -> np.ndarray:
"""Not a feature itself, but rather extrapolates features from some
set of points to all other points. A KNN method is used."""
subset_tsr = o3d.core.Tensor(center_points)
subset_knn = o3d.core.nns.NearestNeighborSearch(subset_tsr)
subset_knn.knn_index()
pts_tsr = o3d.core.Tensor(pts)
prox_idx_ary, prox_eud_ary = subset_knn.knn_search(pts_tsr, knn)
pts_knn_feats = center_features[prox_idx_ary.numpy()]
pts_feats = np.mean(pts_knn_feats, axis=1)
return pts_feats
[docs]
class VSFFactory:
"""A VSF factory that produces PointVSF objects according to some
configuration. You can downsample, create features, etc.
Automatic usage:
.. code-block:: python
config = VSFFactoryConfig(...) #set up the desired voxel resolution, features, etc.
factory = VSFFactory(config)
for i,input in enumerate(inputs):
#input can be a bounding box, point cloud, or other geometry
vsf = factory.process(input) #does everything
#save the result
path = f'vsf_template{i}/'
os.path.mkdir(path)
vsf.save(path)
"""
def __init__(self, config : VSFFactoryConfig):
self.config = config
self.rest_pts:np.ndarray = np.zeros((0, 3))
self.features:Dict[str,np.ndarray] = {}
[docs]
def process(self, input : Union[str,Tuple[np.ndarray,np.ndarray],np.ndarray,o3d.geometry.PointCloud, klampt.Geometry3D],
features : Dict[str,np.ndarray]= None) -> PointVSF:
"""Performs the entire VSF construction pipeline from a given input.
Will do the best it can to extract appropriate features.
Args:
input: a filename, bounding box, set of rest points, PointCloud, or klampt
Geometry3D. For a bounding box, the particles will be sampled
with resolution config.voxel_size. For a Geometry3D, point clouds will
be converted directly. Other geometry types will be converted to a SDF
at resolution config.voxel_size.
features: a dictionary of provided features.
Returns:
PointVSF: the instantiated VSF.
"""
if features is None:
features = {}
sampled_voxel = False
surface_pts = None
if isinstance(input,str):
geom = klampt.Geometry3D()
if not geom.loadFile(input):
raise ValueError('Could not load geometry from file '+input)
input = geom
if isinstance(input,tuple) or isinstance(input,np.ndarray) and input.shape == (2,3):
#it's a bounding box, sample densely
assert self.config.voxel_size is not None
sampled_voxel = True
bmin,bmax = input
input = np.column_stack(np.meshgrid(*[np.arange(bmin[i],bmax[i],self.config.voxel_size) for i in range(3)]))
elif isinstance(input,o3d.geometry.PointCloud):
input = input.points
if 'colors' not in features:
features['colors'] = input.colors
if 'normals' not in features:
features['normals'] = input.normals
elif isinstance(input,klampt.Geometry3D):
if input.type() == 'PointCloud':
from klampt.model.geometry import point_cloud_normals
pc = input.getPointCloud()
input = pc.points
if 'colors' not in features:
features['colors'] = pc.getColors(('r','g','b'))
if 'normals' not in features:
features['normals'] = point_cloud_normals(pc, False)
else:
#compute SDF
assert self.config.voxel_size is not None
sampled_voxel = True
sdf = input.convert('ImplicitSurface',self.config.voxel_size)
input = sdf.convert('PointCloud').getPointCloud().points
#TODO: add mesh vertices
features['signed_distances'] = StandardVSFFeatures.signed_distance(input, sdf)
self.rest_pts = input
if self.config.fps_num is not None or (self.config.voxel_size is not None and not sampled_voxel):
if len(self.config.features) > 0:
raise NotImplementedError("Can't downsample with features yet")
self.downsample()
self.features = {}
for f in self.config.features:
if f in features:
assert len(features[f]) == len(self.rest_pts),"Invalid feature array size"
self.features[f] = features[f]
elif f in StandardVSFFeatures.__dict__:
self.features[f] = getattr(StandardVSFFeatures,f)(self.rest_pts)
else:
raise RuntimeError("Factory configuration requires feature {} but this is not provided in the feature list".format(f))
return self.get()
[docs]
def get(self) -> PointVSF:
"""Retrieves the currently processed VSF"""
for f in self.config.features:
if f not in self.features:
raise RuntimeError("Did not provide feature {} which is required by the factory configuration".format(f))
features_torch = None if len(self.config.features)==0 else {f:torch.tensor(self.features[f]) for f in self.config.features}
return PointVSF(rest_points=torch.tensor(self.rest_pts), features=features_torch)
[docs]
def downsample(self):
"""Downsamples the rest points according to the given parameters.
WARNING: downsampling must be performed before features are assigned.
"""
rest_pcd = o3d.geometry.PointCloud()
rest_pcd.points = o3d.utility.Vector3dVector(self.rest_pts)
if self.config.fps_num is not None:
if self.config.verbose:
print("farthest points downsampling to", self.config.fps_num, "points")
if len(self.rest_pts) < self.config.fps_num:
print("Warning: not enough points to downsample to", self.config.fps_num)
return
# TODO: double check FPS should be deterministic
low_res_pcd = rest_pcd.farthest_point_down_sample(self.config.fps_num)
elif self.config.voxel_size is not None:
if self.config.verbose:
print("voxel downsampling to", self.config.voxel_size, "resolution")
low_res_pcd = rest_pcd.voxel_down_sample(self.voxel_size)
elif self.config.verbose:
print('INFO: no down sample paramters found, use original points')
return
# visualize downsampled point cloud for debugging
if self.config.debug:
o3d.visualization.draw_geometries([low_res_pcd], window_name='downsampled point cloud')
self.rest_pts = np.array(low_res_pcd.points)
[docs]
def set_features(self, name : str, feature_array : np.ndarray = None):
"""Sets the features under name to a given array, or use the default
feature constructor for that feature."""
assert len(feature_array) == len(self.rest_pts),"Invalid feature array size"
if feature_array is None:
if name in StandardVSFFeatures.__dict__:
feature_array = getattr(StandardVSFFeatures,f)(self.rest_pts)
else:
raise RuntimeError("Requested feature {} but this is not provided as a standard feature".format(name))
self.features[name] = feature_array
[docs]
def save_all(self):
self.save_rest_pts()
self.save_features()
[docs]
def load_all(self):
self.load_rest_pts()
self.load_features()
[docs]
def save_rest_pts(self, rest_pts_fn=None):
"""Saves rest points to the standard file in out_dir."""
if rest_pts_fn is None:
rest_pts_fn = str(Path(self.config.out_dir) / 'points.npy')
assert not Path(rest_pts_fn).exists(), f"file {rest_pts_fn} already exists"
np.save(rest_pts_fn, self.rest_pts)
[docs]
def load_rest_pts(self, rest_pts_fn=None):
"""Loads rest points from the standard file in out_dir."""
if rest_pts_fn is None:
rest_pts_fn = str(Path(self.config.out_dir) / 'points.npy')
self.rest_pts = np.load(rest_pts_fn)
[docs]
def save_features(self, features = None, obj_feats_fns = None):
"""Saves features to the standard files in out_dir."""
if features is None:
features = self.config.features
if obj_feats_fns is None:
obj_feats_fns = [Path(self.config.out_dir) / (name+'.npy') for name in features]
for name,fn in zip(features,obj_feats_fns):
if name not in self.features:
raise ValueError("Invalid feature {} or feature not present".format(name))
np.save(str(fn), self.features[name])
[docs]
def load_features(self, features = None, obj_feats_fns = None):
"""Loads features from the standard files in out_dir"""
if features is None:
features = self.config.features
if obj_feats_fns is None:
obj_feats_fns = [Path(self.config.out_dir) / (name+'.npy') for name in features]
for name,fn in zip(features,obj_feats_fns):
ary = np.load(fn)
self.features[name] = ary
@dataclass
class ViewConfig(O3DViewParams):
origin : List[float] = field(default_factory = lambda : [0.0,0.0,0.0]) # the position of the camera in world coordinates
@dataclass
class CleanPCDConfig:
"""Cleans the point cloud before performing processing.
Current methods:
- 'lightness': removes points with lightness values outside a range.
"""
method : str # method for cleaning the point cloud, only currently support 'lightness'
light_min : float # minimum lightness value
light_max : float # maximum lightness value
[docs]
@dataclass
class VSFRGBDCameraFactoryConfig(VSFFactoryConfig):
"""Configuration for a VSF creation process that instantiates a
point-based VSF from an RGBD camera's point cloud.
"""
view : ViewConfig = field(default_factory=lambda : ViewConfig()) # view configuration
bbox : Optional[List[List[float]]] = None # bounding box of the object
downsample_visual : bool = False
clean_visual : Optional[CleanPCDConfig] = None # clean point cloud configuration
depth_extrapolation : float = 1.0 # max amount of depth to extrapolate the volume
depth_sig : float = 0.005 #depth noise standard deviation estimate; used to offset closest points toward camera
def idx_out_bound(idx, vg_shape):
out_min = idx<np.array([0, 0, 0])
out_max = idx>=np.array(vg_shape)
out_axis = np.logical_or(out_min, out_max)
return np.any(out_axis, axis=1)
def pts_out_bound(pts, b_min, b_max):
out_min = pts<b_min
out_max = pts>b_max
out_axis = np.logical_or(out_min, out_max)
return np.any(out_axis, axis=1)
[docs]
class VSFRGBDCameraFactory(VSFFactory):
"""A VSF factory that produces VSF objects from RGBD camera data.
Inherits from VSFFactory.
Operates by:
1. Subsetting an RGBD point cloud by a bounding box selection
1. Creating surface points from the subsetted point cloud
2. Downsampling, if requested
3. Creates a interior points behind the point cloud to fill the volume
4. Extrapolates point cloud features into the volume
The input can be a background point cloud or even just a folder path.
Attributes:
bg_pcd (o3d.geometry.PointCloud): the whole point cloud
vis_pcd (o3d.geometry.PointCloud): the point cloud corresponding to the object, selected by the bounding box
bbox (o3d.geometry.AxisAlignedBoundingBox): the selection bounding box
vg_shape (tuple): the size of the voxel points used for creating interior points
"""
def __init__(self, config : VSFRGBDCameraFactoryConfig):
super().__init__(config)
self.config = config
self.bg_pcd = None
self.cam_pt = self.config.view.origin
self.num_sf_pts = 0
assert self.config.voxel_size is not None
self.b_min, self.b_max = np.array(self.config.bbox)
self.bbox = o3d.geometry.AxisAlignedBoundingBox(min_bound=self.b_min,
max_bound=self.b_max)
# set bbox color to black by default
self.bbox.color = [0, 0, 0]
assert self.config.voxel_size is not None
self.vg_shape = [int(np.ceil((self.b_max[i]-self.b_min[i])/self.config.voxel_size)) for i in range(3)]
self.vg2pc_idx_map = -1*np.ones(self.vg_shape, dtype=int)
[docs]
def process(self, pcd : Union[str,o3d.geometry.PointCloud],
features : Union[List[str],Dict[str,np.ndarray]] = None) -> PointVSF:
"""Performs the entire VSF construction pipeline from a given input.
Args:
pcd: a filename, folder, or PointCloud object. If a folder, will load
from folder/bg_pcd.pcd. If a filename, will load from that file.
features: a dictionary of provided features or a list of filenames in
the given folder. If None, will use the default features.
Returns:
PointVSF: the instantiated VSF.
"""
if isinstance(pcd,str):
if os.path.isdir(pcd):
#load from standard directory
folder = pcd
print("Reading from directory",folder)
print("Reading background point cloud from bg_pcd.pcd")
pcd = os.path.join(pcd,'bg_pcd.pcd')
if features is None:
features = []
for f in self.config.features:
if hasattr(StandardVSFFeatures,f) or f in ['color','depth','fpfh']:
continue
if os.path.exists(os.path.join(folder,f+'.npy')):
features.append(os.path.join(folder,f+'.npy'))
elif os.path.exists(os.path.join(folder,f+'.npz')):
features.append(os.path.join(folder,f+'.npz'))
else:
print("Warning: feature",f,"not standard and not found in directory")
input()
if isinstance(features, list):
# assume features is a list of filenames, either npy or npz
feature_matrices = {}
for f in features:
if hasattr(StandardVSFFeatures,f) or f in ['color','depth','fpfh']:
continue
feature_name = os.path.splitext(os.path.basename(f))[0]
print("Reading feature",feature_name,"from",f)
if f.endswith('.npz'): # sparse matrix
from scipy import sparse
feature_matrices[feature_name] = sparse.load_npz(f).toarray()
else:
feature_matrices[feature_name] = np.load(f)
print(" Value has length",len(feature_matrices[feature_name]))
input()
features = feature_matrices
pcd = o3d.io.read_point_cloud(pcd)
self.bg_pcd = pcd
if self.config.debug:
o3d.visualization.draw_geometries([self.bg_pcd])
if features is None:
features = {}
cam_pt = self.cam_pt
#select vis_pcd from bbox
in_bound_idx = self.bbox.get_point_indices_within_bounding_box(self.bg_pcd.points)
self.vis_pcd = self.bg_pcd.select_by_index(in_bound_idx)
if self.config.verbose:
print("BBox selection: keeping",len(in_bound_idx),"points out of",len(self.bg_pcd.points))
if self.config.clean_visual is not None:
if self.config.clean_visual.method == 'lightness':
if self.config.verbose:
print('cleaning points by lightness')
self.vis_pcd = clean_light_points(self.vis_pcd, light_min=self.config.clean_visual.light_min, light_max=self.config.clean_visual.light_max)
if self.config.debug:
o3d.visualization.draw_geometries([self.bbox, self.vis_pcd])
st_pts_ary = np.array(self.vis_pcd.points)
if self.config.depth_sig is not None:
depth_sig_dir = (cam_pt - st_pts_ary)
st_pts_ary += self.config.depth_sig*np.linalg.norm(depth_sig_dir, axis=1,
keepdims=True)
if self.config.debug:
new_vis_pcd = o3d.geometry.PointCloud()
new_vis_pcd.points = o3d.utility.Vector3dVector(st_pts_ary)
new_vis_pcd.paint_uniform_color([0.0, 0.7, 0.7])
o3d.visualization.draw_geometries([new_vis_pcd, self.vis_pcd])
# NOTE: down sample points after initializing starting points
if self.config.downsample_visual:
if self.config.verbose:
print("downsampling visual point cloud to voxel size", self.config.voxel_size)
self.vis_pcd = self.vis_pcd.voxel_down_sample(voxel_size=self.config.voxel_size)
diff_pts_ary = st_pts_ary-cam_pt
step_ary = np.sign(diff_pts_ary).astype(dtype=int)
pts_diff = st_pts_ary-cam_pt
rel_dir_ary = np.array([pts_diff[:, 1]*pts_diff[:, 2],
pts_diff[:, 0]*pts_diff[:, 2],
pts_diff[:, 0]*pts_diff[:, 1]]).T
st_idx_ary = np.floor((st_pts_ary - self.b_min)/self.config.voxel_size)
st_idx_ary = st_idx_ary.astype(dtype=int)
tDelta_ary = np.abs(self.config.voxel_size*rel_dir_ary)
# NOTE: cb means closest bound
cb_idx_ary = st_idx_ary + (step_ary > 0).astype(int)
cb_pts_ary = self.b_min + cb_idx_ary*self.config.voxel_size
tMax_ary = np.abs((cb_pts_ary - cam_pt)*rel_dir_ary)
idx_ary_lst = []
curr_idx_ary = np.copy(st_idx_ary)
# extrapolate observed points through the volume
max_iters = int(np.ceil(self.config.depth_extrapolation/self.config.voxel_size))
for iter_step in range(max_iters):
if self.config.verbose:
print(f"iter step {iter_step} left pts:",
curr_idx_ary.shape[0])
if curr_idx_ary.size == 0: #all exrrapolated points are out of bounds
break
min_idx_ary = tMax_ary.argmin(axis=1)
arange_idx = np.arange(0, tMax_ary.shape[0])
tMax_ary[arange_idx, min_idx_ary] += tDelta_ary[arange_idx, min_idx_ary]
curr_idx_ary[arange_idx, min_idx_ary] += step_ary[arange_idx, min_idx_ary]
# set free space points
out_bool = idx_out_bound(curr_idx_ary, self.vg_shape)
# remove redundant points
curr_idx_ary = curr_idx_ary[~out_bool, :]
step_ary = step_ary[~out_bool, :]
tMax_ary = tMax_ary[~out_bool, :]
tDelta_ary = tDelta_ary[~out_bool, :]
idx_ary_lst.append(np.copy(curr_idx_ary))
# merge points index
merged_idx_ary = np.concatenate(idx_ary_lst, axis=0)
merged_idx_ary = np.unique(merged_idx_ary, axis=0)
vg_pts = self.b_min + self.config.voxel_size*merged_idx_ary
sf_pts = np.array(self.vis_pcd.points)
self.num_sf_pts = sf_pts.shape[0]
merged_idx_tpl = tuple(merged_idx_ary.T)
vg_rest_idx = self.num_sf_pts + np.arange(merged_idx_ary.shape[0])
self.vg2pc_idx_map[merged_idx_tpl] = vg_rest_idx
if self.config.verbose:
print("number of surface points:", self.num_sf_pts)
print("number of volume points:", vg_rest_idx.shape[0])
self.rest_pts = np.append(sf_pts, vg_pts, axis=0)
# Downsample VSF after creating from the volume
if self.config.fps_num is not None:
if len(self.config.features) > 0:
raise ValueError("Can't downsample with features yet")
self.downsample()
#setup features
for f in self.config.features:
if f in ['color','fpfh','depth'] or f in features:
self.set_vis_features(f,features.get(f,None))
else:
self.set_features(f)
if self.config.debug:
bbox = o3d.geometry.AxisAlignedBoundingBox(self.b_min, self.b_max)
bbox.color = [0, 0, 0]
rest_pcd = o3d.geometry.PointCloud()
rest_pcd.points = o3d.utility.Vector3dVector(self.rest_pts)
o3d.visualization.draw_geometries([self.bbox, rest_pcd])
return self.get()
[docs]
def set_vis_features(self, feat_type : str, values : np.ndarray = None):
surf_pts = np.array(self.vis_pcd.points)
if feat_type == 'color':
self.set_features(feat_type,StandardVSFFeatures.extrapolate(self.rest_pts,surf_pts,np.array(self.vis_pcd.colors)))
elif feat_type == 'fpfh':
v_size = self.config.voxel_size
self.vis_pcd.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=2*v_size, max_nn=30))
kd_params = o3d.geometry.KDTreeSearchParamHybrid(radius=5*v_size, max_nn=100)
pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(self.vis_pcd, kd_params)
print("fpfh feature shape:", pcd_fpfh.dimension())
fpfh = np.array(pcd_fpfh.data).T
self.set_features(feat_type,StandardVSFFeatures.extrapolate(self.rest_pts,surf_pts,fpfh))
elif feat_type == 'depth':
self.set_features(feat_type,StandardVSFFeatures.closest_distance(self.rest_pts,surf_pts))
else:
assert values is not None,"Unknown surface feature type {}".format(feat_type)
self.set_features(feat_type,StandardVSFFeatures.extrapolate(self.rest_pts,surf_pts,values))
[docs]
def load_obj_all(self, bg_pcd_fn=None, vis_pcd_fn=None):
"""Loads from previously saved files"""
if bg_pcd_fn is None:
bg_pcd_fn = str(Path(self.config.out_dir) / 'bg_pcd.pcd')
self.bg_pcd = o3d.io.read_point_cloud(bg_pcd_fn)
if vis_pcd_fn is None:
vis_pcd_fn = str(Path(self.config.out_dir) / 'vis_pcd.pcd')
self.vis_pcd = o3d.io.read_point_cloud(vis_pcd_fn)
self.load_all()
# compute number of surface points
self.num_sf_pts = np.array(self.vis_pcd.points).shape[0]
[docs]
def save_obj_all(self, bg_pcd_fn=None, vis_pcd_fn=None):
if self.bg_pcd is not None:
if bg_pcd_fn is None:
bg_pcd_fn = str(Path(self.config.out_dir) / 'bg_pcd.pcd')
o3d.io.write_point_cloud(bg_pcd_fn, self.bg_pcd)
self.save_vis_pcd(vis_pcd_fn)
self.save_all()
[docs]
def setup_obj_pts(self, bg_pcd_fn=None, cam_pt=None):
if bg_pcd_fn is None:
bg_pcd_fn = str(Path(self.config.out_dir) / 'bg_pcd.pcd')
bg_pcd = o3d.io.read_point_cloud(bg_pcd_fn)
self.process(bg_pcd)
[docs]
def get_vg_pts(self) -> np.ndarray:
"""Returns points that belong to occupied voxel grid cells"""
vg_idx = np.where(self.vg2pc_idx_map != -1)
vg_idx = np.array(vg_idx).T
return self.b_min + vg_idx*self.voxel_size
[docs]
def get_sf_pts(self) -> np.ndarray:
"""Returns points that belong to the visible surface"""
return self.rest_pts[:self.num_sf_pts]
[docs]
def save_vis_pcd(self, vis_pcd_fn=None):
if vis_pcd_fn is None:
vis_pcd_fn = str(Path(self.config.out_dir) / 'vis_pcd.pcd')
o3d.io.write_point_cloud(vis_pcd_fn, self.vis_pcd)
[docs]
def load_vis_pcd(self, vis_pcd_fn=None):
if vis_pcd_fn is None:
vis_pcd_fn = str(Path(self.config.out_dir) / 'vis_pcd.pcd')
self.vis_pcd = o3d.io.read_point_cloud(vis_pcd_fn)