# -*- coding: utf-8 -*-
"""Smoothing-based techniques for fitting baselines to experimental data.
Created on March 7, 2021
@author: Donald Erb
"""
import warnings
import numpy as np
from scipy.ndimage import median_filter, uniform_filter1d
from scipy.signal import savgol_coeffs
from ._algorithm_setup import _Algorithm, _class_wrapper
from ._compat import jit, trapezoid
from ._validation import _check_half_window, _check_scalar
from .utils import (
ParameterWarning, _get_edges, gaussian, gaussian_kernel, optimize_window,
pad_edges, padded_convolve, relative_difference
)
class _Smooth(_Algorithm):
"""A base class for all smoothing algorithms."""
[docs]
@_Algorithm._register
def snip(self, data, max_half_window=None, decreasing=False, smooth_half_window=None,
filter_order=2, pad_kwargs=None, **kwargs):
"""
Statistics-sensitive Non-linear Iterative Peak-clipping (SNIP).
Parameters
----------
data : array-like, shape (N,)
The y-values of the measured data, with N data points.
max_half_window : int or Sequence(int, int), optional
The maximum number of iterations. Should be set such that
`max_half_window` is approxiamtely ``(w-1)/2``, where ``w`` is the index-based
width of a feature or peak. `max_half_window` can also be a sequence of
two integers for asymmetric peaks, with the first item corresponding to
the `max_half_window` of the peak's left edge, and the second item
for the peak's right edge [3]_. Default is None, which will use the output
from :func:`.optimize_window`, which is an okay starting value.
decreasing : bool, optional
If False (default), will iterate through window sizes from 1 to
`max_half_window`. If True, will reverse the order and iterate from
`max_half_window` to 1, which gives a smoother baseline according to [3]_
and [4]_.
smooth_half_window : int, optional
The half window to use for smoothing the data. If `smooth_half_window`
is greater than 0, will perform a moving average smooth on the data for
each window, which gives better results for noisy data [3]_. Default is
None, which will not perform any smoothing.
filter_order : {2, 4, 6, 8}, optional
If the measured data has a more complicated baseline consisting of other
elements such as Compton edges, then a higher `filter_order` should be
selected [3]_. Default is 2, which works well for approximating a linear
baseline.
pad_kwargs : dict, optional
A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding
the edges of the data to prevent edge effects from smoothing. Default is None.
**kwargs
.. deprecated:: 1.2.0
Passing additional keyword arguments is deprecated and will be removed in version
1.4.0. Pass keyword arguments using `pad_kwargs`.
Returns
-------
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
dict
An empty dictionary, just to match the output of all other algorithms.
Raises
------
ValueError
Raised if `filter_order` is not 2, 4, 6, or 8.
Warns
-----
UserWarning
Raised if max_half_window is greater than (len(data) - 1) // 2.
Notes
-----
Algorithm initially developed by [1]_, and this specific version of the
algorithm is adapted from [2]_, [3]_, and [4]_.
If data covers several orders of magnitude, better results can be obtained
by first transforming the data using log-log-square transform before
using SNIP [2]_::
transformed_data = np.log(np.log(np.sqrt(data + 1) + 1) + 1)
and then baseline can then be reverted back to the original scale using inverse::
baseline = -1 + (np.exp(np.exp(snip(transformed_data)) - 1) - 1)**2
References
----------
.. [1] Ryan, C.G., et al. SNIP, A Statistics-Sensitive Background Treatment
For The Quantitative Analysis Of Pixe Spectra In Geoscience Applications.
Nuclear Instruments and Methods in Physics Research B, 1988, 934, 396-402.
.. [2] Morháč, M., et al. Background elimination methods for multidimensional
coincidence γ-ray spectra. Nuclear Instruments and Methods in Physics
Research A, 1997, 401, 113-132.
.. [3] Morháč, M., et al. Peak Clipping Algorithms for Background Estimation in
Spectroscopic Data. Applied Spectroscopy, 2008, 62(1), 91-106.
.. [4] Morháč, M. An algorithm for determination of peak regions and baseline
elimination in spectroscopic data. Nuclear Instruments and Methods in
Physics Research A, 2009, 60, 478-487.
"""
# TODO potentially add adaptive window sizes from [4]_, or at least allow inputting
# an array of max_half_windows; would need to have a separate function for array
# windows since it would no longer be able to be vectorized
if filter_order not in {2, 4, 6, 8}:
raise ValueError('filter_order must be 2, 4, 6, or 8')
if max_half_window is None:
max_half_window = optimize_window(data)
half_windows = _check_half_window(max_half_window, two_d=True)
for i, half_window in enumerate(half_windows):
if half_window > (self._size - 1) // 2:
warnings.warn(
'max_half_window values greater than (len(data) - 1) / 2 have no effect.',
ParameterWarning, stacklevel=2
)
half_windows[i] = (self._size - 1) // 2
max_of_half_windows = np.max(half_windows)
if decreasing:
range_args = (max_of_half_windows, 0, -1)
else:
range_args = (1, max_of_half_windows + 1, 1)
y = self._setup_smooth(data, max_of_half_windows, pad_kwargs=pad_kwargs, **kwargs)[0]
num_y = self._size + 2 * max_of_half_windows
smooth = smooth_half_window is not None and smooth_half_window > 0
if smooth:
smooth_window = 2 * _check_half_window(smooth_half_window) + 1
baseline = y.copy()
for i in range(*range_args):
i_left = min(i, half_windows[0])
i_right = min(i, half_windows[1])
filters = (
baseline[i - i_left:num_y - i - i_left] + baseline[i + i_right:num_y - i + i_right]
) / 2
if filter_order > 2:
filters_new = (
- (
baseline[i - i_left:num_y - i - i_left]
+ baseline[i + i_right:num_y - i + i_right]
)
+ 4 * (
baseline[i - i_left // 2:-i - i_left // 2]
+ baseline[i + i_right // 2:-i + i_right // 2]
)
) / 6
filters = np.maximum(filters, filters_new)
if filter_order > 4:
filters_new = (
baseline[i - i_left:num_y - i - i_left]
+ baseline[i + i_right:num_y - i + i_right]
- 6 * (
baseline[i - 2 * i_left // 3:-i - 2 * i_left // 3]
+ baseline[i + 2 * i_right // 3:-i + 2 * i_right // 3]
)
+ 15 * (
baseline[i - i_left // 3:-i - i_left // 3]
+ baseline[i + i_right // 3:-i + i_right // 3]
)
) / 20
filters = np.maximum(filters, filters_new)
if filter_order > 6:
filters_new = (
- (
baseline[i - i_left:num_y - i - i_left]
+ baseline[i + i_right:num_y - i + i_right]
)
+ 8 * (
baseline[i - 3 * i_left // 4:-i - 3 * i_left // 4]
+ baseline[i + 3 * i_right // 4:-i + 3 * i_right // 4]
)
- 28 * (
baseline[i - i_left // 2:-i - i_left // 2]
+ baseline[i + i_right // 2:-i + i_right // 2]
)
+ 56 * (
baseline[i - i_left // 4:-i - i_left // 4]
+ baseline[i + i_right // 4:-i + i_right // 4]
)
) / 70
filters = np.maximum(filters, filters_new)
if smooth:
previous_baseline = uniform_filter1d(baseline, smooth_window)[i:-i]
else:
previous_baseline = baseline[i:-i]
baseline[i:-i] = np.where(baseline[i:-i] > filters, filters, previous_baseline)
return baseline[max_of_half_windows:-max_of_half_windows], {}
[docs]
@_Algorithm._register
def swima(self, data, min_half_window=3, max_half_window=None, smooth_half_window=None,
pad_kwargs=None, **kwargs):
"""
Small-window moving average (SWiMA) baseline.
Computes an iterative moving average to smooth peaks and obtain the baseline.
Parameters
----------
data : array-like, shape (N,)
The y-values of the measured data, with N data points.
min_half_window : int, optional
The minimum half window value that must be reached before the exit criteria
is considered. Can be increased to reduce the calculation time. Default is 3.
max_half_window : int, optional
The maximum number of iterations. Default is None, which will use
(N - 1) / 2. Typically does not need to be specified.
smooth_half_window : int, optional
The half window to use for smoothing the input data with a moving average.
Default is None, which will use N / 50. Use a value of 0 or less to not
smooth the data. See Notes below for more details.
pad_kwargs : dict, optional
A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding
the edges of the data to prevent edge effects from smoothing. Default is None.
**kwargs
.. deprecated:: 1.2.0
Passing additional keyword arguments is deprecated and will be removed in version
1.4.0. Pass keyword arguments using `pad_kwargs`.
Returns
-------
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
dict
A dictionary with the following items:
* 'half_window': list(int)
A list of the half windows at which the exit criteria was reached.
Has a length of 1 if the main exit criteria was intially reached,
otherwise has a length of 2.
* 'converged': list(bool or None)
A list of the convergence status. Has a length of 1 if the main
exit criteria was intially reached, otherwise has a length of 2.
Each convergence status is True if the main exit criteria was
reached, False if the second exit criteria was reached, and None
if `max_half_window` is reached before either exit criteria.
Notes
-----
This algorithm requires the input data to be fairly smooth (noise-free), so it
is recommended to either smooth the data beforehand, or specify a
`smooth_half_window` value. Non-smooth data can cause the exit criteria to be
reached prematurely (can be avoided by setting a larger `min_half_window`), while
over-smoothed data can cause the exit criteria to be reached later than optimal.
The half-window at which convergence occurs is roughly close to the index-based
full-width-at-half-maximum of a peak or feature, but can vary. Therfore, it is
better to set a `min_half_window` that is smaller than expected to not miss the
exit criteria.
If the main exit criteria is not reached on the initial fit, a gaussian baseline
(which is well handled by this algorithm) is added to the data, and it is re-fit.
References
----------
Schulze, H., et al. A Small-Window Moving Average-Based Fully Automated
Baseline Estimation Method for Raman Spectra. Applied Spectroscopy, 2012,
66(7), 757-764.
"""
if max_half_window is None:
max_half_window = (self._size - 1) // 2
min_half_window = _check_half_window(min_half_window, allow_zero=True)
y = self._setup_smooth(data, max_half_window, pad_kwargs=pad_kwargs, **kwargs)[0]
len_y = self._size + 2 * max_half_window # includes padding of max_half_window at each side
data_slice = slice(max_half_window, -max_half_window)
if smooth_half_window is None:
smooth_half_window = max(1, (len_y - 2 * max_half_window) // 50)
if smooth_half_window > 0:
y = uniform_filter1d(y, 2 * _check_half_window(smooth_half_window) + 1)
*_, pseudo_inverse = self._setup_polynomial(
y, None, poly_order=3, calc_vander=True, calc_pinv=True
)
baseline, converged, half_window = _swima_loop(
y, self._polynomial.vandermonde, pseudo_inverse, data_slice, max_half_window,
min_half_window
)
converges = [converged]
half_windows = [half_window]
if not converged:
residual = y - baseline
gaussian_bkg = gaussian(
np.arange(len_y), np.max(residual), len_y / 2, len_y / 6
)
baseline_2, converged, half_window = _swima_loop(
residual + gaussian_bkg, self._polynomial.vandermonde, pseudo_inverse, data_slice,
max_half_window, 3
)
baseline += baseline_2 - gaussian_bkg
converges.append(converged)
half_windows.append(half_window)
return baseline[data_slice], {'half_window': half_windows, 'converged': converges}
[docs]
@_Algorithm._register
def ipsa(self, data, half_window=None, max_iter=500, tol=None, roi=None,
original_criteria=False, pad_kwargs=None, **kwargs):
"""
Iterative Polynomial Smoothing Algorithm (IPSA).
Parameters
----------
data : array-like, shape (N,)
The y-values of the measured data, with N data points.
half_window : int
The half-window to use for the smoothing each iteration. Should be
approximately equal to the full-width-at-half-maximum of the peaks or features
in the data. Default is None, which will use 4 times the output of
:func:`.optimize_window`, which is not always a good value, but at least scales
with the number of data points and gives a starting point for tuning the parameter.
max_iter : int, optional
The maximum number of iterations. Default is 500.
tol : float, optional
The exit criteria. Default is None, which uses 1e-3 if `original_criteria` is
False, and ``1 / (max(data) - min(data))`` if `original_criteria` is True.
roi : slice or array-like, shape(N,)
The region of interest, such that ``np.asarray(data)[roi]`` gives the values
for calculating the tolerance if `original_criteria` is True. Not used if
`original_criteria` is True. Default is None, which uses all values in `data`.
original_criteria : bool, optional
Whether to use the original exit criteria from the reference, which is difficult
to use since it requires knowledge of how high the peaks should be after baseline
correction. If False (default), then compares ``norm(old, new) / norm(old)``, where
`old` is the previous iteration's baseline, and `new` is the current iteration's
baseline.
pad_kwargs : dict, optional
A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding
the edges of the data to prevent edge effects from smoothing. Default is None.
**kwargs
.. deprecated:: 1.2.0
Passing additional keyword arguments is deprecated and will be removed in version
1.4.0. Pass keyword arguments using `pad_kwargs`.
Returns
-------
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
params : dict
A dictionary with the following items:
* 'tol_history': numpy.ndarray
An array containing the calculated tolerance values for
each iteration. The length of the array is the number of iterations
completed. If the last value in the array is greater than the input
`tol` value, then the function did not converge.
References
----------
Wang, T., et al. Background Subtraction of Raman Spectra Based on Iterative
Polynomial Smoothing. Applied Spectroscopy. 2017, 71(6), 1169-1179.
"""
y, output_half_window = self._setup_smooth(
data, half_window, pad_type='full', window_multiplier=4, pad_kwargs=pad_kwargs,
**kwargs
)
window_size = 2 * output_half_window + 1
y0 = y
data_slice = slice(window_size, -window_size)
if original_criteria:
if roi is None:
roi = slice(None)
elif not isinstance(roi, slice):
roi = np.asarray(roi)
if tol is None:
if original_criteria:
# guess what the desired height should be; not a great guess, but it's
# something
tol = 1 / np.ptp(y[data_slice][roi])
else:
tol = 1e-3
savgol_coef = savgol_coeffs(window_size, 2)
tol_history = np.empty(max_iter + 1)
old_baseline = y[data_slice]
for i in range(max_iter + 1):
baseline = padded_convolve(y, savgol_coef, 'edge')
if original_criteria:
residual = (y0 - baseline)[data_slice][roi]
calc_tol = abs(residual.min() / residual.max())
else:
calc_tol = relative_difference(old_baseline, baseline[data_slice])
tol_history[i] = calc_tol
if calc_tol < tol:
break
y = np.minimum(y0, baseline)
old_baseline = baseline[data_slice]
return baseline[data_slice], {'tol_history': tol_history[:i + 1]}
[docs]
@_Algorithm._register
def ria(self, data, half_window=None, max_iter=500, tol=1e-2, side='both',
width_scale=0.1, height_scale=1., sigma_scale=1 / 12, pad_kwargs=None, **kwargs):
"""
Range Independent Algorithm (RIA).
Adds additional data to the left and/or right of the input data, and then
iteratively smooths until the area of the additional data is removed.
Parameters
----------
data : array-like, shape (N,)
The y-values of the measured data, with N data points.
half_window : int, optional
The half-window to use for the smoothing each iteration. Should be
approximately equal to the full-width-at-half-maximum of the peaks or features
in the data. Default is None, which will use the output of :func:`.optimize_window`,
which is not always a good value, but at least scales with the number of data points
and gives a starting point for tuning the parameter.
max_iter : int, optional
The maximum number of iterations. Default is 500.
tol : float, optional
The exit criteria. Default is 1e-2.
side : {'both', 'left', 'right'}, optional
The side of the measured data to extend. Default is 'both'.
width_scale : float, optional
The number of data points added to each side is `width_scale` * N. Default
is 0.1.
height_scale : float, optional
The height of the added Gaussian peak(s) is calculated as
`height_scale` * max(`data`). Default is 1.
sigma_scale : float, optional
The sigma value for the added Gaussian peak(s) is calculated as
`sigma_scale` * `width_scale` * N. Default is 1/12, which will make
the Gaussian span +- 6 sigma, making its total width about half of the
added length.
pad_kwargs : dict, optional
A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding
the edges of the data when adding the extended left and/or right sections.
**kwargs
.. deprecated:: 1.2.0
Passing additional keyword arguments is deprecated and will be removed in version
1.4.0. Pass keyword arguments using `pad_kwargs`.
Returns
-------
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
params : dict
A dictionary with the following items:
* 'tol_history': numpy.ndarray
An array containing the calculated tolerance values for
each iteration. The length of the array is the number of iterations
completed. If the last value in the array is greater than the input
`tol` value, then the function did not converge (if the array length
is equal to `max_iter`) or the areas of the smoothed extended regions
exceeded their initial areas (if the array length is < `max_iter`).
Raises
------
ValueError
Raised if `side` is not 'left', 'right', or 'both'.
References
----------
Krishna, H., et al. Range-independent background subtraction algorithm for
recovery of Raman spectra of biological tissue. J Raman Spectroscopy. 2012,
43(12), 1884-1894.
"""
side = side.lower()
if side not in ('left', 'right', 'both'):
raise ValueError('side must be "left", "right", or "both"')
# note: only pass pad_kwargs to setup_smooth to validate pad_kwargs and kwargs; no
# padding is done
y, half_window = self._setup_smooth(
data, half_window, pad_type=None, pad_kwargs=pad_kwargs, **kwargs
)
min_x, max_x = self.x_domain
x_range = max_x - min_x
added_window = int(self._size * width_scale)
lower_bound = 0
upper_bound = 0
known_area = 0.
# TODO should make this a function that could be used by
# optimizers.optimize_extended_range too
pad_kwargs = pad_kwargs if pad_kwargs is not None else {}
added_left, added_right = _get_edges(y, added_window, **pad_kwargs, **kwargs)
added_gaussian = gaussian(
np.linspace(-added_window / 2, added_window / 2, added_window),
height_scale * abs(y.max()), 0, added_window * sigma_scale
)
if side in ('left', 'both'):
added_x_left = np.linspace(
min_x - x_range * (width_scale / 2), min_x, added_window + 1
)[:-1]
added_y_left = added_gaussian + added_left
lower_bound = added_window
known_area += trapezoid(added_gaussian, added_x_left)
else:
added_x_left = []
added_y_left = []
if side in ('right', 'both'):
added_x_right = np.linspace(
max_x, max_x + x_range * (width_scale / 2), added_window + 1
)[1:]
added_y_right = added_gaussian + added_right
upper_bound = added_window
known_area += trapezoid(added_gaussian, added_x_right)
else:
added_x_right = []
added_y_right = []
fit_x_data = np.concatenate((added_x_left, self.x, added_x_right))
fit_data = np.concatenate((added_y_left, y, added_y_right))
upper_max = fit_data.shape[0] - upper_bound
tol_history = np.empty(max_iter)
window_size = 2 * half_window + 1
smoother_array = pad_edges(fit_data, window_size, extrapolate_window=2)
data_slice = slice(window_size, -window_size)
for i in range(max_iter):
# only smooth fit_data so that the outer section remains unchanged by
# smoothing and edge effects are ignored
smoother_array[data_slice] = uniform_filter1d(smoother_array, window_size)[data_slice]
residual = fit_data - smoother_array[data_slice]
calc_area = trapezoid(residual[:lower_bound], fit_x_data[:lower_bound])
if upper_bound:
calc_area += trapezoid(residual[-upper_bound:], fit_x_data[-upper_bound:])
calc_difference = relative_difference(known_area, calc_area)
tol_history[i] = calc_difference
if calc_difference < tol or calc_area > known_area:
break
smoother_array[data_slice] = np.minimum(fit_data, smoother_array[data_slice])
baseline = smoother_array[data_slice][lower_bound:upper_max]
return baseline, {'tol_history': tol_history[:i + 1]}
[docs]
@_Algorithm._register
def peak_filling(self, data, half_window=None, sections=None, max_iter=5, lam_smooth=None):
"""
The 4S (Smooth, Subsample, Suppress, Stretch) Peak Filling algorithm.
Smooths and truncates the input. Each value is then replaced in-place by the minimum of
the value or the average of the moving window, with the half-window size decreasing
exponentially from the input `half_window` to 1. The result is then interpolated back
into the original data size.
Parameters
----------
data : array-like, shape (N,)
The y-values of the measured data, with N data points.
half_window : int, optional
The index-based size to use for the moving average window. The total window
size will range from [-half_window, ..., half_window] with size
``2 * half_window + 1``. Default is None, which will use two or three times the
output from func:`.optimize_window`, which is an okay starting value.
sections : int or Sequence[int, ...], optional
If the input is an integer, it sets the number of equally sized
segments the data will be split into. If the input is a sequence, each integer
in the sequence will be the index that splits two segments, which allows
constructing unequally sized segments. The minimum of each section will be used
to represent the input data for determining the baseline. Higher `sections` values
are needed for baselines with higher curvature. Default is None, which will use
``N // 10``.
max_iter : int, optional
The number of iterations to perform smoothing. Each iteration, the size of the
window used for the moving average will shrink logarithmically, starting at
``2 * half_window + 1`` and ending at 3. Default is 5.
lam_smooth : float or None, optional
The parameter for smoothing the input using Whittaker smoothing.
Set to 0 or None (default) to skip smoothing.
Returns
-------
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
params : dict
A dictionary with the following items:
* 'x_fit': numpy.ndarray, shape (P,)
The truncated x-values used for fitting and interpolating the baseline.
* 'baseline_fit': numpy.ndarray, shape (P,)
The truncated baseline values used to interpolate the final baseline.
Raises
------
TypeError
Raised if `sections` is an integer not between 1 and ``N``, or if `sections`
is a sequence with any value not between 0 and ``N - 1``.
Notes
-----
The input parameter `sections` will determine the necessary `half_window` and `max_iter`
values required to correctly fit the baseline. Likewise, `max_iter` is highly correlated
with `half_window`.
References
----------
Liland, K. 4S Peak Filling - baseline estimation by iterative mean suppression. MethodsX.
2015, 2, 135-140.
"""
if sections is None:
sections = self._size // 10
scalar_sections = True
else:
sections, scalar_sections = _check_scalar(
sections, None, coerce_0d=False, dtype=np.intp
)
if scalar_sections and (sections < 1 or sections > self._size):
raise ValueError(
f'There must be between 1 and {self._size} sections for peak_filling'
)
elif (
not scalar_sections
and (np.any(sections < 0) or np.any(sections > self._size - 1))
):
raise ValueError(
f'Section indices must be between 0 and {self._size - 1} for peak_filling'
)
if scalar_sections:
y_truncated = np.empty(sections)
x_truncated = np.empty(sections)
indices = np.linspace(0, self._size, sections + 1, dtype=np.intp)
else:
# np.unique already sorts so do not need to check order
indices = np.unique(np.concatenate(([0], sections, [self._size])))
len_arrays = len(indices) - 1
y_truncated = np.empty(len_arrays)
x_truncated = np.empty(len_arrays)
if lam_smooth is not None and lam_smooth > 0:
_, _, whittaker_system = self._setup_whittaker(data, lam_smooth, diff_order=2)
data = whittaker_system.solve(whittaker_system.add_diagonal(1.), data)
for i, (left_idx, right_idx) in enumerate(zip(indices[:-1], indices[1:])):
y_truncated[i] = data[left_idx:right_idx].min()
x_truncated[i] = self.x[left_idx:right_idx].mean()
# include first and last values to prevent edge effects if they are not already included
left_pad = 1 if x_truncated[0] != self.x[0] else 0
right_pad = 1 if x_truncated[-1] != self.x[-1] else 0
x_truncated = np.pad(
x_truncated, [left_pad, right_pad], 'constant',
constant_values=([self.x[0], self.x[-1]])
)
y_truncated = np.pad(
y_truncated, [left_pad, right_pad], 'constant', constant_values=([data[0], data[-1]],)
)
_, half_win = self._setup_smooth(
y_truncated, half_window, pad_type=None, window_multiplier=3 if max_iter < 3 else 2
)
if half_win > (sections - 1) // 2:
if half_window is not None: # only emit warning if user input half window
warnings.warn(
'half_window values greater than (sections - 1) // 2 have no effect.',
ParameterWarning, stacklevel=2
)
half_win = (sections - 1) // 2
# logspace still works when max_iter=1; use ceil rather than using dtype=int
# in logspace since the int casting will floor the result and cause several half
# windows of 1
half_windows = np.ceil(np.logspace(np.log10(half_win), 0, max_iter)).astype(int)
half_windows[0] = half_win # rounding issues can shift initial half window +- 1
for half_win in half_windows:
y_truncated = _directional_min_moving_avg(y_truncated, sections, half_win)
y_truncated = _directional_min_moving_avg(y_truncated[::-1], sections, half_win)[::-1]
baseline = np.interp(self.x, x_truncated, y_truncated)
return baseline, {'x_fit': x_truncated, 'baseline_fit': y_truncated}
_smooth_wrapper = _class_wrapper(_Smooth)
[docs]
@_smooth_wrapper
def snip(data, max_half_window=None, decreasing=False, smooth_half_window=None,
filter_order=2, x_data=None, pad_kwargs=None, **kwargs):
"""
Statistics-sensitive Non-linear Iterative Peak-clipping (SNIP).
Parameters
----------
data : array-like, shape (N,)
The y-values of the measured data, with N data points.
max_half_window : int or Sequence(int, int), optional
The maximum number of iterations. Should be set such that
`max_half_window` is approxiamtely ``(w-1)/2``, where ``w`` is the index-based
width of a feature or peak. `max_half_window` can also be a sequence of
two integers for asymmetric peaks, with the first item corresponding to
the `max_half_window` of the peak's left edge, and the second item
for the peak's right edge [3]_. Default is None, which will use the output
from :func:`.optimize_window`, which is an okay starting value.
decreasing : bool, optional
If False (default), will iterate through window sizes from 1 to
`max_half_window`. If True, will reverse the order and iterate from
`max_half_window` to 1, which gives a smoother baseline according to [3]_
and [4]_.
smooth_half_window : int, optional
The half window to use for smoothing the data. If `smooth_half_window`
is greater than 0, will perform a moving average smooth on the data for
each window, which gives better results for noisy data [3]_. Default is
None, which will not perform any smoothing.
filter_order : {2, 4, 6, 8}, optional
If the measured data has a more complicated baseline consisting of other
elements such as Compton edges, then a higher `filter_order` should be
selected [3]_. Default is 2, which works well for approximating a linear
baseline.
x_data : array-like, optional
The x-values. Not used by this function, but input is allowed for consistency
with other functions.
pad_kwargs : dict, optional
A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding
the edges of the data to prevent edge effects from smoothing. Default is None.
**kwargs
.. deprecated:: 1.2.0
Passing additional keyword arguments is deprecated and will be removed in version
1.4.0. Pass keyword arguments using `pad_kwargs`.
Returns
-------
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
dict
An empty dictionary, just to match the output of all other algorithms.
Raises
------
ValueError
Raised if `filter_order` is not 2, 4, 6, or 8.
Warns
-----
UserWarning
Raised if max_half_window is greater than (len(data) - 1) // 2.
Notes
-----
Algorithm initially developed by [1]_, and this specific version of the
algorithm is adapted from [2]_, [3]_, and [4]_.
If data covers several orders of magnitude, better results can be obtained
by first transforming the data using log-log-square transform before
using SNIP [2]_::
transformed_data = np.log(np.log(np.sqrt(data + 1) + 1) + 1)
and then baseline can then be reverted back to the original scale using inverse::
baseline = -1 + (np.exp(np.exp(snip(transformed_data)) - 1) - 1)**2
References
----------
.. [1] Ryan, C.G., et al. SNIP, A Statistics-Sensitive Background Treatment
For The Quantitative Analysis Of Pixe Spectra In Geoscience Applications.
Nuclear Instruments and Methods in Physics Research B, 1988, 934, 396-402.
.. [2] Morháč, M., et al. Background elimination methods for multidimensional
coincidence γ-ray spectra. Nuclear Instruments and Methods in Physics
Research A, 1997, 401, 113-132.
.. [3] Morháč, M., et al. Peak Clipping Algorithms for Background Estimation in
Spectroscopic Data. Applied Spectroscopy, 2008, 62(1), 91-106.
.. [4] Morháč, M. An algorithm for determination of peak regions and baseline
elimination in spectroscopic data. Nuclear Instruments and Methods in
Physics Research A, 2009, 60, 478-487.
"""
def _swima_loop(y, vander, pseudo_inverse, data_slice, max_half_window, min_half_window=3):
"""
Computes an iterative moving average to smooth peaks and obtain the baseline.
The internal loop of the small-window moving average (SWiMA) algorithm.
Parameters
----------
y : numpy.ndarray, shape (N + 2 * max_half_window,)
The array of the measured data with N data points padded at each edge with
`max_half_window` extra data points.
vander : numpy.ndarray, shape (N - 1, 4)
The Vandermonde matrix for computing the 3rd order polynomial fit of the
differential of the residual. Used for the alternate exit criteria.
pseudo_inverse : numpy.ndarray, shape (4, N - 1)
The pseudo-inverse of the Vandermonde matrix for computing the 3rd order
polynomial fit of the differential of the residual. Used for the alternate
exit criteria.
data_slice : slice
The slice used for separating the actual values of `y` from the extended y
array.
max_half_window : int
The maximum allowable half window.
min_half_window : int, optional
The minimum half window that must be reached before exit criteria are
considered. Default is 3.
Returns
-------
baseline : numpy.ndarray, shape (N + 2 * max_half_window,)
The baseline with the padded edges.
converged : bool or None
Whether the main exit criteria was achieved. True if it was, False
if the alternate exit criteria was achieved, and None if `max_half_window`
was reached before either exit criteria.
half_window : int
The half window at which the exit criteria was reached.
Notes
-----
Uses a moving average rather than a 0-degree Savitzky-Golay filter since
they are equivalent and the moving average is faster.
The second exit criteria is based on Figure 2 in the reference, since the
slightly different definition of criteria two stated in the text was always
reached before the main exit criteria, which is not the desired outcome.
References
----------
Schulze, H., et al. A Small-Window Moving Average-Based Fully Automated
Baseline Estimation Method for Raman Spectra. Applied Spectroscopy, 2012,
66(7), 757-764.
"""
actual_y = y[data_slice]
baseline = y
min_half_window_check = min_half_window - 2
area_current = -1
area_old = -1
converged = None
for half_window in range(1, max_half_window + 1):
baseline_new = np.minimum(baseline, uniform_filter1d(baseline, 2 * half_window + 1))
# only begin calculating the area when near the lowest allowed half window
if half_window > min_half_window_check:
area_new = trapezoid(baseline[data_slice] - baseline_new[data_slice])
# exit criteria 1
if area_new > area_current and area_current < area_old:
converged = True
# subtract 1 since minimum area was reached the previous iteration
half_window -= 1
break
if half_window > min_half_window:
diff_current = np.gradient(actual_y - baseline_new[data_slice])
poly_diff_current = trapezoid(abs(vander @ (pseudo_inverse @ diff_current)))
# exit criteria 2, means baseline is not well fit
if poly_diff_current > 0.15 * trapezoid(abs(diff_current)):
converged = False
break
area_old = area_current
area_current = area_new
baseline = baseline_new
return baseline, converged, half_window
[docs]
@_smooth_wrapper
def swima(data, min_half_window=3, max_half_window=None, smooth_half_window=None,
x_data=None, pad_kwargs=None, **kwargs):
"""
Small-window moving average (SWiMA) baseline.
Computes an iterative moving average to smooth peaks and obtain the baseline.
Parameters
----------
data : array-like, shape (N,)
The y-values of the measured data, with N data points.
min_half_window : int, optional
The minimum half window value that must be reached before the exit criteria
is considered. Can be increased to reduce the calculation time. Default is 3.
max_half_window : int, optional
The maximum number of iterations. Default is None, which will use
(N - 1) / 2. Typically does not need to be specified.
smooth_half_window : int, optional
The half window to use for smoothing the input data with a moving average.
Default is None, which will use N / 50. Use a value of 0 or less to not
smooth the data. See Notes below for more details.
x_data : array-like, optional
The x-values. Not used by this function, but input is allowed for consistency
with other functions.
pad_kwargs : dict, optional
A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding
the edges of the data to prevent edge effects from smoothing. Default is None.
**kwargs
.. deprecated:: 1.2.0
Passing additional keyword arguments is deprecated and will be removed in version
1.4.0. Pass keyword arguments using `pad_kwargs`.
Returns
-------
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
dict
A dictionary with the following items:
* 'half_window': list(int)
A list of the half windows at which the exit criteria was reached.
Has a length of 1 if the main exit criteria was intially reached,
otherwise has a length of 2.
* 'converged': list(bool or None)
A list of the convergence status. Has a length of 1 if the main
exit criteria was intially reached, otherwise has a length of 2.
Each convergence status is True if the main exit criteria was
reached, False if the second exit criteria was reached, and None
if `max_half_window` is reached before either exit criteria.
Notes
-----
This algorithm requires the input data to be fairly smooth (noise-free), so it
is recommended to either smooth the data beforehand, or specify a
`smooth_half_window` value. Non-smooth data can cause the exit criteria to be
reached prematurely (can be avoided by setting a larger `min_half_window`), while
over-smoothed data can cause the exit criteria to be reached later than optimal.
The half-window at which convergence occurs is roughly close to the index-based
full-width-at-half-maximum of a peak or feature, but can vary. Therfore, it is
better to set a `min_half_window` that is smaller than expected to not miss the
exit criteria.
If the main exit criteria is not reached on the initial fit, a gaussian baseline
(which is well handled by this algorithm) is added to the data, and it is re-fit.
References
----------
Schulze, H., et al. A Small-Window Moving Average-Based Fully Automated
Baseline Estimation Method for Raman Spectra. Applied Spectroscopy, 2012,
66(7), 757-764.
"""
[docs]
@_smooth_wrapper
def ipsa(data, half_window=None, max_iter=500, tol=None, roi=None,
original_criteria=False, x_data=None, pad_kwargs=None, **kwargs):
"""
Iterative Polynomial Smoothing Algorithm (IPSA).
Parameters
----------
data : array-like, shape (N,)
The y-values of the measured data, with N data points.
half_window : int
The half-window to use for the smoothing each iteration. Should be
approximately equal to the full-width-at-half-maximum of the peaks or features
in the data. Default is None, which will use 4 times the output of
:func:`.optimize_window`, which is not always a good value, but at least scales
with the number of data points and gives a starting point for tuning the parameter.
max_iter : int, optional
The maximum number of iterations. Default is 500.
tol : float, optional
The exit criteria. Default is None, which uses 1e-3 if `original_criteria` is
False, and ``1 / (max(data) - min(data))`` if `original_criteria` is True.
roi : slice or array-like, shape(N,)
The region of interest, such that ``np.asarray(data)[roi]`` gives the values
for calculating the tolerance if `original_criteria` is True. Not used if
`original_criteria` is True. Default is None, which uses all values in `data`.
original_criteria : bool, optional
Whether to use the original exit criteria from the reference, which is difficult
to use since it requires knowledge of how high the peaks should be after baseline
correction. If False (default), then compares ``norm(old, new) / norm(old)``, where
`old` is the previous iteration's baseline, and `new` is the current iteration's
baseline.
x_data : array-like, optional
The x-values. Not used by this function, but input is allowed for consistency
with other functions.
pad_kwargs : dict, optional
A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding
the edges of the data to prevent edge effects from smoothing. Default is None.
**kwargs
.. deprecated:: 1.2.0
Passing additional keyword arguments is deprecated and will be removed in version
1.4.0. Pass keyword arguments using `pad_kwargs`.
Returns
-------
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
params : dict
A dictionary with the following items:
* 'tol_history': numpy.ndarray
An array containing the calculated tolerance values for
each iteration. The length of the array is the number of iterations
completed. If the last value in the array is greater than the input
`tol` value, then the function did not converge.
References
----------
Wang, T., et al. Background Subtraction of Raman Spectra Based on Iterative
Polynomial Smoothing. Applied Spectroscopy. 2017, 71(6), 1169-1179.
"""
[docs]
@_smooth_wrapper
def ria(data, x_data=None, half_window=None, max_iter=500, tol=1e-2, side='both',
width_scale=0.1, height_scale=1., sigma_scale=1. / 12., pad_kwargs=None, **kwargs):
"""
Range Independent Algorithm (RIA).
Adds additional data to the left and/or right of the input data, and then
iteratively smooths until the area of the additional data is removed.
Parameters
----------
data : array-like, shape (N,)
The y-values of the measured data, with N data points.
x_data : array-like, shape (N,), optional
The x-values of the measured data. Default is None, which will create an
array from -1 to 1 with N points.
half_window : int, optional
The half-window to use for the smoothing each iteration. Should be
approximately equal to the full-width-at-half-maximum of the peaks or features
in the data. Default is None, which will use the output of :func:`.optimize_window`,
which is not always a good value, but at least scales with the number of data points
and gives a starting point for tuning the parameter.
max_iter : int, optional
The maximum number of iterations. Default is 500.
tol : float, optional
The exit criteria. Default is 1e-2.
side : {'both', 'left', 'right'}, optional
The side of the measured data to extend. Default is 'both'.
width_scale : float, optional
The number of data points added to each side is `width_scale` * N. Default
is 0.1.
height_scale : float, optional
The height of the added Gaussian peak(s) is calculated as
`height_scale` * max(`data`). Default is 1.
sigma_scale : float, optional
The sigma value for the added Gaussian peak(s) is calculated as
`sigma_scale` * `width_scale` * N. Default is 1/12, which will make
the Gaussian span +- 6 sigma, making its total width about half of the
added length.
pad_kwargs : dict, optional
A dictionary of keyword arguments to pass to :func:`.pad_edges` for padding
the edges of the data when adding the extended left and/or right sections.
**kwargs
.. deprecated:: 1.2.0
Passing additional keyword arguments is deprecated and will be removed in version
1.4.0. Pass keyword arguments using `pad_kwargs`.
Returns
-------
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
params : dict
A dictionary with the following items:
* 'tol_history': numpy.ndarray
An array containing the calculated tolerance values for
each iteration. The length of the array is the number of iterations
completed. If the last value in the array is greater than the input
`tol` value, then the function did not converge (if the array length
is equal to `max_iter`) or the areas of the smoothed extended regions
exceeded their initial areas (if the array length is < `max_iter`).
Raises
------
ValueError
Raised if `side` is not 'left', 'right', or 'both'.
References
----------
Krishna, H., et al. Range-independent background subtraction algorithm for
recovery of Raman spectra of biological tissue. J Raman Spectroscopy. 2012,
43(12), 1884-1894.
"""
@jit(nopython=True, cache=True)
def _directional_min_moving_avg(y, data_len, half_window):
"""
Calculates the miniumum of a moving average and current value and modifies in-place.
Since the data is modified in-place, the smoothing has a directional
effect that is canceled out by calling this function a second time with
the reversed output and then reversing that output.
Parameters
----------
y : numpy.ndarray
The array of data to smooth.
data_len : int
The length of the array `y`. Used to prevent calling ``len(y)`` each
time this function is called.
half_window : int
The half window used for the moving average.
Returns
-------
y : numpy.ndarray
The smoothed input. The input `y` is also modified in-place.
Notes
-----
Increases and decreases the window width on the edges because otherwise the data
becomes shifted; to prevent shifting, the window must always be centered.
Uses a shrinking window rather than padding the data since [1]_ states (and is
readily observed when using typical moving minimums) that the output can otherwise
cause too low of a value when data is steep near the ends.
Calculates the rolling mean using Welford's method [2]_ to improve calculation speed over
calling ``numpy.mean`` on each subarray, which scales quite poorly compared to Welford's
method with increasing half-window sizes.
References
----------
.. [1] Liland, K. 4S Peak Filling - baseline estimation by iterative mean suppression.
MethodsX. 2 (2015) 135-140.
.. [2] https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
"""
if half_window > (data_len - 1) // 2:
half_window = (data_len - 1) // 2
window_size = 2 * half_window + 1
mean = y[0]
# fill the first window; have to go to half_window + 1 since the final calculation
# needs to adjust for the window finally filling up
last_window = 1
for i in range(1, half_window + 1):
new_window = last_window + 2 # 2 * i + 1 = 2 * (i - 1) + 1 + 2 == last_window + 2
# advance and grow window to recenter new window at index i
for j in range(last_window, new_window):
mean += (y[j] - mean) / (j + 1)
last_window = new_window
val = y[i]
if mean < val:
# adjust mean for new value
y[i] = mean
mean += (mean - val) / new_window
for i in range(half_window + 1, data_len - half_window):
mean += (y[i + half_window] - y[i - half_window - 1]) / window_size
val = y[i]
if mean < val:
y[i] = mean
mean += (mean - val) / window_size
# finally shrink window on right edge
last_window = window_size
for i in range(data_len - half_window, data_len - 1):
new_window = last_window - 2 # 2 * (i - 1) + 1 = 2 * i - 1 == last_window - 2
for j in range(data_len - last_window, data_len - new_window):
last_window -= 1
mean += (mean - y[j]) / last_window
val = y[i]
if mean < val:
y[i] = mean
mean += (mean - val) / new_window
return y
[docs]
@_smooth_wrapper
def peak_filling(data, x_data=None, half_window=None, sections=None, max_iter=5, lam_smooth=None):
"""
The 4S (Smooth, Subsample, Suppress, Stretch) Peak Filling algorithm.
Smooths and truncates the input. Each value is then replaced in-place by the minimum of
the value or the average of the moving window, with the half-window size decreasing
exponentially from the input `half_window` to 1. The result is then interpolated back
into the original data size.
Parameters
----------
data : array-like, shape (N,)
The y-values of the measured data, with N data points.
x_data : array-like, shape (N,), optional
The x-values of the measured data. Default is None, which will create an
array from -1 to 1 with N points. Not used within this function.
half_window : int, optional
The index-based size to use for the moving average window. The total window
size will range from [-half_window, ..., half_window] with size
``2 * half_window + 1``. Default is None, which will use two or three times the
output from func:`.optimize_window`, which is an okay starting value.
sections : int, optional
The number of sections to divide the input data into for subsampling. The
minimum of each section will be used to represent the input data for determining
the baseline. Higher `sections` values are needed for baselines with higher
curvature. Default is None, which will use ``N // 10``.
max_iter : int, optional
The number of iterations to perform smoothing. Each iteration, the size of the
window used for the moving average will shrink logarithmically, starting at
``2 * half_window + 1`` and ending at 3. Default is 5.
lam_smooth : float or None, optional
The parameter for smoothing the input using Whittaker smoothing.
Set to 0 or None (default) to skip smoothing.
Returns
-------
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
params : dict
A dictionary with the following items:
* 'x_fit': numpy.ndarray, shape (P,)
The truncated x-values used for fitting the baseline.
* 'baseline_fit': numpy.ndarray, shape (P,)
The truncated y-values used for fitting the baseline.
Raises
------
TypeError
Raised if `sections` is not an integer.
Notes
-----
The input parameter `sections` will determine the necessary `half_window` and `max_iter`
values required to correctly fit the baseline. Likewise, `max_iter` is highly correlated
with `half_window`.
References
----------
Liland, K. 4S Peak Filling - baseline estimation by iterative mean suppression. MethodsX.
2015, 2, 135-140.
"""