Source code for pybaselines.two_d.optimizers

# -*- coding: utf-8 -*-
"""High level functions for making better use of baseline algorithms.

Functions in this module make use of other baseline algorithms in
pybaselines to provide better results or optimize parameters.

Created on January 14, 2024
@author: Donald Erb

"""

from collections import defaultdict
from functools import partial
import itertools
from math import ceil

import numpy as np

from . import morphological, polynomial, spline, whittaker
from .._validation import _check_optional_array, _get_row_col_values
from ..api import Baseline
from ..utils import _check_scalar, _sort_array2d
from ._algorithm_setup import _Algorithm2D


class _Optimizers(_Algorithm2D):
    """A base class for all optimizer algorithms."""

[docs] @_Algorithm2D._register(ensure_2d=False, skip_sorting=True) def collab_pls(self, data, average_dataset=True, method='asls', method_kwargs=None): """ Collaborative Penalized Least Squares (collab-PLS). Averages the data or the fit weights for an entire dataset to get more optimal results. Uses any Whittaker-smoothing-based or weighted spline algorithm. Parameters ---------- data : array-like, shape (L, M, N) An array with shape (L, M, N) where L is the number of entries in the dataset and (M, N) is the shape of each data entry. average_dataset : bool, optional If True (default) will average the dataset before fitting to get the weighting. If False, will fit each individual entry in the dataset and then average the weights to get the weighting for the dataset. method : str, optional A string indicating the Whittaker-smoothing-based or weighted spline method to use for fitting the baseline. Default is 'asls'. method_kwargs : dict, optional A dictionary of keyword arguments to pass to the selected `method` function. Default is None, which will use an empty dictionary. Returns ------- baselines : np.ndarray, shape (L, M, N) An array of all of the baselines. params : dict A dictionary with the following items: * 'average_weights': numpy.ndarray, shape (M, N) The weight array used to fit all of the baselines. * 'average_alpha': numpy.ndarray, shape (M, N) Only returned if `method` is 'aspls'. The `alpha` array used to fit all of the baselines for the :meth:`~.Baseline2D.aspls`. * 'method_params': dict[str, list] A dictionary containing the output parameters for each individual fit. Keys will depend on the selected method and will have a list of values, with each item corresponding to a fit. Notes ----- If `method` is 'aspls', `collab_pls` will also calculate the `alpha` array for the entire dataset in the same manner as the weights. References ---------- Chen, L., et al. Collaborative Penalized Least Squares for Background Correction of Multiple Raman Spectra. Journal of Analytical Methods in Chemistry, 2018, 2018. """ dataset, baseline_func, _, method_kws, _ = self._setup_optimizer( data, method, (whittaker, morphological, spline), method_kwargs, True ) data_shape = dataset.shape if len(data_shape) != 3: raise ValueError(( 'the input data must have a shape of (number of measurements, number of x points,' f' number of y points), but instead has a shape of {data_shape}' )) method = method.lower() # if using aspls or pspline_aspls, also need to calculate the alpha array # for the entire dataset calc_alpha = method in ('aspls', 'pspline_aspls') # step 1: calculate weights for the entire dataset if average_dataset: _, fit_params = baseline_func(np.mean(dataset, axis=0), **method_kws) method_kws['weights'] = fit_params['weights'] if calc_alpha: method_kws['alpha'] = fit_params['alpha'] else: weights = np.empty(data_shape) if calc_alpha: alpha = np.empty(data_shape) for i, entry in enumerate(dataset): _, fit_params = baseline_func(entry, **method_kws) weights[i] = fit_params['weights'] if calc_alpha: alpha[i] = fit_params['alpha'] method_kws['weights'] = np.mean(weights, axis=0) if calc_alpha: method_kws['alpha'] = np.mean(alpha, axis=0) # step 2: use the dataset weights from step 1 (stored in method_kws['weights']) # to fit each individual data entry; set tol to infinity so that only one # iteration is done and new weights are not calculated method_kws['tol'] = np.inf if method in ('brpls', 'pspline_brpls'): method_kws['tol_2'] = np.inf baselines = np.empty(data_shape) params = {'average_weights': method_kws['weights'], 'method_params': defaultdict(list)} if calc_alpha: params['average_alpha'] = method_kws['alpha'] if method == 'fabc': # set weights as mask so it just fits the data method_kws['weights_as_mask'] = True for i, entry in enumerate(dataset): baselines[i], param = baseline_func(entry, **method_kws) for key, value in param.items(): params['method_params'][key].append(value) return baselines, params
[docs] @_Algorithm2D._register(skip_sorting=True) def adaptive_minmax(self, data, poly_order=None, method='modpoly', weights=None, constrained_fraction=0.01, constrained_weight=1e5, estimation_poly_order=2, method_kwargs=None): """ Fits polynomials of different orders and uses the maximum values as the baseline. Each polynomial order fit is done both unconstrained and constrained at the endpoints. Parameters ---------- data : array-like, shape (M, N) The y-values of the measured data. poly_order : int or Sequence[int, int] or None, optional The two polynomial orders to use for fitting. If a single integer is given, then will use the input value and one plus the input value. Default is None, which will do a preliminary fit using a polynomial of order `estimation_poly_order` and then select the appropriate polynomial orders according to [1]_. method : {'modpoly', 'imodpoly'}, optional The method to use for fitting each polynomial. Default is 'modpoly'. weights : array-like, shape (M, N), optional The weighting array. If None (default), then will be an array with shape equal to (M, N) and all values set to 1. constrained_fraction : float or Sequence[float, float], optional The fraction of points at the left and right edges to use for the constrained fit. Default is 0.01. If `constrained_fraction` is a sequence, the first item is the fraction for the left edge and the second is the fraction for the right edge. constrained_weight : float or Sequence[float, float], optional The weighting to give to the endpoints. Higher values ensure that the end points are fit, but can cause large fluctuations in the other sections of the polynomial. Default is 1e5. If `constrained_weight` is a sequence, the first item is the weight for the left edge and the second is the weight for the right edge. estimation_poly_order : int, optional The polynomial order used for estimating the baseline-to-signal ratio to select the appropriate polynomial orders if `poly_order` is None. Default is 2. method_kwargs : dict, optional Additional keyword arguments to pass to :meth:`~.Baseline2D.modpoly` or :meth:`~.Baseline2D.imodpoly`. These include `tol`, `max_iter`, `use_original`, `mask_initial_peaks`, and `num_std`. Returns ------- numpy.ndarray, shape (M, N) The calculated baseline. params : dict A dictionary with the following items: * 'weights': numpy.ndarray, shape (M, N) The weight array used for fitting the data. * 'constrained_weights': numpy.ndarray, shape (M, N) The weight array used for the endpoint-constrained fits. * 'poly_order': numpy.ndarray, shape (2,) An array of the two polynomial orders used for the fitting. * 'method_params': dict[str, list] A dictionary containing the output parameters for each individual fit. Keys will depend on the selected method and will have a list of values, with each item corresponding to a fit. References ---------- .. [1] Cao, A., et al. A robust method for automated background subtraction of tissue fluorescence. Journal of Raman Spectroscopy, 2007, 38, 1199-1205. """ y, baseline_func, _, method_kws, _ = self._setup_optimizer( data, method, [polynomial], method_kwargs, False ) sort_weights = weights is not None weight_array = _check_optional_array( self._shape, weights, check_finite=self._check_finite, ensure_1d=False, axis=slice(None) ) if poly_order is None: poly_orders = _determine_polyorders( y, estimation_poly_order, weight_array, baseline_func, **method_kws ) else: poly_orders, scalar_poly_order = _check_scalar(poly_order, 2, True, dtype=int) if scalar_poly_order: poly_orders[1] += 1 # add 1 since they are initially equal if scalar input # use high weighting rather than Lagrange multipliers to constrain the points # to better work with noisy data weightings = _get_row_col_values(constrained_weight) constrained_fractions = _get_row_col_values(constrained_fraction) if np.any(constrained_fractions < 0) or np.any(constrained_fractions > 1): raise ValueError('constrained_fraction must be between 0 and 1') # have to temporarily sort weights to match x- and y-ordering so that left and right edges # are correct if sort_weights: weight_array = _sort_array2d(weight_array, self._sort_order) constrained_weights = weight_array.copy() constrained_weights[:ceil(self._shape[0] * constrained_fractions[0])] = weightings[0] constrained_weights[:, :ceil(self._shape[1] * constrained_fractions[2])] = weightings[2] constrained_weights[ self._shape[0] - ceil(self._shape[0] * constrained_fractions[1]): ] = weightings[1] constrained_weights[ :, self._shape[1] - ceil(self._shape[1] * constrained_fractions[3]): ] = weightings[3] # and now change back to original ordering if sort_weights: weight_array = _sort_array2d(weight_array, self._inverted_order) constrained_weights = _sort_array2d(constrained_weights, self._inverted_order) params = { 'weights': weight_array, 'constrained_weights': constrained_weights, 'poly_order': poly_orders, 'method_params': defaultdict(list) } # order of inputs is (poly_orders[0], weight_array), (poly_orders[0], constrained_weights), # (poly_orders[1], weight_array), (poly_orders[1], constrained_weights) baselines = np.empty((4, *self._shape)) for i, (p_order, weight) in enumerate( itertools.product(poly_orders, (weight_array, constrained_weights)) ): baselines[i], method_params = baseline_func( data=y, poly_order=p_order, weights=weight, **method_kws ) for key, value in method_params.items(): params['method_params'][key].append(value) return np.maximum.reduce(baselines), params
[docs] @_Algorithm2D._register(skip_sorting=True) def individual_axes(self, data, axes=(0, 1), method='asls', method_kwargs=None): """ Applies a one dimensional baseline correction method along each row and/or column. Parameters ---------- data : array-like, shape (M, N) The y-values of the measured data. axes : (0, 1) or (1, 0) or 0 or 1, optional The axes along which to apply baseline correction. The order dictates along which axis baseline correction is first applied. Default is (0, 1), which applies baseline correction along the rows first and then the columns. method : str, optional A string indicating the algorithm to use for fitting the baseline of each row and/or column; can be any one dimensional algorithm in pybaselines. Default is 'asls'. method_kwargs : Sequence[dict] or dict, optional A sequence of dictionaries of keyword arguments to pass to the selected `method` function for each axis in `axes`. A single dictionary designates that the same keyword arguments will be used for each axis. Default is None, which will use an empty dictionary. Returns ------- numpy.ndarray, shape (M, N) The calculated baseline. params : dict A dictionary with the following items: * 'params_rows': dict[str, list] Only if 0 is in `axes`. A dictionary of the parameters for each fit along the rows. The items within the dictionary will depend on the selected method. * 'params_columns': dict[str, list] Only if 1 is in `axes`. A dictionary of the parameters for each fit along the columns. The items within the dictionary will depend on the selected method. * 'baseline_rows': numpy.ndarray, shape (M, N) Only if 0 is in `axes`. The fit baseline along the rows. * 'baseline_columns': numpy.ndarray, shape (M, N) Only if 1 is in `axes`. The fit baseline along the columns. Raises ------ ValueError Raised if `method_kwargs` is a sequence with length greater than `axes` or if the values in `axes` are duplicates. Notes ----- If using array-like inputs within `method_kwargs`, they must correspond to their one-dimensional counterparts. For example, `weights` must be one-dimensional and have a length of `M` or `N` when used for fitting the rows or columns, respectively. Correctness of this is NOT verified within this method. """ axes, scalar_axes = _check_scalar(axes, 2, fill_scalar=False, dtype=int) if scalar_axes: axes = [axes] num_axes = 1 else: if axes[0] == axes[1]: raise ValueError('Fitting the same axis twice is not allowed') num_axes = 2 if ( method_kwargs is None or (not isinstance(method_kwargs, dict) and len(method_kwargs) == 0) ): method_kwargs = [{}] * num_axes elif isinstance(method_kwargs, dict): method_kwargs = [method_kwargs] * num_axes elif len(method_kwargs) == 1: method_kwargs = [method_kwargs[0]] * num_axes elif len(method_kwargs) != num_axes: raise ValueError('Method kwargs must have the same length as the input axes') keys = ('rows', 'columns') baseline = np.zeros(self._shape) params = {} for i, axis in enumerate(axes): fitter = Baseline( (self.x, self.z)[axis], check_finite=self._check_finite, assume_sorted=True, output_dtype=self._dtype ) fitter.banded_solver = self.banded_solver baseline_func = fitter._get_method(method) params[f'params_{keys[axis]}'] = defaultdict(list) func = partial( _update_params, baseline_func, params[f'params_{keys[axis]}'], **method_kwargs[i] ) partial_baseline = np.apply_along_axis(func, axis, data - baseline) baseline += partial_baseline params[f'baseline_{keys[axis]}'] = partial_baseline return baseline, params
def _update_params(func, params, data, **kwargs): """ A partial function to allow updating a params dictionary using NumPy's apply_aplong_axis. Parameters ---------- func : Callable The baseline method to use. params : dict[str, list] The dictionary of parameters to be updated. data : numpy.ndarray The data to be baseline corrected. Returns ------- baseline : numpy.ndarray The calculated basline. """ baseline, baseline_params = func(data, **kwargs) for key, val in baseline_params.items(): params[key].append(val) return baseline def _determine_polyorders(y, poly_order, weights, fit_function, **fit_kwargs): """ Selects the appropriate polynomial orders based on the baseline-to-signal ratio. Parameters ---------- y : numpy.ndarray The array of y-values. poly_order : int The polynomial order for fitting. weights : numpy.ndarray The weight array for fitting. fit_function : Callable The function to use for the polynomial fit. **fit_kwargs Additional keyword arguments to pass to `fit_function`. Returns ------- orders : numpy.ndarray, shape (2,) The two polynomial orders to use based on the baseline to signal ratio according to the reference. References ---------- Cao, A., et al. A robust method for automated background subtraction of tissue fluorescence. Journal of Raman Spectroscopy, 2007, 38, 1199-1205. """ baseline = fit_function(y, poly_order=poly_order, weights=weights, **fit_kwargs)[0] signal = y - baseline baseline_to_signal = (baseline.max() - baseline.min()) / (signal.max() - signal.min()) # Table 2 in reference # TODO in 2D does this need changed? if baseline_to_signal < 0.2: orders = (1, 2) elif baseline_to_signal < 0.75: orders = (2, 3) elif baseline_to_signal < 8.5: orders = (3, 4) elif baseline_to_signal < 55: orders = (4, 5) elif baseline_to_signal < 240: orders = (5, 6) elif baseline_to_signal < 517: orders = (6, 7) else: orders = (6, 8) # not a typo, use 6 and 8 rather than 7 and 8 return np.array(orders)