Source code for straxen.plugins.events.local_minimum_info

import numpy as np
import numba
import strax
import straxen
from strax.processing.peak_splitting import natural_breaks_gof

export, __all__ = strax.exporter()


[docs]@export class LocalMinimumInfo(strax.LoopPlugin): """Looks for the main S2 peak in an event, finds the local minimum (if there is one), and looks to compute some figures of merit such as the max goodness of split, width of the valley, and height of the valley.""" depends_on = ("event_basics", "peaks") data_kind = "events" provides = "event_local_min_info" parallel = "process" compressor = "zstd" __version__ = "0.1.1" dtype = strax.time_fields + [ (("Maximum Goodness of Split", "s2_max_gos"), np.float32), (("Number of local maxima of the smoothed peak", "s2_num_loc_max"), np.int16), ( ( "Full gap at p% of the valley height of the deepest valley [ns],by default p = 90", "s2_valley_gap", ), np.float32, ), ( ("Valley depth over max height of the deepest valley", "s2_valley_height_ratio"), np.float32, ), ] divide_90p_width_localmin = straxen.URLConfig( default=7.0, type=float, help=( "The peak is smoothed by dividing the 90p width by" "this number, and coverting it into number of samples." "This is then the 'n' used in the smoothing kernel" "shown below." ), ) smoothing_power_localmin = straxen.URLConfig( default=3.0, type=float, help=( "The power used in the smoothing filter with a kernel of" "(1-(x/n)^p)^p, where p is the power" ), ) percentage_threshold_localmin = straxen.URLConfig( default=0.1, type=float, help=( "The height threshold for the peak as a percentage" "of the maximum, used to reject the low parts" "of the peak in order to find the local extrema." ), ) percent_valley_height = straxen.URLConfig( default=0.9, type=float, help=( "The percentage of the valley height of the deepest" "valley for which to calculate the valley width" ), )
[docs] def compute_loop(self, event, peaks): """This finds the maxima and minima for the main S2 peak and calculates its info such as the number of local maxima, the depth of the deepest local minimum over the maximum height of the peak (s2_valley_height_ratio), and the width of the local minimum valley at 90% of the valley height. :param event: The event :param peaks: The peaks belonging to the event, only the main S2 peak is considered, if there is none, this plugin returns none :return: Returns a dictionary containing all of the fields above for each main S2 peak as well as the timing information. """ max_gos = np.nan num_loc_maxes = 0 valley_gap = np.nan valley_height_ratio = np.nan if event["s2_area"] > 0: p = peaks[event["s2_index"]] smoothing_number = p["width"][9] / p["dt"] smoothing_number = np.ceil(smoothing_number / self.divide_90p_width_localmin) smoothed_peak = power_smooth( p["data"][: p["length"]], int(smoothing_number), self.smoothing_power_localmin ) if len(smoothed_peak) > 0: # Set data below percentage threshold on both side to zeros left, right = bounds_above_percentage_height( smoothed_peak, self.percentage_threshold_localmin ) # Maximum GOS calculation for data above percentage max_gos = np.max(natural_breaks_gof(p["data"][left:right], p["dt"])) # Local minimum based information maxes, mins = identify_local_extrema(smoothed_peak) maxes = maxes[np.logical_and(maxes >= left, maxes < right)] mins = mins[np.logical_and(mins >= left, mins < right)] num_loc_maxes = len(maxes) valley_gap, valley = full_gap_percent_valley( smoothed_peak, maxes, mins, self.percent_valley_height, p["dt"] ) valley_height_ratio = valley / np.max(smoothed_peak) return { "time": event["time"], "endtime": event["endtime"], "s2_max_gos": max_gos, "s2_num_loc_max": num_loc_maxes, "s2_valley_gap": valley_gap, "s2_valley_height_ratio": valley_height_ratio, }
@numba.njit() def full_gap_percent_valley(smoothp, max_loc, min_loc, pv, dt): """Full gap at percent valley. The width of the valley at "pv" of the valley height Only do this for peaks which have number of maxes-number of mins = 1, since otherwise this local minimum finding doesn't make sense. Furthermore, it gets rid of those peaks which start at some high value likely because they are the tails of another peak or something. :param smoothp: The smoothed peak :param max_loc: Location of every local maximum of the peak :param min_loc: Location of every local minimum of the peak :param pv: "Percent value", the percent of the valley height for which to calculate the gap :param dt: The time of one sample in ns :return: The gap in ns, the depth in PE """ n_gap = len(min_loc) p_length = len(smoothp) if ~np.logical_and((len(max_loc) - n_gap == 1), (len(min_loc) > 0)): return 0, 0 else: gaps = np.zeros(n_gap) gap_heights = np.zeros(len(min_loc)) for j in range(n_gap): gh = np.min(smoothp[max_loc[j : j + 2]]) gh -= smoothp[min_loc[j]] height_pv = smoothp[min_loc[j]] + gh * pv above_hpv = smoothp > height_pv above_hpv |= np.arange(p_length) < max_loc[j] left_crossing = np.argmin(above_hpv) above_hpv = smoothp > height_pv above_hpv &= np.arange(p_length) >= min_loc[j] right_crossing = np.argmax(above_hpv) gaps[j] = right_crossing - left_crossing gap_heights[j] = gh max_gap = gaps[np.argmax(gap_heights)] valley_depth = gap_heights[np.argmax(gap_heights)] return max_gap * dt, valley_depth def bounds_above_percentage_height(p, percent): """Finding the index bounds of the peak above given percentage. :param p: The peak :param percent: The percentage of the maximum height to cut the peak, this is to reject the tails and noise before and after the bulk of the peak :return: The left and right (exclusive) index of samples above the percent. """ percent_height = np.max(p) * percent above_percent_height = np.where(p >= percent_height)[0] return above_percent_height[0], above_percent_height[-1] + 1 def identify_local_extrema(smoothp): """Identifies local minima and maxima by comparing each point to the one before and after it. :param smoothp: smoothed peak :return: The locations of the minima, the locations of the maxima. """ larger_than_next = smoothp > np.pad(smoothp[:-1], (1, 0)) larger_than_next = larger_than_next.astype("int") max_loc = np.where(np.diff(larger_than_next) < 0)[0] min_loc = np.where(np.diff(larger_than_next) > 0)[0] return max_loc, min_loc def power_smooth(origindata, cover_num, power): """A smoothing filter to get rid of the noise in peaks so that we don't find too many local extrema that are just noisy. :param origindata: Original peak :param cover_num: The cover number for smoothing high cover numbers mean you smooth over a larger region :param power: The power of the smoothing essentially tunes how much your smoothing filter looks like a square :return: The smoothed waveform. """ x_zeroed = np.arange(-cover_num, cover_num + 1) weight = (1 - np.abs(x_zeroed / cover_num) ** power) ** power weight = weight / np.sum(weight) smoothed_data = np.convolve(origindata, weight)[cover_num:-cover_num] return smoothed_data