Source code for straxen.plugins.peaks.peak_proximity

import numpy as np
import numba
import strax
import straxen


export, __all__ = strax.exporter()


[docs]@export class PeakProximity(strax.OverlapWindowPlugin): """Look for peaks around a peak to determine how many peaks are in proximity (in time) of a peak.""" __version__ = "0.4.0" depends_on = "peak_basics" dtype = [ ("n_competing", np.int32, "Number of nearby larger or slightly smaller peaks"), ( "n_competing_left", np.int32, "Number of larger or slightly smaller peaks left of the main peak", ), ( "t_to_prev_peak", np.int64, "Time between end of previous peak and start of this peak [ns]", ), ("t_to_next_peak", np.int64, "Time between end of this peak and start of next peak [ns]"), ("t_to_nearest_peak", np.int64, "Smaller of t_to_prev_peak and t_to_next_peak [ns]"), ] + strax.time_fields min_area_fraction = straxen.URLConfig( default=0.5, infer_type=False, help=( "The area of competing peaks must be at least " "this fraction of that of the considered peak" ), ) nearby_window = straxen.URLConfig( default=int(1e7), infer_type=False, help="Peaks starting within this time window (on either side) in ns count as nearby.", ) peak_max_proximity_time = straxen.URLConfig( default=int(1e8), infer_type=False, help="Maximum value for proximity values such as t_to_next_peak [ns]", )
[docs] def get_window_size(self): return self.peak_max_proximity_time
[docs] def compute(self, peaks): windows = strax.touching_windows(peaks, peaks, window=self.nearby_window) n_left, n_tot = self.find_n_competing(peaks, windows, fraction=self.min_area_fraction) t_to_prev_peak = np.ones(len(peaks), dtype=np.int64) * self.peak_max_proximity_time t_to_prev_peak[1:] = peaks["time"][1:] - peaks["endtime"][:-1] t_to_next_peak = t_to_prev_peak.copy() t_to_next_peak[:-1] = peaks["time"][1:] - peaks["endtime"][:-1] return dict( time=peaks["time"], endtime=strax.endtime(peaks), n_competing=n_tot, n_competing_left=n_left, t_to_prev_peak=t_to_prev_peak, t_to_next_peak=t_to_next_peak, t_to_nearest_peak=np.minimum(t_to_prev_peak, t_to_next_peak), )
[docs] @staticmethod @numba.jit(nopython=True, nogil=True, cache=True) def find_n_competing(peaks, windows, fraction): n_left = np.zeros(len(peaks), dtype=np.int32) n_tot = n_left.copy() areas = peaks["area"] for i, peak in enumerate(peaks): left_i, right_i = windows[i] threshold = areas[i] * fraction n_left[i] = np.sum(areas[left_i:i] > threshold) n_tot[i] = n_left[i] + np.sum(areas[i + 1 : right_i] > threshold) return n_left, n_tot