from __future__ import absolute_import, division, print_function, unicode_literals
import pickle
from builtins import ValueError
from typing import Tuple
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.patches import Rectangle
from obspy import Trace, Stream
from twistpy.polarization.estimator import EstimatorConfiguration
from twistpy.polarization.machinelearning import SupportVectorMachine
from twistpy.polarization.time import TimeDomainAnalysis6C
[docs]class DispersionAnalysis:
r"""
Single-station six-component surface wave dispersion estimation for Love and Rayleigh waves.
Under the hood, the DispersionAnalysis class runs a :obj:`~twistpy.polarization.time.TimeDomainAnalysis6C`. The data
is filtered to different frequency bands of interest, and the wave parameters are estimated at each frequency
independently.
Parameters
----------
traN : :obj:`~obspy.core.trace.Trace`
North component of translation
traE : :obj:`~obspy.core.trace.Trace`
East component of translation
traZ : :obj:`~obspy.core.trace.Trace`
Vertical component of translation (positive downwards!)
rotN : :obj:`~obspy.core.trace.Trace`
North component of rotation
rotE : :obj:`~obspy.core.trace.Trace`
East component of rotation
rotZ : :obj:`~obspy.core.trace.Trace`
Vertical component of rotation (right-handed, pointing downwards!)
fmin : :obj:`float`
Minimum frequency to be considered in the analysis (in Hz)
fmax : :obj:`float`
Maximum frequency to be considered in the analysis (in Hz)
octaves : :obj:`float`
Width of the frequency band used for bandpass filtering in octaves
window : :obj:`dict`
Window parameters defined as:
| window = {'number_of_periods': :obj:`float`, 'overlap': :obj:`float`}
| Overlap should be on the interval 0 (no overlap between subsequent time windows) and 1
(complete overlap, window is moved by 1 sample only). The window is frequency dependent and extends over
number_of_periods times the dominant period at the current frequency band
scaling_velocity : :obj:`float`, default=1.
Scaling velocity (in m/s) to ensure numerical stability. The scaling velocity is
applied to the translational data only (amplitudes are divided by scaling velocity) and ensures
that both translation and rotation amplitudes are on the same order of magnitude and
dimensionless. Ideally, v_scal is close to the S-Wave velocity at the receiver. After applying
the scaling velocity, translation and rotation signals amplitudes should be similar. svm
verbose : :obj:`bool`, default=True
Run in verbose mode.
Attributes
----------
parameters : :obj:`list` of :obj:`dict`
List containing the estimated surface wave parameters at each frequency.
The parameters for each frequency are saved in a dictionary with the following keys:
| 'xi': Rayleigh wave ellipticity angle (degrees)
| 'phi_r': Rayleigh wave back-azimuth (degrees)
| 'c_r': Rayleigh wave phase-velocity (m/s)
| 'phi_l': Love wave back azimuth (degrees)
| 'c_l': Love wave phase velocity (m/s)
| 'wave_types': Wave type classification labels
| 'dop': Degree of polarization
f : :obj:`numpy.ndarray` of :obj:`float`
Frequency vector for parameters.
"""
def __init__(
self,
traN: Trace = None,
traE: Trace = None,
traZ: Trace = None,
rotN: Trace = None,
rotE: Trace = None,
rotZ: Trace = None,
fmin: float = 1.0,
fmax: float = 20.0,
octaves: float = 0.5,
window: dict = {"number_of_periods": 1, "overlap": 0.0},
scaling_velocity: float = 1.0,
svm: SupportVectorMachine = None,
verbose: bool = True,
):
f_lower = []
f_higher = []
fcenter = fmax
f_center_tot = []
data = Stream()
data += traN
data += traE
data += traZ
data += rotN
data += rotE
data += rotZ
# Compute width of bandpass filters (width in octaves)
while fcenter > fmin:
f_l = fcenter / 2 ** (octaves / 2)
f_h = fcenter * (2 ** (octaves / 2))
f_lower.append(f_l)
f_higher.append(f_h)
f_center_tot.append(fcenter)
fcenter = fcenter / (2**octaves)
self.traN, self.traE, self.traZ, self.rotN, self.rotE, self.rotZ = (
traN,
traE,
traZ,
rotN,
rotE,
rotZ,
)
self.f = f_center_tot
self.f_lower = f_lower
self.f_higher = f_higher
est = EstimatorConfiguration(
wave_types=["L", "R"],
method="ML",
scaling_velocity=scaling_velocity,
use_ml_classification=True,
svm=svm,
)
self.parameters = []
for i in range(len(f_lower)):
if verbose:
print(
f"Estimating surface wave parameters at frequency f = {f_center_tot[i]: 0.2f} Hz. "
f"Frequency step {i + 1} out of {len(f_lower)}."
)
window_f = {
"window_length_seconds": 1
/ f_center_tot[i]
* window["number_of_periods"],
"overlap": window["overlap"],
}
data_f = data.copy()
data_f.filter("lowpass", freq=f_higher[i], zerophase=True)
data_f.filter("highpass", freq=f_lower[i], zerophase=True)
analysis = TimeDomainAnalysis6C(
traN=data_f[0],
traE=data_f[1],
traZ=data_f[2],
rotN=data_f[3],
rotE=data_f[4],
rotZ=data_f[5],
window=window_f,
scaling_velocity=scaling_velocity,
verbose=False,
)
analysis.polarization_analysis(estimator_configuration=est)
parameters_f = {
"xi": analysis.xi,
"phi_r": analysis.phi_r,
"phi_l": analysis.phi_l,
"c_l": analysis.c_l,
"c_r": analysis.c_r,
"wave_types": analysis.classification["0"],
"dop": analysis.dop,
}
self.parameters.append(parameters_f)
[docs] def plot(
self,
nbins: int = 100,
velocity_range: Tuple[float, float] = (0, 2500),
quantiles: Tuple[float, float] = (0.2, 0.8),
dop_min: float = 0.3,
show: bool = True,
) -> None:
r"""Plot estimated Love- and Rayleigh-wave dispersion curves and Rayleigh wave ellipticity angle.
Parameters
----------
nbins: :obj:`int`, default=100
Number of bins to use in the dispersion plot.
velocity_range: :obj:`tuple` of (:obj:`float`, :obj:`float`), default=(0, 2500)
Range of the velocity axis in m/s.
quantiles: :obj:`tuple` of (:obj:`float`, :obj:`float`), default=(0.2, 0.8)
Only show data points that lie within the specified quantile range.
dop_min: :obj:`float`, default=0.3
Only show data points that were extracted in windows with a degree of polarization larger than dop_min.
show: :obj:`bool`, default=True
If True, show the plot interactively after plotting.
"""
counts_r = np.zeros((nbins, len(self.f)))
counts_l = np.zeros((nbins, len(self.f)))
counts_dop_r = np.zeros((nbins, len(self.f)))
counts_dop_l = np.zeros((nbins, len(self.f)))
counts_elli = np.zeros((nbins, len(self.f)))
median_cr = np.zeros(
len(self.f),
)
median_cl = np.zeros(
len(self.f),
)
median_elli = np.zeros(
len(self.f),
)
cr_ndec = np.zeros(
len(self.f),
)
cl_ndec = np.zeros(
len(self.f),
)
for i, f in enumerate(self.f):
dop_f = self.parameters[i]["dop"]
dop_r = dop_f
dop_l = dop_f
c_r_full = self.parameters[i]["c_r"]
dop_r = dop_r[dop_f > dop_min]
c_r_full = c_r_full[dop_f > dop_min]
dop_r = dop_r[~np.isnan(c_r_full)]
c_r_full = c_r_full[~np.isnan(c_r_full)]
if not c_r_full.any():
c_r = np.array(np.nan)
cr_ndec[i] = 0
dop_r = np.array(np.nan)
else:
dop_r = dop_r[c_r_full < np.quantile(c_r_full, quantiles[1])]
c_r = c_r_full[c_r_full < np.quantile(c_r_full, quantiles[1])]
dop_r = dop_r[c_r > np.quantile(c_r_full, quantiles[0])]
c_r = c_r[c_r > np.quantile(c_r_full, quantiles[0])]
dop_r = dop_r[c_r < velocity_range[1]]
c_r = c_r[c_r < velocity_range[1]]
dop_r = dop_r[c_r > velocity_range[0]]
c_r = c_r[c_r > velocity_range[0]]
cr_ndec[i] = len(c_r) / len(self.parameters[i]["c_r"])
counts, values_r = np.histogram(
c_r,
bins=nbins,
range=[velocity_range[0], velocity_range[1]],
density=True,
)
counts_r[:, i] = counts.T
if c_r.any():
median_cr[i] = np.median(c_r)
else:
median_cr[i] = np.array(np.nan)
counts, _ = np.histogram(dop_r, bins=nbins, range=[0, 1], density=True)
counts_dop_r[:, i] = counts
elli_full = self.parameters[i]["xi"]
elli_full = elli_full[dop_f > dop_min]
elli_full = elli_full[~np.isnan(elli_full)]
if not elli_full.any():
elli = np.array(np.nan)
else:
elli = elli_full[elli_full < np.quantile(elli_full, quantiles[1])]
elli = elli[elli > np.quantile(elli_full, quantiles[0])]
counts, values_elli = np.histogram(
elli, bins=nbins, range=[-90, 90], density=True
)
counts_elli[:, i] = counts.T
median_elli[i] = np.median(elli)
c_l_full = self.parameters[i]["c_l"]
dop_l = dop_l[dop_f > dop_min]
c_l_full = c_l_full[dop_f > dop_min]
dop_l = dop_l[~np.isnan(c_l_full)]
c_l_full = c_l_full[~np.isnan(c_l_full)]
if not c_l_full.any():
c_l = np.array(np.nan)
cl_ndec[i] = 0
dop_l = np.array(np.nan)
else:
dop_l = dop_l[c_l_full < np.quantile(c_l_full, quantiles[1])]
c_l = c_l_full[c_l_full < np.quantile(c_l_full, quantiles[1])]
dop_l = dop_l[c_l > np.quantile(c_l_full, quantiles[0])]
c_l = c_l[c_l > np.quantile(c_l_full, quantiles[0])]
dop_l = dop_l[c_l < velocity_range[1]]
c_l = c_l[c_l < velocity_range[1]]
dop_l = dop_l[c_l > velocity_range[0]]
c_l = c_l[c_l > velocity_range[0]]
cl_ndec[i] = len(c_l) / len(self.parameters[i]["c_l"])
counts, values_l = np.histogram(
c_l,
bins=nbins,
range=[velocity_range[0], velocity_range[1]],
density=True,
)
counts_l[:, i] = counts.T
median_cl[i] = np.median(c_l)
counts, _ = np.histogram(dop_l, bins=nbins, range=[0, 1], density=True)
counts_dop_l[:, i] = counts
counts_r[np.isnan(counts_r)] = 0
counts_l[np.isnan(counts_l)] = 0
counts_dop_r[np.isnan(counts_dop_r)] = 0
counts_dop_l[np.isnan(counts_dop_l)] = 0
X, Y = np.meshgrid(
self.f, np.linspace(velocity_range[0], velocity_range[1], nbins)
)
fig1, ((ax2, ax1, ax5), (ax4, ax3, ax6), (ax7, ax8, ax9)) = plt.subplots(
3, 3, gridspec_kw={"height_ratios": [3, 1, 1]}, figsize=(15, 10)
)
ax1.pcolormesh(
X,
Y,
counts_r,
vmin=0.005 * np.max(np.max(counts_r)),
vmax=0.2 * np.max(np.max(counts_r)),
cmap="magma",
)
ax1.set_ylim([velocity_range[0], velocity_range[1]])
ax1.set_ylabel("Phase velocity (ms)")
ax1.set_xlabel("Frequency (Hz)")
ax1.set_title("Rayleigh")
ax2.pcolormesh(
X,
Y,
counts_l,
vmin=0.005 * np.max(np.max(counts_l)),
vmax=0.2 * np.max(np.max(counts_l)),
cmap="magma",
)
ax2.set_ylim([velocity_range[0], velocity_range[1]])
ax2.set_ylabel("Phase velocity (ms)")
ax2.set_xlabel("Frequency (Hz)")
ax2.set_title("Love")
for n in range(len(cl_ndec)):
rect_l = Rectangle(
(self.f_lower[n], 0),
self.f_higher[n] - self.f_lower[n],
cl_ndec[n] * 100,
edgecolor="k",
facecolor="none",
)
rect_r = Rectangle(
(self.f_lower[n], 0),
self.f_higher[n] - self.f_lower[n],
cr_ndec[n] * 100,
edgecolor="k",
facecolor="none",
)
rect_re = Rectangle(
(self.f_lower[n], 0),
self.f_higher[n] - self.f_lower[n],
cr_ndec[n] * 100,
edgecolor="k",
facecolor="none",
)
ax4.add_patch(rect_l)
ax3.add_patch(rect_r)
ax6.add_patch(rect_re)
X, Y = np.meshgrid(self.f, np.linspace(-90, 90, nbins))
ax5.pcolormesh(
X,
Y,
counts_elli,
vmin=0.005 * np.max(np.max(counts_elli)),
vmax=0.6 * np.max(np.max(counts_elli)),
cmap="magma",
)
ax5.set_ylim([-90, 90])
ax5.set_ylabel("Ellipticity angle (degrees)")
ax5.set_xlabel("Frequency (Hz)")
ax5.set_title("Rayleigh")
ax1.plot(np.flip(self.f), np.flip(median_cr), "-.r")
ax5.plot(np.flip(self.f), np.flip(median_elli), "-.r")
ax2.plot(np.flip(self.f), np.flip(median_cl), "-.r")
ax4.set_xlim([self.f_lower[-1], self.f_higher[0]])
ax3.set_xlim([self.f_lower[-1], self.f_higher[0]])
ax6.set_xlim([self.f_lower[-1], self.f_higher[0]])
ax4.set_ylim([0, 100 * 1.2 * np.max([cr_ndec.max(), cl_ndec.max()])])
ax3.set_ylim([0, 100 * 1.2 * np.max([cr_ndec.max(), cl_ndec.max()])])
ax6.set_ylim([0, 100 * 1.2 * np.max([cr_ndec.max(), cl_ndec.max()])])
ax4.set_ylabel("Detection rate (%)")
X, Y = np.meshgrid(self.f, np.linspace(0, 1, nbins))
ax7.pcolormesh(
X,
Y,
counts_dop_l,
vmin=0.005 * np.max(np.max(counts_dop_l)),
vmax=0.6 * np.max(np.max(counts_dop_l)),
cmap="magma",
)
ax7.set_ylim([0, 1])
ax7.set_ylabel("Degree of polarization")
ax7.set_xlabel("Frequency (Hz)")
ax8.pcolormesh(
X,
Y,
counts_dop_r,
vmin=0.005 * np.max(np.max(counts_dop_r)),
vmax=0.6 * np.max(np.max(counts_dop_r)),
cmap="magma",
)
ax8.set_ylim([0, 1])
ax8.set_xlabel("Frequency (Hz)")
ax9.pcolormesh(
X,
Y,
counts_dop_r,
vmin=0.005 * np.max(np.max(counts_dop_r)),
vmax=0.6 * np.max(np.max(counts_dop_r)),
cmap="magma",
)
ax9.set_ylim([0, 1])
ax7.set_xlim(ax2.get_xlim())
ax8.set_xlim(ax1.get_xlim())
ax9.set_xlim(ax1.get_xlim())
ax9.set_xlabel("Frequency (Hz)")
if show:
plt.show()
[docs] def plot_baz(self, freq: float = None, nbins: int = 90, show: bool = True) -> None:
r"""Plot the back-azimuth of Love and Rayleigh wave sources as a polar plot.
Parameters
----------
freq: :obj:`float`
If specified, the back-azimuth is only plotted for sources detected at the specified frequency.
Needs to be a frequency belonging to the vector in attribute f of the DispersionAnalysis object.
nbins: :obj:`int`, default=92
Number of bins used to split the circle.
show: :obj:`bool`, default=True
If True, show the plot interactively after plotting.
"""
if freq is not None:
indx = np.argmin(np.abs(np.asarray(self.f) - freq))
phi_r = self.parameters[indx]["phi_r"]
phi_l = self.parameters[indx]["phi_l"]
# Add Pi to convert from propagation direction to back azimuth
phi_r = np.radians(phi_r[~np.isnan(phi_r)]) + np.pi
phi_l = np.radians(phi_l[~np.isnan(phi_l)]) + np.pi
phi_r[phi_r > 2 * np.pi] -= 2 * np.pi
phi_l[phi_l > 2 * np.pi] -= 2 * np.pi
width = (2 * np.pi) / nbins
theta = np.linspace(0.0, 2 * np.pi, nbins, endpoint=False)
height_phi_r, _ = np.histogram(
phi_r, nbins, range=(0, 2 * np.pi), density=True
)
height_phi_r /= height_phi_r.max() * 1.2
height_phi_l, _ = np.histogram(
phi_l, nbins, range=(0, 2 * np.pi), density=True
)
height_phi_l /= height_phi_l.max() * 1.2
bottom = 0
fig, ax = plt.subplots(
1,
2,
sharex=True,
sharey=True,
figsize=(15, 6),
subplot_kw=dict(polar=True),
)
for axis in ax:
axis.set_theta_direction(-1)
axis.set_theta_offset(np.pi / 2.0)
bars_r = ax[1].bar(theta, height_phi_r, width=width, bottom=bottom)
bars_l = ax[0].bar(theta, height_phi_l, width=width, bottom=bottom)
for bar_r, bar_l in zip(bars_r, bars_l):
bar_l.set_facecolor("darkgreen")
bar_r.set_facecolor("yellow")
bar_r.set_edgecolor((0, 0, 0))
bar_l.set_edgecolor((0, 0, 0))
ax[0].set_title("Love waves")
ax[1].set_title("Rayleigh waves")
plt.suptitle(f"Back-azimuth at {self.f[indx]: .2f} Hz")
else:
for indx in range(len(self.f)):
phi_r = self.parameters[indx]["phi_r"]
phi_l = self.parameters[indx]["phi_l"]
phi_r = np.radians(phi_r[~np.isnan(phi_r)])
phi_l = np.radians(phi_l[~np.isnan(phi_l)])
width = (2 * np.pi) / nbins
theta = np.linspace(0.0, 2 * np.pi, nbins, endpoint=False)
height_phi_r, _ = np.histogram(
phi_r, nbins, range=(0, 2 * np.pi), density=True
)
height_phi_l, _ = np.histogram(
phi_l, nbins, range=(0, 2 * np.pi), density=True
)
bottom = 0
fig, ax = plt.subplots(
1,
2,
sharex=True,
sharey=True,
figsize=(15, 6),
subplot_kw=dict(polar=True),
)
for axis in ax:
axis.set_theta_direction(-1)
axis.set_theta_offset(np.pi / 2.0)
bars_r = ax[1].bar(theta, height_phi_r, width=width, bottom=bottom)
bars_l = ax[0].bar(theta, height_phi_l, width=width, bottom=bottom)
for bar_r, bar_l in zip(bars_r, bars_l):
bar_l.set_facecolor("darkgreen")
bar_r.set_facecolor("yellow")
bar_r.set_edgecolor((0, 0, 0))
bar_l.set_edgecolor((0, 0, 0))
ax[0].set_title("Love waves")
ax[1].set_title("Rayleigh waves")
plt.suptitle(f"Back-azimuth at {self.f[indx]: .2f} Hz")
if show:
plt.show()
[docs] def save(self, name: str) -> None:
"""Save the current DispersionAnalysis object to a file on the disk in the current working directory.
Include absolute path if you want to save in a different directory.
name : :obj:`str`
File name
"""
if not isinstance(name, str):
raise ValueError("Name must be a string!")
fid = open(name, "wb")
pickle.dump(self, fid, pickle.HIGHEST_PROTOCOL)
fid.close()