from __future__ import absolute_import, division, print_function, unicode_literals
import pickle
from builtins import ValueError
from typing import List, Dict
import h5py
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from obspy import Trace, Stream
from scipy.ndimage import uniform_filter1d
from scipy.signal import hilbert
from twistpy.polarization.estimator import EstimatorConfiguration
from twistpy.polarization.machinelearning import SupportVectorMachine
[docs]class TimeDomainAnalysis6C:
"""Time domain six-component polarization analysis.
Single-station six degree-of-freedom polarization analysis in the time domain. Polarization analysis is performed
in a sliding time window using the complex analytic signal [1].
[1] Sollberger et al. (2018). *6-C polarization analysis using point measurements of translational and rotational
ground-motion: theory and applications*, Geophysical Journal International, 213(1),
https://doi.org/10.1093/gji/ggx542
.. note:: It is recommended to bandpass filter the data to a narrow frequency band before attempting a time-domain
polarization analysis in order to avoid that dispersion effects and wave type interference affect the
polarization.
Parameters
----------
traN : :obj:`~obspy.core.trace.Trace`
North component of translation
traE : :obj:`~obspy.core.trace.Trace`
East component of translation
traZ : :obj:`~obspy.core.trace.Trace`
Vertical component of translation (positive downwards!)
rotN : :obj:`~obspy.core.trace.Trace`
North component of rotation
rotE : :obj:`~obspy.core.trace.Trace`
East component of rotation
rotZ : :obj:`~obspy.core.trace.Trace`
Vertical component of rotation (right-handed, pointing downwards!)
window : :obj:`dict`
Window parameters defined as:
| window = {'window_length_seconds': :obj:`float`, 'overlap': :obj:`float`}
| Overlap should be on the interval 0 (no overlap between subsequent time windows) and 1
(complete overlap, window is moved by 1 sample only).
scaling_velocity : :obj:`float`, default=1.
Scaling velocity (in m/s) to ensure numerical stability. The scaling velocity is
applied to the translational data only (amplitudes are divided by scaling velocity) and ensures
that both translation and rotation amplitudes are on the same order of magnitude and
dimensionless. Ideally, v_scal is close to the S-Wave velocity at the receiver. After applying
the scaling velocity, translation and rotation signals amplitudes should be similar.
free_surface : :obj:`bool`, default=True
Specify whether the recording station is located at the
free surface in order to use the corresponding polarization models.
verbose : :obj:`bool`, default=True
Run in verbose mode.
timeaxis : :obj:`str`, default='utc'
Specify whether the time axis of plots is shown in UTC (timeaxis='utc') or in seconds relative to the first
sample (timeaxis='rel').
Attributes
----------
classification : :obj:`dict`
Dictionary containing the labels of classified wave types at each position of the sliding time window. The
dictionary has up to six entries corresponding to classifications for each eigenvector.
| classification = {'0': list_with_classifications_of_first_eigenvector, '1':
list_with_classification_of_second_eigenvector, ... , '5': list_with_classification_of_last_eigenvector}
t_windows : :obj:`list` of :obj:`~obspy.core.utcdatetime.UTCDateTime`
Window positions of the sliding time window on the time axis (center point of the window)
C : :obj:`~numpy.ndarray` of :obj:`~numpy.complex128`
Complex covariance matrices at each window position
time : :obj:`list` of :obj:`~obspy.core.utcdatetime.UTCDateTime`
Time axis of the input traces
delta : :obj:`float`
Sampling interval of the input data in seconds
window_length_samples : :obj:`int`
Window length in samples
dop : :obj:`numpy.ndarray` of :obj:`float`
Degree of polarization estimated at each window position.
.. hint::
The definition of the degree of polarization follows the one from Samson & Olson (1980): *Some comments
on the descriptions of the polarization states of waves*, Geophysical Journal of the Royal Astronomical
Society, https://doi.org/10.1111/j.1365-246X.1980.tb04308.x. It is defined as:
.. math::
P^2=\sum_{j,k=1}^{n}(\lambda_j-\lambda_k)^2/[2(n-1)(\sum_{j=1}^{n}(\lambda_j)^2)]
where :math:`P^2` is the degree of polarization and :math:`\lambda_j` are the eigenvalues of the
covariance matrix.
"""
def __init__(
self,
traN: Trace,
traE: Trace,
traZ: Trace,
rotN: Trace,
rotE: Trace,
rotZ: Trace,
window: dict,
scaling_velocity: float = 1.0,
free_surface: bool = True,
verbose: bool = True,
timeaxis: str = "utc",
) -> None:
self.dop = None
self.phi_r = None
self.c_r = None
self.xi = None
self.phi_l = None
self.c_l = None
self.P = {"P": None, "SV": None, "R": None, "L": None, "SH": None}
self.traN, self.traE, self.traZ, self.rotN, self.rotE, self.rotZ = (
traN,
traE,
traZ,
rotN,
rotE,
rotZ,
)
self.timeaxis = timeaxis
# Assert that input traces are ObsPy Trace objects
assert (
isinstance(self.traN, Trace)
and isinstance(self.traE, Trace)
and isinstance(self.traZ, Trace)
and isinstance(self.rotN, Trace)
and isinstance(self.rotE, Trace)
and isinstance(self.rotZ, Trace)
), "Input data must be objects of class obspy.core.Trace()!"
# Assert that all traces have the same number of samples
assert (
self.traN.stats.npts == self.traE.stats.npts
and self.traN.stats.npts == self.traZ.stats.npts
and self.traN.stats.npts == self.rotN.stats.npts
and self.traN.stats.npts == self.rotE.stats.npts
and self.traN.stats.npts == self.rotZ.stats.npts
), "All six traces must have the same number of samples!"
self.window = window
if self.timeaxis == "utc":
self.time = self.traN.times(type="matplotlib")
else:
self.time = self.traN.times()
self.scaling_velocity = scaling_velocity
self.free_surface = free_surface
self.verbose = verbose
self.delta = self.traN.stats.delta
self.window_length_samples = 2 * int(
(self.window["window_length_seconds"] / self.delta) / 2
)
self.classification: Dict[str, List[str]] = {
"0": None,
"1": None,
"2": None,
"3": None,
"4": None,
"5": None,
}
start, stop, incr = [
int(self.window_length_samples / 2),
-1 - int(self.window_length_samples / 2),
np.max([1, int((1 - self.window["overlap"]) * self.window_length_samples)]),
]
self.t_windows = self.time[start:stop:incr]
# Compute the analytic signal
u: np.ndarray = np.array(
[
hilbert(self.traN).T,
hilbert(self.traE).T,
hilbert(self.traZ).T,
hilbert(self.rotN).T,
hilbert(self.rotE).T,
hilbert(self.rotZ).T,
]
).T
# Compute covariance matrices
if self.verbose:
print("Computing covariance matrices...")
C: np.ndarray = np.einsum("...i,...j->...ij", np.conj(u), u).astype("complex")
for j in range(C.shape[2]):
for k in range(C.shape[1]):
C[..., j, k] = uniform_filter1d(
C[..., j, k].real, size=self.window_length_samples
) + 1j * uniform_filter1d(
C[..., j, k].imag, size=self.window_length_samples
)
self.C = C[start:stop:incr, :, :]
if self.verbose:
print("Covariance matrices computed!")
[docs] def classify(
self,
svm: SupportVectorMachine,
eigenvector_to_classify: int = 0,
compute_dop: bool = True,
) -> None:
"""Classify wave types using a support vector machine
Parameters
----------
svm : :obj:`~twistpy.MachineLearning.SupportVectorMachine`
Trained support vector machine used for wave type classification
eigenvector_to_classify : :obj:`int`, optional, default=0
Integer value identifying the eigenvector that will be classified. The eigenvectors are
sorted in descending order of their corresponding eigenvalue
| If 0: first eigenvector, corresponding to the dominant signal in
the time window (associated with the largest eigenvalue).
compute_dop : :obj:`bool`, optional, default = True
Specify whether the degree of polarization is computed.
"""
if self.classification[str(eigenvector_to_classify)] is not None:
print(
f"Wave types are already classified for eigenvector with number '{eigenvector_to_classify}'! "
f"Classification will not be run a second time. To access the previous classification, "
f"check the attribute TimeDomainAnalysis6C.classification['{eigenvector_to_classify}']"
)
return
if self.verbose:
print("Performing eigen-decomposition of covariance matrices...")
eigenvalues, eigenvectors = np.linalg.eigh(self.C)
if compute_dop:
self.dop = (
(eigenvalues[:, 0] - eigenvalues[:, 1]) ** 2
+ (eigenvalues[:, 0] - eigenvalues[:, 2]) ** 2
+ (eigenvalues[:, 0] - eigenvalues[:, 3]) ** 2
+ (eigenvalues[:, 0] - eigenvalues[:, 4]) ** 2
+ (eigenvalues[:, 0] - eigenvalues[:, 5]) ** 2
+ (eigenvalues[:, 1] - eigenvalues[:, 2]) ** 2
+ (eigenvalues[:, 1] - eigenvalues[:, 3]) ** 2
+ (eigenvalues[:, 1] - eigenvalues[:, 4]) ** 2
+ (eigenvalues[:, 1] - eigenvalues[:, 5]) ** 2
+ (eigenvalues[:, 2] - eigenvalues[:, 3]) ** 2
+ (eigenvalues[:, 2] - eigenvalues[:, 4]) ** 2
+ (eigenvalues[:, 2] - eigenvalues[:, 5]) ** 2
+ (eigenvalues[:, 3] - eigenvalues[:, 4]) ** 2
+ (eigenvalues[:, 3] - eigenvalues[:, 5]) ** 2
+ (eigenvalues[:, 4] - eigenvalues[:, 5]) ** 2
) / (5 * np.sum(eigenvalues, axis=-1) ** 2)
# The eigenvectors are initially arbitrarily oriented in the complex plane, here we ensure that
# the real and imaginary parts are orthogonal. See Samson (1980): Some comments on the descriptions of the
# polarization states of waves, Geophysical Journal of the Royal Astronomical Society, Eqs. (3)-(5)
u1 = eigenvectors[
:, :, -(eigenvector_to_classify + 1)
] # Select eigenvector for classification
gamma = np.arctan2(
2 * np.einsum("ij,ij->j", u1.real.T, u1.imag.T, optimize=True),
np.einsum("ij,ij->j", u1.real.T, u1.real.T, optimize=True)
- np.einsum("ij,ij->j", u1.imag.T, u1.imag.T, optimize=True),
)
phi = -0.5 * gamma
eigenvectors = np.tile(np.exp(1j * phi), (6, 6, 1)).T * eigenvectors
if self.verbose:
print("Eigenvectors and eigenvalues have been computed!")
df = pd.DataFrame(
np.concatenate(
(
eigenvectors[:, :, -(eigenvector_to_classify + 1)].real,
eigenvectors[:, :, -(eigenvector_to_classify + 1)].imag,
),
axis=1,
),
columns=[
"t1_real",
"t2_real",
"t3_real",
"r1_real",
"r2_real",
"r3_real",
"t1_imag",
"t2_imag",
"t3_imag",
"r1_imag",
"r2_imag",
"r3_imag",
],
)
model = svm.load_model()
if self.verbose:
print("Wave type classification in progress...")
wave_types = model.predict(df)
self.classification[str(eigenvector_to_classify)] = wave_types
if self.verbose:
print("Wave types have been classified!")
[docs] def polarization_analysis(
self, estimator_configuration: EstimatorConfiguration = None, plot: bool = False
):
r"""Perform polarization analysis.
Parameters
----------
estimator_configuration : :obj:`~twistpy.polarization.estimator.EstimatorConfiguration`
Estimator Configuration to be used in the polarization analysis
"""
if estimator_configuration is None:
raise ValueError(
"Please provide an EstimatorConfiguration for polarization analysis!"
)
# Classify wave types if this has not been done already.
if (
estimator_configuration.use_ml_classification
and self.classification[str(estimator_configuration.eigenvector)] is None
):
self.classify(
estimator_configuration.svm, estimator_configuration.eigenvector
)
if self.verbose:
print("Computing wave parameters...")
eigenvalues, eigenvectors = np.linalg.eigh(self.C)
# The eigenvectors are initially arbitrarily oriented in the complex plane, here we ensure that
# the real and imaginary parts are orthogonal. See Samson (1980): Some comments on the descriptions of the
# polarization states of waves, Geophysical Journal of the Royal Astronomical Society, Eqs. (3)-(5)
u1 = eigenvectors[
:, :, -(estimator_configuration.eigenvector + 1)
] # Select eigenvector for classification
gamma = np.arctan2(
2 * np.einsum("ij,ij->j", u1.real.T, u1.imag.T, optimize=True),
np.einsum("ij,ij->j", u1.real.T, u1.real.T, optimize=True)
- np.einsum("ij,ij->j", u1.imag.T, u1.imag.T, optimize=True),
)
phi = -0.5 * gamma
eigenvectors = np.tile(np.exp(1j * phi), (6, 6, 1)).T * eigenvectors
# Compute degree of polarization after Samson (1980): Some comments on the descriptions of the
# polarization states of waves, Geophysical Journal of the Royal Astronomical Society, Eq. (18)
self.dop = (
(eigenvalues[:, 0] - eigenvalues[:, 1]) ** 2
+ (eigenvalues[:, 0] - eigenvalues[:, 2]) ** 2
+ (eigenvalues[:, 0] - eigenvalues[:, 3]) ** 2
+ (eigenvalues[:, 0] - eigenvalues[:, 4]) ** 2
+ (eigenvalues[:, 0] - eigenvalues[:, 5]) ** 2
+ (eigenvalues[:, 1] - eigenvalues[:, 2]) ** 2
+ (eigenvalues[:, 1] - eigenvalues[:, 3]) ** 2
+ (eigenvalues[:, 1] - eigenvalues[:, 4]) ** 2
+ (eigenvalues[:, 1] - eigenvalues[:, 5]) ** 2
+ (eigenvalues[:, 2] - eigenvalues[:, 3]) ** 2
+ (eigenvalues[:, 2] - eigenvalues[:, 4]) ** 2
+ (eigenvalues[:, 2] - eigenvalues[:, 5]) ** 2
+ (eigenvalues[:, 3] - eigenvalues[:, 4]) ** 2
+ (eigenvalues[:, 3] - eigenvalues[:, 5]) ** 2
+ (eigenvalues[:, 4] - eigenvalues[:, 5]) ** 2
) / (5 * np.sum(eigenvalues, axis=-1) ** 2)
# Estimate wave parameters directly from the specified eigenvector if method=='ML'
if estimator_configuration.method == "ML":
for wave_type in estimator_configuration.wave_types:
indices = (
self.classification[str(estimator_configuration.eigenvector)]
== wave_type
)
eigenvector_wtype = eigenvectors[
indices, :, -(estimator_configuration.eigenvector + 1)
]
if wave_type == "L":
self.phi_l = np.empty_like(self.t_windows)
self.phi_l[:] = np.nan
self.c_l = np.empty_like(self.t_windows)
self.c_l[:] = np.nan
eigenvector_wtype[
np.linalg.norm(np.abs(eigenvector_wtype[:, 0:2].real), axis=1)
< np.linalg.norm(np.abs(eigenvector_wtype[:, 0:2].imag), axis=1)
] = (
eigenvector_wtype[
np.linalg.norm(
np.abs(eigenvector_wtype[:, 0:2].real), axis=1
)
< np.linalg.norm(
np.abs(eigenvector_wtype[:, 0:2].imag), axis=1
)
].conj()
* 1j
)
phi_love = np.arctan2(
np.real(eigenvector_wtype[:, 1]),
np.real(eigenvector_wtype[:, 0]),
)
a_t = (
np.cos(phi_love) * eigenvector_wtype[:, 0].real
+ np.sin(phi_love) * eigenvector_wtype[:, 1].real
)
phi_love += np.pi / 2
# Resolve 180 degrees ambiguity by checking the sign of the vertical rotation and the transverse
# acceleration
phi_love[np.sign(a_t) == np.sign(eigenvector_wtype[:, 5].real)] = (
phi_love[np.sign(a_t) == np.sign(eigenvector_wtype[:, 5].real)]
+ np.pi
)
phi_love[phi_love > 2 * np.pi] -= 2 * np.pi
phi_love[phi_love < 0] += 2 * np.pi
# Love wave velocity: transverse acceleration divided by 2*vertical rotation
c_love = (
self.scaling_velocity
* np.abs(a_t.real)
/ np.abs(eigenvector_wtype[:, 5].real)
/ 2
)
self.c_l[indices] = c_love
self.phi_l[indices] = np.degrees(phi_love)
elif wave_type == "R":
self.xi = np.empty_like(self.t_windows)
self.xi[:] = np.nan
self.c_r = np.empty_like(self.t_windows)
self.c_r[:] = np.nan
self.phi_r = np.empty_like(self.t_windows)
self.phi_r[:] = np.nan
eigenvector_wtype[
np.linalg.norm(np.abs(eigenvector_wtype[:, 0:2].real), axis=1)
> np.linalg.norm(np.abs(eigenvector_wtype[:, 0:2].imag), axis=1)
] = (
eigenvector_wtype[
np.linalg.norm(
np.abs(eigenvector_wtype[:, 0:2].real), axis=1
)
> np.linalg.norm(
np.abs(eigenvector_wtype[:, 0:2].imag), axis=1
)
].conj()
* 1j
)
phi_rayleigh = np.arctan2(
np.imag(eigenvector_wtype[:, 1]),
np.imag(eigenvector_wtype[:, 0]),
)
# Compute radial translational component t_r and transverse rotational component r_t
r_t = (
-np.sin(phi_rayleigh) * eigenvector_wtype[:, 3].real
+ np.cos(phi_rayleigh) * eigenvector_wtype[:, 4].real
)
t_r = (
np.cos(phi_rayleigh) * eigenvector_wtype[:, 0].imag
+ np.sin(phi_rayleigh) * eigenvector_wtype[:, 1].imag
)
# Account for 180 degree ambiguity by evaluating signs
phi_rayleigh[np.sign(t_r) < np.sign(r_t)] += np.pi
phi_rayleigh[(np.sign(t_r) > 0) & (np.sign(r_t) > 0)] += np.pi
phi_rayleigh[phi_rayleigh < 0] += 2 * np.pi
phi_rayleigh[phi_rayleigh > 2 * np.pi] -= 2 * np.pi
phi_rayleigh[
np.sign(eigenvector_wtype[:, 2].real) < np.sign(t_r)
] -= np.pi
phi_rayleigh[phi_rayleigh < 0.0] += 2 * np.pi
# Compute Rayleigh wave ellipticity angle
elli_rayleigh = np.arctan(t_r / eigenvector_wtype[:, 2].real)
elli_rayleigh[np.sign(r_t) < np.sign(t_r)] *= -1
elli_rayleigh[
np.sign(eigenvector_wtype[:, 2].real) < np.sign(t_r)
] *= -1
# Compute Rayleigh wave phase velocity
c_rayleigh = (
self.scaling_velocity
* np.abs(eigenvector_wtype[:, 2].real)
/ np.abs(r_t)
)
self.xi[indices] = np.degrees(elli_rayleigh)
self.c_r[indices] = c_rayleigh
self.phi_r[indices] = np.degrees(phi_rayleigh)
else:
for wave_type in estimator_configuration.wave_types:
if estimator_configuration.use_ml_classification:
indices = (
self.classification[str(estimator_configuration.eigenvector)]
== wave_type
)
else:
indices = np.ones_like(self.t_windows).astype("bool")
steering_vectors = estimator_configuration.compute_steering_vectors(
wave_type
)
if estimator_configuration.method == "MUSIC":
hdf5_file = h5py.File("estimator.h5", "w")
noise_space_vectors = eigenvectors[
indices,
:,
: 6 - estimator_configuration.music_signal_space_dimension,
]
noise_space_vectors_H = np.transpose(
noise_space_vectors.conj(), axes=(0, 2, 1)
)
noise_space = np.einsum(
"...ij, ...jk->...ik",
noise_space_vectors,
noise_space_vectors_H,
optimize=True,
)
P = (
1
/ np.einsum(
"...sn,...nk,...sk->...s",
steering_vectors.conj().T,
noise_space,
steering_vectors.T,
optimize=True,
).real
)
hdf5_file.create_dataset(wave_type, data=P)
hdf5_file.close()
elif estimator_configuration.method == "MVDR":
hdf5_file = h5py.File("estimator.h5", "w")
P = (
1
/ np.einsum(
"...sn,...nk,...sk->...s",
steering_vectors.conj().T,
np.linalg.pinv(
self.C[indices, :, :], rcond=1e-6, hermitian=True
),
steering_vectors.T,
optimize=True,
).real
)
hdf5_file.create_dataset(wave_type, data=P)
hdf5_file.close()
elif estimator_configuration.method == "BARTLETT":
hdf5_file = h5py.File("estimator.h5", "w")
P = np.einsum(
"...sn,...nk,...sk->...s",
steering_vectors.conj().T,
self.C[indices, :, :],
steering_vectors.T,
optimize=True,
).real
hdf5_file.create_dataset(wave_type, data=P)
hdf5_file.close()
if plot:
self.plot(estimator_configuration=estimator_configuration)
[docs] def plot(
self, estimator_configuration: EstimatorConfiguration, dop_clip: float = 0
):
wave_types = estimator_configuration.wave_types
method = estimator_configuration.method
if isinstance(wave_types, str):
wave_types = [wave_types]
for wave_type in wave_types:
if wave_type == "L":
assert (
self.c_l is not None
), "No polarization attributes for Love waves have been computed so far!"
if method == "ML":
fig, ax = plt.subplots(4, 1, sharex=True, figsize=(10, 10))
self._plot_seismograms(ax[0])
phi_l = self.phi_l
phi_l[self.dop < dop_clip] = np.nan
c_l = self.c_l
c_l[self.dop < dop_clip] = np.nan
dop = self.dop
ax[1].plot(self.t_windows, phi_l, "k.")
ax[1].set_ylabel("Degrees")
ax[1].set_title("Back-azimuth")
ax[2].plot(self.t_windows, c_l, "k.")
ax[2].set_title("Phase velocity")
ax[2].set_ylabel("m/s")
ax[3].plot(self.t_windows, dop, "k.")
ax[3].set_title("Degree of polarization")
ax[3].set_ylabel("DOP")
if self.timeaxis == "utc":
ax[1].xaxis_date()
ax[2].xaxis_date()
ax[3].xaxis_date()
ax[3].set_xlabel("Time (UTC)")
else:
ax[3].set_xlabel("Time (s)")
fig.suptitle("Love wave analysis")
elif wave_type == "R":
assert (
self.c_r is not None
), "No polarization attributes for Rayleigh waves have been computed so far!"
if method == "ML":
fig, ax = plt.subplots(5, 1, sharex=True, figsize=(10, 10))
self._plot_seismograms(ax[0])
phi_r = self.phi_r
phi_r[self.dop < dop_clip] = np.nan
c_r = self.c_r
c_r[self.dop < dop_clip] = np.nan
xi = self.xi
xi[self.dop < dop_clip] = np.nan
dop = self.dop
ax[1].plot(self.t_windows, phi_r, "k.")
ax[1].set_ylabel("Degrees")
ax[1].set_title("Back-azimuth")
ax[2].plot(self.t_windows, c_r, "k.")
ax[2].set_title("Phase velocity")
ax[2].set_ylabel("m/s")
ax[3].plot(self.t_windows, xi, "k.")
ax[3].set_ylabel("Degrees")
ax[3].set_title("Ellipticity angle")
ax[4].plot(self.t_windows, dop, "k.")
ax[4].set_ylabel("DOP")
ax[4].set_title("Degree of polarization")
if self.timeaxis == "utc":
ax[1].xaxis_date()
ax[2].xaxis_date()
ax[3].xaxis_date()
ax[4].xaxis_date()
ax[4].set_xlabel("Time (UTC)")
else:
ax[4].set_xlabel("Time (s)")
fig.suptitle("Rayleigh wave analysis")
plt.show()
def _plot_seismograms(self, ax: plt.Axes):
if self.timeaxis == "utc":
time = self.traN.times(type="matplotlib")
else:
time = self.traN.times()
ax.plot(time, self.traN.data, "k:", label="traN")
ax.plot(time, self.traE.data, "k--", label="traE")
ax.plot(time, self.traZ.data, "k", label="traZ")
ax.plot(time, self.rotZ.data, "r:", label="rotN")
ax.plot(time, self.rotE.data, "r--", label="rotE")
ax.plot(time, self.rotZ.data, "r", label="rotZ")
if self.timeaxis == "utc":
ax.xaxis_date()
ax.legend(loc="center left", bbox_to_anchor=(1, 0.5))
[docs] def save(self, name: str) -> None:
"""Save the current TimeDomainAnalysis6C object to a file on the disk in the current working directory.
Include absolute path if you want to save in a different directory.
name : :obj:`str`
File name
"""
if not isinstance(name, str):
raise ValueError("Name must be a string!")
fid = open(name, "wb")
pickle.dump(self, fid, pickle.HIGHEST_PROTOCOL)
fid.close()
[docs]class TimeDomainAnalysis3C:
"""Time domain three-component polarization analysis.
Single-station three-component polarization analysis in the time domain. Polarization analysis is performed
in a sliding time window using the complex analytic signal [1].
[1] Vidale, J. E. (1986). *Complex polarization analysis of particle motion*, BSSA, 76(5),
https://doi.org/10.1785/BSSA0760051393
.. note:: It is recommended to bandpass filter the data to a narrow frequency band before attempting a time-domain
polarization analysis in order to avoid that dispersion effects and wave type interference affect the
polarization.
Parameters
----------
N : :obj:`~obspy.core.trace.Trace`
North component seismogram
E : :obj:`~obspy.core.trace.Trace`
East component seismogram
Z : :obj:`~obspy.core.trace.Trace`
Vertical component seismogram
window : :obj:`dict`
Window parameters defined as:
| window = {'window_length_seconds': :obj:`float`, 'overlap': :obj:`float`}
| Overlap should be on the interval 0 (no overlap between subsequent time windows) and 1
(complete overlap, window is moved by 1 sample only).
verbose : :obj:`bool`, default=True
Run in verbose mode.
timeaxis : :obj:`str`, default='utc'
Specify whether the time axis of plots is shown in UTC (timeaxis='utc') or in seconds relative to the first
sample (timeaxis='rel').
Attributes
----------
t_windows : :obj:`list` of :obj:`~obspy.core.utcdatetime.UTCDateTime`
Window positions of the sliding time window on the time axis (center point of the window)
C : :obj:`~numpy.ndarray` of :obj:`~numpy.complex128`
Complex covariance matrices at each window position
time : :obj:`list` of :obj:`~obspy.core.utcdatetime.UTCDateTime`
Time axis of the input traces
delta : :obj:`float`
Sampling interval of the input data in seconds
window_length_samples : :obj:`int`
Window length in samples.
dop : :obj:`numpy.ndarray` of :obj:`float`
Degree of polarization estimated at each window position.
.. hint::
The definition of the degree of polarization follows the one from Samson & Olson (1980): *Some comments
on the descriptions of the polarization states of waves*, Geophysical Journal of the Royal Astronomical
Society, https://doi.org/10.1111/j.1365-246X.1980.tb04308.x. It is defined as:
.. math::
P^2=\sum_{j,k=1}^{n}(\lambda_j-\lambda_k)^2/[2(n-1)(\sum_{j=1}^{n}(\lambda_j)^2)]
where :math:`P^2` is the degree of polarization and :math:`\lambda_j` are the eigenvalues of the
covariance matrix.
elli : :obj:`numpy.ndarray` of :obj:`float`
Ellipticity estimated at each window position
inc1 : :obj:`numpy.ndarray` of :obj:`float`
Inclination of the major semi-axis of the polarization ellipse estimated at each window position
inc2 : :obj:`numpy.ndarray` of :obj:`float`
Inclination of the minor semi-axis of the polarization ellipse estimated at each window position
azi1 : :obj:`numpy.ndarray` of :obj:`float`
Azimuth of the major semi-axis of the polarization ellipse estimated at each window position
azi2 : :obj:`numpy.ndarray` of :obj:`float`
Azimuth of the minor semi-axis of the polarization ellipse estimated at each window position
"""
def __init__(
self,
N: Trace,
E: Trace,
Z: Trace,
window: dict,
verbose: bool = True,
timeaxis: str = "utc",
) -> None:
self.dop = None
self.elli = None
self.inc1 = None
self.inc2 = None
self.azi1 = None
self.azi2 = None
self.N, self.E, self.Z = N, E, Z
self.timeaxis = timeaxis
# Assert that input traces are ObsPy Trace objects
assert (
isinstance(self.N, Trace)
and isinstance(self.E, Trace)
and isinstance(self.Z, Trace)
), "Input data must be objects of class obspy.core.Trace()!"
# Assert that all traces have the same number of samples
assert (
self.N.stats.npts == self.E.stats.npts
and self.N.stats.npts == self.Z.stats.npts
), "All three traces must have the same number of samples!"
self.window = window
if self.timeaxis == "utc":
self.time = self.N.times(type="matplotlib")
else:
self.time = self.N.times()
self.verbose = verbose
self.delta = self.N.stats.delta
self.window_length_samples = 2 * int(
(self.window["window_length_seconds"] / self.delta) / 2
)
start, stop, incr = [
int(self.window_length_samples / 2),
-1 - int(self.window_length_samples / 2),
np.max([1, int((1 - self.window["overlap"]) * self.window_length_samples)]),
]
self.t_windows = self.time[start:stop:incr]
# Compute the analytic signal
u: np.ndarray = np.array(
[hilbert(self.N).T, hilbert(self.E).T, hilbert(self.Z).T]
).T
# Compute covariance matrices
if self.verbose:
print("Computing covariance matrices...")
C: np.ndarray = np.einsum("...i,...j->...ij", np.conj(u), u).astype("complex")
for j in range(C.shape[2]):
for k in range(C.shape[1]):
C[..., j, k] = uniform_filter1d(
C[..., j, k].real, size=self.window_length_samples
) + 1j * uniform_filter1d(
C[..., j, k].imag, size=self.window_length_samples
)
self.C = C[start:stop:incr, :, :]
if self.verbose:
print("Covariance matrices computed!")
self.polarization_analysis()
[docs] def polarization_analysis(self):
r"""Perform polarization analysis."""
if self.verbose:
print("Computing polarization attributes...")
eigenvalues, eigenvectors = np.linalg.eigh(self.C)
# The eigenvectors are initially arbitrarily oriented in the complex plane, here we ensure that
# the real and imaginary parts are orthogonal. See Samson (1980): Some comments on the descriptions of the
# polarization states of waves, Geophysical Journal of the Royal Astronomical Society, Eqs. (3)-(5)
u1 = eigenvectors[:, :, -1] # Select principal eigenvector
gamma = np.arctan2(
2 * np.einsum("ij,ij->j", u1.real.T, u1.imag.T, optimize=True),
np.einsum("ij,ij->j", u1.real.T, u1.real.T, optimize=True)
- np.einsum("ij,ij->j", u1.imag.T, u1.imag.T, optimize=True),
)
phi = -0.5 * gamma
eigenvectors = np.tile(np.exp(1j * phi), (3, 3, 1)).T * eigenvectors
# Compute degree of polarization after Samson (1980): Some comments on the descriptions of the
# polarization states of waves, Geophysical Journal of the Royal Astronomical Society, Eq. (18)
self.dop = (
(eigenvalues[:, 0] - eigenvalues[:, 1]) ** 2
+ (eigenvalues[:, 0] - eigenvalues[:, 2]) ** 2
+ (eigenvalues[:, 1] - eigenvalues[:, 2]) ** 2
) / (2 * np.sum(eigenvalues, axis=-1) ** 2)
self.azi1 = np.degrees(
np.pi / 2
- np.arctan2(eigenvectors[:, 0, -1].real, eigenvectors[:, 1, -1].real)
)
self.azi2 = np.degrees(
np.pi / 2
- np.arctan2(eigenvectors[:, 0, -1].imag, eigenvectors[:, 1, -1].imag)
)
self.azi1[self.azi1 < 0] += 180
self.azi2[self.azi2 < 0] += 180
self.azi1[self.azi1 > 180] -= 180
self.azi2[self.azi2 > 180] -= 180
self.inc1 = np.degrees(
np.arctan2(
np.linalg.norm(eigenvectors[:, 0:2, -1].real, axis=1),
np.abs(eigenvectors[:, 2, -1].real),
)
)
self.inc2 = np.degrees(
np.arctan2(
np.linalg.norm(eigenvectors[:, 0:2, -1].imag, axis=1),
np.abs(eigenvectors[:, 2, -1].imag),
)
)
self.elli = np.linalg.norm(
eigenvectors[:, :, -1].imag, axis=1
) / np.linalg.norm(eigenvectors[:, :, -1].real, axis=1)
if self.verbose:
print("Polarization attributes have been computed!")
[docs] def filter(
self,
suppress: bool = False,
smooth_mask: bool = False,
plot_filtered_attributes: bool = False,
**kwargs,
):
r"""Filter data based on polarization attributes.
Parameters
----------
**kwargs : For example elli=[0, 0.3]
Polarization attributes used for filtering. The filter parameters are specified as a list with two entries,
specifying the range of the polarization attributes that are kept by the filter. In the above example the
filter would only retain all signal with an ellipticity between 0 and 0.3 and suppress all signal
with an ellipticity larger than 0.3. Multiple polarization attributes can be specified.
.. hint:: Supported polariziation attributes are:
dop (degree of polarization)
elli (Ellipticity)
inc1 (Incidence angle of major semi axis)
inc2 (Incidence angle of minor semi axis)
azi1 (Azimuth of the major semi axis)
azi2 (Azimuth of the minor semi axis)
suppress : :obj:`bool`, default = False
If set to True the polarization states that fulfill the filtering criterion will be suppressed from the data
while preserving any underlying signal that is orthogonal to the polarization state.
smooth_mask : :obj:`bool`, default = False
If set to True, the filter mask will be additionally smoothed.
plot_filtered_attributes : :obj:`bool`, default = False
If set to True, a plot will be generated that shows the polarization attributes after filtering
Returns
-------
data_filtered : :obj:`~obspy.core.stream.Stream`
Filtered data. The order of traces in the stream is N, E, Z.
"""
if self.window["overlap"] != 1.0:
raise Exception(
"Polarization filtering is only supported if the polarization attributes have been computed"
"at each sample. To do so, recompute the polarization attributes by setting the overlap of"
"adjacent time windows to 1. (window['overlap']=1.)"
)
params = {}
for k in kwargs:
if kwargs[k] is not None:
params[k] = kwargs[k]
if not params:
raise Exception("No filter values provided!")
if self.elli is None:
raise Exception("No polarization attributes computed yet!")
# Compute eigenvectors for projection
start, stop = int(self.window_length_samples / 2), -1 - int(
self.window_length_samples / 2
)
eigenvectors = np.zeros((self.N.stats.npts, 3, 3)).astype("complex")
_, eigenvectors[start:stop, :, :] = np.linalg.eigh(self.C)
# Initialize filter mask
mask = np.ones((self.N.stats.npts,))
# Populate filter mask (will be 1 where signal is kept and 0 everywhere else)
for parameter in params:
pol_attr = np.empty_like(mask)
pol_attr[:] = np.nan
pol_attr[start:stop] = np.real(getattr(self, parameter))
if (
parameter == "dop"
or parameter == "elli"
and params[parameter][1] == 1.0
):
alpha = pol_attr >= params[parameter][0]
else:
alpha = (
(pol_attr >= params[parameter][0])
& (pol_attr <= params[parameter][1])
).astype("int")
mask *= alpha
if smooth_mask:
mask = uniform_filter1d(mask, size=self.window_length_samples, axis=0)
indx = mask > 0
if suppress:
Z_sep = self.Z.data.copy()
N_sep = self.N.data.copy()
E_sep = self.E.data.copy()
else:
Z_sep = np.zeros_like(self.Z.data)
N_sep = np.zeros_like(self.N.data)
E_sep = np.zeros_like(self.E.data)
N_hilbert, E_hilbert, Z_hilbert = (
hilbert(self.N.data),
hilbert(self.E.data),
hilbert(self.Z.data),
)
data_hilbert = np.array(
[N_hilbert[indx].ravel(), E_hilbert[indx].ravel(), Z_hilbert[indx].ravel()]
).T
# Project data into coordinate frame spanned by eigenvectors
data_proj = np.einsum(
"...i, ...ij -> ...j", data_hilbert, eigenvectors[indx, :, :], optimize=True
)
if suppress:
# Only keep data that is orthogonal to the principal eigenvector
data_proj[:, 2] = 0
else:
# Only keep data that is aligned with the principal eigenvector
data_proj[:, 0:2] = 0
# Back-projection into original coordinate frame
data_filt = np.einsum(
"...i, ...ij -> ...j",
data_proj,
np.transpose(eigenvectors[indx.ravel(), :, :].conj(), axes=(0, 2, 1)),
optimize=True,
)
Z_sep[indx] = data_filt[:, 2].real
N_sep[indx] = data_filt[:, 0].real
E_sep[indx] = data_filt[:, 1].real
if not suppress:
Z_sep = mask * Z_sep
N_sep = mask * N_sep
E_sep = mask * E_sep
#
data_filtered = Stream(traces=[self.Z.copy(), self.N.copy(), self.E.copy()])
data_filtered[0].data = Z_sep
data_filtered[1].data = N_sep
data_filtered[2].data = E_sep
if plot_filtered_attributes:
self.plot(show=False, alpha=mask[start:stop], seismograms=data_filtered)
return data_filtered
[docs] def plot(
self, show: bool = True, alpha: np.ndarray = None, seismograms: Stream = None
) -> None:
"""Plot polarization analysis.
Parameters
----------
show : :obj:`bool`, default=True
Display the plot directly after plotting. If set to False, the plot will only show up once
:func:`~matplotlib.pyplot.show` is called.
alpha : :obj:`numpy.ndarray`, default = None
A mask (values between zero and 1) of the same dimension as the polarization attributes, that will be
plotted on the alpha channel.
seismograms : :obj:`obspy.core.Stream`, default = None
Manually provide seismograms to be plotted in the first panel. By default, the input data is plotted.
"""
assert (
self.elli is not None
), "No polarization attributes for Love waves have been computed so far!"
fig, ax = plt.subplots(5, 1, sharex=True, figsize=(10, 10))
if seismograms is None:
self._plot_seismograms(ax[0])
else:
self._plot_seismograms(ax[0], seismograms=seismograms)
filter_mask = np.ones_like(self.dop)
if alpha is not None:
filter_mask *= alpha
ax[1].plot(self.t_windows, filter_mask * self.elli, "k.")
ax[1].set_ylabel("Ellipticity")
ax[1].set_title("Ellipticity")
ax[2].plot(self.t_windows, filter_mask * self.inc1, "k.", label="Major")
ax[2].plot(self.t_windows, filter_mask * self.inc2, "r.", label="Minor")
ax[2].set_title("Inclination of major and minor semi-axis")
ax[2].legend()
ax[2].set_ylabel("Degrees")
ax[3].plot(self.t_windows, filter_mask * self.azi1, "k.", label="Major")
ax[3].plot(self.t_windows, filter_mask * self.azi2, "r.", label="Minor")
ax[3].legend()
ax[3].set_ylabel("Degrees")
ax[3].set_title("Azimuth of major and minor semi-axis")
ax[4].plot(self.t_windows, filter_mask * self.dop, "k.")
ax[4].set_title("Degree of polarization")
ax[4].set_ylabel("DOP")
if self.timeaxis == "utc":
ax[1].xaxis_date()
ax[2].xaxis_date()
ax[3].xaxis_date()
ax[4].xaxis_date()
ax[4].set_xlabel("Time (UTC)")
else:
ax[4].set_xlabel("Time (s)")
if show:
plt.show()
def _plot_seismograms(self, ax: plt.Axes, seismograms: Stream = None):
if seismograms is None:
if self.timeaxis == "utc":
time = self.N.times(type="matplotlib")
else:
time = self.N.times()
ax.plot(time, self.N.data, "k:", label="N")
ax.plot(time, self.E.data, "k--", label="E")
ax.plot(time, self.Z.data, "k", label="Z")
if self.timeaxis == "utc":
ax.xaxis_date()
ax.legend(loc="center left", bbox_to_anchor=(1, 0.5))
else:
if self.timeaxis == "utc":
time = seismograms[0].times(type="matplotlib")
else:
time = seismograms[0].times()
ax.plot(time, seismograms[1].data, "k:", label="N")
ax.plot(time, seismograms[2].data, "k--", label="E")
ax.plot(time, seismograms[0].data, "k", label="Z")
if self.timeaxis == "utc":
ax.xaxis_date()
ax.legend(loc="center left", bbox_to_anchor=(1, 0.5))
[docs] def save(self, name: str) -> None:
"""Save the current TimeDomainAnalysis3C object to a file on the disk in the current working directory.
Include absolute path if you want to save in a different directory.
name : :obj:`str`
File name
"""
if not isinstance(name, str):
raise ValueError("Name must be a string!")
fid = open(name, "wb")
pickle.dump(self, fid, pickle.HIGHEST_PROTOCOL)
fid.close()