vsf.estimator module

class vsf.estimator.LinearRecursiveEstimator(max_dim: int, x_mu: float | Tensor = 0.0, x_var: float | Tensor = 0.0, latent_mu: float | Tensor = 0.0, latent_var: float | Tensor = 0.0, latent_basis: Tensor | None = None, max_buffer_len: int | None = None)[source]

Bases: object

Base estimator for a high-dimensional stationary state with observations as linear-Gaussian functions of state.

The state x is composed of a heterogeneous factor y and latent factor q, both of which are optional:

\[x = y + A*q \in \mathbb{R}^n\]

where y in \(\mathbb{R}^n\) is a heterogeneous term and q in \(\mathbb{R}^k\) is a latent factor. If the latent factor is included, A is a known basis matrix. Diagonal priors should be defined over y and q,

\[y \sim N(\mu_y,\text{diag}(\Sigma_y)), q \sim N(\mu_q,\text{diag}(\Sigma_q))\]

such that x has the prior N(mu_y + A*mu_q, diag(Sig_y) + A * diag(Sig_q) * A^T). To mark the heterogeneous factor as optional, set Sig_y = 0. To mark the latent factor as optional, set Sig_q = 0 or A = None.

Observations are given by vectors z^i, observation matrices W^i, optional indices ind^i, biases z0^i, and covariances \(Sigma_z^i\) such that

\[z^i = W^i * x[ind^i] + z0^i + \epsilon^i, \epsilon^i \sim N(0,\Sigma_z^i).\]

This class provides shared methods for different instantiations of the estimator. It gives a replay buffer and

Unified interface:

  1. add_observation(obs_model, measurement) Adds an observation model and measurement to the buffer.

  2. update_estimation() Reads the most recent measurements. This function is implementation-dependent.

  3. finalize_estimate() Updates y_mu, y_var, q_mu, q_var so the x mean/variance can be extracted. This function is implementation-dependent.

NOTE: for numerical stability purposes, all torch tensors in this class are double precision.

obs_buffer

the observation buffer, stores (observation model, measurement) pairs

max_buffer_len

the max length of the observation buffer, or None for unlimited buffer.

y_mu, y_var

the mean and variance of the heterogeneous factor y

q_mu, q_var

the mean and variance of the latent factor q

A

the basis matrix for the latent factor q

add_observation(model: ObservationLinearization, measurement: Tensor)[source]

Add an observation to the buffer.

clear_obs_buffer()[source]

Clear the observation buffer, set self.obs_buffer to an empty list.

finalize_estimation()[source]

Finalize the estimation after all observations are added

get_covar(idx=None) Tensor[source]

Returns the full covariance matrix of x.

get_mean(idx=None) Tensor[source]

Returns the estimated x mean.

get_observed_indices() LongTensor[source]

Returns all the indices observed across all observations in the buffer

get_unobserved_indices() LongTensor[source]

Complement of get_observed_indices.

NOTE: converts to CPU

get_var(idx=None) Tensor[source]

Returns the diagonal variance of x.

load_state_dict(state_dict: dict)[source]

Subclasses may override this to load additional state.

num_obs()[source]

Returns the number of observations in the buffer.

predict_observation(obs: ObservationLinearization) Tensor[source]

Predicts the mean observation at the current state.

state_dict()[source]

Subclasses may override this to save additional state.

to(device)[source]
update_estimation()[source]

Update the estimation based on the current observation buffer

class vsf.estimator.NeuralVSFEstimator(config: NeuralVSFEstimatorConfig)[source]

Bases: BaseVSFMaterialEstimator

Neural VSF stiffness estimator. Works in online or batch mode.

batch_estimate(sim: QuasistaticVSFSimulator, vsf: NeuralVSF, dataset: BaseDataset, dataset_metadata: DatasetConfig, calibrators: Dict[str, BaseCalibrator] | None = None, dt=0.1, verbose: bool = False) Tensor[source]

Solver that optimizes the NeuralVSF model to match the observations from a dataset.

Note: for best results, call vsf.to(‘cuda’) to train on GPU before calling this.

TODO: the batch_estimate function currently evaluates sequences sequentially. To do a better job of shuffling training, we would need to store the internal state of the neural VSF simulator for each frame. This is not currently supported.

