Source code for straxen.plugins.peaks.peak_ambience

import numpy as np
import numba
import strax
import straxen


export, __all__ = strax.exporter()


[docs]@export class PeakAmbience(strax.OverlapWindowPlugin): """Calculate Ambience of peaks. Features are the number of lonehits, small S0, S1, S2 in a time window before peaks, and the number of small S2 in circle near the S2 peak in a time window. References: * v0.0.7 reference: xenon:xenonnt:ac:prediction:shadow_ambience """ __version__ = "0.0.8" depends_on = ("lone_hits", "peak_basics", "peak_positions") provides = "peak_ambience" data_kind = "peaks" save_when = strax.SaveWhen.EXPLICIT ambience_time_window_backward = straxen.URLConfig( default=int(2e6), type=int, track=True, help="Search for ambience in this time window [ns]" ) ambience_divide_t = straxen.URLConfig( default=False, type=bool, track=True, help="Whether to divide area by time difference of ambience creating peak to current peak", ) ambience_divide_r = straxen.URLConfig( default=False, type=bool, track=True, help="Whether to divide area by radial distance of ambience creating peak to current peak", ) ambient_radius = straxen.URLConfig( default=6.7, type=float, track=True, help="Search for ambience in this radius [cm]" ) ambience_area_parameters = straxen.URLConfig( default=(5, 60, 60), type=(list, tuple), track=True, help="The upper limit of S0, S1, S2 area to be counted", )
[docs] def get_window_size(self): return 10 * self.ambience_time_window_backward
@property def origin_dtype(self): return ["lh_before", "s0_before", "s1_before", "s2_before", "s2_near"]
[docs] def infer_dtype(self): dtype = [] for ambience in self.origin_dtype: dtype += [ ( (f"Number of small {' '.join(ambience.split('_'))} a peak", f"n_{ambience}"), np.int16, ), ( (f"Area sum of small {' '.join(ambience.split('_'))} a peak", f"s_{ambience}"), np.float32, ), ] dtype += strax.time_fields return dtype
[docs] def compute(self, lone_hits, peaks): argsort = np.argsort(peaks["center_time"], kind="mergesort") _peaks = np.sort(peaks, order="center_time") result = np.zeros(len(peaks), self.dtype) _quick_assign(argsort, result, self.compute_ambience(lone_hits, peaks, _peaks)) return result
[docs] def compute_ambience(self, lone_hits, peaks, current_peak): # 1. Initialization result = np.zeros(len(current_peak), self.dtype) # 2. Define time window for each peak, # we will find small peaks & lone hits within these time windows roi = np.zeros(len(current_peak), dtype=strax.time_fields) roi["time"] = current_peak["center_time"] - self.ambience_time_window_backward roi["endtime"] = current_peak["center_time"] # 3. Calculate number and area sum of lonehits before a peak touching_windows = strax.touching_windows(lone_hits, roi) # Calculating ambience self.lonehits_ambience( current_peak, lone_hits, touching_windows, result["n_lh_before"], result["s_lh_before"], self.ambience_divide_t, ) # 4. Calculate number and area sum of small S0, S1, S2 before a peak radius = -1 for stype, area in zip([0, 1, 2], self.ambience_area_parameters): mask_pre = (peaks["type"] == stype) & (peaks["area"] < area) touching_windows = strax.touching_windows(peaks[mask_pre], roi) # Calculating ambience self.peaks_ambience( current_peak, peaks[mask_pre], touching_windows, radius, result[f"n_s{stype}_before"], result[f"s_s{stype}_before"], self.ambience_divide_t, self.ambience_divide_r, ) # 5. Calculate number and area sum of small S2 near(in (x,y) space) a S2 peak mask_pre = (peaks["type"] == 2) & (peaks["area"] < self.ambience_area_parameters[2]) touching_windows = strax.touching_windows(peaks[mask_pre], roi) # Calculating ambience self.peaks_ambience( current_peak, peaks[mask_pre], touching_windows, self.ambient_radius, result["n_s2_near"], result["s_s2_near"], self.ambience_divide_t, self.ambience_divide_r, ) # 6. Set time and endtime for peaks result["time"] = current_peak["time"] result["endtime"] = strax.endtime(current_peak) return result
[docs] @staticmethod @numba.njit def lonehits_ambience( peaks, pre_hits, touching_windows, num_array, sum_array, ambience_divide_t ): # Function to find lonehits before a peak # creating_hit is the lonehit creating ambience # suspicious_peak is the suspicious peak in the ambience created by creating_hit for p_i, suspicious_peak in enumerate(peaks): indices = touching_windows[p_i] for idx in range(indices[0], indices[1]): creating_hit = pre_hits[idx] dt = suspicious_peak["center_time"] - creating_hit["time"] if (dt <= 0) or (creating_hit["area"] <= 0): continue num_array[p_i] += 1 # Sometimes we may interested in sum of area / dt if ambience_divide_t: sum_array[p_i] += creating_hit["area"] / dt else: sum_array[p_i] += creating_hit["area"]
[docs] @staticmethod @numba.njit def peaks_ambience( peaks, pre_peaks, touching_windows, ambient_radius, num_array, sum_array, ambience_divide_t, ambience_divide_r, ): # Function to find S0, S1, S2 before or near a peak # creating_peak is the peak creating ambience # suspicious_peak is the suspicious peak in the ambience created by creating_peak for p_i, suspicious_peak in enumerate(peaks): indices = touching_windows[p_i] for idx in range(indices[0], indices[1]): creating_peak = pre_peaks[idx] r = distance_in_xy(suspicious_peak, creating_peak) dt = suspicious_peak["center_time"] - creating_peak["center_time"] if dt <= 0: continue if (ambient_radius < 0) or (r <= ambient_radius): num_array[p_i] += 1 # Sometimes we may interested in sum of area / dt if ambience_divide_t: sum_array[p_i] += creating_peak["area"] / dt else: sum_array[p_i] += creating_peak["area"] # Sometimes we may interested in sum of area / r^2 if ambience_divide_r and ambient_radius > 0: sum_array[p_i] /= r**2
@numba.njit def distance_in_xy(peak_a, peak_b): """Distance between S2s in (x,y)""" return np.sqrt((peak_a["x"] - peak_b["x"]) ** 2 + (peak_a["y"] - peak_b["y"]) ** 2) @numba.njit def _quick_assign(indices, results, inputs): for i, r in zip(indices, inputs): results[i] = r