Source code for vsf.core.vsf_factory

import json
from pathlib import Path
import os

import numpy as np
import open3d as o3d
import torch
import klampt
from .point_vsf import PointVSF
from ..utils.data_utils import clean_light_points
from ..visualize.common import O3DViewParams
from dataclasses import dataclass, field, asdict
from typing import Tuple, List, Optional, Union, Dict


[docs] @dataclass class VSFFactoryConfig: """Configuration for the VSF factory process. """ features : List[str] = field(default_factory=list) """the factory will produce features according to this order""" fps_num : Optional[int] = None """if present, performs Farthest Point Sampling to downsample to this number of points""" voxel_size : Optional[float] = 0.005 """performs voxel subsampling to this resolution""" debug : bool = False """whether to visually debug the factory construction process""" verbose : int = 0 """verbosity level"""
class StandardVSFFeatures: """Gives a list of features that can be automatically generated by name. You can also include 'colors' and 'normals' """ @staticmethod def height(pts : np.ndarray) -> np.ndarray: """Feature: height above minimum height.""" zmin = np.min(pts[:,3]) return pts[:,3] - zmin @staticmethod def distance_to_centroid(pts : np.ndarray) -> np.ndarray: """Feature: distance to centroid""" pts_feats = pts.copy() pts_feats -= np.mean(pts_feats, axis=0) max_dist = np.linalg.norm(pts_feats, axis=1).max() pts_feats /= max_dist return pts_feats @staticmethod def signed_distance(pts : np.ndarray, sdf : klampt.Geometry3D): """Feature: signed distance to a geometry""" d = [] for p in pts: d.append(sdf.distance_point(list(p)).d) return np.array(d) @staticmethod def closest_distance(pts: np.ndarray, center_points : np.ndarray, knn : int = 10) -> np.ndarray: """Features are the closest distances to any of the centers in center_indices. """ subset_tsr = o3d.core.Tensor(center_points) subset_knn = o3d.core.nns.NearestNeighborSearch(subset_tsr) subset_knn.knn_index() pts_tsr = o3d.core.Tensor(pts) prox_idx_ary, prox_eud_ary = subset_knn.knn_search(pts_tsr, knn) return prox_eud_ary.numpy() @staticmethod def extrapolate(pts : np.ndarray, center_points : np.ndarray, center_features : np.ndarray, knn : int = 10) -> np.ndarray: """Not a feature itself, but rather extrapolates features from some set of points to all other points. A KNN method is used.""" subset_tsr = o3d.core.Tensor(center_points) subset_knn = o3d.core.nns.NearestNeighborSearch(subset_tsr) subset_knn.knn_index() pts_tsr = o3d.core.Tensor(pts) prox_idx_ary, prox_eud_ary = subset_knn.knn_search(pts_tsr, knn) pts_knn_feats = center_features[prox_idx_ary.numpy()] pts_feats = np.mean(pts_knn_feats, axis=1) return pts_feats
[docs] class VSFFactory: """A VSF factory that produces PointVSF objects according to some configuration. You can downsample, create features, etc. Automatic usage: .. code-block:: python config = VSFFactoryConfig(...) #set up the desired voxel resolution, features, etc. factory = VSFFactory(config) for i,input in enumerate(inputs): #input can be a bounding box, point cloud, or other geometry vsf = factory.process(input) #does everything #save the result path = f'vsf_template{i}/' os.path.mkdir(path) vsf.save(path) """ def __init__(self, config : VSFFactoryConfig): self.config = config self.rest_pts:np.ndarray = np.zeros((0, 3)) self.features:Dict[str,np.ndarray] = {}
[docs] def process(self, input : Union[str,Tuple[np.ndarray,np.ndarray],np.ndarray,o3d.geometry.PointCloud, klampt.Geometry3D], features : Dict[str,np.ndarray]= None) -> PointVSF: """Performs the entire VSF construction pipeline from a given input. Will do the best it can to extract appropriate features. Args: input: a filename, bounding box, set of rest points, PointCloud, or klampt Geometry3D. For a bounding box, the particles will be sampled with resolution config.voxel_size. For a Geometry3D, point clouds will be converted directly. Other geometry types will be converted to a SDF at resolution config.voxel_size. features: a dictionary of provided features. Returns: PointVSF: the instantiated VSF. """ if features is None: features = {} sampled_voxel = False surface_pts = None if isinstance(input,str): geom = klampt.Geometry3D() if not geom.loadFile(input): raise ValueError('Could not load geometry from file '+input) input = geom if isinstance(input,tuple) or isinstance(input,np.ndarray) and input.shape == (2,3): #it's a bounding box, sample densely assert self.config.voxel_size is not None sampled_voxel = True bmin,bmax = input input = np.column_stack(np.meshgrid(*[np.arange(bmin[i],bmax[i],self.config.voxel_size) for i in range(3)])) elif isinstance(input,o3d.geometry.PointCloud): input = input.points if 'colors' not in features: features['colors'] = input.colors if 'normals' not in features: features['normals'] = input.normals elif isinstance(input,klampt.Geometry3D): if input.type() == 'PointCloud': from klampt.model.geometry import point_cloud_normals pc = input.getPointCloud() input = pc.points if 'colors' not in features: features['colors'] = pc.getColors(('r','g','b')) if 'normals' not in features: features['normals'] = point_cloud_normals(pc, False) else: #compute SDF assert self.config.voxel_size is not None sampled_voxel = True sdf = input.convert('ImplicitSurface',self.config.voxel_size) input = sdf.convert('PointCloud').getPointCloud().points #TODO: add mesh vertices features['signed_distances'] = StandardVSFFeatures.signed_distance(input, sdf) self.rest_pts = input if self.config.fps_num is not None or (self.config.voxel_size is not None and not sampled_voxel): if len(self.config.features) > 0: raise NotImplementedError("Can't downsample with features yet") self.downsample() self.features = {} for f in self.config.features: if f in features: assert len(features[f]) == len(self.rest_pts),"Invalid feature array size" self.features[f] = features[f] elif f in StandardVSFFeatures.__dict__: self.features[f] = getattr(StandardVSFFeatures,f)(self.rest_pts) else: raise RuntimeError("Factory configuration requires feature {} but this is not provided in the feature list".format(f)) return self.get()
[docs] def get(self) -> PointVSF: """Retrieves the currently processed VSF""" for f in self.config.features: if f not in self.features: raise RuntimeError("Did not provide feature {} which is required by the factory configuration".format(f)) features_torch = None if len(self.config.features)==0 else {f:torch.tensor(self.features[f]) for f in self.config.features} return PointVSF(rest_points=torch.tensor(self.rest_pts), features=features_torch)
[docs] def downsample(self): """Downsamples the rest points according to the given parameters. WARNING: downsampling must be performed before features are assigned. """ rest_pcd = o3d.geometry.PointCloud() rest_pcd.points = o3d.utility.Vector3dVector(self.rest_pts) if self.config.fps_num is not None: if self.config.verbose: print("farthest points downsampling to", self.config.fps_num, "points") if len(self.rest_pts) < self.config.fps_num: print("Warning: not enough points to downsample to", self.config.fps_num) return # TODO: double check FPS should be deterministic low_res_pcd = rest_pcd.farthest_point_down_sample(self.config.fps_num) elif self.config.voxel_size is not None: if self.config.verbose: print("voxel downsampling to", self.config.voxel_size, "resolution") low_res_pcd = rest_pcd.voxel_down_sample(self.voxel_size) elif self.config.verbose: print('INFO: no down sample paramters found, use original points') return # visualize downsampled point cloud for debugging if self.config.debug: o3d.visualization.draw_geometries([low_res_pcd], window_name='downsampled point cloud') self.rest_pts = np.array(low_res_pcd.points)
[docs] def set_features(self, name : str, feature_array : np.ndarray = None): """Sets the features under name to a given array, or use the default feature constructor for that feature.""" assert len(feature_array) == len(self.rest_pts),"Invalid feature array size" if feature_array is None: if name in StandardVSFFeatures.__dict__: feature_array = getattr(StandardVSFFeatures,f)(self.rest_pts) else: raise RuntimeError("Requested feature {} but this is not provided as a standard feature".format(name)) self.features[name] = feature_array
[docs] def save_all(self): self.save_rest_pts() self.save_features()
[docs] def load_all(self): self.load_rest_pts() self.load_features()
[docs] def save_rest_pts(self, rest_pts_fn=None): """Saves rest points to the standard file in out_dir.""" if rest_pts_fn is None: rest_pts_fn = str(Path(self.config.out_dir) / 'points.npy') assert not Path(rest_pts_fn).exists(), f"file {rest_pts_fn} already exists" np.save(rest_pts_fn, self.rest_pts)
[docs] def load_rest_pts(self, rest_pts_fn=None): """Loads rest points from the standard file in out_dir.""" if rest_pts_fn is None: rest_pts_fn = str(Path(self.config.out_dir) / 'points.npy') self.rest_pts = np.load(rest_pts_fn)
[docs] def save_features(self, features = None, obj_feats_fns = None): """Saves features to the standard files in out_dir.""" if features is None: features = self.config.features if obj_feats_fns is None: obj_feats_fns = [Path(self.config.out_dir) / (name+'.npy') for name in features] for name,fn in zip(features,obj_feats_fns): if name not in self.features: raise ValueError("Invalid feature {} or feature not present".format(name)) np.save(str(fn), self.features[name])
[docs] def load_features(self, features = None, obj_feats_fns = None): """Loads features from the standard files in out_dir""" if features is None: features = self.config.features if obj_feats_fns is None: obj_feats_fns = [Path(self.config.out_dir) / (name+'.npy') for name in features] for name,fn in zip(features,obj_feats_fns): ary = np.load(fn) self.features[name] = ary
@dataclass class ViewConfig(O3DViewParams): origin : List[float] = field(default_factory = lambda : [0.0,0.0,0.0]) # the position of the camera in world coordinates @dataclass class CleanPCDConfig: """Cleans the point cloud before performing processing. Current methods: - 'lightness': removes points with lightness values outside a range. """ method : str # method for cleaning the point cloud, only currently support 'lightness' light_min : float # minimum lightness value light_max : float # maximum lightness value
[docs] @dataclass class VSFRGBDCameraFactoryConfig(VSFFactoryConfig): """Configuration for a VSF creation process that instantiates a point-based VSF from an RGBD camera's point cloud. """ view : ViewConfig = field(default_factory=lambda : ViewConfig()) # view configuration bbox : Optional[List[List[float]]] = None # bounding box of the object downsample_visual : bool = False clean_visual : Optional[CleanPCDConfig] = None # clean point cloud configuration depth_extrapolation : float = 1.0 # max amount of depth to extrapolate the volume depth_sig : float = 0.005 #depth noise standard deviation estimate; used to offset closest points toward camera
def idx_out_bound(idx, vg_shape): out_min = idx<np.array([0, 0, 0]) out_max = idx>=np.array(vg_shape) out_axis = np.logical_or(out_min, out_max) return np.any(out_axis, axis=1) def pts_out_bound(pts, b_min, b_max): out_min = pts<b_min out_max = pts>b_max out_axis = np.logical_or(out_min, out_max) return np.any(out_axis, axis=1)
[docs] class VSFRGBDCameraFactory(VSFFactory): """A VSF factory that produces VSF objects from RGBD camera data. Inherits from VSFFactory. Operates by: 1. Subsetting an RGBD point cloud by a bounding box selection 1. Creating surface points from the subsetted point cloud 2. Downsampling, if requested 3. Creates a interior points behind the point cloud to fill the volume 4. Extrapolates point cloud features into the volume The input can be a background point cloud or even just a folder path. Attributes: bg_pcd (o3d.geometry.PointCloud): the whole point cloud vis_pcd (o3d.geometry.PointCloud): the point cloud corresponding to the object, selected by the bounding box bbox (o3d.geometry.AxisAlignedBoundingBox): the selection bounding box vg_shape (tuple): the size of the voxel points used for creating interior points """ def __init__(self, config : VSFRGBDCameraFactoryConfig): super().__init__(config) self.config = config self.bg_pcd = None self.cam_pt = self.config.view.origin self.num_sf_pts = 0 assert self.config.voxel_size is not None self.b_min, self.b_max = np.array(self.config.bbox) self.bbox = o3d.geometry.AxisAlignedBoundingBox(min_bound=self.b_min, max_bound=self.b_max) # set bbox color to black by default self.bbox.color = [0, 0, 0] assert self.config.voxel_size is not None self.vg_shape = [int(np.ceil((self.b_max[i]-self.b_min[i])/self.config.voxel_size)) for i in range(3)] self.vg2pc_idx_map = -1*np.ones(self.vg_shape, dtype=int)
[docs] def process(self, pcd : Union[str,o3d.geometry.PointCloud], features : Union[List[str],Dict[str,np.ndarray]] = None) -> PointVSF: """Performs the entire VSF construction pipeline from a given input. Args: pcd: a filename, folder, or PointCloud object. If a folder, will load from folder/bg_pcd.pcd. If a filename, will load from that file. features: a dictionary of provided features or a list of filenames in the given folder. If None, will use the default features. Returns: PointVSF: the instantiated VSF. """ if isinstance(pcd,str): if os.path.isdir(pcd): #load from standard directory folder = pcd print("Reading from directory",folder) print("Reading background point cloud from bg_pcd.pcd") pcd = os.path.join(pcd,'bg_pcd.pcd') if features is None: features = [] for f in self.config.features: if hasattr(StandardVSFFeatures,f) or f in ['color','depth','fpfh']: continue if os.path.exists(os.path.join(folder,f+'.npy')): features.append(os.path.join(folder,f+'.npy')) elif os.path.exists(os.path.join(folder,f+'.npz')): features.append(os.path.join(folder,f+'.npz')) else: print("Warning: feature",f,"not standard and not found in directory") input() if isinstance(features, list): # assume features is a list of filenames, either npy or npz feature_matrices = {} for f in features: if hasattr(StandardVSFFeatures,f) or f in ['color','depth','fpfh']: continue feature_name = os.path.splitext(os.path.basename(f))[0] print("Reading feature",feature_name,"from",f) if f.endswith('.npz'): # sparse matrix from scipy import sparse feature_matrices[feature_name] = sparse.load_npz(f).toarray() else: feature_matrices[feature_name] = np.load(f) print(" Value has length",len(feature_matrices[feature_name])) input() features = feature_matrices pcd = o3d.io.read_point_cloud(pcd) self.bg_pcd = pcd if self.config.debug: o3d.visualization.draw_geometries([self.bg_pcd]) if features is None: features = {} cam_pt = self.cam_pt #select vis_pcd from bbox in_bound_idx = self.bbox.get_point_indices_within_bounding_box(self.bg_pcd.points) self.vis_pcd = self.bg_pcd.select_by_index(in_bound_idx) if self.config.verbose: print("BBox selection: keeping",len(in_bound_idx),"points out of",len(self.bg_pcd.points)) if self.config.clean_visual is not None: if self.config.clean_visual.method == 'lightness': if self.config.verbose: print('cleaning points by lightness') self.vis_pcd = clean_light_points(self.vis_pcd, light_min=self.config.clean_visual.light_min, light_max=self.config.clean_visual.light_max) if self.config.debug: o3d.visualization.draw_geometries([self.bbox, self.vis_pcd]) st_pts_ary = np.array(self.vis_pcd.points) if self.config.depth_sig is not None: depth_sig_dir = (cam_pt - st_pts_ary) st_pts_ary += self.config.depth_sig*np.linalg.norm(depth_sig_dir, axis=1, keepdims=True) if self.config.debug: new_vis_pcd = o3d.geometry.PointCloud() new_vis_pcd.points = o3d.utility.Vector3dVector(st_pts_ary) new_vis_pcd.paint_uniform_color([0.0, 0.7, 0.7]) o3d.visualization.draw_geometries([new_vis_pcd, self.vis_pcd]) # NOTE: down sample points after initializing starting points if self.config.downsample_visual: if self.config.verbose: print("downsampling visual point cloud to voxel size", self.config.voxel_size) self.vis_pcd = self.vis_pcd.voxel_down_sample(voxel_size=self.config.voxel_size) diff_pts_ary = st_pts_ary-cam_pt step_ary = np.sign(diff_pts_ary).astype(dtype=int) pts_diff = st_pts_ary-cam_pt rel_dir_ary = np.array([pts_diff[:, 1]*pts_diff[:, 2], pts_diff[:, 0]*pts_diff[:, 2], pts_diff[:, 0]*pts_diff[:, 1]]).T st_idx_ary = np.floor((st_pts_ary - self.b_min)/self.config.voxel_size) st_idx_ary = st_idx_ary.astype(dtype=int) tDelta_ary = np.abs(self.config.voxel_size*rel_dir_ary) # NOTE: cb means closest bound cb_idx_ary = st_idx_ary + (step_ary > 0).astype(int) cb_pts_ary = self.b_min + cb_idx_ary*self.config.voxel_size tMax_ary = np.abs((cb_pts_ary - cam_pt)*rel_dir_ary) idx_ary_lst = [] curr_idx_ary = np.copy(st_idx_ary) # extrapolate observed points through the volume max_iters = int(np.ceil(self.config.depth_extrapolation/self.config.voxel_size)) for iter_step in range(max_iters): if self.config.verbose: print(f"iter step {iter_step} left pts:", curr_idx_ary.shape[0]) if curr_idx_ary.size == 0: #all exrrapolated points are out of bounds break min_idx_ary = tMax_ary.argmin(axis=1) arange_idx = np.arange(0, tMax_ary.shape[0]) tMax_ary[arange_idx, min_idx_ary] += tDelta_ary[arange_idx, min_idx_ary] curr_idx_ary[arange_idx, min_idx_ary] += step_ary[arange_idx, min_idx_ary] # set free space points out_bool = idx_out_bound(curr_idx_ary, self.vg_shape) # remove redundant points curr_idx_ary = curr_idx_ary[~out_bool, :] step_ary = step_ary[~out_bool, :] tMax_ary = tMax_ary[~out_bool, :] tDelta_ary = tDelta_ary[~out_bool, :] idx_ary_lst.append(np.copy(curr_idx_ary)) # merge points index merged_idx_ary = np.concatenate(idx_ary_lst, axis=0) merged_idx_ary = np.unique(merged_idx_ary, axis=0) vg_pts = self.b_min + self.config.voxel_size*merged_idx_ary sf_pts = np.array(self.vis_pcd.points) self.num_sf_pts = sf_pts.shape[0] merged_idx_tpl = tuple(merged_idx_ary.T) vg_rest_idx = self.num_sf_pts + np.arange(merged_idx_ary.shape[0]) self.vg2pc_idx_map[merged_idx_tpl] = vg_rest_idx if self.config.verbose: print("number of surface points:", self.num_sf_pts) print("number of volume points:", vg_rest_idx.shape[0]) self.rest_pts = np.append(sf_pts, vg_pts, axis=0) # Downsample VSF after creating from the volume if self.config.fps_num is not None: if len(self.config.features) > 0: raise ValueError("Can't downsample with features yet") self.downsample() #setup features for f in self.config.features: if f in ['color','fpfh','depth'] or f in features: self.set_vis_features(f,features.get(f,None)) else: self.set_features(f) if self.config.debug: bbox = o3d.geometry.AxisAlignedBoundingBox(self.b_min, self.b_max) bbox.color = [0, 0, 0] rest_pcd = o3d.geometry.PointCloud() rest_pcd.points = o3d.utility.Vector3dVector(self.rest_pts) o3d.visualization.draw_geometries([self.bbox, rest_pcd]) return self.get()
[docs] def set_vis_features(self, feat_type : str, values : np.ndarray = None): surf_pts = np.array(self.vis_pcd.points) if feat_type == 'color': self.set_features(feat_type,StandardVSFFeatures.extrapolate(self.rest_pts,surf_pts,np.array(self.vis_pcd.colors))) elif feat_type == 'fpfh': v_size = self.config.voxel_size self.vis_pcd.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=2*v_size, max_nn=30)) kd_params = o3d.geometry.KDTreeSearchParamHybrid(radius=5*v_size, max_nn=100) pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(self.vis_pcd, kd_params) print("fpfh feature shape:", pcd_fpfh.dimension()) fpfh = np.array(pcd_fpfh.data).T self.set_features(feat_type,StandardVSFFeatures.extrapolate(self.rest_pts,surf_pts,fpfh)) elif feat_type == 'depth': self.set_features(feat_type,StandardVSFFeatures.closest_distance(self.rest_pts,surf_pts)) else: assert values is not None,"Unknown surface feature type {}".format(feat_type) self.set_features(feat_type,StandardVSFFeatures.extrapolate(self.rest_pts,surf_pts,values))
[docs] def load_obj_all(self, bg_pcd_fn=None, vis_pcd_fn=None): """Loads from previously saved files""" if bg_pcd_fn is None: bg_pcd_fn = str(Path(self.config.out_dir) / 'bg_pcd.pcd') self.bg_pcd = o3d.io.read_point_cloud(bg_pcd_fn) if vis_pcd_fn is None: vis_pcd_fn = str(Path(self.config.out_dir) / 'vis_pcd.pcd') self.vis_pcd = o3d.io.read_point_cloud(vis_pcd_fn) self.load_all() # compute number of surface points self.num_sf_pts = np.array(self.vis_pcd.points).shape[0]
[docs] def save_obj_all(self, bg_pcd_fn=None, vis_pcd_fn=None): if self.bg_pcd is not None: if bg_pcd_fn is None: bg_pcd_fn = str(Path(self.config.out_dir) / 'bg_pcd.pcd') o3d.io.write_point_cloud(bg_pcd_fn, self.bg_pcd) self.save_vis_pcd(vis_pcd_fn) self.save_all()
[docs] def setup_obj_pts(self, bg_pcd_fn=None, cam_pt=None): if bg_pcd_fn is None: bg_pcd_fn = str(Path(self.config.out_dir) / 'bg_pcd.pcd') bg_pcd = o3d.io.read_point_cloud(bg_pcd_fn) self.process(bg_pcd)
[docs] def get_vg_pts(self) -> np.ndarray: """Returns points that belong to occupied voxel grid cells""" vg_idx = np.where(self.vg2pc_idx_map != -1) vg_idx = np.array(vg_idx).T return self.b_min + vg_idx*self.voxel_size
[docs] def get_sf_pts(self) -> np.ndarray: """Returns points that belong to the visible surface""" return self.rest_pts[:self.num_sf_pts]
[docs] def save_vis_pcd(self, vis_pcd_fn=None): if vis_pcd_fn is None: vis_pcd_fn = str(Path(self.config.out_dir) / 'vis_pcd.pcd') o3d.io.write_point_cloud(vis_pcd_fn, self.vis_pcd)
[docs] def load_vis_pcd(self, vis_pcd_fn=None): if vis_pcd_fn is None: vis_pcd_fn = str(Path(self.config.out_dir) / 'vis_pcd.pcd') self.vis_pcd = o3d.io.read_point_cloud(vis_pcd_fn)