vsf.estimator module
- class vsf.estimator.LinearRecursiveEstimator(max_dim: int, x_mu: float | Tensor = 0.0, x_var: float | Tensor = 0.0, latent_mu: float | Tensor = 0.0, latent_var: float | Tensor = 0.0, latent_basis: Tensor | None = None, max_buffer_len: int | None = None)[source]
Bases:
objectBase estimator for a high-dimensional stationary state with observations as linear-Gaussian functions of state.
The state x is composed of a heterogeneous factor y and latent factor q, both of which are optional:
\[x = y + A*q \in \mathbb{R}^n\]where y in \(\mathbb{R}^n\) is a heterogeneous term and q in \(\mathbb{R}^k\) is a latent factor. If the latent factor is included, A is a known basis matrix. Diagonal priors should be defined over y and q,
\[y \sim N(\mu_y,\text{diag}(\Sigma_y)), q \sim N(\mu_q,\text{diag}(\Sigma_q))\]such that x has the prior N(mu_y + A*mu_q, diag(Sig_y) + A * diag(Sig_q) * A^T). To mark the heterogeneous factor as optional, set Sig_y = 0. To mark the latent factor as optional, set Sig_q = 0 or A = None.
Observations are given by vectors z^i, observation matrices W^i, optional indices ind^i, biases z0^i, and covariances \(Sigma_z^i\) such that
\[z^i = W^i * x[ind^i] + z0^i + \epsilon^i, \epsilon^i \sim N(0,\Sigma_z^i).\]This class provides shared methods for different instantiations of the estimator. It gives a replay buffer and
Unified interface:
add_observation(obs_model, measurement) Adds an observation model and measurement to the buffer.
update_estimation() Reads the most recent measurements. This function is implementation-dependent.
finalize_estimate() Updates y_mu, y_var, q_mu, q_var so the x mean/variance can be extracted. This function is implementation-dependent.
NOTE: for numerical stability purposes, all torch tensors in this class are double precision.
- obs_buffer
the observation buffer, stores (observation model, measurement) pairs
- max_buffer_len
the max length of the observation buffer, or None for unlimited buffer.
- y_mu, y_var
the mean and variance of the heterogeneous factor y
- q_mu, q_var
the mean and variance of the latent factor q
- A
the basis matrix for the latent factor q
- add_observation(model: ObservationLinearization, measurement: Tensor)[source]
Add an observation to the buffer.
- get_observed_indices() LongTensor[source]
Returns all the indices observed across all observations in the buffer
- get_unobserved_indices() LongTensor[source]
Complement of get_observed_indices.
NOTE: converts to CPU
- class vsf.estimator.NeuralVSFEstimator(config: NeuralVSFEstimatorConfig)[source]
Bases:
BaseVSFMaterialEstimatorNeural VSF stiffness estimator. Works in online or batch mode.
- batch_estimate(sim: QuasistaticVSFSimulator, vsf: NeuralVSF, dataset: BaseDataset, dataset_metadata: DatasetConfig, calibrators: Dict[str, BaseCalibrator] | None = None, dt=0.1, verbose: bool = False) Tensor[source]
Solver that optimizes the NeuralVSF model to match the observations from a dataset.
Note: for best results, call vsf.to(‘cuda’) to train on GPU before calling this.
TODO: the batch_estimate function currently evaluates sequences sequentially. To do a better job of shuffling training, we would need to store the internal state of the neural VSF simulator for each frame. This is not currently supported.
- online_finalize()[source]
If the online estimator doesn’t update the VSF in-place, this operation should be called to update the VSF’s material parameters.
- online_init(sim: QuasistaticVSFSimulator, vsf: NeuralVSF)[source]
Note: for best results, call vsf.to(‘cuda’) to train on GPU before calling this.
- online_reset(sim: QuasistaticVSFSimulator)[source]
Finishes a trial and resets the online estimation version of this estimator to a new trial. The sim is assumed to have been reset. (The material parameters of the VSF are kept the same.)
- online_update(sim: QuasistaticVSFSimulator, dt, observations: Dict[str, ndarray]) float[source]
Given a simulator advanced by dt to a new time step and the true observations received at that time step, perform an update of the material estimation.
The discrepancy between sim.measurements() and observation will be used to update the material parameters.
- class vsf.estimator.PointVSFEstimator(config: PointVSFEstimatorConfig, prior: BaseVSFPriorFactory, meta_prior: BaseVSFStructuredPriorFactory | None = None)[source]
Bases:
BaseVSFMaterialEstimatorA class that estimates a PointVSF stiffness field.
It can accept a prior factory and a meta-prior to help guide the estimation.
Very important!!! This class has VSF object and simulators, but they ARE NOT attached to the estimator, it is only a pointer to the recent VSF. When you would to estimate a new VSF, you need to call the online_init function to refresh the pointer to the VSF. Batch estimation will refresh the VSF pointer automatically when using the batch_estimate function.
- config
Configuration for the estimator.
- Type:
PointVSFEstimatorConfig
- prior_factory
Prior factory for the estimator.
- Type:
BaseVSFPriorFactory
- struct_prior_factory
Structured prior factory for the estimator.
- Type:
BaseVSFStructuredPriorFactory
- vsf_sim
Temporary reference to the simulator object used for estimation.
- Type:
- estimator
The optimizer used for estimation.
- batch_estimate(sim, vsf: PointVSF, dataset: BaseDataset, dataset_metadata: DatasetConfig, calibrators: Dict[str, BaseCalibrator] | None = None, dt=0.1, sim_out_dir: str | None = None)[source]
Solves the VSF stiffness using from raw control and observation data. The solving procedure has the following steps: 1. Simulate the sensor state using the simulator, caching the simulated states; this dataset can be optionally saved on the disk to avoid re-simulation. 2. Solve VSF stiffness given the simulated sensor states.
- batch_estimate_sim_dataset(sim: QuasistaticVSFSimulator, vsf: PointVSF, sim_dataset: SimStateCache, verbose=False)[source]
A utility function that solves VSF stiffness given simulated sensor states. This function is not expected to be called directly by the user, which should be used by the solve function only
Args: - sim_dataset: A cache of the simulation state and sensor states.
- linearize_dataset(sim_dataset: SimStateCache, flatten=True, verbose=False) List[Tuple[ObservationLinearization, ndarray]][source]
Reduces multiple sequences of point VSF simulation data to linear observation data.
- Parameters:
sim_dataset (SimStateCache) – The cache containing multiple sequences of simulation states.
flatten (bool, optional) – If True, all sequences are concatenated into a single sequence of (observation_model, obs) tuples, and all None models are ignored. Defaults to True.
verbose (bool, optional) – If True, prints additional debugging information. Defaults to False.
- Returns:
A list of sequences. Each frame in a sequence is a dictionary mapping sensor names to:
(None, obs): If the observation model is ignored.
(linear_observation_model, obs): If the observation model is included.
If flatten=True, all sequences are concatenated into a single sequence of (observation_model, obs) tuples, and all None models are ignored.
- Return type:
list
- linearize_observation(sim_state: SimState, sensor_name: str) ObservationLinearization | None[source]
For the given sensor, linearize the observation and return the observation matrix and the observation index.
- Returns:
the linearized model as a function of the object stiffness state variable.
- Return type:
ObservationLinearization
- online_finalize(vsf: PointVSF)[source]
Finalizes the estimation. Run post-processing steps to update the vsf’s stiffness value, ‘K_std’ and ‘N_obs’ features.
- online_init(sim: QuasistaticVSFSimulator, vsf: PointVSF)[source]
Initializes online estimation version of this estimator.
Any internal state and material priors should be instantiated here.
- Parameters:
estimation. (sim is the simulator that will be used for online)
estimated. (vsf is the VSF that will be)
- online_reset(sim: QuasistaticVSFSimulator)[source]
Clear the estimator state to prepare for next online estimation.
Different from online_init that fully reset the estimator, this function only clears the observation buffer.
- online_update(sim: ~vsf.sim.quasistatic_sim.QuasistaticVSFSimulator, dt: float, measurements: ~typing.Dict[str, ~numpy.ndarray], perfer: ~vsf.utils.perf.PerfRecorder = <vsf.utils.perf.DummyRecorder object>, verbose=False)[source]
Update point VSF stiffness given updated simulator state and sensor measurements.
NOTE: this function assumes controls are externally applied to sim by the user.
- class vsf.estimator.QuadProgOptimizer(max_dim: int, x_mu: float | Tensor = 0.0, x_var: float | Tensor = 0.0, latent_mu: float | Tensor = 0.0, latent_var: float | Tensor = 0.0, latent_basis: Tensor | None = None, max_buffer_len: int | None = None)[source]
Bases:
LinearRecursiveEstimatorEstimator that uses quadratic programming to solve for a linear estimation problem.
This estimator assumes a set of linear observations are made on the state x, which are corrupted by Gaussian noise. The maximum likelihood estimate of x is obtained by solving a quadratic programming problem.
The state x is composed of a heterogeneous factor y and latent factor q, both of which are optional:
x = y + A*q in R^n
where y in R^n is a heterogeneous term and q in R^k is a latent factor. If the latent factor is included, A is a known basis matrix. Diagonal priors should be defined over y and q,
y ~ N(mu_y,diag(Sig_y)), q ~ N(mu_q,diag(Sig_q))
such that x has the prior N(mu_y + A*mu_q, diag(Sig_y) + A * diag(Sig_q) * A^T). To mark the heterogeneous factor as optional, set Sig_y = 0. To mark the latent factor as optional, set Sig_q = 0 or A = None.
Observations are given by vectors z^i, observation matrices W^i, optional indices ind^i, biases z0^i, and covariances Sig_z^i such that
z^i = W^i * x[ind^i] + z0^i + eps^i, eps^i ~ N(0,Sig_z^i).
It is assumed that both y and q are non-negative. The optimization problem is formulated as a quadratic program with linear constraints. The objective is to maximize the likelihood of the state given the observations and prior.
- finalize_estimation()[source]
Finalize the estimation after all observations are added.
Converts all observation indices into a subset of indices. Then calls solve()
- setup_opt_var(obs_idx: ndarray, verbose=False)[source]
Setup the CVXPY optimization variables.
Args: - obs_idx: The indices of the stiffness values to be estimated. - verbose: Flag to print the information during the setup.
- solve(unified_x_indices: ndarray, obs_models: List[ObservationLinearization], measurements: List[Tensor], reg_weight: float = 1.0, solver='OSQP', verbose=False)[source]
Solve a quadratic programming optimization problem to get optimal estimate of y_mu and q_mu.
The estimation process involves: 1. Setting up CVXPY optimization variables. 2. Computing the observation loss as a cp.Expression. 3. Computing the regularization loss as cp.Expression. 4. Solving the optimization problem using the specified solver.
Args: - unified_x_indices (np.ndarray): Indices of the x variable to be estimated. All indices in the observation models must be a subset of these - obs_models (list[ObservationLinearization]): List of observation models. - measurements (list[Tensor]): List of actual observations - reg_weight (float): Weight for the regularization term. Should be 1 if your priors are set correctly. - solver (str): Solver used for the optimization problem (default: ‘OSQP’). - verbose (bool): If True, prints additional information during optimization.
- Updates:
The estimated value of y_mu, q_mu.