import warnings
import numpy as np
from obspy import UTCDateTime
from obspy.core import Stream
from scipy.signal import find_peaks
from . import RTMWarning, _proj_from_grid
[docs]
def get_peak_coordinates(S, global_max=True, height=None, min_time=None,
prominence=None, unproject=False):
"""
Find the values of the coordinates corresponding to the maxima (peaks) in
a stack function :math:`S`. Function will return all peaks above the
`height` and separated by greater than `min_time` in the stack function.
Returns just global max if there are less than three time segments.
Optionally "unprojects" UTM coordinates to (latitude, longitude) for
projected grids.
Args:
S (:class:`~xarray.DataArray`): The stack function :math:`S`
global_max (bool): Only return values for the max of the stack function
(default: `True`)
height (int or float): Minimum threshold for the value of a detection
(peak) in :math:`S` (default: `None`). Only used if
`global_max=False`.
min_time (int or float): Minimum time (distance) between peaks in
:math:`S` [s] (default: `None`). Only used if `global_max=False`.
prominence (int or float): Minimum peak prominence. Represents the
vertical distance between the peak and its lowest contour line.
Only used if `global_max=False`.
unproject (bool): If `True` and if `S` is a projected grid, unprojects
the UTM coordinates to (latitude, longitude) (default: `False`)
Returns:
tuple: Tuple containing:
- **time_max** (:class:`~obspy.core.utcdatetime.UTCDateTime`) – Time(s)
corresponding to peak(s) in :math:`S`
- **y_max** – [deg lat. or m N] :math:`y`-coordinate(s) corresponding
to peak(s) in :math:`S`
- **x_max** – [deg lon. or m E] :math:`x`-coordinate(s) corresponding
to peak(s) in :math:`S`
- **peaks** (:class:`numpy.ndarray`) – Peak indices
- **props** – Dictionary containing peak properties
"""
# Create peak stack function over time
s_peak = S.max(axis=(1, 2)).data
# If there are less than three values, use global_max as find_peaks fails
if len(s_peak) < 3:
print('Stack function contains < 3 time samples, using global_max!')
global_max = True
s_peak = np.hstack((0, 0, s_peak, 0))
# Return just the global max or desired peaks. Check for multiple maxima
# along each dimension and across the stack function
if global_max:
print('Returning just global max!')
# Get global max
stack_maximum = S.where(S == S.max(), drop=True)
# Warn if we have multiple maxima along any dimension
for dim in stack_maximum.coords:
num_dim_maxima = stack_maximum[dim].size
if num_dim_maxima != 1:
warnings.warn(f'Multiple maxima ({num_dim_maxima}) present in S '
f'along the {dim} dimension.', RTMWarning)
max_indices = np.argwhere(~np.isnan(stack_maximum.data))
num_global_maxima = max_indices.shape[0]
if num_global_maxima != 1:
warnings.warn(f'Multiple global maxima ({num_global_maxima}) present '
'in S. Using first occurrence.', RTMWarning)
# Find time index and values of first occurence
first_max = np.where(stack_maximum[tuple(max_indices[0])]['time'] == S['time'])[0]
peaks = np.array(first_max)
props = {'peak_heights': stack_maximum[tuple(max_indices[0])].data}
npeaks = len(peaks)
time_max = [UTCDateTime(stack_maximum[tuple(max_indices[0])]['time'].values.astype(str))]
x_max = [stack_maximum[tuple(max_indices[0])]['x'].values.tolist()]
y_max = [stack_maximum[tuple(max_indices[0])]['y'].values.tolist()]
else:
if (height is None) or (min_time is None):
raise ValueError('height and min_time must be supplied for peak '
'detection when global_max=False!')
# [s] Time sampling interval of S
peak_dt = (S.time.data[1] - S.time.data[0]) / np.timedelta64(1, 's')
# Find all peaks based on set thresholds
peaks, props = find_peaks(s_peak, height, distance=min_time/peak_dt,
prominence=prominence)
npeaks = len(peaks)
if prominence:
print(f'Found {npeaks} peaks in stack for height > {height:.1f}, '
f'min_time > {min_time:.1f} s, and prominence > {prominence}.')
else:
print(f'Found {npeaks} peaks in stack for height > {height:.1f} and '
f'min_time > {min_time:.1f} s.')
time_max = [UTCDateTime(S['time'][i].values.astype(str)) for i in peaks]
x_max = [S[i].where(S[i] == S[i].max(),
drop=True)['x'].values[0].tolist() for i in peaks]
y_max = [S[i].where(S[i] == S[i].max(),
drop=True)['y'].values[0].tolist() for i in peaks]
if unproject:
# If the grid is projected
if S.UTM:
print('Unprojecting coordinates from UTM to (latitude, longitude).')
# Convert UTM coordinates to lat/lon
proj = _proj_from_grid(S)
for i in range(0, npeaks):
y_max[i], x_max[i] = proj.transform(x_max[i], y_max[i], direction='INVERSE')
# If the grid is already in lat/lon
else:
print('unproject=True is set but coordinates are already in '
'(latitude, longitude). Doing nothing.')
# Return just a single float for global_max
if global_max:
time_max = time_max[0]
x_max = x_max[0]
y_max = y_max[0]
return time_max, y_max, x_max, peaks, props
[docs]
def calculate_semblance(data_in):
"""
Calculates the semblance, a measure of multi-channel coherence, following
the definition of Neidell & Taner (1971). Assumes data are already
time-shifted to construct the beam.
Args:
data_in: Time-shifted :class:`~obspy.core.stream.Stream` or
time-shifted :class:`numpy.ndarray`
Returns:
:class:`numpy.ndarray`: Multi-channel coherence, defined on
:math:`[0, 1]`
"""
if isinstance(data_in, Stream):
# check that all traces have the same length
if len(set([len(tr) for tr in data_in])) != 1:
raise ValueError('Traces in stream must have same length!')
n = len(data_in)
beam = np.sum([tr.data for tr in data_in], axis=0) / n
beampower = n * np.sum(beam**2)
avg_power = np.sum(np.sum([tr.data**2 for tr in data_in], axis=0))
elif isinstance(data_in, np.ndarray):
n = data_in.shape[0]
beam = np.sum(data_in, axis=0) / n
beampower = n * np.sum(beam**2)
avg_power = np.sum(np.sum([data_in**2], axis=0))
semblance = beampower / avg_power
return semblance