online_finalize()[source]

If the online estimator doesn’t update the VSF in-place, this operation should be called to update the VSF’s material parameters.

online_init(sim: QuasistaticVSFSimulator, vsf: NeuralVSF)[source]

Note: for best results, call vsf.to(‘cuda’) to train on GPU before calling this.

online_reset(sim: QuasistaticVSFSimulator)[source]

Finishes a trial and resets the online estimation version of this estimator to a new trial. The sim is assumed to have been reset. (The material parameters of the VSF are kept the same.)

online_update(sim: QuasistaticVSFSimulator, dt, observations: Dict[str, ndarray]) float[source]

Given a simulator advanced by dt to a new time step and the true observations received at that time step, perform an update of the material estimation.

The discrepancy between sim.measurements() and observation will be used to update the material parameters.

class vsf.estimator.PointVSFEstimator(config: PointVSFEstimatorConfig, prior: BaseVSFPriorFactory, meta_prior: BaseVSFStructuredPriorFactory | None = None)[source]

Bases: BaseVSFMaterialEstimator

A class that estimates a PointVSF stiffness field.

It can accept a prior factory and a meta-prior to help guide the estimation.

Very important!!! This class has VSF object and simulators, but they ARE NOT attached to the estimator, it is only a pointer to the recent VSF. When you would to estimate a new VSF, you need to call the online_init function to refresh the pointer to the VSF. Batch estimation will refresh the VSF pointer automatically when using the batch_estimate function.

config

Configuration for the estimator.

Type:

PointVSFEstimatorConfig

prior_factory

Prior factory for the estimator.

Type:

BaseVSFPriorFactory

struct_prior_factory

Structured prior factory for the estimator.

Type:

BaseVSFStructuredPriorFactory

vsf

Temporary reference to the VSF object being estimated.

Type:

PointVSF

vsf_sim

Temporary reference to the simulator object used for estimation.

Type:

QuasistaticVSFSimulator

estimator

The optimizer used for estimation.

batch_estimate(sim, vsf: PointVSF, dataset: BaseDataset, dataset_metadata: DatasetConfig, calibrators: Dict[str, BaseCalibrator] | None = None, dt=0.1, sim_out_dir: str | None = None)[source]

Solves the VSF stiffness using from raw control and observation data. The solving procedure has the following steps: 1. Simulate the sensor state using the simulator, caching the simulated states; this dataset can be optionally saved on the disk to avoid re-simulation. 2. Solve VSF stiffness given the simulated sensor states.

batch_estimate_sim_dataset(sim: QuasistaticVSFSimulator, vsf: PointVSF, sim_dataset: SimStateCache, verbose=False)[source]

A utility function that solves VSF stiffness given simulated sensor states. This function is not expected to be called directly by the user, which should be used by the solve function only

Args: - sim_dataset: A cache of the simulation state and sensor states.

linearize_dataset(sim_dataset: SimStateCache, flatten=True, verbose=False) List[Tuple[ObservationLinearization, ndarray]][source]

Reduces multiple sequences of point VSF simulation data to linear observation data.

Parameters:
  • sim_dataset (SimStateCache) – The cache containing multiple sequences of simulation states.

  • flatten (bool, optional) – If True, all sequences are concatenated into a single sequence of (observation_model, obs) tuples, and all None models are ignored. Defaults to True.

  • verbose (bool, optional) – If True, prints additional debugging information. Defaults to False.

Returns:

A list of sequences. Each frame in a sequence is a dictionary mapping sensor names to:

  • (None, obs): If the observation model is ignored.

  • (linear_observation_model, obs): If the observation model is included.

If flatten=True, all sequences are concatenated into a single sequence of (observation_model, obs) tuples, and all None models are ignored.

Return type:

list

linearize_observation(sim_state: SimState, sensor_name: str) ObservationLinearization | None[source]

For the given sensor, linearize the observation and return the observation matrix and the observation index.

Returns:

the linearized model as a function of the object stiffness state variable.

Return type:

ObservationLinearization

online_finalize(vsf: PointVSF)[source]

Finalizes the estimation. Run post-processing steps to update the vsf’s stiffness value, ‘K_std’ and ‘N_obs’ features.

