Source code for twistpy.polarization.timefrequency

from __future__ import absolute_import, division, print_function, unicode_literals

import pickle
from builtins import ValueError
from typing import List, Dict, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.axes import Axes
from matplotlib.cm import ScalarMappable
from matplotlib.colors import ListedColormap, Normalize
from obspy import Trace, Stream
from obspy.core.utcdatetime import UTCDateTime
from scipy.ndimage import uniform_filter1d

from twistpy.polarization import EstimatorConfiguration
from twistpy.polarization.machinelearning import SupportVectorMachine
from twistpy.utils import stransform, istransform


[docs]class TimeFrequencyAnalysis6C: """Time-frequency domain six-component polarization analysis. Single-station six degree-of-freedom polarization analysis on time-frequency decomposed signals. The time-frequency representation is obtained via the S-transform (:func:`twistpy.utils.s_transform`) [1], [2]. .. hint:: [1] Sollberger et al. (2018). *6-C polarization analysis using point measurements of translational and rotational ground-motion: theory and applications*, Geophysical Journal International, 213(1), https://doi.org/10.1093/gji/ggx542 [2] Sollberger et al. (2020). *Seismological processing of six degree-of-freedom ground-motion data*, Sensors, 20(23), https://doi.org/10.3390/s20236904 Parameters ---------- traN : :obj:`~obspy.core.trace.Trace` North component of translation traE : :obj:`~obspy.core.trace.Trace` East component of translation traZ : :obj:`~obspy.core.trace.Trace` Vertical component of translation (positive downwards!) rotN : :obj:`~obspy.core.trace.Trace` North component of rotation rotE : :obj:`~obspy.core.trace.Trace` East component of rotation rotZ : :obj:`~obspy.core.trace.Trace` Vertical component of rotation (right-handed, pointing downwards!) window : :obj:`dict` 2-D time-frequency window within which the covariance matrices are averaged: | window = {'number_of_periods': :obj:`float`, 'frequency_extent': :obj:`float`} | The width of the window in time is frequency-dependent and stretches over 'number_of_periods' periods, where the period is defined as 1/f. In frequency, the window stretches over the specified 'frequency_extent' in Hz. dsfact : :obj:`int`, default=1 Down-sampling factor of the time-axis prior to the computation of polarization attributes. dsfacf : :obj:`int`, default=1 Down-sampling factor of the frequency-axis prior to the computation of polarization attributes. .. warning:: Downsampling of the frequency axis (dsfacf > 1) prevents the accurate computation of the inverse S-transform! Down-sampling should be avoided for filtering applications. frange : :obj:`tuple`, (f_min, f_max) Limit the analysis to the specified frequency band in Hz trange : :obj:`tuple`, (t_min, t_max) Limit the analysis to the specified time window. t_min and t_max should either be specified as a tuple of two :obj:`~obspy.core.utcdatetime.UTCDateTime` objects (if timeaxis='utc'), or as a tuple of :obj:`float` (if timeaxis='rel'). k : :obj:`float`, default=1. k-value used in the S-transform, corresponding to a scaling factor that controls the number of oscillations in the Gaussian window. When k increases, the frequency resolution increases, with a corresponding loss of time resolution. .. seealso:: :func:`twistpy.utils.s_transform` scaling_velocity : :obj:`float`, default=1. Scaling velocity (in m/s) to ensure numerical stability. The scaling velocity is applied to the translational data only (amplitudes are divided by scaling velocity) and ensures that both translation and rotation amplitudes are on the same order of magnitude and dimensionless. Ideally, v_scal is close to the S-Wave velocity at the receiver. After applying the scaling velocity, translation and rotation signals amplitudes should be similar. free_surface : :obj:`bool`, default=True Specify whether the recording station is located at the free surface in order to use the corresponding polarization models. verbose : :obj:`bool`, default=True Run in verbose mode. timeaxis : :obj:'str', default='utc' Specify whether the time axis of plots is shown in UTC (timeaxis='utc') or in seconds relative to the first sample (timeaxis='rel'). Attributes ---------- classification : :obj:`dict` Dictionary containing the labels of classified wave types at each position of the analysis window in time and frequency. The dictionary has six entries corresponding to classifications for each eigenvector. | classification = {'0': list_with_classifications_of_first_eigenvector, '1': list_with_classification_of_second_eigenvector, ... , '5': list_with_classification_of_last_eigenvector} wave_parameters : :obj:`dict` Dictionary containing the estimated wave parameters. t_pol : :obj:`list` of :obj:`~obspy.core.utcdatetime.UTCDateTime` Time axis of the computed polarization attributes. f_pol : :obj:`~numpy.ndarray` of :obj:`float` Frequency axis of the computed polarization attributes. C : :obj:`~numpy.ndarray` of :obj:`~numpy.complex128` Complex covariance matrices at each window position dop : :obj:`~numpy.ndarray` of :obj:`float` Degree of polarization as a function of time- and frequency. time : :obj:`list` of :obj:`~obspy.core.utcdatetime.UTCDateTime` Time axis of the input traces delta : :obj:`float` Sampling interval of the input data in seconds """ def __init__( self, traN: Trace, traE: Trace, traZ: Trace, rotN: Trace, rotE: Trace, rotZ: Trace, window: dict, dsfacf: int = 1, dsfact: int = 1, frange: Tuple[float, float] = None, trange: Tuple[UTCDateTime, UTCDateTime] = None, k: float = 1, scaling_velocity: float = 1.0, free_surface: bool = True, verbose: bool = True, timeaxis: "str" = "utc", ) -> None: self.dop = None self.traN, self.traE, self.traZ, self.rotN, self.rotE, self.rotZ = ( traN, traE, traZ, rotN, rotE, rotZ, ) # Assert that input traces are ObsPy Trace objects assert ( isinstance(self.traN, Trace) and isinstance(self.traE, Trace) and isinstance(self.traZ, Trace) and isinstance(self.rotN, Trace) and isinstance(self.rotE, Trace) and isinstance(self.rotZ, Trace) ), ("Input data " "must be " "objects of " "class " "obspy.core" ".Trace()! ") # Assert that all traces have the same number of samples assert ( self.traN.stats.npts == self.traE.stats.npts and self.traN.stats.npts == self.traZ.stats.npts and self.traN.stats.npts == self.rotN.stats.npts and self.traN.stats.npts == self.rotE.stats.npts and self.traN.stats.npts == self.rotZ.stats.npts ), "All six traces must have the same number of samples!" self.window = window self.dsfact = dsfact self.dsfacf = dsfacf self.k = k self.timeaxis = timeaxis if self.timeaxis == "utc": self.time = self.traN.times(type="matplotlib") else: self.time = self.traN.times() self.scaling_velocity = scaling_velocity self.free_surface = free_surface self.verbose = verbose self.delta = self.traN.stats.delta self.classification: Dict[str, List[str]] = { "0": None, "1": None, "2": None, "3": None, "4": None, "5": None, } self.wave_parameters = { "P": {"vp": None, "vp_to_vs": None, "theta": None, "phi": None, "lh": None}, "SV": { "vp": None, "vp_to_vs": None, "theta": None, "phi": None, "lh": None, }, "SH": {"vs": None, "theta": None, "phi": None, "lh": None}, "R": {"vr": None, "phi": None, "xi": None, "lh": None}, "L": {"vl": None, "phi": None, "lh": None}, } self.frange = frange self.trange = trange N = np.max(traN.data.shape) N_half = int(np.floor(N / 2)) self.f_pol = (np.concatenate([np.arange(N_half + 1)]) / N)[ ::dsfacf ] / self.delta # Compute extent of window in no. of samples in the time and frequency direction df = self.f_pol[1] - self.f_pol[0] periods = 1.0 / self.f_pol[1:] # Exclude DC periods = np.append(float(self.time[-1] - self.time[0]), periods) window_t_samples = np.array( self.window["number_of_periods"] * periods / self.delta, dtype=int ) window_t_samples[window_t_samples > len(self.time)] = len(self.time) window_t_samples[window_t_samples == 0] = 1 window_f_samples = np.max([1, int(self.window["frequency_extent"] / df)]) # Compute the S-transform of the input signal u: np.ndarray = np.moveaxis( np.array( [ stransform(traN.data, dsfacf=dsfacf, k=k)[0], stransform(traE.data, dsfacf=dsfacf, k=k)[0], stransform(traZ.data, dsfacf=dsfacf, k=k)[0], stransform(rotN.data, dsfacf=dsfacf, k=k)[0], stransform(rotE.data, dsfacf=dsfacf, k=k)[0], stransform(rotZ.data, dsfacf=dsfacf, k=k)[0], ] ), 0, 2, ) self.signal_amplitudes_st = np.sqrt( np.abs(u[:, :, 0]) ** 2 + np.abs(u[:, :, 1]) ** 2 + np.abs(u[:, :, 2]) ** 2 + np.abs(u[:, :, 3]) ** 2 + np.abs(u[:, :, 4]) ** 2 + np.abs(u[:, :, 5]) ** 2 ) if self.trange is not None: indx_t = [ ( np.abs( self.traN.times(type="utcdatetime").astype("float") - float(self.trange[0]) ) ).argmin(), ( np.abs( self.traN.times(type="utcdatetime").astype("float") - float(self.trange[1]) ) ).argmin(), ] else: indx_t = [0, u.shape[1]] if self.frange is not None: indx_f = [ (np.abs(self.f_pol - self.frange[0])).argmin(), (np.abs(self.f_pol - self.frange[1])).argmin(), ] else: indx_f = [0, u.shape[0]] self.t_pol = self.time[indx_t[0] : indx_t[1]] self.t_pol = self.t_pol[::dsfact] self.f_pol = self.f_pol[indx_f[0] : indx_f[1]] self.signal_amplitudes_st = self.signal_amplitudes_st[ indx_f[0] : indx_f[1], indx_t[0] : indx_t[1] : dsfact ] # Compute covariance matrices if self.verbose: print("Computing covariance matrices...") C: np.ndarray = np.einsum("...i,...j->...ij", np.conj(u), u).astype("complex") for j in range(C.shape[2]): for k in range(C.shape[3]): C[..., j, k] = uniform_filter1d( C[..., j, k].real, size=window_f_samples, axis=0 ) + 1j * uniform_filter1d( C[..., j, k].imag, size=window_f_samples, axis=0 ) for i in range(C.shape[0]): C[i, :, j, k] = uniform_filter1d( C[i, :, j, k].real, size=window_t_samples[i] ) + 1j * uniform_filter1d( C[i, :, j, k].imag, size=window_t_samples[i] ) self.C = np.reshape( C[indx_f[0] : indx_f[1], indx_t[0] : indx_t[1] : dsfact, :, :], (len(self.t_pol) * len(self.f_pol), 6, 6), ) # flatten the # time and frequency dimension if self.verbose: print("Covariance matrices computed!")
[docs] def classify( self, svm: SupportVectorMachine, eigenvector_to_classify: int = 0, compute_dop: bool = True, ) -> None: """Classify wave types using a support vector machine Parameters ---------- svm : :obj:`~twistpy.MachineLearning.SupportVectorMachine` Trained support vector machine used for wave type classification eigenvector_to_classify : :obj:`int`, optional, default=0 Integer value identifying the eigenvector that will be classified. The eigenvectors are sorted in descending order of their corresponding eigenvalue | If 0: first eigenvector, corresponding to the dominant signal in the time window (associated with the largest eigenvalue). compute_dop : :obj:`bool`, optional, default = True Specify whether the degree of polarization is computed. """ if self.classification[str(eigenvector_to_classify)] is not None: print( f"Wave types are already classified for eigenvector with number '{eigenvector_to_classify}'! " f"Classification will not be run a second time. To access the previous classification, " f"check the attribute TimeFrequencyAnalysis6C.classification['{eigenvector_to_classify}']" ) return if self.verbose: print("Performing eigen-decomposition of covariance matrices...") eigenvalues, eigenvectors = np.linalg.eigh(self.C) if compute_dop: self.dop = np.reshape( ( (eigenvalues[:, 0] - eigenvalues[:, 1]) ** 2 + (eigenvalues[:, 0] - eigenvalues[:, 2]) ** 2 + (eigenvalues[:, 0] - eigenvalues[:, 3]) ** 2 + (eigenvalues[:, 0] - eigenvalues[:, 4]) ** 2 + (eigenvalues[:, 0] - eigenvalues[:, 5]) ** 2 + (eigenvalues[:, 1] - eigenvalues[:, 2]) ** 2 + (eigenvalues[:, 1] - eigenvalues[:, 3]) ** 2 + (eigenvalues[:, 1] - eigenvalues[:, 4]) ** 2 + (eigenvalues[:, 1] - eigenvalues[:, 5]) ** 2 + (eigenvalues[:, 2] - eigenvalues[:, 3]) ** 2 + (eigenvalues[:, 2] - eigenvalues[:, 4]) ** 2 + (eigenvalues[:, 2] - eigenvalues[:, 5]) ** 2 + (eigenvalues[:, 3] - eigenvalues[:, 4]) ** 2 + (eigenvalues[:, 3] - eigenvalues[:, 5]) ** 2 + (eigenvalues[:, 4] - eigenvalues[:, 5]) ** 2 ) / (5 * np.sum(eigenvalues, axis=-1) ** 2), (len(self.f_pol), len(self.t_pol)), ) # The eigenvectors are initially arbitrarily oriented in the complex plane, here we ensure that # the real and imaginary parts are orthogonal. See Samson (1980): Some comments on the descriptions of the # polarization states of waves, Geophysical Journal of the Royal Astronomical Society, Eqs. (3)-(5) u1 = eigenvectors[ :, :, -(eigenvector_to_classify + 1) ] # Select eigenvector for classification gamma = np.arctan2( 2 * np.einsum("ij,ij->j", u1.real.T, u1.imag.T, optimize=True), np.einsum("ij,ij->j", u1.real.T, u1.real.T, optimize=True) - np.einsum("ij,ij->j", u1.imag.T, u1.imag.T, optimize=True), ) phi = -0.5 * gamma eigenvectors = np.tile(np.exp(1j * phi), (6, 6, 1)).T * eigenvectors if self.verbose: print("Eigenvectors and eigenvalues have been computed!") df = pd.DataFrame( np.concatenate( ( eigenvectors[:, :, -(eigenvector_to_classify + 1)].real, eigenvectors[:, :, -(eigenvector_to_classify + 1)].imag, ), axis=1, ), columns=[ "t1_real", "t2_real", "t3_real", "r1_real", "r2_real", "r3_real", "t1_imag", "t2_imag", "t3_imag", "r1_imag", "r2_imag", "r3_imag", ], ) model = svm.load_model() if self.verbose: print("Wave type classification in progress...") wave_types = model.predict(df) self.classification[str(eigenvector_to_classify)] = np.reshape( wave_types, (len(self.f_pol), len(self.t_pol)) ) if self.verbose: print("Wave types have been classified!")
[docs] def polarization_analysis( self, estimator_configuration: EstimatorConfiguration = None, plot: bool = False ): r"""Perform polarization analysis. Parameters ---------- plot : :obj:`bool`, default=False Plot the r estimator_configuration : :obj:`~twistpy.polarization.estimator.EstimatorConfiguration` Estimator Configuration to be used in the polarization analysis """ if estimator_configuration is None: raise ValueError( "Please provide an EstimatorConfiguration for polarization analysis!" ) # Classify wave types if this has not been done already. if ( estimator_configuration.use_ml_classification and self.classification[str(estimator_configuration.eigenvector)] is None ): self.classify( estimator_configuration.svm, estimator_configuration.eigenvector ) if self.verbose: print("Computing wave parameters...") eigenvalues, eigenvectors = np.linalg.eigh(self.C) # The eigenvectors are initially arbitrarily oriented in the complex plane, here we ensure that # the real and imaginary parts are orthogonal. See Samson (1980): Some comments on the descriptions of the # polarization states of waves, Geophysical Journal of the Royal Astronomical Society, Eqs. (3)-(5) u1 = eigenvectors[ :, :, -(estimator_configuration.eigenvector + 1) ] # Select eigenvector for classification gamma = np.arctan2( 2 * np.einsum("ij,ij->j", u1.real.T, u1.imag.T, optimize=True), np.einsum("ij,ij->j", u1.real.T, u1.real.T, optimize=True) - np.einsum("ij,ij->j", u1.imag.T, u1.imag.T, optimize=True), ) phi = -0.5 * gamma eigenvectors = np.tile(np.exp(1j * phi), (6, 6, 1)).T * eigenvectors # Compute degree of polarization after Samson (1980): Some comments on the descriptions of the # polarization states of waves, Geophysical Journal of the Royal Astronomical Society, Eq. (18) self.dop = ( (eigenvalues[:, 0] - eigenvalues[:, 1]) ** 2 + (eigenvalues[:, 0] - eigenvalues[:, 2]) ** 2 + (eigenvalues[:, 0] - eigenvalues[:, 3]) ** 2 + (eigenvalues[:, 0] - eigenvalues[:, 4]) ** 2 + (eigenvalues[:, 0] - eigenvalues[:, 5]) ** 2 + (eigenvalues[:, 1] - eigenvalues[:, 2]) ** 2 + (eigenvalues[:, 1] - eigenvalues[:, 3]) ** 2 + (eigenvalues[:, 1] - eigenvalues[:, 4]) ** 2 + (eigenvalues[:, 1] - eigenvalues[:, 5]) ** 2 + (eigenvalues[:, 2] - eigenvalues[:, 3]) ** 2 + (eigenvalues[:, 2] - eigenvalues[:, 4]) ** 2 + (eigenvalues[:, 2] - eigenvalues[:, 5]) ** 2 + (eigenvalues[:, 3] - eigenvalues[:, 4]) ** 2 + (eigenvalues[:, 3] - eigenvalues[:, 5]) ** 2 + (eigenvalues[:, 4] - eigenvalues[:, 5]) ** 2 ) / (5 * np.sum(eigenvalues, axis=-1) ** 2) # Estimate wave parameters directly from the specified eigenvector if method=='ML' if estimator_configuration.method == "ML": for wave_type in estimator_configuration.wave_types: indices = ( self.classification[ str(estimator_configuration.eigenvector) ].ravel() == wave_type ) eigenvector_wtype = eigenvectors[ indices, :, -(estimator_configuration.eigenvector + 1) ] if wave_type == "L": self.phi_l = np.empty_like(self.t_windows) self.phi_l[:] = np.nan self.c_l = np.empty_like(self.t_windows) self.c_l[:] = np.nan eigenvector_wtype[ np.linalg.norm(np.abs(eigenvector_wtype[:, 0:2].real), axis=1) < np.linalg.norm(np.abs(eigenvector_wtype[:, 0:2].imag), axis=1) ] = ( eigenvector_wtype[ np.linalg.norm( np.abs(eigenvector_wtype[:, 0:2].real), axis=1 ) < np.linalg.norm( np.abs(eigenvector_wtype[:, 0:2].imag), axis=1 ) ].conj() * 1j ) phi_love = np.arctan2( np.real(eigenvector_wtype[:, 1]), np.real(eigenvector_wtype[:, 0]), ) a_t = ( np.cos(phi_love) * eigenvector_wtype[:, 0] + np.sin(phi_love) * eigenvector_wtype[:, 1] ) phi_love += np.pi / 2 # Resolve 180 degrees ambiguity by checking the sign of the vertical rotation and the transverse # acceleration phi_love[np.sign(a_t) == np.sign(eigenvector_wtype[:, 5].real)] = ( phi_love[np.sign(a_t) == np.sign(eigenvector_wtype[:, 5].real)] + np.pi ) phi_love[phi_love > 2 * np.pi] -= 2 * np.pi phi_love[phi_love < 0] += 2 * np.pi # Love wave velocity: transverse acceleration divided by 2*vertical rotation c_love = ( self.scaling_velocity * np.abs(a_t.real) / np.abs(eigenvector_wtype[:, 5].real) / 2 ) self.c_l[indices] = c_love self.phi_l[indices] = np.degrees(phi_love) elif wave_type == "R": self.xi = np.empty_like(self.t_windows) self.xi[:] = np.nan self.c_r = np.empty_like(self.t_windows) self.c_r[:] = np.nan self.phi_r = np.empty_like(self.t_windows) self.phi_r[:] = np.nan eigenvector_wtype[ np.linalg.norm(np.abs(eigenvector_wtype[:, 0:2].real), axis=1) > np.linalg.norm(np.abs(eigenvector_wtype[:, 0:2].imag), axis=1) ] = ( eigenvector_wtype[ np.linalg.norm( np.abs(eigenvector_wtype[:, 0:2].real), axis=1 ) > np.linalg.norm( np.abs(eigenvector_wtype[:, 0:2].imag), axis=1 ) ].conj() * 1j ) phi_rayleigh = np.arctan2( np.imag(eigenvector_wtype[:, 1]), np.imag(eigenvector_wtype[:, 0]), ) # Compute radial translational component t_r and transverse rotational component r_t r_t = ( -np.sin(phi_rayleigh) * eigenvector_wtype[:, 3].real + np.cos(phi_rayleigh) * eigenvector_wtype[:, 4].real ) t_r = ( np.cos(phi_rayleigh) * eigenvector_wtype[:, 0].imag + np.sin(phi_rayleigh) * eigenvector_wtype[:, 1].imag ) # Account for 180 degree ambiguity by evaluating signs phi_rayleigh[np.sign(t_r) < np.sign(r_t)] += np.pi phi_rayleigh[(np.sign(t_r) > 0) & (np.sign(r_t) > 0)] += np.pi phi_rayleigh[phi_rayleigh < 0] += 2 * np.pi phi_rayleigh[phi_rayleigh > 2 * np.pi] -= 2 * np.pi phi_rayleigh[ np.sign(eigenvector_wtype[:, 2].real) < np.sign(t_r) ] -= np.pi phi_rayleigh[phi_rayleigh < 0.0] += 2 * np.pi # Compute Rayleigh wave ellipticity angle elli_rayleigh = np.arctan(t_r / eigenvector_wtype[:, 2].real) elli_rayleigh[np.sign(r_t) < np.sign(t_r)] *= -1 elli_rayleigh[ np.sign(eigenvector_wtype[:, 2].real) < np.sign(t_r) ] *= -1 # Compute Rayleigh wave phase velocity c_rayleigh = ( self.scaling_velocity * np.abs(eigenvector_wtype[:, 2].real) / np.abs(r_t) ) self.xi[indices] = np.degrees(elli_rayleigh) self.c_r[indices] = c_rayleigh self.phi_r[indices] = np.degrees(phi_rayleigh) else: for wave_type in estimator_configuration.wave_types: if self.verbose: print(f"Estimating wave parameters for {wave_type}-Waves...") if estimator_configuration.use_ml_classification: indices = ( self.classification[ str(estimator_configuration.eigenvector) ].ravel() == wave_type ) else: indices = np.ones((self.C.shape[0],)).astype("bool") steering_vectors = estimator_configuration.compute_steering_vectors( wave_type ) if estimator_configuration.method == "MUSIC": noise_space_vectors = eigenvectors[ indices, :, : 6 - estimator_configuration.music_signal_space_dimension, ] noise_space_vectors_H = np.transpose( noise_space_vectors.conj(), axes=(0, 2, 1) ) noise_space = np.einsum( "...ij, ...jk->...ik", noise_space_vectors, noise_space_vectors_H, optimize=True, ) P = ( 1 / np.einsum( "...sn,...nk,...sk->...s", steering_vectors.conj().T, noise_space, steering_vectors.T, optimize=True, ).real ) elif estimator_configuration.method == "MVDR": P = ( 1 / np.einsum( "...sn,...nk,...sk->...s", steering_vectors.conj().T, np.linalg.pinv( self.C[indices, :, :] / np.moveaxis( np.tile( np.linalg.norm( self.C[indices, :, :], axis=(1, 2) ), (6, 6, 1), ), 2, 0, ), rcond=1e-6, hermitian=True, ), steering_vectors.T, optimize=True, ).real ) elif estimator_configuration.method == "BARTLETT": P = np.einsum( "...sn,...nk,...sk->...s", steering_vectors.conj().T, self.C[indices, :, :] / np.moveaxis( np.tile( np.linalg.norm(self.C[indices, :, :], axis=(1, 2)), (6, 6, 1), ), 2, 0, ), steering_vectors.T, optimize=True, ).real elif estimator_configuration.method == "DOT": P = np.einsum( "ij,kj->ik", steering_vectors.conj().T, eigenvectors[ indices, :, 5 - estimator_configuration.eigenvector ], optimize=True, ).T P = np.exp(-np.abs(np.arccos(np.around(np.real(np.abs(P)), 12)))) indx_max = np.nanargmax(P, axis=1) if wave_type == "R": self.wave_parameters["R"]["lh"] = np.reshape( P.max(axis=1), (len(self.f_pol), len(self.t_pol)) ) indx = np.unravel_index( indx_max, ( estimator_configuration.vr_n, estimator_configuration.phi_n, estimator_configuration.xi_n, ), ) self.wave_parameters["R"]["vr"] = np.reshape( estimator_configuration.vr[0] + indx[0] * estimator_configuration.vr[2], (len(self.f_pol), len(self.t_pol)), ) self.wave_parameters["R"]["phi"] = np.reshape( estimator_configuration.phi[0] + indx[1] * estimator_configuration.phi[2], (len(self.f_pol), len(self.t_pol)), ) self.wave_parameters["R"]["xi"] = np.reshape( estimator_configuration.xi[0] + indx[2] * estimator_configuration.xi[2], (len(self.f_pol), len(self.t_pol)), ) elif wave_type == "L": indx = np.unravel_index( indx_max, (estimator_configuration.vl_n, estimator_configuration.phi_n), ) self.wave_parameters["L"]["lh"] = np.reshape( P.max(axis=1), (len(self.f_pol), len(self.t_pol)) ) self.wave_parameters["L"]["vl"] = np.reshape( estimator_configuration.vl[0] + indx[0] * estimator_configuration.vl[2], (len(self.f_pol), len(self.t_pol)), ) self.wave_parameters["L"]["phi"] = np.reshape( estimator_configuration.phi[0] + indx[1] * estimator_configuration.phi[2], (len(self.f_pol), len(self.t_pol)), ) elif wave_type == "P": indx = np.unravel_index( indx_max, ( estimator_configuration.vp_n, estimator_configuration.vp_to_vs_n, estimator_configuration.theta_n, estimator_configuration.phi_n, ), ) self.wave_parameters["P"]["lh"] = np.reshape( P.max(axis=1), (len(self.f_pol), len(self.t_pol)) ) self.wave_parameters["P"]["vp"] = np.reshape( estimator_configuration.vp[0] + indx[0] * estimator_configuration.vp[2], (len(self.f_pol), len(self.t_pol)), ) self.wave_parameters["P"]["vp_to_vs"] = np.reshape( estimator_configuration.vp_to_vs[0] + indx[1] * estimator_configuration.vp_to_vs[2], (len(self.f_pol), len(self.t_pol)), ) self.wave_parameters["P"]["theta"] = np.reshape( estimator_configuration.theta[0] + indx[2] * estimator_configuration.theta[2], (len(self.f_pol), len(self.t_pol)), ) self.wave_parameters["P"]["phi"] = np.reshape( estimator_configuration.phi[0] + indx[3] * estimator_configuration.phi[2], (len(self.f_pol), len(self.t_pol)), ) elif wave_type == "SV": indx = np.unravel_index( indx_max, ( estimator_configuration.vp_n, estimator_configuration.vp_to_vs_n, estimator_configuration.theta_n, estimator_configuration.phi_n, ), ) self.wave_parameters["SV"]["lh"] = np.reshape( np.nanmax(P, axis=1), (len(self.f_pol), len(self.t_pol)) ) self.wave_parameters["SV"]["vp"] = np.reshape( estimator_configuration.vp[0] + indx[0] * estimator_configuration.vp[2], (len(self.f_pol), len(self.t_pol)), ) self.wave_parameters["SV"]["vp_to_vs"] = np.reshape( estimator_configuration.vp_to_vs[0] + indx[1] * estimator_configuration.vp_to_vs[2], (len(self.f_pol), len(self.t_pol)), ) self.wave_parameters["SV"]["theta"] = np.reshape( estimator_configuration.theta[0] + indx[2] * estimator_configuration.theta[2], (len(self.f_pol), len(self.t_pol)), ) self.wave_parameters["SV"]["phi"] = np.reshape( estimator_configuration.phi[0] + indx[3] * estimator_configuration.phi[2], (len(self.f_pol), len(self.t_pol)), ) elif wave_type == "SH": indx = np.unravel_index( indx_max, ( estimator_configuration.vs_n, estimator_configuration.theta_n, estimator_configuration.phi_n, ), ) self.wave_parameters["SH"]["lh"] = np.reshape( P.max(axis=1), (len(self.f_pol), len(self.t_pol)) ) self.wave_parameters["SH"]["vs"] = np.reshape( estimator_configuration.vs[0] + indx[0] * estimator_configuration.vs[2], (len(self.f_pol), len(self.t_pol)), ) self.wave_parameters["SH"]["theta"] = np.reshape( estimator_configuration.theta[0] + indx[1] * estimator_configuration.theta[2], (len(self.f_pol), len(self.t_pol)), ) self.wave_parameters["SH"]["phi"] = np.reshape( estimator_configuration.phi[0] + indx[1] * estimator_configuration.phi[2], (len(self.f_pol), len(self.t_pol)), ) if self.verbose: print("Done!") if plot: self.plot_wave_parameters(estimator_configuration=estimator_configuration)
[docs] def plot_wave_parameters( self, estimator_configuration: EstimatorConfiguration, lh_min: float, lh_max: float, ) -> None: r"""Plot estimated wave parameters Parameters ---------- estimator_configuration : :obj:`~twistpy.polarization.EstimatorConfiguration` Estimator configuration object lh_min : :obj:`float` Minimum likelihood for which wave parameters are plotted lh_max : :obj:`float` Maximum likelihood for which wave parameters are plotted """ wave_types = estimator_configuration.wave_types method = estimator_configuration.method if isinstance(wave_types, str): wave_types = [wave_types] for wave_type in wave_types: if method in ["DOT", "MUSIC", "BARTlETT", "MVDR"]: if wave_type == "P": if self.wave_parameters[wave_type]["lh"] is None: raise ValueError( f"Wave parameters for '{wave_type}'-waves have not been computed yet!" ) fig, ax = plt.subplots(5, 1, sharex=True) alpha = Normalize(vmin=lh_min, vmax=lh_max)( self.wave_parameters["P"]["lh"] ) alpha[alpha < 0.0] = 0.0 alpha[alpha > 1.0] = 1.0 ax[0].imshow( self.wave_parameters["P"]["lh"], origin="lower", cmap="inferno", aspect="auto", vmin=lh_min, vmax=lh_max, extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], ) ax[0].set_title("P-wave model fit") cbarmap = ScalarMappable( Normalize(vmin=lh_min, vmax=lh_max), cmap="inferno" ) cbar = plt.colorbar(cbarmap, ax=ax[0], extend="max") cbar.set_label("Estimator power") ax[1].imshow( self.wave_parameters["P"]["vp"], origin="lower", cmap="inferno", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.vp[0], vmax=estimator_configuration.vp[1], ) ax[1].set_title("P-wave velocity") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.vp[0], vmax=estimator_configuration.vp[1], ), cmap="inferno", ) cbar = plt.colorbar(cbarmap, ax=ax[1], extend="max") cbar.set_label("m/s") ax[2].imshow( self.wave_parameters["P"]["vp_to_vs"], origin="lower", cmap="inferno", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.vp_to_vs[0], vmax=estimator_configuration.vp_to_vs[1], ) ax[2].set_title("Vp/Vs Ratio") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.vp_to_vs[0], vmax=estimator_configuration.vp_to_vs[1], ), cmap="inferno", ) cbar = plt.colorbar(cbarmap, ax=ax[2], extend="max") cbar.set_label("Vp/Vs") ax[3].imshow( self.wave_parameters["P"]["theta"], origin="lower", cmap="inferno", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.theta[0], vmax=estimator_configuration.theta[1], ) ax[3].set_title("Incidence angle") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.theta[0], vmax=estimator_configuration.theta[1], ), cmap="inferno", ) cbar = plt.colorbar(cbarmap, ax=ax[3], extend="max") cbar.set_label("Degrees") ax[4].imshow( self.wave_parameters["P"]["phi"], origin="lower", cmap="hsv", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=0, vmax=360, ) ax[4].set_title("Propagation Azimuth") cbarmap = ScalarMappable(Normalize(vmin=0, vmax=360), cmap="hsv") cbar = plt.colorbar(cbarmap, ax=ax[4], extend="max") cbar.set_label("Degrees") for axis in ax: axis.set_ylabel("Frequency (Hz)") if self.timeaxis == "utc": ax[0].xaxis_date() ax[1].xaxis_date() ax[2].xaxis_date() ax[3].xaxis_date() ax[4].xaxis_date() ax[4].set_xlabel("Time (UTC)") else: ax[4].set_xlabel("Time (s)") elif wave_type == "SV": if self.wave_parameters[wave_type]["lh"] is None: raise ValueError( f"Wave parameters for '{wave_type}'-waves have not been computed yet!" ) fig, ax = plt.subplots(5, 1, sharex=True) alpha = Normalize(vmin=lh_min, vmax=lh_max)( self.wave_parameters["SV"]["lh"] ) alpha[alpha < 0.0] = 0.0 alpha[alpha > 1.0] = 1.0 ax[0].imshow( self.wave_parameters["SV"]["lh"], origin="lower", cmap="inferno", aspect="auto", vmin=lh_min, vmax=lh_max, extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], ) ax[0].set_title("SV-wave model fit") cbarmap = ScalarMappable( Normalize(vmin=lh_min, vmax=lh_max), cmap="inferno" ) cbar = plt.colorbar(cbarmap, ax=ax[0], extend="max") cbar.set_label("Estimator power") ax[1].imshow( self.wave_parameters["SV"]["vp"], origin="lower", cmap="inferno", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.vp[0], vmax=estimator_configuration.vp[1], ) ax[1].set_title("P-wave velocity") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.vp[0], vmax=estimator_configuration.vp[1], ), cmap="inferno", ) cbar = plt.colorbar(cbarmap, ax=ax[1], extend="max") cbar.set_label("m/s") ax[2].imshow( self.wave_parameters["SV"]["vp_to_vs"], origin="lower", cmap="inferno", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.vp_to_vs[0], vmax=estimator_configuration.vp_to_vs[1], ) ax[2].set_title("Vp/Vs Ratio") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.vp_to_vs[0], vmax=estimator_configuration.vp_to_vs[1], ), cmap="inferno", ) cbar = plt.colorbar(cbarmap, ax=ax[2], extend="max") cbar.set_label("Vp/Vs") ax[3].imshow( self.wave_parameters["SV"]["theta"], origin="lower", cmap="inferno", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.theta[0], vmax=estimator_configuration.theta[1], ) ax[3].set_title("Incidence angle") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.theta[0], vmax=estimator_configuration.theta[1], ), cmap="inferno", ) cbar = plt.colorbar(cbarmap, ax=ax[3], extend="max") cbar.set_label("Degrees") ax[4].imshow( self.wave_parameters["SV"]["phi"], origin="lower", cmap="hsv", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=0, vmax=360, ) ax[4].set_title("Propagation Azimuth") cbarmap = ScalarMappable(Normalize(vmin=0, vmax=360), cmap="hsv") cbar = plt.colorbar(cbarmap, ax=ax[4], extend="max") cbar.set_label("Degrees") for axis in ax: axis.set_ylabel("Frequency (Hz)") if self.timeaxis == "utc": ax[0].xaxis_date() ax[1].xaxis_date() ax[2].xaxis_date() ax[3].xaxis_date() ax[4].xaxis_date() ax[4].set_xlabel("Time (UTC)") else: ax[4].set_xlabel("Time (s)") elif wave_type == "R": if self.wave_parameters[wave_type]["lh"] is None: raise ValueError( f"Wave parameters for '{wave_type}'-waves have not been computed yet!" ) fig, ax = plt.subplots(4, 1, sharex=True) alpha = Normalize(vmin=lh_min, vmax=lh_max)( self.wave_parameters["R"]["lh"] ) alpha[alpha < 0.0] = 0.0 alpha[alpha > 1.0] = 1.0 ax[0].imshow( self.wave_parameters["R"]["lh"], origin="lower", cmap="inferno", aspect="auto", vmin=lh_min, vmax=lh_max, extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], ) ax[0].set_title("Rayleigh-wave model fit") cbarmap = ScalarMappable( Normalize(vmin=lh_min, vmax=lh_max), cmap="inferno" ) cbar = plt.colorbar(cbarmap, ax=ax[0], extend="max") cbar.set_label("Estimator power") ax[1].imshow( self.wave_parameters["R"]["vr"], origin="lower", cmap="inferno", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.vr[0], vmax=estimator_configuration.vr[1], ) ax[1].set_title("Rayleigh-wave phase velocity") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.vr[0], vmax=estimator_configuration.vr[1], ), cmap="inferno", ) cbar = plt.colorbar(cbarmap, ax=ax[1], extend="max") cbar.set_label("m/s") ax[2].imshow( self.wave_parameters["R"]["phi"], origin="lower", cmap="hsv", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=0, vmax=360, ) ax[2].set_title("Propagation Azimuth") cbarmap = ScalarMappable(Normalize(vmin=0, vmax=360), cmap="hsv") cbar = plt.colorbar(cbarmap, ax=ax[2], extend="max") cbar.set_label("Degrees") ax[3].imshow( self.wave_parameters["R"]["xi"], origin="lower", cmap="Spectral", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=-90, vmax=90, ) ax[3].set_title("Ellipticity angle") cbarmap = ScalarMappable( Normalize(vmin=-90, vmax=90), cmap="Spectral" ) cbar = plt.colorbar(cbarmap, ax=ax[3], extend="max") cbar.set_label("Degrees") for axis in ax: axis.set_ylabel("Frequency (Hz)") if self.timeaxis == "utc": ax[0].xaxis_date() ax[1].xaxis_date() ax[2].xaxis_date() ax[3].xaxis_date() ax[3].set_xlabel("Time (UTC)") else: ax[3].set_xlabel("Time (s)") elif wave_type == "L": if self.wave_parameters[wave_type]["lh"] is None: raise ValueError( f"Wave parameters for '{wave_type}'-waves have not been computed yet!" ) fig, ax = plt.subplots(3, 1, sharex=True) alpha = Normalize(vmin=lh_min, vmax=lh_max)( self.wave_parameters["L"]["lh"] ) alpha[alpha < 0.0] = 0.0 alpha[alpha > 1.0] = 1.0 ax[0].imshow( self.wave_parameters["L"]["lh"], origin="lower", cmap="inferno", aspect="auto", vmin=lh_min, vmax=lh_max, extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], ) ax[0].set_title("Love-wave model fit") cbarmap = ScalarMappable( Normalize(vmin=lh_min, vmax=lh_max), cmap="inferno" ) cbar = plt.colorbar(cbarmap, ax=ax[0], extend="max") cbar.set_label("Estimator power") ax[1].imshow( self.wave_parameters["L"]["vl"], origin="lower", cmap="inferno", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.vl[0], vmax=estimator_configuration.vl[1], ) ax[1].set_title("Love-wave velocity") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.vl[0], vmax=estimator_configuration.vl[1], ), cmap="inferno", ) cbar = plt.colorbar(cbarmap, ax=ax[1], extend="max") cbar.set_label("m/s") ax[2].imshow( self.wave_parameters["L"]["phi"], origin="lower", cmap="hsv", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.phi[0], vmax=estimator_configuration.phi[1], ) ax[2].set_title("Propagation Azimuth") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.phi[0], vmax=estimator_configuration.phi[1], ), cmap="hsv", ) cbar = plt.colorbar(cbarmap, ax=ax[2], extend="max") cbar.set_label("Degrees") for axis in ax: axis.set_ylabel("Frequency (Hz)") if self.timeaxis == "utc": ax[0].xaxis_date() ax[1].xaxis_date() ax[2].xaxis_date() ax[2].set_xlabel("Time (UTC)") else: ax[2].set_xlabel("Time (s)") elif wave_type == "SH": if self.wave_parameters[wave_type]["lh"] is None: raise ValueError( f"Wave parameters for '{wave_type}'-waves have not been computed yet!" ) fig, ax = plt.subplots(3, 1, sharex=True) alpha = Normalize(vmin=lh_min, vmax=lh_max)( self.wave_parameters["SH"]["lh"] ) alpha[alpha < 0.0] = 0.0 alpha[alpha > 1.0] = 1.0 ax[0].imshow( self.wave_parameters["SH"]["lh"], origin="lower", cmap="inferno", aspect="auto", vmin=lh_min, vmax=lh_max, extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], ) ax[0].set_title("SH-wave model fit") cbarmap = ScalarMappable( Normalize(vmin=lh_min, vmax=lh_max), cmap="inferno" ) cbar = plt.colorbar(cbarmap, ax=ax[0], extend="max") cbar.set_label("Estimator power") ax[1].imshow( self.wave_parameters["SH"]["vs"], origin="lower", cmap="inferno", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.vs[0], vmax=estimator_configuration.vs[1], ) ax[1].set_title("S-wave velocity") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.vs[0], vmax=estimator_configuration.vs[1], ), cmap="inferno", ) cbar = plt.colorbar(cbarmap, ax=ax[1], extend="max") cbar.set_label("m/s") ax[2].imshow( self.wave_parameters["SH"]["theta"], origin="lower", cmap="inferno", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.theta[0], vmax=estimator_configuration.theta[1], ) ax[2].set_title("Incidence angle") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.theta[0], vmax=estimator_configuration.theta[1], ), cmap="inferno", ) cbar = plt.colorbar(cbarmap, ax=ax[2], extend="max") cbar.set_label("Degrees") ax[3].imshow( self.wave_parameters["SH"]["phi"], origin="lower", cmap="hsv", aspect="auto", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], alpha=alpha, vmin=estimator_configuration.phi[0], vmax=estimator_configuration.phi[1], ) ax[3].set_title("Propagation Azimuth") cbarmap = ScalarMappable( Normalize( vmin=estimator_configuration.phi[0], vmax=estimator_configuration.phi[1], ), cmap="hsv", ) cbar = plt.colorbar(cbarmap, ax=ax[3], extend="max") cbar.set_label("Degrees") for axis in ax: axis.set_ylabel("Frequency (Hz)") if self.timeaxis == "utc": ax[0].xaxis_date() ax[1].xaxis_date() ax[2].xaxis_date() ax[3].xaxis_date() ax[3].set_xlabel("Time (UTC)") else: ax[3].set_xlabel("Time (s)") plt.show()
[docs] def filter( self, svm: SupportVectorMachine, wave_types: List = ["P", "SV", "R"], no_of_eigenvectors: int = 1, suppress: bool = False, ): r"""Wavefield separation Parameters ---------- svm : :obj:`~twistpy.polarization.machinelearning.SupportVectorMachine` Support vector machine for wave type classification wave_types : :obj:`list`, default=['P', 'SV', 'R'] List of wave types to separate no_of_eigenvectors : :obj:`int`, default=1 Number of eigenvectors to classify and use in the separated data suppress: :obj:`bool`, default=False If set to True, the wave types in listed in wave_types are suppressed from the data instead of being isolated """ if self.dsfacf != 1 or self.dsfact != 1: raise Exception( "For now, filtering is only supported if the polarization attributes have been computed" "at each time-frequency pixel. To do so, recompute the polarization attributes and set " "dsfacf=1 and dsfact=1." ) for eigenvector in range(no_of_eigenvectors): self.classify(svm=svm, eigenvector_to_classify=eigenvector) data_filtered = {} # d = {'P': 0, 'S': 1, 'SV': 1, 'SH': 2, 'R': 3, 'Noise': 4} # Compute eigenvectors for projection _, eigenvectors = np.linalg.eigh(self.C) # Compute S-transform of each component traZ_stran, f_stran = stransform(self.traZ.data, k=self.k) traN_stran, _ = stransform(self.traN.data, k=self.k) traE_stran, _ = stransform(self.traE.data, k=self.k) rotZ_stran, _ = stransform(self.rotZ.data, k=self.k) rotN_stran, _ = stransform(self.rotN.data, k=self.k) rotE_stran, _ = stransform(self.rotE.data, k=self.k) data_st = np.array( [ traN_stran.ravel(), traE_stran.ravel(), traZ_stran.ravel(), rotN_stran.ravel(), rotE_stran.ravel(), rotZ_stran.ravel(), ] ).T # Project data into coordinate frame spanned by eigenvectors data_proj = np.einsum( "...i, ...ij -> ...j", data_st, eigenvectors, optimize=True ) d_projected_filt = data_proj.copy() # Populate filter mask (will be 1 where signal is kept and 0 everywhere else) for wtype in wave_types: data_filtered[wtype] = np.zeros((len(self.t_pol), 6), dtype="float") if suppress: for eigenvector in range(no_of_eigenvectors): mask = self.classification[str(eigenvector)].ravel() == wtype d_projected_filt[mask, -(eigenvector + 1)] *= 0 else: d_projected_filt = data_proj.copy() d_projected_filt *= 0.0 for eigenvector in range(no_of_eigenvectors): mask = self.classification[str(eigenvector)].ravel() == wtype d_projected_filt[mask, -(eigenvector + 1)] = data_proj[ mask, -(eigenvector + 1) ] data_filt = np.einsum( "...i, ...ij -> ...j", d_projected_filt, np.transpose(eigenvectors.conj(), axes=(0, 2, 1)), optimize=True, ) data_filt = data_filt.reshape(len(self.f_pol), len(self.t_pol), 6) for trace in range(6): data_filtered[wtype][:, trace] = istransform( data_filt[:, :, trace], f=f_stran, k=self.k ) if suppress: data_filt = np.einsum( "...i, ...ij -> ...j", d_projected_filt, np.transpose(eigenvectors.conj(), axes=(0, 2, 1)), optimize=True, ) data_filt = data_filt.reshape(len(self.f_pol), len(self.t_pol), 6) for trace in range(6): for wtype in wave_types: data_filtered[wtype][:, trace] = istransform( data_filt[:, :, trace], f=f_stran, k=self.k ) return data_filtered
[docs] def plot_classification( self, ax: Axes, classified_eigenvector: int = 0, clip: float = 0.05 ) -> None: """Plot wave type classification labels as a function of time and frequency. Parameters ---------- ax : :obj:`~matplotlib.axes.Axes` Matplotlib Axes instance where the classification should be plotted classified_eigenvector : :obj:`int`, default = 0 Specify for which eigenvector the classification is plotted clip : :obj:`float`, default=0.05 (between 0 and 1) Results are only plotted at time-frequency points where the signal amplitudes exceed a value of amplitudes * maximum amplitude in the signal (given by the l2-norm of all three components). """ if self.classification[str(classified_eigenvector)] is not None: cmap = ListedColormap(["blue", "red", "green", "yellow", "white"]) map = ScalarMappable(Normalize(vmin=0, vmax=4), cmap=cmap) d = {"P": 0, "SV": 1, "SH": 2, "L": 2, "R": 3, "Noise": 4} classification = ( self.classification[str(classified_eigenvector)] ).flatten() # Convert classification label from string to numbers for plotting classification = np.array([d[wave_type] for wave_type in classification]) classification = np.reshape( classification, self.classification[str(classified_eigenvector)].shape ) classification[ self.signal_amplitudes_st < clip * self.signal_amplitudes_st.max().max() ] = 4 if classified_eigenvector == 0: ax.imshow( classification, origin="lower", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], cmap=cmap, aspect="auto", interpolation="nearest", ) else: # The following lines ensure that the classification of second eigenvector is not plotted at positions # where the first eigenvector was classified as Noise, as there should not be any signal there. This # avoids plotting noise samples that randomly align with one of the polarization vectors. if self.classification["0"] is not None: classification0 = (self.classification["0"]).flatten() # Convert classification label from string to numbers for plotting classification0 = np.array( [d[wave_type] for wave_type in classification0] ) classification0 = np.reshape( classification0, self.classification["0"].shape ) classification0[ self.signal_amplitudes_st < clip * self.signal_amplitudes_st.max().max() ] = 4 ax.imshow( classification, origin="lower", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], cmap=cmap, aspect="auto", alpha=(classification0 != 4).astype("float"), interpolation="nearest", ) else: ax.imshow( classification, origin="lower", extent=[ float(self.t_pol[0]), float(self.t_pol[-1]), self.f_pol[0], self.f_pol[-1], ], cmap=cmap, aspect="auto", interpolation="nearest", ) cbar = plt.colorbar(map, ax=ax, extend="max") cbar.set_ticks([0.4, 1.2, 2.0, 2.8, 3.6]) cbar.set_ticklabels(["P", "SV", "SH", "R", "Noise"]) cbar.set_label("Wave type") if self.timeaxis == "utc": ax.xaxis_date() ax.set_xlabel("Time (UTC)") else: ax.set_xlabel("Time (s)") else: raise Exception( f"No wave types have been classified for this eigenvector yet (eigenvector: " f"{classified_eigenvector})!" )
def _plot_seismograms(self, ax: plt.Axes): if self.timeaxis == "utc": time = self.traN.times(type="matplotlib") else: time = self.traN.times() ax.plot(time, self.traN.data, "k:", label="traN") ax.plot(time, self.traE.data, "k--", label="traE") ax.plot(time, self.traZ.data, "k", label="traZ") ax.plot(time, self.rotZ.data, "r:", label="rotN") ax.plot(time, self.rotE.data, "r--", label="rotE") ax.plot(time, self.rotZ.data, "r", label="rotZ") if self.timeaxis == "utc": ax.xaxis_date() ax.legend(loc="center left", bbox_to_anchor=(1, 0.5))
[docs] def save(self, name: str) -> None: """Save the current TimeFrequencyAnalaysis6C object to a file on the disk in the current working directory. Include absolute path if you want to save in a different directory. name : :obj:`str` File name """ if not isinstance(name, str): raise ValueError("Name must be a string!") fid = open(name, "wb") pickle.dump(self, fid, pickle.HIGHEST_PROTOCOL) fid.close()
[docs]class TimeFrequencyAnalysis3C: r"""Time-frequency domain three-component polarization analysis. Single-station three-component polarization analysis on time-frequency decomposed signals. The time-frequency representation is obtained via the S-transform (:func:`twistpy.utils.s_transform`). Parameters ---------- N : :obj:`~obspy.core.trace.Trace` North component seismogram E : :obj:`~obspy.core.trace.Trace` East component seismogram Z : :obj:`~obspy.core.trace.Trace` Vertical component seismogram window : :obj:`dict` 2-D time-frequency window within which the covariance matrices are averaged: | window = {'number_of_periods': :obj:`float`, 'frequency_extent': :obj:`float`} | The width of the window in time is frequency-dependent and stretches over 'number_of_periods' periods, where the period is defined as 1/f. In frequency, the window stretches over the specified 'frequency_extent' in Hz. dsfact : :obj:`int`, default=1 Down-sampling factor of the time-axis prior to the computation of polarization attributes. dsfacf : :obj:`int`, default=1 Down-sampling factor of the frequency-axis prior to the computation of polarization attributes. .. warning:: Downsampling of the frequency axis (dsfacf > 1) prevents the accurate computation of the inverse S-transform! Down-sampling should be avoided for filtering applications. frange : :obj:`tuple`, (f_min, f_max) Limit the analysis to the specified frequency band in Hz trange : :obj:`tuple`, (t_min, t_max) Limit the analysis to the specified time window. t_min and t_max should either be specified as a tuple of two :obj:`~obspy.core.utcdatetime.UTCDateTime` objects (if timeaxis='utc'), or as a tuple of :obj:`float` (if timeaxis='rel'). k : :obj:`float`, default=1. k-value used in the S-transform, corresponding to a scaling factor that controls the number of oscillations in the Gaussian window. When k increases, the frequency resolution increases, with a corresponding loss of time resolution. .. seealso:: :func:`twistpy.utils.s_transform` verbose : :obj:`bool`, default=True Run in verbose mode. timeaxis : :obj:'str', default='utc' Specify whether the time axis of plots is shown in UTC (timeaxis='utc') or in seconds relative to the first sample (timeaxis='rel'). Attributes ---------- t_pol : :obj:`list` of :obj:`~obspy.core.utcdatetime.UTCDateTime` Time axis of the computed polarization attributes. f_pol : :obj:`~numpy.ndarray` of :obj:`float` Frequency axis of the computed polarization attributes. C : :obj:`~numpy.ndarray` of :obj:`~numpy.complex128` Complex covariance matrices at each window position time : :obj:`list` of :obj:`~obspy.core.utcdatetime.UTCDateTime` Time axis of the input traces delta : :obj:`float` Sampling interval of the input data in seconds dop : 2D :obj:`numpy.ndarray` of :obj:`float` Degree of polarization estimated at each window position in time and frequency. .. hint:: The definition of the degree of polarization follows the one from Samson & Olson (1980): *Some comments on the descriptions of the polarization states of waves*, Geophysical Journal of the Royal Astronomical Society, https://doi.org/10.1111/j.1365-246X.1980.tb04308.x. It is defined as: .. math:: P^2=\sum_{j,k=1}^{n}(\lambda_j-\lambda_k)^2/[2(n-1)(\sum_{j=1}^{n}(\lambda_j)^2)] where :math:`P^2` is the degree of polarization and :math:`\lambda_j` are the eigenvalues of the covariance matrix. elli : 2D :obj:`numpy.ndarray` of :obj:`float` Ellipticity estimated at each window position inc1 : 2D :obj:`numpy.ndarray` of :obj:`float` Inclination of the major semi-axis of the polarization ellipse estimated at each window position inc2 : 2D :obj:`numpy.ndarray` of :obj:`float` Inclination of the minor semi-axis of the polarization ellipse estimated at each window position azi1 : 2D :obj:`numpy.ndarray` of :obj:`float` Azimuth of the major semi-axis of the polarization ellipse estimated at each window position azi2 : 2D :obj:`numpy.ndarray` of :obj:`float` Azimuth of the minor semi-axis of the polarization ellipse estimated at each window position """ def __init__( self, N: Trace, E: Trace, Z: Trace, window: dict, dsfacf: int = 1, dsfact: int = 1, frange: Tuple[float, float] = None, trange: Tuple[UTCDateTime, UTCDateTime] = None, k: float = 1, verbose: bool = True, timeaxis: "str" = "utc", ) -> None: self.dop = None self.elli = None self.inc1 = None self.inc2 = None self.azi1 = None self.azi2 = None self.N, self.E, self.Z = N, E, Z # Assert that input traces are ObsPy Trace objects assert ( isinstance(self.N, Trace) and isinstance(self.E, Trace) and isinstance(self.Z, Trace) ), ("Input data " "must be " "objects of " "class " "obspy.core" ".Trace()! ") # Assert that all traces have the same number of samples assert ( self.N.stats.npts == self.E.stats.npts and self.N.stats.npts == self.Z.stats.npts ), "All six traces must have the same number of samples!" self.window = window self.dsfact = dsfact self.dsfacf = dsfacf self.k = k self.timeaxis = timeaxis if self.timeaxis == "utc": self.time = self.N.times(type="matplotlib") else: self.time = self.N.times() self.verbose = verbose self.delta = self.N.stats.delta self.frange = frange self.trange = trange N_full = np.max(N.data.shape) N_half = int(np.floor(N_full / 2)) self.f_pol = (np.concatenate([np.arange(N_half + 1)]) / N_full)[ ::dsfacf ] / self.delta # Compute extent of window in no. of samples in the time and frequency direction df = self.f_pol[1] - self.f_pol[0] periods = 1.0 / self.f_pol[1:] # Exclude DC periods = np.append(float(self.time[-1] - self.time[0]), periods) window_t_samples = np.array( self.window["number_of_periods"] * periods / self.delta, dtype=int ) window_t_samples[window_t_samples == 0] = 1 window_t_samples[window_t_samples > len(self.time)] = len(self.time) window_f_samples = np.max([1, int(self.window["frequency_extent"] / df)]) self.window_t_samples = window_t_samples self.window_f_samples = window_f_samples # Compute the S-transform of the input signal u: np.ndarray = np.moveaxis( np.array( [ stransform(N.data, dsfacf=dsfacf, k=k)[0], stransform(E.data, dsfacf=dsfacf, k=k)[0], stransform(Z.data, dsfacf=dsfacf, k=k)[0], ] ), 0, 2, ) self.signal_amplitudes_st = np.sqrt( np.abs(u[:, :, 0]) ** 2 + np.abs(u[:, :, 1]) ** 2 + np.abs(u[:, :, 2]) ** 2 ) if self.trange is not None: indx_t = [ ( np.abs( self.N.times(type="utcdatetime").astype("float") - float(self.trange[0]) ) ).argmin(), ( np.abs( self.N.times(type="utcdatetime").astype("float") - float(self.trange[1]) ) ).argmin(), ] else: indx_t = [0, u.shape[1]] if self.frange is not None: indx_f = [ (np.abs(self.f_pol - self.frange[0])).argmin(), (np.abs(self.f_pol - self.frange[1])).argmin(), ] else: indx_f = [0, u.shape[0]] self.t_pol = self.time[indx_t[0] : indx_t[1]] self.t_pol = self.t_pol[::dsfact] self.signal_amplitudes_st = self.signal_amplitudes_st[ indx_f[0] : indx_f[1], indx_t[0] : indx_t[1] : dsfact ] # Compute covariance matrices if self.verbose: print("Computing covariance matrices...") C: np.ndarray = np.einsum("...i,...j->...ij", np.conj(u), u).astype("complex") for j in range(C.shape[2]): for k in range(C.shape[3]): C[..., j, k] = uniform_filter1d( C[..., j, k].real, size=window_f_samples, axis=0 ) + 1j * uniform_filter1d( C[..., j, k].imag, size=window_f_samples, axis=0 ) for i in range(C.shape[0]): C[i, :, j, k] = uniform_filter1d( C[i, :, j, k].real, size=window_t_samples[i] ) + 1j * uniform_filter1d( C[i, :, j, k].imag, size=window_t_samples[i] ) self.C = np.reshape( C[indx_f[0] : indx_f[1], indx_t[0] : indx_t[1] : dsfact, :, :], (len(self.t_pol) * len(self.f_pol), 3, 3), ) # flatten the # time and frequency dimension if self.verbose: print("Covariance matrices computed!")
[docs] def polarization_analysis(self) -> None: r"""Perform polarization analysis.""" if self.verbose: print("Computing polarization attributes...") eigenvalues, eigenvectors = np.linalg.eigh(self.C) # The eigenvectors are initially arbitrarily oriented in the complex plane, here we ensure that # the real and imaginary parts are orthogonal. See Samson (1980): Some comments on the descriptions of the # polarization states of waves, Geophysical Journal of the Royal Astronomical Society, Eqs. (3)-(5) u1 = eigenvectors[:, :, -1] # Select principal eigenvector gamma = np.arctan2( 2 * np.einsum("ij,ij->j", u1.real.T, u1.imag.T, optimize=True), np.einsum("ij,ij->j", u1.real.T, u1.real.T, optimize=True) - np.einsum("ij,ij->j", u1.imag.T, u1.imag.T, optimize=True), ) phi = -0.5 * gamma eigenvectors = np.tile(np.exp(1j * phi), (3, 3, 1)).T * eigenvectors # Compute degree of polarization after Samson (1980): Some comments on the descriptions of the # polarization states of waves, Geophysical Journal of the Royal Astronomical Society, Eq. (18) self.dop = ( (eigenvalues[:, 0] - eigenvalues[:, 1]) ** 2 + (eigenvalues[:, 0] - eigenvalues[:, 2]) ** 2 + (eigenvalues[:, 1] - eigenvalues[:, 2]) ** 2 ) / (2 * np.sum(eigenvalues, axis=-1) ** 2) self.dop = np.reshape(self.dop, (len(self.f_pol), len(self.t_pol))) self.azi1 = np.degrees( np.pi / 2 - np.arctan2(eigenvectors[:, 0, -1].real, eigenvectors[:, 1, -1].real) ) self.azi2 = np.degrees( np.pi / 2 - np.arctan2(eigenvectors[:, 0, -1].imag, eigenvectors[:, 1, -1].imag) ) self.azi1[self.azi1 < 0] += 180 self.azi2[self.azi2 < 0] += 180 self.azi1[self.azi1 > 180] -= 180 self.azi2[self.azi2 > 180] -= 180 self.azi1 = np.reshape(self.azi1, (len(self.f_pol), len(self.t_pol))) self.azi2 = np.reshape(self.azi2, (len(self.f_pol), len(self.t_pol))) self.inc1 = np.degrees( np.arctan2( np.linalg.norm(eigenvectors[:, 0:2, -1].real, axis=1), np.abs(eigenvectors[:, 2, -1].real), ) ) self.inc2 = np.degrees( np.arctan2( np.linalg.norm(eigenvectors[:, 0:2, -1].imag, axis=1), np.abs(eigenvectors[:, 2, -1].imag), ) ) self.inc1 = np.reshape(self.inc1, (len(self.f_pol), len(self.t_pol))) self.inc2 = np.reshape(self.inc2, (len(self.f_pol), len(self.t_pol))) self.elli = np.linalg.norm( eigenvectors[:, :, -1].imag, axis=1 ) / np.linalg.norm(eigenvectors[:, :, -1].real, axis=1) self.elli = np.reshape(self.elli, (len(self.f_pol), len(self.t_pol))) if self.verbose: print("Polarization attributes have been computed!")
[docs] def filter( self, plot_filtered_attributes: bool = False, suppress: bool = False, smooth_mask: bool = False, return_mask: bool = False, clip: float = 0.05, log_frequency: bool = False, fmin: float = None, fmax: float = None, no_of_eigenvectors=1, **kwargs, ) -> Tuple: r"""Filter data based on polarization attributes. Parameters ---------- **kwargs : For example elli=[0, 0.3] Polarization attributes used for filtering. The filter parameters are specified as a list with two entries, specifying the range of the polarization attributes that are kept by the filter. In the above example the filter would only retain all signal with an ellipticity between 0 and 0.3 and suppress all signal with an ellipticity larger than 0.3. Multiple polarization attributes can be specified. .. hint:: Supported polariziation attributes are: dop (degree of polarization) elli (Ellipticity) inc1 (Incidence angle of major semi axis) inc2 (Incidence angle of minor semi axis) azi1 (Azimuth of the major semi axis) azi2 (Azimuth of the minor semi axis) plot_filtered_attributes : :obj:`bool`, default = False If set to True, a plot will be generated that shows the polarization attributes after filtering suppress : :obj:`bool`, default = False If set to True the polarization states that fulfill the filtering criterion will be suppressed from the data while preserving any underlying signal that is orthogonal to the polarization state. no_of_eigenvectors : :obj:`int`, optional Number of eigenvectors to be isolate or suppress from the signal. For example, no_of_eigenvectors=2 and suppress=True will suppress all signal that is aligned with the first two principal eigenvectors of the data covariance matrix. smooth_mask : :obj:`bool`, default = False If set to True, the filter mask will be smoothed within the same time-frequency window that was used to compute the polarization attributes return_mask : :obj:´bool´, default = False If set to True, the filter mask will be returned after the function is called. clip : :obj:`float`, default = 0.05 Only used if plot_filtered_attributes = True. Results are only plotted at time-frequency points where the signal amplitudes exceed a value of amplitudes * maximum amplitude in the signal (given by the l2-norm of all three components). log_frequency : :obj:`bool` = False Only used if plot_filtered_attributes = True. Plot polarization attributes on a logarithmic frequency axis. fmin : :obj:`float`, optional Only used if plot_filtered_attributes = True. Set the lower limit of the frequency axis. fmax : :obj:`float`, optional Only used if plot_filtered_attributes = True. Set the upper limit of the frequency axis. Returns ------- data_filtered : :obj:`~obspy.core.stream.Stream` Filtered data. The order of traces in the stream is Z, N, E. """ if self.dsfacf != 1 or self.dsfact != 1: raise Exception( "Polarization filtering is only supported if the polarization attributes have been computed" "at each time-frequency pixel. To do so, recompute the polarization attributes and set " "dsfacf=1 and dsfact=1." ) if no_of_eigenvectors not in [1, 2]: raise ValueError( "The parameter no_of_eigenvectors must be set to either 1 or 2" ) params = {} for k in kwargs: if kwargs[k] is not None: params[k] = kwargs[k] if not params: raise Exception("No values provided") if self.elli is None: raise Exception("No polarization attributes computed yet!") # Compute eigenvectors for projection _, eigenvectors = np.linalg.eigh(self.C) # Compute S-transform of each component Z_stran, f_stran = stransform(self.Z.data, k=self.k) N_stran, _ = stransform(self.N.data, k=self.k) E_stran, _ = stransform(self.E.data, k=self.k) # Initialize filter mask mask = np.ones((len(self.f_pol), len(self.t_pol))) # Populate filter mask (will be 1 where signal is kept and 0 everywhere else) for parameter in params: pol_attr = np.real(getattr(self, parameter)) alpha = ( (pol_attr >= params[parameter][0]) & (pol_attr <= params[parameter][1]) ).astype("int") mask *= alpha if smooth_mask: mask = uniform_filter1d(mask, size=self.window_f_samples, axis=0) for i in range(mask.shape[0]): mask[i, :] = uniform_filter1d(mask[i, :], size=self.window_t_samples[i]) indx = mask > 0 if suppress: Z_sep = Z_stran N_sep = N_stran E_sep = E_stran else: Z_sep = np.zeros_like(Z_stran).astype("complex") N_sep = np.zeros_like(N_stran).astype("complex") E_sep = np.zeros_like(E_stran).astype("complex") data_st = np.array( [N_stran[indx].ravel(), E_stran[indx].ravel(), Z_stran[indx].ravel()] ).T # Project data into coordinate frame spanned by eigenvectors data_proj = np.einsum( "...i, ...ij -> ...j", data_st, eigenvectors[indx.ravel(), :, :], optimize=True, ) # Only keep or suppress data that is aligned with the principal eigenvector if suppress: data_proj[:, -no_of_eigenvectors:] = 0 else: data_proj[:, 0 : 2 - (no_of_eigenvectors - 1)] = 0 # Back-projection into original coordinate frame data_filt = np.einsum( "...i, ...ij -> ...j", data_proj, np.transpose(eigenvectors[indx.ravel(), :, :].conj(), axes=(0, 2, 1)), optimize=True, ) Z_sep[indx] = data_filt[:, 2] N_sep[indx] = data_filt[:, 0] E_sep[indx] = data_filt[:, 1] if suppress: Z_sep = istransform(Z_sep, f=f_stran, k=self.k) N_sep = istransform(N_sep, f=f_stran, k=self.k) E_sep = istransform(E_sep, f=f_stran, k=self.k) else: Z_sep = istransform(mask * Z_sep, f=f_stran, k=self.k) N_sep = istransform(mask * N_sep, f=f_stran, k=self.k) E_sep = istransform(mask * E_sep, f=f_stran, k=self.k) # data_filtered = Stream(traces=[self.Z.copy(), self.N.copy(), self.E.copy()]) data_filtered[0].data = Z_sep data_filtered[1].data = N_sep data_filtered[2].data = E_sep if plot_filtered_attributes: self.plot( show=False, alpha=mask, seismograms=data_filtered, clip=clip, log_frequency=log_frequency, fmin=fmin, fmax=fmax, ) if return_mask: return data_filtered, mask else: return data_filtered
[docs] def plot( self, clip: float = 0.05, major_semi_axis: bool = True, show: bool = True, alpha: np.ndarray = None, seismograms: Stream = None, log_frequency: bool = False, fmin: float = None, fmax: float = None, ) -> None: """Plot polarization analysis. Parameters ---------- clip : :obj:`float`, default=0.05 (between 0 and 1) Results are only plotted at time-frequency points where the signal amplitudes exceed a value of amplitudes * maximum amplitude in the signal (given by the l2-norm of all three components). major_semi_axis : obj:`bool`, default=True If True, the inclination and azimuth of the major semi-axis is plotted. Otherwise (major_semi_axis=False), The inclination and azimuth are plotted for the minor semi-axis. show : :obj:`bool`, default=True Display the plot directly after plotting. If set to False, the plot will only show up once :func:`~matplotlib.pyplot.show` is called. alpha : :obj:`numpy.ndarray`, default = None A mask (values between zero and 1) of the same dimension as the polarization attributes, that will be plotted on the alpha channel. seismograms : :obj:`obspy.core.Stream`, default = None Manually provide seismograms to be plotted in the first panel. By default, the input data is plotted. log_frequency : :obj:`bool` Optional, Default = False Plot polarization attributes on a logarithmic frequency axis. fmin : :obj:`float`, optional Set the lower limit of the frequency axis. fmax : :obj:`float`, optional Set the upper limit of the frequency axis. """ plt.rcParams["axes.grid"] = False assert ( self.elli is not None ), "No polarization attributes have been computed so far!" fig, ax = plt.subplots(5, 1, sharex=True, figsize=(10, 10)) if seismograms is None: self._plot_seismograms(ax[0]) else: self._plot_seismograms(ax[0], seismograms=seismograms) ax[0].set_ylabel("Amplitude") alpha_channel = np.ones(self.dop.shape) if alpha is not None: alpha_channel *= alpha alpha_channel[ self.signal_amplitudes_st < clip * self.signal_amplitudes_st.max().max() ] = 0 alpha_channel[alpha_channel > 1] = 1 alpha_channel[alpha_channel < 0] = 0 if log_frequency: ax[1].set_yscale("log") ax[1].pcolorfast( self.t_pol, self.f_pol, self.elli, alpha=alpha_channel, cmap="inferno", vmin=0, vmax=1, ) if fmin is None and fmax is None: ax[1].set_ylim(bottom=self.f_pol[1], top=self.f_pol[-1]) elif fmin is None: ax[1].set_ylim(bottom=self.f_pol[1], top=fmax) elif fmax is None: ax[1].set_ylim(bottom=fmin, top=self.f_pol[-1]) else: ax[1].set_ylim(bottom=fmin, top=fmax) map_elli = ScalarMappable(Normalize(vmin=0, vmax=1), cmap="inferno") cbar_elli = plt.colorbar(map_elli, ax=ax[1], extend="max") cbar_elli.set_label("Ellipticity") ax[1].set_title("Ellipticity") ax[1].set_ylabel("Frequency (Hz)") if major_semi_axis: if log_frequency: ax[2].set_yscale("log") ax[2].pcolorfast( self.t_pol, self.f_pol, self.inc1, alpha=alpha_channel, cmap="inferno", vmin=0, vmax=90, ) if fmin is None and fmax is None: ax[2].set_ylim(bottom=self.f_pol[1], top=self.f_pol[-1]) elif fmin is None: ax[2].set_ylim(bottom=self.f_pol[1], top=fmax) elif fmax is None: ax[2].set_ylim(bottom=fmin, top=self.f_pol[-1]) else: ax[2].set_ylim(bottom=fmin, top=fmax) map_inc1 = ScalarMappable(Normalize(vmin=0, vmax=90), cmap="inferno") cbar_inc1 = plt.colorbar(map_inc1, ax=ax[2], extend="max") cbar_inc1.set_label("Inclination (Degrees)") ax[2].set_title("Inclination of major semi-axis") else: if log_frequency: ax[2].set_yscale("log") ax[2].pcolorfast( self.t_pol, self.f_pol, self.inc2, alpha=alpha_channel, cmap="inferno", vmin=0, vmax=90, ) if fmin is None and fmax is None: ax[2].set_ylim(bottom=self.f_pol[1], top=self.f_pol[-1]) elif fmin is None: ax[2].set_ylim(bottom=self.f_pol[1], top=fmax) elif fmax is None: ax[2].set_ylim(bottom=fmin, top=self.f_pol[-1]) else: ax[2].set_ylim(bottom=fmin, top=fmax) map_inc2 = ScalarMappable(Normalize(vmin=0, vmax=90), cmap="inferno") cbar_inc2 = plt.colorbar(map_inc2, ax=ax[2], extend="max") cbar_inc2.set_label("Inclination (Degrees)") ax[2].set_title("Inclination of minor semi-axis") ax[2].set_ylabel("Frequency (Hz)") if major_semi_axis: if log_frequency: ax[3].set_yscale("log") ax[3].pcolorfast( self.t_pol, self.f_pol, self.azi1, alpha=alpha_channel, cmap="twilight", vmin=0, vmax=180, ) if fmin is None and fmax is None: ax[3].set_ylim(bottom=self.f_pol[1], top=self.f_pol[-1]) elif fmin is None: ax[3].set_ylim(bottom=self.f_pol[1], top=fmax) elif fmax is None: ax[3].set_ylim(bottom=fmin, top=self.f_pol[-1]) else: ax[3].set_ylim(bottom=fmin, top=fmax) map_azi1 = ScalarMappable(Normalize(vmin=0, vmax=180), cmap="twilight") cbar_azi1 = plt.colorbar(map_azi1, ax=ax[3], extend="max") cbar_azi1.set_label("Azimuth (Degrees)") ax[3].set_title("Azimuth of major semi-axis") else: if log_frequency: ax[3].set_yscale("log") ax[3].pcolorfast( self.t_pol, self.f_pol, self.azi2, alpha=alpha_channel, cmap="twilight", vmin=0, vmax=180, ) if fmin is None and fmax is None: ax[3].set_ylim(bottom=self.f_pol[1], top=self.f_pol[-1]) elif fmin is None: ax[3].set_ylim(bottom=self.f_pol[1], top=fmax) elif fmax is None: ax[3].set_ylim(bottom=fmin, top=self.f_pol[-1]) else: ax[3].set_ylim(bottom=fmin, top=fmax) map_azi2 = ScalarMappable(Normalize(vmin=0, vmax=180), cmap="twilight") cbar_azi2 = plt.colorbar(map_azi2, ax=ax[3], extend="max") cbar_azi2.set_label("Azimuth (Degrees)") ax[3].set_title("Azimuth of minor semi-axis") ax[3].set_ylabel("Frequency (Hz)") if log_frequency: ax[4].set_yscale("log") ax[4].pcolorfast( self.t_pol, self.f_pol, self.dop, alpha=alpha_channel, cmap="inferno", vmin=0, vmax=1, ) if fmin is None and fmax is None: ax[4].set_ylim(bottom=self.f_pol[1], top=self.f_pol[-1]) elif fmin is None: ax[4].set_ylim(bottom=self.f_pol[1], top=fmax) elif fmax is None: ax[4].set_ylim(bottom=fmin, top=self.f_pol[-1]) else: ax[4].set_ylim(bottom=fmin, top=fmax) map_dop = ScalarMappable(Normalize(vmin=0, vmax=1), cmap="inferno") cbar_dop = plt.colorbar(map_dop, ax=ax[4], extend="max") cbar_dop.set_label("DOP") ax[4].set_title("Degree of polarization") ax[4].set_ylabel("Frequency (Hz)") # Ensure that all three subplots have the same width plt.style.use("ggplot") pos = ax[4].get_position() pos0 = ax[0].get_position() ax[0].set_position([pos0.x0, pos0.y0, pos.width, pos.height]) if self.timeaxis == "utc": ax[1].xaxis_date() ax[2].xaxis_date() ax[3].xaxis_date() ax[4].xaxis_date() ax[4].set_xlabel("Time (UTC)") else: ax[4].set_xlabel("Time (s)") if show: plt.show()
def _plot_seismograms(self, ax: plt.Axes, seismograms: Stream = None): if seismograms is None: if self.timeaxis == "utc": time = self.N.times(type="matplotlib") else: time = self.N.times() ax.plot(time, self.N.data, "k:", label="N") ax.plot(time, self.E.data, "k--", label="E") ax.plot(time, self.Z.data, "k", label="Z") if self.timeaxis == "utc": ax.xaxis_date() ax.legend(loc="center left", bbox_to_anchor=(1, 0.5)) else: if self.timeaxis == "utc": time = seismograms[0].times(type="matplotlib") else: time = seismograms[0].times() ax.plot(time, seismograms[1].data, "k:", label="N") ax.plot(time, seismograms[2].data, "k--", label="E") ax.plot(time, seismograms[0].data, "k", label="Z") if self.timeaxis == "utc": ax.xaxis_date() ax.legend(loc="center left", bbox_to_anchor=(1, 0.5))
[docs] def save(self, name: str) -> None: """Save the current TimeFrequencyAnalaysis3C object to a file on the disk in the current working directory. Include absolute path if you want to save in a different directory. name : :obj:`str` File name """ if not isinstance(name, str): raise ValueError("Name must be a string!") fid = open(name, "wb") pickle.dump(self, fid, pickle.HIGHEST_PROTOCOL) fid.close()