Source code for vsf.sensor.base_calibrator

from .base_sensor import BaseSensor
import numpy as np
from typing import List, Dict
from .base_sensor import SimState

[docs] class BaseCalibrator: """A base class for sensor calibration. For example, a sensor can be tared or calibrated to a specific range using this class. """
[docs] def compatible(self, sensor : BaseSensor) -> bool: """Returns true if the calibrator is compatible with the sensor.""" raise NotImplementedError()
[docs] def calibrate(self, sensor : BaseSensor, sim, command_sequence : List[Dict[str,np.ndarray]], observation_sequence : List[Dict[str,np.ndarray]]) -> int: """Calibrates the sensor based on the data sequence. The implementer will use the `sensor.set_calibration` method, with the appropriate calibration data structure, to set the calibration. Note that a calibrator could theoretically run the simulation on the entire sequence. However, this is not recommended as it would be using results from the future to calibrate the sensor. It is recommended that only a small fraction of the sequence be used for calibration. Args: sensor (BaseSensor): The sensor to calibrate. sim (QuasistaticVSFSimulator): The simulation object. command_sequence (List[Dict[str,np.ndarray]]): The command sequence. observation_sequence (List[Dict[str,np.ndarray]]): The observation sequence. Returns: int: The number of time steps used for calibration. Ideally these should be skipped for estimation and prediction. """ raise NotImplementedError()
[docs] class TareCalibrator(BaseCalibrator): """A simple calibrator that tares the sensor based on the average of some initial set of observations. NOTE: the average is not average observation, but the average of the observation residual after subtracting simulation with empty VSF model. The calibration has the form {output_key: average_observation} """ def __init__(self, num_samples : int = 10, output_key : str = 'tare'): self.num_samples = num_samples self.output_key = output_key
[docs] def compatible(self, sensor): return True
[docs] def calibrate(self, sensor, sim, command_sequence, observation_sequence) -> int: nsamples = min(self.num_samples, len(observation_sequence)) if nsamples == 0: return average = np.zeros(observation_sequence[0][sensor.name].shape) tare_sim_avg = np.zeros(observation_sequence[0][sensor.name].shape) # Dynamically set the tare based on whether the command is moving and whether contacts are present for i in range(nsamples): sim.step(command_sequence[i], dt=0.1) sim_state : SimState = sim.state() # Contact happened, stop the calibration if len(sim_state.contacts) > 0: if i == 0: # Make sure at least one sample is used for calibration import warnings warnings.warn('Contact happens at the first sample, please check the command sequence') else: nsamples = i break # Here measurements are not caused by the contact average += observation_sequence[i][sensor.name] tare_sim = sim.measurements()[sensor.name].detach().cpu().numpy() tare_sim_avg += tare_sim average /= nsamples tare_sim_avg /= nsamples sensor.set_calibration({self.output_key:average, 'tare_sim_avg':tare_sim_avg}) return nsamples