Source code for vsf.estimator.quad_prog_optimizer

import numpy as np
import torch
from typing import List, Union, Optional
import cvxpy as cp
import time
from ..utils.data_utils import convert_to_tensor, convert_to_numpy
from .recursive_optimizer import ObservationLinearization, LinearRecursiveEstimator


def gaussian_nll(x: cp.Variable, mu: np.ndarray = None, lam: np.ndarray = None, constant=False) -> cp.Expression:
    """
    Computes the Gaussian negative log likelihood for a CVXPY variable.

    The Gaussian is defined by the mean `mu` and the precision matrix `lam`.
    We calculate the negative log-likelihood of the Gaussian prior,
    evaluated at the value of `x`.

    Args:
    - x (cp.Variable): The location to evaluate.
    - mu (np.ndarray): Mean of the Gaussian prior.
    - lam (np.ndarray): Precision matrix of the Gaussian prior.
    - constant (bool) whether to include the constant offset 
    """

    if constant:
        raise NotImplementedError("TODO: NLL constants?")
    if (mu is not None) and (lam is not None):
        if lam.ndim == 1:
            res_squares = cp.power(x - mu, 2)
            return 0.5*cp.multiply(res_squares, lam).sum()
        else:
            return 0.5*cp.quad_form(x - mu, cp.psd_wrap(lam))
    else:
        return 0.0
    

