Source code for pybaselines.classification

# -*- coding: utf-8 -*-
"""Techniques that rely on classifying peak and/or baseline segments for fitting baselines.

Created on July 3, 2021
@author: Donald Erb


Several functions were adapted from SciPy
(https://github.com/scipy/scipy, accessed December 28, 2023), which was
licensed under the BSD-3-Clause below.

Copyright (c) 2001-2002 Enthought, Inc.  2003-2023, SciPy Developers.
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:

1. Redistributions of source code must retain the above copyright
   notice, this list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above
   copyright notice, this list of conditions and the following
   disclaimer in the documentation and/or other materials provided
   with the distribution.

3. Neither the name of the copyright holder nor the names of its
   contributors may be used to endorse or promote products derived
   from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""

from math import ceil
import warnings

import numpy as np
from scipy.ndimage import (
    binary_dilation, binary_erosion, binary_opening, grey_dilation, grey_erosion, uniform_filter1d
)
from scipy.optimize import curve_fit
from scipy.signal import convolve
from scipy.spatial import ConvexHull

from ._algorithm_setup import _Algorithm, _class_wrapper
from ._compat import jit, trapezoid
from ._validation import _check_scalar, _check_scalar_variable
from .utils import (
    _MIN_FLOAT, ParameterWarning, _convert_coef, _interp_inplace, gaussian, optimize_window,
    pad_edges, relative_difference
)


class _Classification(_Algorithm):
    """A base class for all classification algorithms."""

[docs] @_Algorithm._register(sort_keys=('mask',), require_unique_x=True) def golotvin(self, data, half_window=None, num_std=2.0, sections=32, smooth_half_window=None, interp_half_window=5, weights=None, min_length=2, pad_kwargs=None, **kwargs): """ Golotvin's method for identifying baseline regions. Divides the data into sections and takes the minimum standard deviation of all sections as the noise standard deviation for the entire data. Then classifies any point where the rolling max minus min is less than ``num_std * noise standard deviation`` as belonging to the baseline. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. half_window : int, optional The half-window to use for the rolling maximum and rolling minimum calculations. Should be approximately equal to the full-width-at-half-maximum of the peaks or features in the data. Default is None, which will use half of the value from :func:`.optimize_window`, which is not always a good value, but at least scales with the number of data points and gives a starting point for tuning the parameter. num_std : float, optional The number of standard deviations to include when thresholding. Higher values will assign more points as baseline. Default is 3.0. sections : int, optional The number of sections to divide the input data into for finding the minimum standard deviation. Default is 32. smooth_half_window : int, optional The half window to use for smoothing the interpolated baseline with a moving average. Default is None, which will use `half_window`. Set to 0 to not smooth the baseline. interp_half_window : int, optional When interpolating between baseline segments, will use the average of ``data[i-interp_half_window:i+interp_half_window+1]``, where `i` is the index of the peak start or end, to fit the linear segment. Default is 5. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. min_length : int, optional Any region of consecutive baseline points less than `min_length` is considered to be a false positive and all points in the region are converted to peak points. A higher `min_length` ensures less points are falsely assigned as baseline points. Default is 2, which only removes lone baseline points. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from smoothing. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. References ---------- Golotvin, S., et al. Improved Baseline Recognition and Modeling of FT NMR Spectra. Journal of Magnetic Resonance. 2000, 146, 122-125. """ y, weight_array = self._setup_classification(data, weights, **kwargs) if half_window is None: # optimize_window(y) / 2 gives an "okay" estimate that at least scales # with data size half_window = ceil(optimize_window(y) / 2) if smooth_half_window is None: smooth_half_window = half_window min_sigma = np.inf indices = np.linspace(0, self._size, sections + 1, dtype=np.intp) for (left_idx, right_idx) in zip(indices[:-1], indices[1:]): # use ddof=1 since sampling subsets of the data min_sigma = min(min_sigma, np.std(y[left_idx:right_idx], ddof=1)) mask = ( grey_dilation(y, 2 * half_window + 1) - grey_erosion(y, 2 * half_window + 1) ) < num_std * min_sigma mask = _refine_mask(mask, min_length) np.logical_and(mask, weight_array, out=mask) rough_baseline = _averaged_interp(self.x, y, mask, interp_half_window) pad_kwargs = pad_kwargs if pad_kwargs is not None else {} baseline = uniform_filter1d( pad_edges(rough_baseline, smooth_half_window, **pad_kwargs, **kwargs), 2 * smooth_half_window + 1 )[smooth_half_window:self._size + smooth_half_window] return baseline, {'mask': mask}
[docs] @_Algorithm._register(sort_keys=('mask',), require_unique_x=True) def dietrich(self, data, smooth_half_window=None, num_std=3.0, interp_half_window=5, poly_order=5, max_iter=50, tol=1e-3, weights=None, return_coef=False, min_length=2, pad_kwargs=None, **kwargs): """ Dietrich's method for identifying baseline regions. Calculates the power spectrum of the data as the squared derivative of the data. Then baseline points are identified by iteratively removing points where the mean of the power spectrum is less than `num_std` times the standard deviation of the power spectrum. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. smooth_half_window : int, optional The half window to use for smoothing the input data with a moving average. Default is None, which will use N / 256. Set to 0 to not smooth the data. num_std : float, optional The number of standard deviations to include when thresholding. Higher values will assign more points as baseline. Default is 3.0. interp_half_window : int, optional When interpolating between baseline segments, will use the average of ``data[i-interp_half_window:i+interp_half_window+1]``, where `i` is the index of the peak start or end, to fit the linear segment. Default is 5. poly_order : int, optional The polynomial order for fitting the identified baseline. Default is 5. max_iter : int, optional The maximum number of iterations for fitting a polynomial to the identified baseline. If `max_iter` is 0, the returned baseline will be just the linear interpolation of the baseline segments. Default is 50. tol : float, optional The exit criteria for fitting a polynomial to the identified baseline points. Default is 1e-3. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. return_coef : bool, optional If True, will convert the polynomial coefficients for the fit baseline to a form that fits the input `x_data` and return them in the params dictionary. Default is False, since the conversion takes time. min_length : int, optional Any region of consecutive baseline points less than `min_length` is considered to be a false positive and all points in the region are converted to peak points. A higher `min_length` ensures less points are falsely assigned as baseline points. Default is 2, which only removes lone baseline points. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from smoothing. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. * 'coef': numpy.ndarray, shape (poly_order,) Only if `return_coef` is True and `max_iter` is greater than 0. The array of polynomial coefficients for the baseline, in increasing order. Can be used to create a polynomial using :class:`numpy.polynomial.polynomial.Polynomial`. * 'tol_history': numpy.ndarray Only if `max_iter` is greater than 1. An array containing the calculated tolerance values for each iteration. The length of the array is the number of iterations completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. Notes ----- When choosing parameters, first choose a `smooth_half_window` that appropriately smooths the data, and then reduce `num_std` until no peak regions are included in the baseline. If no value of `num_std` works, change `smooth_half_window` and repeat. If `max_iter` is 0, the baseline is simply a linear interpolation of the identified baseline points. Otherwise, a polynomial is iteratively fit through the baseline points, and the interpolated sections are replaced each iteration with the polynomial fit. References ---------- Dietrich, W., et al. Fast and Precise Automatic Baseline Correction of One- and Two-Dimensional NMR Spectra. Journal of Magnetic Resonance. 1991, 91, 1-11. """ y, weight_array = self._setup_classification(data, weights, **kwargs) if smooth_half_window is None: smooth_half_window = ceil(self._size / 256) pad_kwargs = pad_kwargs if pad_kwargs is not None else {} smooth_y = uniform_filter1d( pad_edges(y, smooth_half_window, **pad_kwargs, **kwargs), 2 * smooth_half_window + 1 )[smooth_half_window:self._size + smooth_half_window] power = np.gradient(smooth_y)**2 mask = _refine_mask(_iter_threshold(power, num_std), min_length) np.logical_and(mask, weight_array, out=mask) rough_baseline = _averaged_interp(self.x, y, mask, interp_half_window) params = {'mask': mask} baseline = rough_baseline if max_iter > 0: *_, pseudo_inverse = self._setup_polynomial( y, poly_order=poly_order, calc_vander=True, calc_pinv=True ) old_coef = coef = pseudo_inverse @ rough_baseline baseline = self._polynomial.vandermonde @ coef if max_iter > 1: tol_history = np.empty(max_iter - 1) for i in range(max_iter - 1): rough_baseline[mask] = baseline[mask] coef = pseudo_inverse @ rough_baseline baseline = self._polynomial.vandermonde @ coef calc_difference = relative_difference(old_coef, coef) tol_history[i] = calc_difference if calc_difference < tol: break old_coef = coef params['tol_history'] = tol_history[:i + 1] if return_coef: params['coef'] = _convert_coef(coef, self.x_domain) return baseline, params
[docs] @_Algorithm._register(sort_keys=('mask',), require_unique_x=True) def std_distribution(self, data, half_window=None, interp_half_window=5, fill_half_window=3, num_std=1.1, smooth_half_window=None, weights=None, pad_kwargs=None, **kwargs): """ Identifies baseline segments by analyzing the rolling standard deviation distribution. The rolling standard deviations are split into two distributions, with the smaller distribution assigned to noise. Baseline points are then identified as any point where the rolling standard deviation is less than a multiple of the median of the noise's standard deviation distribution. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. half_window : int, optional The half-window to use for the rolling standard deviation calculation. Should be approximately equal to the full-width-at-half-maximum of the peaks or features in the data. Default is None, which will use half of the value from :func:`.optimize_window`, which is not always a good value, but at least scales with the number of data points and gives a starting point for tuning the parameter. interp_half_window : int, optional When interpolating between baseline segments, will use the average of ``data[i-interp_half_window:i+interp_half_window+1]``, where `i` is the index of the peak start or end, to fit the linear segment. Default is 5. fill_half_window : int, optional When a point is identified as a peak point, all points +- `fill_half_window` are likewise set as peak points. Default is 3. num_std : float, optional The number of standard deviations to include when thresholding. Higher values will assign more points as baseline. Default is 1.1. smooth_half_window : int, optional The half window to use for smoothing the interpolated baseline with a moving average. Default is None, which will use `half_window`. Set to 0 to not smooth the baseline. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from smoothing. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. References ---------- Wang, K.C., et al. Distribution-Based Classification Method for Baseline Correction of Metabolomic 1D Proton Nuclear Magnetic Resonance Spectra. Analytical Chemistry. 2013, 85, 1231-1239. """ y, weight_array = self._setup_classification(data, weights, **kwargs) if half_window is None: # optimize_window(y) / 2 gives an "okay" estimate that at least scales # with data size half_window = ceil(optimize_window(y) / 2) if smooth_half_window is None: smooth_half_window = half_window # use dof=1 since sampling a subset of the data std = _padded_rolling_std(y, half_window, 1) median = np.median(std) median_2 = np.median(std[std < 2 * median]) # TODO make the 2 an input? while median_2 / median < 0.999: # TODO make the 0.999 an input? median = median_2 median_2 = np.median(std[std < 2 * median]) noise_std = median_2 # use ~ to convert from peak==1, baseline==0 to peak==0, baseline==1; if done before, # would have to do ~binary_dilation(~mask) or binary_erosion(np.hstack((1, mask, 1))[1:-1] mask = np.logical_and( ~binary_dilation(std > num_std * noise_std, np.ones(2 * fill_half_window + 1)), weight_array ) rough_baseline = _averaged_interp(self.x, y, mask, interp_half_window) pad_kwargs = pad_kwargs if pad_kwargs is not None else {} baseline = uniform_filter1d( pad_edges(rough_baseline, smooth_half_window, **pad_kwargs, **kwargs), 2 * smooth_half_window + 1 )[smooth_half_window:self._size + smooth_half_window] return baseline, {'mask': mask}
[docs] @_Algorithm._register(sort_keys=('mask',), require_unique_x=True) def fastchrom(self, data, half_window=None, threshold=None, min_fwhm=None, interp_half_window=5, smooth_half_window=None, weights=None, max_iter=100, min_length=2, pad_kwargs=None, **kwargs): """ Identifies baseline segments by thresholding the rolling standard deviation distribution. Baseline points are identified as any point where the rolling standard deviation is less than the specified threshold. Peak regions are iteratively interpolated until the baseline is below the data. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. half_window : int, optional The half-window to use for the rolling standard deviation calculation. Should be approximately equal to the full-width-at-half-maximum of the peaks or features in the data. Default is None, which will use half of the value from :func:`.optimize_window`, which is not always a good value, but at least scales with the number of data points and gives a starting point for tuning the parameter. threshold : float or Callable, optional All points in the rolling standard deviation below `threshold` will be considered as baseline. Higher values will assign more points as baseline. Default is None, which will set the threshold as the 15th percentile of the rolling standard deviation. If `threshold` is Callable, it should take the rolling standard deviation as the only argument and output a float. min_fwhm : int, optional After creating the interpolated baseline, any region where the baseline is greater than the data for `min_fwhm` consecutive points will have an additional baseline point added and reinterpolated. Should be set to approximately the index-based full-width-at-half-maximum of the smallest peak. Default is None, which uses 2 * `half_window`. interp_half_window : int, optional When interpolating between baseline segments, will use the average of ``data[i-interp_half_window:i+interp_half_window+1]``, where `i` is the index of the peak start or end, to fit the linear segment. Default is 5. smooth_half_window : int, optional The half window to use for smoothing the interpolated baseline with a moving average. Default is None, which will use `half_window`. Set to 0 to not smooth the baseline. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. max_iter : int, optional The maximum number of iterations to attempt to fill in regions where the baseline is greater than the input data. Default is 100. min_length : int, optional Any region of consecutive baseline points less than `min_length` is considered to be a false positive and all points in the region are converted to peak points. A higher `min_length` ensures less points are falsely assigned as baseline points. Default is 2, which only removes lone baseline points. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from smoothing. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. Notes ----- Only covers the baseline correction from FastChrom, not its peak finding and peak grouping capabilities. References ---------- Johnsen, L., et al. An automated method for baseline correction, peak finding and peak grouping in chromatographic data. Analyst. 2013, 138, 3502-3511. """ y, weight_array = self._setup_classification(data, weights, **kwargs) if half_window is None: # optimize_window(y) / 2 gives an "okay" estimate that at least scales # with data size half_window = ceil(optimize_window(y) / 2) if smooth_half_window is None: smooth_half_window = half_window if min_fwhm is None: min_fwhm = 2 * half_window # use dof=1 since sampling a subset of the data std = _padded_rolling_std(y, half_window, 1) if threshold is None: # scales fairly well with y and gaurantees baseline segments are created; # picked 15% since it seems to work better than 10% threshold_val = np.percentile(std, 15) elif callable(threshold): threshold_val = threshold(std) else: threshold_val = threshold # reference did not mention removing single points, but do so anyway to # be more thorough mask = _refine_mask(std < threshold_val, min_length) np.logical_and(mask, weight_array, out=mask) rough_baseline = _averaged_interp(self.x, y, mask, interp_half_window) mask_sum = mask.sum() # only try to fix peak regions if there actually are peak and baseline regions if mask_sum and mask_sum != self._size: peak_starts, peak_ends = _find_peak_segments(mask) for _ in range(max_iter): modified_baseline = False for start, end in zip(peak_starts, peak_ends): baseline_section = rough_baseline[start:end + 1] data_section = y[start:end + 1] # mask should be baseline_section > data_section, but use the # inverse since _find_peak_segments looks for 0s, not 1s section_mask = baseline_section < data_section seg_starts, seg_ends = _find_peak_segments(section_mask) if np.any(seg_ends - seg_starts > min_fwhm): modified_baseline = True # designate lowest point as baseline # TODO should surrounding points also be classified as baseline? mask[np.argmin(data_section - baseline_section) + start] = 1 if modified_baseline: # TODO probably faster to just re-interpolate changed sections rough_baseline = _averaged_interp(self.x, y, mask, interp_half_window) else: break # reference did not discuss smoothing, but include to be consistent with # other classification functions pad_kwargs = pad_kwargs if pad_kwargs is not None else {} baseline = uniform_filter1d( pad_edges(rough_baseline, smooth_half_window, **pad_kwargs, **kwargs), 2 * smooth_half_window + 1 )[smooth_half_window:self._size + smooth_half_window] return baseline, {'mask': mask}
[docs] @_Algorithm._register(sort_keys=('mask',)) def cwt_br(self, data, poly_order=5, scales=None, num_std=1.0, min_length=2, max_iter=50, tol=1e-3, symmetric=False, weights=None, pad_kwargs=None, **kwargs): """ Continuous wavelet transform baseline recognition (CWT-BR) algorithm. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. poly_order : int, optional The polynomial order for fitting the baseline. Default is 5. scales : array-like, optional The scales at which to perform the continuous wavelet transform. Default is None, num_std : float, optional The number of standard deviations to include when thresholding. Default is 1.0. min_length : int, optional Any region of consecutive baseline points less than `min_length` is considered to be a false positive and all points in the region are converted to peak points. A higher `min_length` ensures less points are falsely assigned as baseline points. Default is 2, which only removes lone baseline points. max_iter : int, optional The maximum number of iterations. Default is 50. tol : float, optional The exit criteria. Default is 1e-3. symmetric : bool, optional When fitting the identified baseline points with a polynomial, if `symmetric` is False (default), will add any point `i` as a baseline point where the fit polynomial is greater than the input data for ``N/100`` consecutive points on both sides of point `i`. If `symmetric` is True, then it means that both positive and negative peaks exist and baseline points are not modified during the polynomial fitting. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from convolution for the continuous wavelet transform. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. * 'tol_history': numpy.ndarray An array containing the calculated tolerance values for each iteration. The length of the array is the number of iterations completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. * 'best_scale' : scalar The scale at which the Shannon entropy of the continuous wavelet transform of the data is at a minimum. Notes ----- Uses the standard deviation for determining outliers during polynomial fitting rather than the standard error as used in the reference since the number of standard errors to include when thresholding varies with data size while the number of standard deviations is independent of data size. References ---------- Bertinetto, C., et al. Automatic Baseline Recognition for the Correction of Large Sets of Spectra Using Continuous Wavelet Transform and Iterative Fitting. Applied Spectroscopy, 2014, 68(2), 155-164. """ y, weight_array = self._setup_classification(data, weights, **kwargs) self._setup_polynomial(y, weight_array, poly_order=poly_order, calc_vander=True) # scale y between -1 and 1 so that the residual fit is more numerically stable y_domain = np.polynomial.polyutils.getdomain(y) y = np.polynomial.polyutils.mapdomain(y, y_domain, np.array([-1., 1.])) if scales is None: # avoid low scales since their cwt is fairly noisy min_scale = max(2, self._size // 500) max_scale = self._size // 4 scales = range(min_scale, max_scale) else: scales = np.atleast_1d(scales).reshape(-1) max_scale = scales.max() shannon_old = -np.inf shannon_current = -np.inf half_window = max_scale * 2 # TODO is x2 enough padding to prevent edge effects from cwt? pad_kwargs = pad_kwargs if pad_kwargs is not None else {} padded_y = pad_edges(y, half_window, **pad_kwargs, **kwargs) for scale in scales: wavelet_cwt = _cwt(padded_y, _ricker, [scale])[0, half_window:-half_window] abs_wavelet = np.abs(wavelet_cwt) inner = abs_wavelet / abs_wavelet.sum(axis=0) # was not stated in the reference to use abs(wavelet) for the Shannon entropy, # but otherwise the Shannon entropy vs wavelet scale curve does not look like # Figure 2 in the reference; masking out non-positive values also gives an # incorrect entropy curve shannon_entropy = -np.sum(inner * np.log(inner + _MIN_FLOAT), 0) if shannon_current < shannon_old and shannon_entropy > shannon_current: break shannon_old = shannon_current shannon_current = shannon_entropy best_scale_ptp_multiple = 8 * abs(wavelet_cwt.max() - wavelet_cwt.min()) num_bins = 200 histogram, bin_edges = np.histogram(wavelet_cwt, num_bins) bins = 0.5 * (bin_edges[1:] + bin_edges[:-1]) fit_params = [histogram.max(), np.log10(0.2 * np.std(wavelet_cwt))] # use 10**sigma so that sigma is not actually bounded gaussian_fit = lambda x, height, sigma: gaussian(x, height, 0, 10**sigma) # TODO should the number of iterations, the height cutoff for the histogram, # and the exit tol be parameters? The number of iterations is never greater than # 2 or 3, matching the reference. The height maybe should be since the masking # depends on the histogram scale dilation_structure = np.ones(5, bool) for _ in range(10): # dilate the mask to ensure at least five points are included for the fitting fit_mask = binary_dilation(histogram > histogram.max() / 5, dilation_structure) # histogram[~fit_mask] = 0 TODO use this instead? does it help fitting? fit_params = curve_fit( gaussian_fit, bins[fit_mask], histogram[fit_mask], fit_params, check_finite=False )[0] sigma_opt = 10**fit_params[1] new_num_bins = ceil(best_scale_ptp_multiple / sigma_opt) if relative_difference(num_bins, new_num_bins) < 0.05: break num_bins = new_num_bins histogram, bin_edges = np.histogram(wavelet_cwt, num_bins) bins = 0.5 * (bin_edges[1:] + bin_edges[:-1]) gaussian_mask = np.abs(bins) < 3 * sigma_opt gaus_area = trapezoid(histogram[gaussian_mask], bins[gaussian_mask]) num_sigma = 0.6 + 10 * ((trapezoid(histogram, bins) - gaus_area) / gaus_area) wavelet_mask = _refine_mask(abs_wavelet < num_sigma * sigma_opt, min_length) np.logical_and(wavelet_mask, weight_array, out=wavelet_mask) check_window = np.ones(2 * (self._size // 200) + 1, bool) # TODO make window size a param? baseline_old = y mask = wavelet_mask.copy() tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): coef = np.linalg.lstsq(self._polynomial.vandermonde[mask], y[mask], None)[0] baseline = self._polynomial.vandermonde @ coef residual = y - baseline mask[residual > num_std * np.std(residual)] = False # TODO is this necessary? It improves fits where the initial fit didn't # include enough points, but ensures that negative peaks are not allowed; # maybe make it a param called symmetric, like for mixture_model, and only # do if not symmetric; also probably only need to do it the first iteration # since after that the masking above will not remove negative residuals coef = np.linalg.lstsq(self._polynomial.vandermonde[mask], y[mask], None)[0] baseline = self._polynomial.vandermonde @ coef calc_difference = relative_difference(baseline_old, baseline) tol_history[i] = calc_difference if calc_difference < tol: break baseline_old = baseline if not symmetric: np.logical_or(mask, binary_erosion(y < baseline, check_window), out=mask) # TODO should include wavelet_mask in params; maybe called 'initial_mask'? params = { 'mask': mask, 'tol_history': tol_history[:i + 1], 'best_scale': scale } baseline = np.polynomial.polyutils.mapdomain(baseline, np.array([-1., 1.]), y_domain) return baseline, params
[docs] @_Algorithm._register(sort_keys=('mask', 'weights')) def fabc(self, data, lam=1e6, scale=None, num_std=3.0, diff_order=2, min_length=2, weights=None, weights_as_mask=False, pad_kwargs=None, **kwargs): """ Fully automatic baseline correction (fabc). Similar to Dietrich's method, except that the derivative is estimated using a continuous wavelet transform and the baseline is calculated using Whittaker smoothing through the identified baseline points. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. lam : float, optional The smoothing parameter. Larger values will create smoother baselines. Default is 1e6. scale : int, optional The scale at which to calculate the continuous wavelet transform. Should be approximately equal to the index-based full-width-at-half-maximum of the peaks or features in the data. Default is None, which will use half of the value from :func:`.optimize_window`, which is not always a good value, but at least scales with the number of data points and gives a starting point for tuning the parameter. num_std : float, optional The number of standard deviations to include when thresholding. Higher values will assign more points as baseline. Default is 3.0. diff_order : int, optional The order of the differential matrix. Must be greater than 0. Default is 2 (second order differential matrix). Typical values are 2 or 1. min_length : int, optional Any region of consecutive baseline points less than `min_length` is considered to be a false positive and all points in the region are converted to peak points. A higher `min_length` ensures less points are falsely assigned as baseline points. Default is 2, which only removes lone baseline points. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. weights_as_mask : bool, optional If True, signifies that the input `weights` is the mask to use for fitting, which skips the continuous wavelet calculation and just smooths the input data. Default is False. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from convolution for the continuous wavelet transform. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. * 'weights': numpy.ndarray, shape (N,) The weight array used for fitting the data. Notes ----- The classification of baseline points is similar to :meth:`~.Baseline.dietrich`, except that this method approximates the first derivative using a continous wavelet transform with the Haar wavelet, which is more robust than the numerical derivative in Dietrich's method. References ---------- Cobas, J., et al. A new general-purpose fully automatic baseline-correction procedure for 1D and 2D NMR data. Journal of Magnetic Resonance, 2006, 183(1), 145-151. """ self._deprecate_pad_kwargs(**kwargs) if weights_as_mask: y, whittaker_weights, whittaker_system = self._setup_whittaker( data, lam, diff_order, weights ) mask = whittaker_weights.astype(bool) else: y, weight_array = self._setup_classification(data, weights) if scale is None: # optimize_window(y) / 2 gives an "okay" estimate that at least scales # with data size scale = ceil(optimize_window(y) / 2) # TODO is 2*scale enough padding to prevent edge effects from cwt? half_window = scale * 2 pad_kwargs = pad_kwargs if pad_kwargs is not None else {} wavelet_cwt = _cwt(pad_edges(y, half_window, **pad_kwargs, **kwargs), _haar, [scale]) power = wavelet_cwt[0, half_window:-half_window]**2 mask = _refine_mask(_iter_threshold(power, num_std), min_length) np.logical_and(mask, weight_array, out=mask) _, whittaker_weights, whittaker_system = self._setup_whittaker(y, lam, diff_order, mask) if self._sort_order is not None: whittaker_weights = whittaker_weights[self._inverted_order] whittaker_weights = whittaker_weights.astype(float) baseline = whittaker_system.solve( whittaker_system.add_diagonal(whittaker_weights), whittaker_weights * y, overwrite_b=True, overwrite_ab=True ) params = {'mask': mask, 'weights': whittaker_weights} return baseline, params
[docs] @_Algorithm._register(sort_keys=('mask',), require_unique_x=True) def rubberband(self, data, segments=1, lam=None, diff_order=2, weights=None, smooth_half_window=None, pad_kwargs=None, **kwargs): """ Identifies baseline points by fitting a convex hull to the bottom of the data. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. segments : int or Sequence[int, ...], optional Used to fit multiple convex hulls to the data to negate the effects of concave data. If the input is an integer, it sets the number of equally sized segments the data will be split into. If the input is a sequence, each integer in the sequence will be the index that splits two segments, which allows constructing unequally sized segments. Default is 1, which fits a single convex hull to the data. lam : float or None, optional The smoothing parameter for interpolating the baseline points using Whittaker smoothing. Set to 0 or None to use linear interpolation instead. Default is None, which does not smooth. diff_order : int, optional The order of the differential matrix if using Whittaker smoothing. Must be greater than 0. Default is 2 (second order differential matrix). Typical values are 2 or 1. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered potential baseline points. If None (default), then will be an array with size equal to N and all values set to 1. smooth_half_window : int or None, optional The half window to use for smoothing the input data with a moving average before calculating the convex hull, which gives much better results for noisy data. Set to None (default) or 0 to not smooth the data. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from smoothing. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. Raises ------ ValueError Raised if the number of segments is less than 1 or too large to allow interpolation. """ sections, scalar_sections = _check_scalar(segments, None, coerce_0d=False, dtype=np.intp) if scalar_sections and (sections < 1 or self._size / sections < 3): raise ValueError( f'There must be between 1 and {self._size // 3} segments for the rubberband fit' ) elif not scalar_sections and (np.any(sections < 0) or np.any(sections > self._size)): raise ValueError( f'Segment indices must be between 0 and {self._size} for the rubberband fit' ) y, weight_array = self._setup_classification(data, weights) if smooth_half_window is not None and smooth_half_window > 0: pad_kwargs = pad_kwargs if pad_kwargs is not None else {} y = self._setup_smooth(y, smooth_half_window, pad_kwargs=pad_kwargs, **kwargs)[0] y = uniform_filter1d( y, 2 * smooth_half_window + 1 )[smooth_half_window:-smooth_half_window] else: # still ensure deprecation of kwargs if no smoothing was done self._deprecate_pad_kwargs(**kwargs) if scalar_sections: total_sections = np.linspace(0, self._size, sections + 1, dtype=np.intp) else: # np.unique already sorts so do not need to check order total_sections = np.unique(np.concatenate(([0], sections, [self._size]))) for i, section in enumerate(total_sections[:-1]): if total_sections[i + 1] - section < 3: raise ValueError('Each segment must have at least 3 points.') hull_data = np.vstack((self.x, y)).T total_vertices = [] for i, left_idx in enumerate(total_sections[:-1]): vertices = ConvexHull(hull_data[left_idx:total_sections[i + 1]]).vertices min_idx = vertices.argmin() max_idx = vertices.argmax() + 1 if max_idx > min_idx: vertices = vertices[min_idx:max_idx] else: vertices = np.concatenate((vertices[min_idx:], vertices[:max_idx])) total_vertices.extend(vertices + left_idx) mask = np.zeros(self._shape, dtype=bool) mask[np.unique(total_vertices)] = True np.logical_and(mask, weight_array, out=mask) if lam is not None and lam != 0: _, _, whittaker_system = self._setup_whittaker(y, lam, diff_order, mask) baseline = whittaker_system.solve( whittaker_system.add_diagonal(mask), mask * y, overwrite_b=True, overwrite_ab=True ) else: baseline = np.interp(self.x, self.x[mask], y[mask]) return baseline, {'mask': mask}
_classification_wrapper = _class_wrapper(_Classification) def _refine_mask(mask, min_length=2): """ Removes small consecutive True values and lone False values from a boolean mask. Parameters ---------- mask : numpy.ndarray The boolean array designating baseline points as True and peak points as False. min_length : int, optional The minimum consecutive length of True values needed for a section to remain True. Lengths of True values less than `min_length` are converted to False. Default is 2, which removes all lone True values. Returns ------- numpy.ndarray The input mask after removing lone True and False values. Notes ----- Removes the lone True values first since True values designate the baseline. That way, the approach is more conservative with assigning baseline points. Examples -------- >>> mask = np.array([1, 1, 0, 0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1]) >>> _refine_mask(mask, 3).astype(int) array([1, 1, 0, 0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1]) >>> _refine_mask(mask, 5).astype(int) array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1]) """ min_length = max(min_length, 1) # has to be at least 1 for the binary opening half_length = min_length // 2 # do not use border_value=1 since that automatically makes the borders True and # extends the True section by half_window on each side output = binary_opening( np.pad(mask, half_length, 'constant', constant_values=True), np.ones(min_length, bool) )[half_length:len(mask) + half_length] # convert lone False values to True np.logical_or(output, binary_erosion(output, [1, 0, 1]), out=output) # TODO should there be an erosion step here, using another parameter (erode_hw)? # that way, can control both the minimum length and then remove edges of baselines # independently, allowing more control over the output mask return output def _find_peak_segments(mask): """ Identifies the peak starts and ends from a boolean mask. Parameters ---------- mask : numpy.ndarray The boolean mask with peak points as 0 or False and baseline points as 1 or True. Returns ------- peak_starts : numpy.ndarray The array identifying the indices where each peak begins. peak_ends : numpy.ndarray The array identifying the indices where each peak ends. """ extended_mask = np.concatenate(([True], mask, [True])) peak_starts = extended_mask[1:-1] < extended_mask[:-2] peak_starts = np.flatnonzero(peak_starts) if len(peak_starts): peak_starts[1 if peak_starts[0] == 0 else 0:] -= 1 peak_ends = extended_mask[1:-1] < extended_mask[2:] peak_ends = np.flatnonzero(peak_ends) if len(peak_ends): peak_ends[:-1 if peak_ends[-1] == mask.shape[0] - 1 else None] += 1 return peak_starts, peak_ends def _averaged_interp(x, y, mask, interp_half_window=0): """ Averages each anchor point and then interpolates between segments. Parameters ---------- x : numpy.ndarray The x-values. y : numpy.ndarray The y-values. mask : numpy.ndarray A boolean array with 0 or False designating peak points and 1 or True designating baseline points. interp_half_window : int, optional The half-window to use for averaging around the anchor points before interpolating. Default is 0, which uses just the anchor point value. Returns ------- output : numpy.ndarray A copy of the input `y` array with peak values in `mask` calculcated using linear interpolation. """ interp_hw = _check_scalar_variable( interp_half_window, allow_zero=True, variable_name='interp_half_window', dtype=np.intp ) output = y.copy() mask_sum = mask.sum() if not mask_sum: # all points belong to peaks # will just interpolate between first and last points warnings.warn('there were no baseline points found', ParameterWarning, stacklevel=2) elif mask_sum == mask.shape[0]: # all points belong to baseline warnings.warn('there were no peak points found', ParameterWarning, stacklevel=2) return output peak_starts, peak_ends = _find_peak_segments(mask) num_y = y.shape[0] for start, end in zip(peak_starts, peak_ends): left_mean = np.mean(y[max(0, start - interp_hw):min(start + interp_hw + 1, num_y)]) right_mean = np.mean(y[max(0, end - interp_hw):min(end + interp_hw + 1, num_y)]) _interp_inplace(x[start:end + 1], output[start:end + 1], left_mean, right_mean) return output
[docs] @_classification_wrapper def golotvin(data, x_data=None, half_window=None, num_std=2.0, sections=32, smooth_half_window=None, interp_half_window=5, weights=None, min_length=2, pad_kwargs=None, **kwargs): """ Golotvin's method for identifying baseline regions. Divides the data into sections and takes the minimum standard deviation of all sections as the noise standard deviation for the entire data. Then classifies any point where the rolling max minus min is less than ``num_std * noise standard deviation`` as belonging to the baseline. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. x_data : array-like, shape (N,), optional The x-values of the measured data. Default is None, which will create an array from -1 to 1 with N points. half_window : int, optional The half-window to use for the rolling maximum and rolling minimum calculations. Should be approximately equal to the full-width-at-half-maximum of the peaks or features in the data. Default is None, which will use half of the value from :func:`.optimize_window`, which is not always a good value, but at least scales with the number of data points and gives a starting point for tuning the parameter. num_std : float, optional The number of standard deviations to include when thresholding. Higher values will assign more points as baseline. Default is 3.0. sections : int, optional The number of sections to divide the input data into for finding the minimum standard deviation. Default is 32. smooth_half_window : int, optional The half window to use for smoothing the interpolated baseline with a moving average. Default is None, which will use `half_window`. Set to 0 to not smooth the baseline. interp_half_window : int, optional When interpolating between baseline segments, will use the average of ``data[i-interp_half_window:i+interp_half_window+1]``, where `i` is the index of the peak start or end, to fit the linear segment. Default is 5. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. min_length : int, optional Any region of consecutive baseline points less than `min_length` is considered to be a false positive and all points in the region are converted to peak points. A higher `min_length` ensures less points are falsely assigned as baseline points. Default is 2, which only removes lone baseline points. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from smoothing. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. References ---------- Golotvin, S., et al. Improved Baseline Recognition and Modeling of FT NMR Spectra. Journal of Magnetic Resonance. 2000, 146, 122-125. """
def _iter_threshold(power, num_std=3.0): """ Iteratively thresholds a power spectrum based on the mean and standard deviation. Any values greater than the mean of the power spectrum plus a multiple of the standard deviation are masked out to create a new power spectrum. The process is performed iteratively until no further points are masked out. Parameters ---------- power : numpy.ndarray, shape (N,) The power spectrum to threshold. num_std : float, optional The number of standard deviations to include when thresholding. Default is 3.0. Returns ------- mask : numpy.ndarray, shape (N,) The boolean mask with True values where any point in the input power spectrum was less than References ---------- Dietrich, W., et al. Fast and Precise Automatic Baseline Correction of One- and Two-Dimensional NMR Spectra. Journal of Magnetic Resonance. 1991, 91, 1-11. """ mask = power < np.mean(power) + num_std * np.std(power, ddof=1) old_mask = np.ones_like(mask) while not np.array_equal(mask, old_mask): old_mask = mask masked_power = power[mask] if masked_power.size < 2: # need at least 2 points for std calculation warnings.warn( 'not enough baseline points found; "num_std" is likely too low', ParameterWarning, stacklevel=2 ) break mask = power < np.mean(masked_power) + num_std * np.std(masked_power, ddof=1) return mask
[docs] @_classification_wrapper def dietrich(data, x_data=None, smooth_half_window=None, num_std=3.0, interp_half_window=5, poly_order=5, max_iter=50, tol=1e-3, weights=None, return_coef=False, min_length=2, pad_kwargs=None, **kwargs): """ Dietrich's method for identifying baseline regions. Calculates the power spectrum of the data as the squared derivative of the data. Then baseline points are identified by iteratively removing points where the mean of the power spectrum is less than `num_std` times the standard deviation of the power spectrum. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. x_data : array-like, shape (N,), optional The x-values of the measured data. Default is None, which will create an array from -1 to 1 with N points. smooth_half_window : int, optional The half window to use for smoothing the input data with a moving average. Default is None, which will use N / 256. Set to 0 to not smooth the data. num_std : float, optional The number of standard deviations to include when thresholding. Higher values will assign more points as baseline. Default is 3.0. interp_half_window : int, optional When interpolating between baseline segments, will use the average of ``data[i-interp_half_window:i+interp_half_window+1]``, where `i` is the index of the peak start or end, to fit the linear segment. Default is 5. poly_order : int, optional The polynomial order for fitting the identified baseline. Default is 5. max_iter : int, optional The maximum number of iterations for fitting a polynomial to the identified baseline. If `max_iter` is 0, the returned baseline will be just the linear interpolation of the baseline segments. Default is 50. tol : float, optional The exit criteria for fitting a polynomial to the identified baseline points. Default is 1e-3. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. return_coef : bool, optional If True, will convert the polynomial coefficients for the fit baseline to a form that fits the input `x_data` and return them in the params dictionary. Default is False, since the conversion takes time. min_length : int, optional Any region of consecutive baseline points less than `min_length` is considered to be a false positive and all points in the region are converted to peak points. A higher `min_length` ensures less points are falsely assigned as baseline points. Default is 2, which only removes lone baseline points. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from smoothing. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. * 'coef': numpy.ndarray, shape (poly_order,) Only if `return_coef` is True and `max_iter` is greater than 0. The array of polynomial coefficients for the baseline, in increasing order. Can be used to create a polynomial using :class:`numpy.polynomial.polynomial.Polynomial`. * 'tol_history': numpy.ndarray Only if `max_iter` is greater than 1. An array containing the calculated tolerance values for each iteration. The length of the array is the number of iterations completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. Notes ----- When choosing parameters, first choose a `smooth_half_window` that appropriately smooths the data, and then reduce `num_std` until no peak regions are included in the baseline. If no value of `num_std` works, change `smooth_half_window` and repeat. If `max_iter` is 0, the baseline is simply a linear interpolation of the identified baseline points. Otherwise, a polynomial is iteratively fit through the baseline points, and the interpolated sections are replaced each iteration with the polynomial fit. References ---------- Dietrich, W., et al. Fast and Precise Automatic Baseline Correction of One- and Two-Dimensional NMR Spectra. Journal of Magnetic Resonance. 1991, 91, 1-11. """
@jit(nopython=True, cache=True) def _rolling_std(data, half_window, ddof): """ Computes the rolling standard deviation of an array. Parameters ---------- data : numpy.ndarray The array for the calculation. Should be padded on the left and right edges by `half_window`. half_window : int The half-window the rolling calculation. The full number of points for each window is ``half_window * 2 + 1``. ddof : int The delta degrees of freedom for the calculation. Usually 0 (numpy default) or 1. Returns ------- numpy.ndarray The array of the rolling standard deviation for each window. Notes ----- This implementation is a version of Welford's method [1]_, modified for a fixed-length window [2]_. It is slightly modified from the version in [2]_ in that it assumes the data is padded on the left and right. Other deviations from [2]_ are noted within the function. References ---------- .. [1] https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance .. [2] Chmielowiec, A. Algorithm for error-free determination of the variance of all contiguous subsequences and fixed-length contiguous subsequences for a sequence of industrial measurement data. Computational Statistics. 2021, 1-28. """ window_size = half_window * 2 + 1 num_y = data.shape[0] squared_diff = np.zeros(num_y) mean = data[0] # fill the first window for i in range(1, window_size): val = data[i] size_factor = i / (i + 1) squared_diff[i] = squared_diff[i - 1] + 2 * size_factor * (val - mean)**2 mean = mean * size_factor + val / (i + 1) # at this point, mean == np.mean(data[:window_size]) # update squared_diff[half_window] with squared_diff[window_size - 1] / 2; if # this isn't done, all values within [half_window:-half_window] in the output are # off; no idea why... but it works squared_diff[half_window] = squared_diff[window_size - 1] / 2 for j in range(half_window + 1, num_y - half_window): old_val = data[j - half_window - 1] new_val = data[j + half_window] val_diff = new_val - old_val # reference divided by window_size here new_mean = mean + val_diff / window_size squared_diff[j] = squared_diff[j - 1] + val_diff * (old_val + new_val - mean - new_mean) mean = new_mean # empty the last half-window # TODO need to double-check this; not high priority since last half-window # is discarded currently size = window_size for k in range(num_y - half_window + 1, num_y): val = data[k] size_factor = size / (size - 1) squared_diff[k] = squared_diff[k - 1] + 2 * size_factor * (val - mean)**2 mean = mean * size_factor + val / (size - 1) size -= 1 return np.sqrt(squared_diff / (window_size - ddof)) def _padded_rolling_std(data, half_window, ddof=0): """ Convenience function that pads data before performing rolling standard deviation calculation. Parameters ---------- data : array-like The array for the calculation. half_window : int The half-window the rolling calculation. The full number of points for each window is ``half_window * 2 + 1``. ddof : int, optional The delta degrees of freedom for the calculation. Default is 0. Returns ------- numpy.ndarray The array of the rolling standard deviation for each window. Notes ----- Reflect the data since the standard deviation calculation requires noisy data to work. """ padded_data = np.pad(data, half_window, 'reflect') rolling_std = _rolling_std(padded_data, half_window, ddof)[half_window:-half_window] return rolling_std
[docs] @_classification_wrapper def std_distribution(data, x_data=None, half_window=None, interp_half_window=5, fill_half_window=3, num_std=1.1, smooth_half_window=None, weights=None, pad_kwargs=None, **kwargs): """ Identifies baseline segments by analyzing the rolling standard deviation distribution. The rolling standard deviations are split into two distributions, with the smaller distribution assigned to noise. Baseline points are then identified as any point where the rolling standard deviation is less than a multiple of the median of the noise's standard deviation distribution. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. x_data : array-like, shape (N,), optional The x-values of the measured data. Default is None, which will create an array from -1 to 1 with N points. half_window : int, optional The half-window to use for the rolling standard deviation calculation. Should be approximately equal to the full-width-at-half-maximum of the peaks or features in the data. Default is None, which will use half of the value from :func:`.optimize_window`, which is not always a good value, but at least scales with the number of data points and gives a starting point for tuning the parameter. interp_half_window : int, optional When interpolating between baseline segments, will use the average of ``data[i-interp_half_window:i+interp_half_window+1]``, where `i` is the index of the peak start or end, to fit the linear segment. Default is 5. fill_half_window : int, optional When a point is identified as a peak point, all points +- `fill_half_window` are likewise set as peak points. Default is 3. num_std : float, optional The number of standard deviations to include when thresholding. Higher values will assign more points as baseline. Default is 1.1. smooth_half_window : int, optional The half window to use for smoothing the interpolated baseline with a moving average. Default is None, which will use `half_window`. Set to 0 to not smooth the baseline. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from smoothing. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. References ---------- Wang, K.C., et al. Distribution-Based Classification Method for Baseline Correction of Metabolomic 1D Proton Nuclear Magnetic Resonance Spectra. Analytical Chemistry. 2013, 85, 1231-1239. """
[docs] @_classification_wrapper def fastchrom(data, x_data=None, half_window=None, threshold=None, min_fwhm=None, interp_half_window=5, smooth_half_window=None, weights=None, max_iter=100, min_length=2, pad_kwargs=None, **kwargs): """ Identifies baseline segments by thresholding the rolling standard deviation distribution. Baseline points are identified as any point where the rolling standard deviation is less than the specified threshold. Peak regions are iteratively interpolated until the baseline is below the data. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. x_data : array-like, shape (N,), optional The x-values of the measured data. Default is None, which will create an array from -1 to 1 with N points. half_window : int, optional The half-window to use for the rolling standard deviation calculation. Should be approximately equal to the full-width-at-half-maximum of the peaks or features in the data. Default is None, which will use half of the value from :func:`.optimize_window`, which is not always a good value, but at least scales with the number of data points and gives a starting point for tuning the parameter. threshold : float or Callable, optional All points in the rolling standard deviation below `threshold` will be considered as baseline. Higher values will assign more points as baseline. Default is None, which will set the threshold as the 15th percentile of the rolling standard deviation. If `threshold` is Callable, it should take the rolling standard deviation as the only argument and output a float. min_fwhm : int, optional After creating the interpolated baseline, any region where the baseline is greater than the data for `min_fwhm` consecutive points will have an additional baseline point added and reinterpolated. Should be set to approximately the index-based full-width-at-half-maximum of the smallest peak. Default is None, which uses ``2 * half_window``. interp_half_window : int, optional When interpolating between baseline segments, will use the average of ``data[i-interp_half_window:i+interp_half_window+1]``, where `i` is the index of the peak start or end, to fit the linear segment. Default is 5. smooth_half_window : int, optional The half window to use for smoothing the interpolated baseline with a moving average. Default is None, which will use `half_window`. Set to 0 to not smooth the baseline. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. max_iter : int, optional The maximum number of iterations to attempt to fill in regions where the baseline is greater than the input data. Default is 100. min_length : int, optional Any region of consecutive baseline points less than `min_length` is considered to be a false positive and all points in the region are converted to peak points. A higher `min_length` ensures less points are falsely assigned as baseline points. Default is 2, which only removes lone baseline points. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from smoothing. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. Notes ----- Only covers the baseline correction from FastChrom, not its peak finding and peak grouping capabilities. References ---------- Johnsen, L., et al. An automated method for baseline correction, peak finding and peak grouping in chromatographic data. Analyst. 2013, 138, 3502-3511. """
[docs] @_classification_wrapper def cwt_br(data, x_data=None, poly_order=5, scales=None, num_std=1.0, min_length=2, max_iter=50, tol=1e-3, symmetric=False, weights=None, pad_kwargs=None, **kwargs): """ Continuous wavelet transform baseline recognition (CWT-BR) algorithm. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. x_data : array-like, shape (N,), optional The x-values of the measured data. Default is None, which will create an array from -1 to 1 with N points. poly_order : int, optional The polynomial order for fitting the baseline. Default is 5. scales : array-like, optional The scales at which to perform the continuous wavelet transform. Default is None, num_std : float, optional The number of standard deviations to include when thresholding. Default is 1.0. min_length : int, optional Any region of consecutive baseline points less than `min_length` is considered to be a false positive and all points in the region are converted to peak points. A higher `min_length` ensures less points are falsely assigned as baseline points. Default is 2, which only removes lone baseline points. max_iter : int, optional The maximum number of iterations. Default is 50. tol : float, optional The exit criteria. Default is 1e-3. symmetric : bool, optional When fitting the identified baseline points with a polynomial, if `symmetric` is False (default), will add any point `i` as a baseline point where the fit polynomial is greater than the input data for ``N/100`` consecutive points on both sides of point `i`. If `symmetric` is True, then it means that both positive and negative peaks exist and baseline points are not modified during the polynomial fitting. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from convolution for the continuous wavelet transform. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. * 'tol_history': numpy.ndarray An array containing the calculated tolerance values for each iteration. The length of the array is the number of iterations completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. * 'best_scale' : scalar The scale at which the Shannon entropy of the continuous wavelet transform of the data is at a minimum. Notes ----- Uses the standard deviation for determining outliers during polynomial fitting rather than the standard error as used in the reference since the number of standard errors to include when thresholding varies with data size while the number of standard deviations is independent of data size. References ---------- Bertinetto, C., et al. Automatic Baseline Recognition for the Correction of Large Sets of Spectra Using Continuous Wavelet Transform and Iterative Fitting. Applied Spectroscopy, 2014, 68(2), 155-164. """
def _haar(num_points, scale=2): """ Creates a Haar wavelet. Parameters ---------- num_points : int The number of points for the wavelet. Note that if `num_points` is odd and `scale` is even, or if `num_points` is even and `scale` is odd, then the length of the output wavelet will be `num_points` + 1 to ensure the symmetry of the wavelet. scale : int, optional The scale at which the wavelet is evaluated. Default is 2. Returns ------- wavelet : numpy.ndarray The Haar wavelet. Notes ----- This implementation is only designed to work for integer scales. Matches pywavelets's Haar implementation after applying patches from pywavelets issue #365 and pywavelets pull request #580. Raises ------ TypeError Raised if `scale` is not an integer. References ---------- https://wikipedia.org/wiki/Haar_wavelet """ if not isinstance(scale, int): raise TypeError('scale must be an integer for the Haar wavelet') # to maintain symmetry, even scales should have even windows and odd # scales have odd windows odd_scale = scale % 2 odd_window = num_points % 2 if (odd_scale and not odd_window) or (not odd_scale and odd_window): num_points += 1 # center at 0 rather than 1/2 to make calculation easier # from [-scale/2 to 0), wavelet = 1; [0, scale/2), wavelet = -1 x_vals = np.arange(num_points) - (num_points - 1) / 2 wavelet = np.zeros(num_points) if not odd_scale: wavelet[(x_vals >= -scale / 2) & (x_vals < 0)] = 1 wavelet[(x_vals < scale / 2) & (x_vals >= 0)] = -1 else: # set such that wavelet[x_vals == 0] = 0 wavelet[(x_vals > -scale / 2) & (x_vals < 0)] = 1 wavelet[(x_vals < scale / 2) & (x_vals > 0)] = -1 # the 1/sqrt(scale) is a normalization return wavelet / (np.sqrt(scale)) # adapted from scipy (scipy/signal/_wavelets.py/ricker); see license above def _ricker(points, a): """ Return a Ricker wavelet, also known as the "Mexican hat wavelet". It models the function: ``A * (1 - (x/a)**2) * exp(-0.5*(x/a)**2)``, where ``A = 2/(sqrt(3*a)*(pi**0.25))``. Parameters ---------- points : int Number of points in `vector`. Will be centered around 0. a : scalar Width parameter of the wavelet. Returns ------- vector : (N,) ndarray Array of length `points` in shape of ricker curve. Notes ----- This function was deprecated from scipy.signal in version 1.12. """ A = 2 / (np.sqrt(3 * a) * (np.pi**0.25)) wsq = a**2 vec = np.arange(0, points) - (points - 1.0) / 2 xsq = vec**2 mod = (1 - xsq / wsq) gauss = np.exp(-xsq / (2 * wsq)) total = A * mod * gauss return total # adapted from scipy (scipy/signal/_wavelets.py/cwt); see license above def _cwt(data, wavelet, widths, dtype=None, **kwargs): """ Continuous wavelet transform. Performs a continuous wavelet transform on `data`, using the `wavelet` function. A CWT performs a convolution with `data` using the `wavelet` function, which is characterized by a width parameter and length parameter. The `wavelet` function is allowed to be complex. Parameters ---------- data : array-like, shape (N,) Data on which to perform the transform. wavelet : Callable Wavelet function, which should take 2 arguments. The first argument is the number of points that the returned vector will have (len(wavelet(length,width)) == length). The second is a width parameter, defining the size of the wavelet (e.g. standard deviation of a gaussian). See `ricker`, which satisfies these requirements. widths : Sequence[scalar, ...] Widths to use for transform with length `M`. dtype : type or numpy.dtype, optional The desired data type of output. Defaults to ``float64`` if the output of `wavelet` is real and ``complex128`` if it is complex. **kwargs Keyword arguments passed to wavelet function. Returns ------- cwt: numpy.ndarray, shape (M, N) Will have shape of (len(widths), len(data)). Notes ----- This function was deprecated from scipy.signal in version 1.12. References ---------- S. Mallat, "A Wavelet Tour of Signal Processing (3rd Edition)", Academic Press, 2009. """ # Determine output type if dtype is None: if np.asarray(wavelet(1, widths[0], **kwargs)).dtype.char in 'FDG': dtype = np.complex128 else: dtype = np.float64 output = np.empty((len(widths), len(data)), dtype=dtype) for ind, width in enumerate(widths): N = np.min([10 * width, len(data)]) wavelet_data = np.conj(wavelet(N, width, **kwargs)[::-1]) output[ind] = convolve(data, wavelet_data, mode='same') return output
[docs] @_classification_wrapper def fabc(data, lam=1e6, scale=None, num_std=3.0, diff_order=2, min_length=2, weights=None, weights_as_mask=False, x_data=None, pad_kwargs=None, **kwargs): """ Fully automatic baseline correction (fabc). Similar to Dietrich's method, except that the derivative is estimated using a continuous wavelet transform and the baseline is calculated using Whittaker smoothing through the identified baseline points. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. lam : float, optional The smoothing parameter. Larger values will create smoother baselines. Default is 1e6. scale : int, optional The scale at which to calculate the continuous wavelet transform. Should be approximately equal to the index-based full-width-at-half-maximum of the peaks or features in the data. Default is None, which will use half of the value from :func:`.optimize_window`, which is not always a good value, but at least scales with the number of data points and gives a starting point for tuning the parameter. num_std : float, optional The number of standard deviations to include when thresholding. Higher values will assign more points as baseline. Default is 3.0. diff_order : int, optional The order of the differential matrix. Must be greater than 0. Default is 2 (second order differential matrix). Typical values are 2 or 1. min_length : int, optional Any region of consecutive baseline points less than `min_length` is considered to be a false positive and all points in the region are converted to peak points. A higher `min_length` ensures less points are falsely assigned as baseline points. Default is 2, which only removes lone baseline points. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered baseline points. If None (default), then will be an array with size equal to N and all values set to 1. weights_as_mask : bool, optional If True, signifies that the input `weights` is the mask to use for fitting, which skips the continuous wavelet calculation and just smooths the input data. Default is False. x_data : array-like, optional The x-values. Not used by this function, but input is allowed for consistency with other functions. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from convolution for the continuous wavelet transform. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. params : dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. * 'weights': numpy.ndarray, shape (N,) The weight array used for fitting the data. Notes ----- The classification of baseline points is similar to :meth:`~.Baseline.dietrich`, except that this method approximates the first derivative using a continous wavelet transform with the Haar wavelet, which is more robust than the numerical derivative in Dietrich's method. References ---------- Cobas, J., et al. A new general-purpose fully automatic baseline-correction procedure for 1D and 2D NMR data. Journal of Magnetic Resonance, 2006, 183(1), 145-151. """
[docs] @_classification_wrapper def rubberband(data, x_data=None, segments=1, lam=None, diff_order=2, weights=None, smooth_half_window=None, pad_kwargs=None, **kwargs): """ Identifies baseline points by fitting a convex hull to the bottom of the data. Parameters ---------- data : array-like, shape (N,) The y-values of the measured data, with N data points. x_data : array-like, shape (N,), optional The x-values of the measured data. Default is None, which will create an array from -1 to 1 with N points. segments : int or array-like[int], optional Used to fit multiple convex hulls to the data to negate the effects of concave data. If the input is an integer, it sets the number of equally sized segments the data will be split into. If the input is an array-like, each integer in the array will be the index that splits two segments, which allows constructing unequally sized segments. Default is 1, which fits a single convex hull to the data. lam : float or None, optional The smoothing parameter for interpolating the baseline points using Whittaker smoothing. Set to 0 or None to use linear interpolation instead. Default is None, which does not smooth. diff_order : int, optional The order of the differential matrix if using Whittaker smoothing. Must be greater than 0. Default is 2 (second order differential matrix). Typical values are 2 or 1. weights : array-like, shape (N,), optional The weighting array, used to override the function's baseline identification to designate peak points. Only elements with 0 or False values will have an effect; all non-zero values are considered potential baseline points. If None (default), then will be an array with size equal to N and all values set to 1. smooth_half_window : int or None, optional The half window to use for smoothing the input data with a moving average before calculating the convex hull, which gives much better results for noisy data. Set to None (default) or 0 to not smooth the data. pad_kwargs : dict, optional A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding the edges of the data to prevent edge effects from smoothing. Default is None. **kwargs .. deprecated:: 1.2.0 Passing additional keyword arguments is deprecated and will be removed in version 1.4.0. Pass keyword arguments using `pad_kwargs`. Returns ------- baseline : numpy.ndarray, shape (N,) The calculated baseline. dict A dictionary with the following items: * 'mask': numpy.ndarray, shape (N,) The boolean array designating baseline points as True and peak points as False. Raises ------ ValueError Raised if the number of segments is less than 1 or too large to allow interpolation. """