online_init(sim: QuasistaticVSFSimulator, vsf: PointVSF)[source]

Initializes online estimation version of this estimator.

Any internal state and material priors should be instantiated here.

Parameters:
  • estimation. (sim is the simulator that will be used for online)

  • estimated. (vsf is the VSF that will be)

online_reset(sim: QuasistaticVSFSimulator)[source]

Clear the estimator state to prepare for next online estimation.

Different from online_init that fully reset the estimator, this function only clears the observation buffer.

online_update(sim: ~vsf.sim.quasistatic_sim.QuasistaticVSFSimulator, dt: float, measurements: ~typing.Dict[str, ~numpy.ndarray], perfer: ~vsf.utils.perf.PerfRecorder = <vsf.utils.perf.DummyRecorder object>, verbose=False)[source]

Update point VSF stiffness given updated simulator state and sensor measurements.

NOTE: this function assumes controls are externally applied to sim by the user.

prior_predict(vsf: PointVSF) DiagGaussianDistribution[source]

Predicts the prior stiffness distribution for the given VSF.

setup_optimizer(vsf: PointVSF, initialize_vsf_features=True, verbose=True)[source]

Setup the recursive/batched optimizer for the given VSF.

class vsf.estimator.QuadProgOptimizer(max_dim: int, x_mu: float | Tensor = 0.0, x_var: float | Tensor = 0.0, latent_mu: float | Tensor = 0.0, latent_var: float | Tensor = 0.0, latent_basis: Tensor | None = None, max_buffer_len: int | None = None)[source]

Bases: LinearRecursiveEstimator

Estimator that uses quadratic programming to solve for a linear estimation problem.

This estimator assumes a set of linear observations are made on the state x, which are corrupted by Gaussian noise. The maximum likelihood estimate of x is obtained by solving a quadratic programming problem.

The state x is composed of a heterogeneous factor y and latent factor q, both of which are optional:

x = y + A*q in R^n

where y in R^n is a heterogeneous term and q in R^k is a latent factor. If the latent factor is included, A is a known basis matrix. Diagonal priors should be defined over y and q,

y ~ N(mu_y,diag(Sig_y)), q ~ N(mu_q,diag(Sig_q))

such that x has the prior N(mu_y + A*mu_q, diag(Sig_y) + A * diag(Sig_q) * A^T). To mark the heterogeneous factor as optional, set Sig_y = 0. To mark the latent factor as optional, set Sig_q = 0 or A = None.

Observations are given by vectors z^i, observation matrices W^i, optional indices ind^i, biases z0^i, and covariances Sig_z^i such that

z^i = W^i * x[ind^i] + z0^i + eps^i, eps^i ~ N(0,Sig_z^i).

It is assumed that both y and q are non-negative. The optimization problem is formulated as a quadratic program with linear constraints. The objective is to maximize the likelihood of the state given the observations and prior.

finalize_estimation()[source]

Finalize the estimation after all observations are added.

Converts all observation indices into a subset of indices. Then calls solve()

setup_opt_var(obs_idx: ndarray, verbose=False)[source]

Setup the CVXPY optimization variables.

Args: - obs_idx: The indices of the stiffness values to be estimated. - verbose: Flag to print the information during the setup.

solve(unified_x_indices: ndarray, obs_models: List[ObservationLinearization], measurements: List[Tensor], reg_weight: float = 1.0, solver='OSQP', verbose=False)[source]

Solve a quadratic programming optimization problem to get optimal estimate of y_mu and q_mu.

The estimation process involves: 1. Setting up CVXPY optimization variables. 2. Computing the observation loss as a cp.Expression. 3. Computing the regularization loss as cp.Expression. 4. Solving the optimization problem using the specified solver.

Args: - unified_x_indices (np.ndarray): Indices of the x variable to be estimated. All indices in the observation models must be a subset of these - obs_models (list[ObservationLinearization]): List of observation models. - measurements (list[Tensor]): List of actual observations - reg_weight (float): Weight for the regularization term. Should be 1 if your priors are set correctly. - solver (str): Solver used for the optimization problem (default: ‘OSQP’). - verbose (bool): If True, prints additional information during optimization.

Updates:

The estimated value of y_mu, q_mu.

update_estimation()[source]

Note: this estimator is non recursive