[docs] class QuadProgOptimizer(LinearRecursiveEstimator): """ Estimator that uses quadratic programming to solve for a linear estimation problem. This estimator assumes a set of linear observations are made on the state x, which are corrupted by Gaussian noise. The maximum likelihood estimate of x is obtained by solving a quadratic programming problem. The state x is composed of a heterogeneous factor y and latent factor q, both of which are optional: x = y + A*q in R^n where y in R^n is a heterogeneous term and q in R^k is a latent factor. If the latent factor is included, A is a known basis matrix. Diagonal priors should be defined over y and q, y ~ N(mu_y,diag(Sig_y)), q ~ N(mu_q,diag(Sig_q)) such that x has the prior N(mu_y + A*mu_q, diag(Sig_y) + A * diag(Sig_q) * A^T). To mark the heterogeneous factor as optional, set Sig_y = 0. To mark the latent factor as optional, set Sig_q = 0 or A = None. Observations are given by vectors z^i, observation matrices W^i, optional indices ind^i, biases z0^i, and covariances Sig_z^i such that z^i = W^i * x[ind^i] + z0^i + eps^i, eps^i ~ N(0,Sig_z^i). It is assumed that both y and q are non-negative. The optimization problem is formulated as a quadratic program with linear constraints. The objective is to maximize the likelihood of the state given the observations and prior. """ def __init__(self, max_dim:int, x_mu:Union[float,torch.Tensor]=0.0, x_var:Union[float,torch.Tensor]=0.0, latent_mu:Union[float,torch.Tensor]=0.0, latent_var:Union[float,torch.Tensor]=0.0, latent_basis:Optional[torch.Tensor]=None, max_buffer_len:int=None) -> None: """ Initialize the estimator with standard LinearRecursiveEstimator parameters.""" super().__init__(max_dim,x_mu,x_var,latent_mu,latent_var,latent_basis,max_buffer_len) self.include_heterogeneous = (isinstance(x_var,torch.Tensor) or x_var != 0.0) self.include_latent = self.A is not None and (isinstance(latent_var,torch.Tensor) or latent_var != 0.0) if self.include_heterogeneous: self.y_lam = 1.0 / self.y_var if self.include_latent: self.q_lam = 1.0 / self.q_lam self.y_variable: cp.Variable = None self.q_variable: cp.Variable = None self.x_variable: cp.Variable = None self.A_subset = None
[docs] def setup_opt_var(self, obs_idx: np.ndarray, verbose=False): """ Setup the CVXPY optimization variables. Args: - obs_idx: The indices of the stiffness values to be estimated. - verbose: Flag to print the information during the setup. """ K_dim = obs_idx.shape[0] if self.include_heterogeneous: self.y_variable = cp.Variable((K_dim,)) if self.include_latent: self.q_variable = cp.Variable((self.num_basis,)) self.A_subset = self.A[obs_idx, :] if self.include_heterogeneous and self.include_latent: self.x_variable = self.y_variable + self.A_subset @ self.q_variable elif self.include_heterogeneous: self.x_variable = self.y_variable else: self.x_variable = self.A_subset.dot(self.q_variable)
[docs] def solve(self, unified_x_indices: np.ndarray, obs_models:List[ObservationLinearization], measurements:List[torch.Tensor], reg_weight: float = 1.0, solver='OSQP', verbose=False): """ Solve a quadratic programming optimization problem to get optimal estimate of y_mu and q_mu. The estimation process involves: 1. Setting up CVXPY optimization variables. 2. Computing the observation loss as a cp.Expression. 3. Computing the regularization loss as cp.Expression. 4. Solving the optimization problem using the specified `solver`. Args: - unified_x_indices (np.ndarray): Indices of the x variable to be estimated. All indices in the observation models must be a subset of these - obs_models (list[ObservationLinearization]): List of observation models. - measurements (list[Tensor]): List of actual observations - reg_weight (float): Weight for the regularization term. Should be 1 if your priors are set correctly. - solver (str): Solver used for the optimization problem (default: 'OSQP'). - verbose (bool): If True, prints additional information during optimization. Updates: The estimated value of `y_mu`, `q_mu`. """ self.setup_opt_var(unified_x_indices, verbose=verbose) constraints = [] obs_loss = 0.0 for obs_model, meas in zip(obs_models, measurements): obs_mat = obs_model.matrix.cpu().numpy() if obs_model.state_indices is not None: pred = obs_mat @ self.x_variable[obs_model.state_indices.cpu().numpy()] else: pred = obs_mat @ self.x_variable[:] assert pred.ndim == 1 assert pred.shape[0] == len(meas) if obs_model.bias is not None: pred += obs_model.bias.cpu().numpy() assert pred.ndim == 1 assert pred.shape[0] == len(meas) assert len(meas) == len(obs_model.var) obs_loss += gaussian_nll(pred, meas.cpu().numpy(), obs_model.var.cpu().numpy()) reg_loss = 0 if self.include_heterogeneous: constraints.append(self.y_variable >= 0) assert self.y_variable.ndim == 1 assert self.y_variable.shape[0] == len(unified_x_indices) assert self.y_mu[unified_x_indices].ndim == 1 assert self.y_lam[unified_x_indices].ndim == 1 reg_loss += gaussian_nll(self.y_variable, self.y_mu[unified_x_indices], self.y_lam[unified_x_indices]) if self.include_latent: constraints.append(self.q_variable >= 0) reg_loss += gaussian_nll(self.q_variable, self.q_mu, self.q_lam) prob = cp.Problem( cp.Minimize( obs_loss + reg_weight * reg_loss), constraints) print("QuadProgOptimizer: start solving cvxpy problem...") start_time = time.time() prob.solve(verbose=verbose, solver=solver) solve_time = time.time() - start_time print(f"QuadProgOptimizer: cvxpy solve time = {solve_time}") # Update variables if self.include_heterogeneous: self.y_mu[unified_x_indices] = torch.from_numpy(self.y_variable.value).to(self.y_mu.device).to(self.y_mu.dtype) if self.include_latent: self.q_mu[:] = torch.from_numpy(self.q_variable.value).to(self.q_mu.device).to(self.q_mu.dtype)
[docs] def update_estimation(self): """Note: this estimator is non recursive""" import warnings warnings.warn("QuadProgOptimizer does not operate in recursive form")
[docs] def finalize_estimation(self): """Finalize the estimation after all observations are added. Converts all observation indices into a subset of indices. Then calls solve() """ all_observed_indices = self.get_observed_indices() print("Observed indices:",len(all_observed_indices)) #reshape all observation indices observed_to_idx = torch.full((len(self.y_mu),),-1,dtype=int,device=all_observed_indices.device) observed_to_idx[all_observed_indices] = torch.arange(0,len(all_observed_indices),dtype=int,device=all_observed_indices.device) measurements = [] new_obs_models = [] for (obs_model,meas) in self.obs_buffer: new_indices = all_observed_indices if obs_model.state_indices is None else observed_to_idx[obs_model.state_indices] new_obs_models.append(ObservationLinearization(matrix=obs_model.matrix,var=obs_model.var,bias=obs_model.bias, state_indices=new_indices)) measurements.append(meas) self.solve(all_observed_indices, new_obs_models, measurements)