import math
from typing import Tuple
import numpy as np
def _next_pow2(n):
return 2 ** int(math.ceil(math.log(n) / math.log(2.0)))
def _nearest_pow2(x: float) -> float:
r"""Get nearest power of 2 for a given input x.
Parameters
----------
x : :obj:`float`
Arbitrary number
Returns
-------
y : :obj:`float`
Nearest power of 2 from x
"""
a = math.pow(2, math.ceil(np.log2(x)))
b = math.pow(2, math.floor(np.log2(x)))
if abs(a - x) < abs(b - x):
return a
else:
return b
[docs]def transfer_function(
response: np.ndarray, source: np.ndarray, dt: float, smooth: float
) -> tuple:
r"""
Calculate transfer function and complex coherence between two signals.
The transfer function is calculated from smoothed cross- and non-smoothed
autospectral densities of source and response signal. Smoothing is done by
convolution with a Blackman window.
The complex transfer function, the autospectral densities, and a
corresponding frequency vector are returned.
Parameters
----------
response : :obj:`numpy.ndarray`
Sample data of the response signal.
source : :obj:`numpy.ndarray`
Sample data of the source signal.
dt : :obj:`float`
Sampling interval [s].
smooth : :obj:`float`
Size of the Blackman window used for smoothing [Hz].
Returns
-------
output : 5-:obj:`tuple` of :obj:`numpy.ndarray`
(``freq``, ``XX``, ``YY``, ``Ars``, ``coh``)
``freq``: array of frequencies
``Grr``: autospectral density of response signal,
``Gss``: autospectral density of source signal,
``Ars``: source to response transfer function,
``coh``: smoothed complex coherence between source and response signal
"""
assert response.size == source.size
ndat = response.size
nfft = int(_nearest_pow2(ndat))
nfft *= 2
gr = np.zeros(nfft)
gs = np.zeros(nfft)
gr[:ndat] = response
gs[:ndat] = source
# perform ffts
Gr = np.fft.rfft(gr) * dt
Gs = np.fft.rfft(gs) * dt
freq = np.fft.rfftfreq(nfft, dt)
# calculate autospectral and crossspectral densities
Grs = Gr * Gs.conjugate()
Grr = Gr * Gr.conjugate()
Gss = Gs * Gs.conjugate()
nsmooth = int(round(smooth / (freq[1] - freq[0])))
if nsmooth != 0:
w = np.blackman(nsmooth)
Grs_smooth = np.convolve(Grs, w, mode="same")
Grr_smooth = np.convolve(Grr, w, mode="same")
Gss_smooth = np.convolve(Gss, w, mode="same")
else:
Grs_smooth = Grs
Grr_smooth = Grr
Gss_smooth = Gss
Crs_smooth = Grs_smooth / np.sqrt(Grr_smooth * Gss_smooth)
# calculate transfer function
Ars = Crs_smooth * np.sqrt(Grr / Gss)
return freq, Grr, Gss, Ars, Crs_smooth
[docs]def remove_tilt(
response: np.ndarray,
source: np.ndarray,
dt: float,
fmin: float = None,
fmax: float = None,
parallel: bool = True,
threshold: float = 0.5,
smooth: float = 1.0,
g: float = 9.81,
method: str = "coh",
trans_coh: Tuple[np.ndarray, np.ndarray] = None,
) -> np.ndarray:
r"""
Remove tilt noise from translational accelerometer recordings.
See the ``method`` argument for different correction options.
The correction can optionally be applied only in a selected frequency band.
The method is described in Bernauer et al. (2022) and is based on the work of
Crawford and Webb (2000).
Parameters
----------
response : :obj:`numpy.ndarray`
Data samples of the accelerometer signal [m/s**2].
source : :obj:`numpy.ndarray`
Data samples of the tilt signal [rad].
dt : :obj:`float`
Sampling interval [s].
fmin : :obj:`float`, optional
Minimum frequency for band-limited correction [Hz]. Only applicable in
``'coh'`` and ``'freq'`` methods.
fmax : :obj:`float`, optional
Maximum frequency for band-limited correction [Hz]. Only applicable in
``'coh'`` and ``'freq'`` methods.
parallel : :obj:`bool`, optional, Default=True
Flag to indicate if tilt and acceleration axes are parallel (``True``)
or antiparallel (``False``).
threshold : :obj:`float`, optional, Default=0.5
Correction is applied only where ``abs(coherence) >= threshold``. Only
applicable in ``'coh'`` method.
smooth : :obj:`float`, optional, Defaault=1.0
Size of the Blackman window [Hz] used for smoothing when calculating
the coherence with :py:func:`tilt_utils.transfer_function`. Only
applicable in ``'coh'`` and ``'freq'`` methods.
g : :obj:`float`, optional, Default=9.81
Gravitational acceleration [m/s**2].
method : :obj:`str`, optional, Default='coh'
Correction method to use.
..hint:: 'coh': apply theoretical transfer
function where coherence is significant (via frequency domain),
'freq': Use empirical transfer function estimate (via frequency
domain)
'direct': Apply theoretical transfer function directly (in time domain).
trans_coh : (:obj:`numpy.ndarray`, :obj:`numpy.ndaarray`), optional
If given, previously calculated transfer function and complex coherence
between the tilt and accelerometer signals, used to decide where
to apply the correction. The size of the given arrays must match the
size of the spectra of ``source`` and ``response`` (the same
zero-padding has to be applied). If set to ``None``, it is computed
from ``response`` and ``source`` using
:py:func:`twistpy.tilt.correction.transfer_function`.
Returns
-------
signal_out : :obj:`numpy.ndarray`
Data samples of corrected accelerometer signal [m/s**2].
"""
assert response.size == source.size
assert method in ("direct", "coh", "freq")
sign = 1.0 if parallel else -1.0
if method == "direct":
return response - sign * g * np.sin(source)
ndat = response.size
nfft = int(_nearest_pow2(ndat))
nfft *= 2
if trans_coh is None:
Ars, coh = transfer_function(response, source, dt, smooth)[-2:]
else:
Ars, coh = trans_coh
Gr = np.fft.rfft(response, nfft)
Gs = np.fft.rfft(source, nfft)
freq = np.fft.rfftfreq(nfft, dt)
assert Ars.shape == Gr.shape
assert coh.shape == Gr.shape
mask = np.where(np.abs(coh) >= threshold, 1.0, 0.0)
if fmin is not None:
mask[freq < fmin] = 0.0
if fmax is not None:
mask[freq > fmax] = 0.0
if method == "coh":
corr = sign * g * Gs * mask
elif method == "freq":
corr = sign * np.conjugate(Ars) * Gs
else:
raise ValueError("Invalid `method` argument: %s" % method)
return np.fft.irfft(Gr - corr)[:ndat]