Source code for straxen.plugins.peaks.peak_shadow

import numpy as np
import numba
from scipy.stats import halfcauchy
from .peak_ambience import distance_in_xy, _quick_assign
import strax
import straxen

export, __all__ = strax.exporter()


[docs]@export class PeakShadow(strax.OverlapWindowPlugin): """This plugin can find and calculate the time & position shadow from previous peaks in time. It also gives the area and (x,y) of the previous peaks. References: * v0.1.5 reference: xenon:xenonnt:ac:prediction:shadow_ambience """ __version__ = "0.1.6" depends_on = ("peak_basics", "peak_positions") provides = "peak_shadow" save_when = strax.SaveWhen.EXPLICIT shadow_time_window_backward = straxen.URLConfig( default=int(1e9), type=int, track=True, help="Search for peaks casting time & position shadow in this time window [ns]", ) shadow_threshold = straxen.URLConfig( default={"s1_time_shadow": 1e3, "s2_time_shadow": 1e4, "s2_position_shadow": 1e4}, type=dict, track=True, help="Only take S1/S2s larger than this into account when calculating Shadow [PE]", ) shadow_deltatime_exponent = straxen.URLConfig( default=-1.0, type=float, track=True, help="The exponent of delta t when calculating shadow" ) shadow_sigma_and_baseline = straxen.URLConfig( default=[15.220, 0.036], type=list, track=True, help=( "Fitted position correlation sigma[cm*PE^0.5] and baseline[cm] using in position shadow" ), )
[docs] def get_window_size(self): return 10 * self.shadow_time_window_backward
[docs] def infer_dtype(self): s1_time_shadow_dtype = [] s2_time_shadow_dtype = [] s2_position_shadow_dtype = [] nearest_dtype = [] # We have time shadow(S2/dt) and position shadow(S2/dt*p(s)) # previous S1 can only cast time shadow, previous S2 can cast both time & position shadow for key, dtype in zip( ["s1_time_shadow", "s2_time_shadow", "s2_position_shadow"], [s1_time_shadow_dtype, s2_time_shadow_dtype, s2_position_shadow_dtype], ): type_str, tp_desc, _ = key.split("_") dtype.append( ( ( f"previous large {type_str} casted largest {tp_desc} shadow [PE/ns]", f"shadow_{key}", ), np.float32, ) ) dtype.append( ( ( ( f"time difference to the previous large {type_str} peak casting largest" f" {tp_desc} shadow [ns]" ), f"dt_{key}", ), np.int64, ) ) # Only previous S2 peaks have (x,y) if "s2" in key: dtype.append( ( ( f"x of previous large s2 peak casting largest {tp_desc} shadow [cm]", f"x_{key}", ), np.float32, ) ) dtype.append( ( ( f"y of previous large s2 peak casting largest {tp_desc} shadow [cm]", f"y_{key}", ), np.float32, ) ) # Only time shadow gives the nearest large peak if "time" in key: dtype.append( ( ( f"time difference to the nearest previous large {type_str} [ns]", f"nearest_dt_{type_str}", ), np.int64, ) ) # Also record the PDF of HalfCauchy when calculating S2 position shadow s2_position_shadow_dtype.append( ( ("PDF describing correlation to the previous large s2", "pdf_s2_position_shadow"), np.float32, ) ) dtype = ( s1_time_shadow_dtype + s2_time_shadow_dtype + s2_position_shadow_dtype + nearest_dtype + strax.time_fields ) return dtype
@property def shadowdtype(self): dtype = [] dtype += [("shadow", np.float32), ("dt", np.int64)] dtype += [("x", np.float32), ("y", np.float32)] dtype += [("nearest_dt", np.int64)] return dtype
[docs] def compute(self, peaks): argsort = np.argsort(peaks["center_time"], kind="mergesort") _peaks = np.sort(peaks, order="center_time") result = np.zeros(len(peaks), self.dtype) _quick_assign(argsort, result, self.compute_shadow(peaks, _peaks)) return result
[docs] def compute_shadow(self, peaks, current_peak): # 1. Define time window for each peak, we will find previous peaks within these time windows roi_shadow = np.zeros(len(current_peak), dtype=strax.time_fields) roi_shadow["time"] = current_peak["center_time"] - self.shadow_time_window_backward roi_shadow["endtime"] = current_peak["center_time"] # 2. Calculate S2 position shadow, S2 time shadow, and S1 time shadow result = np.zeros(len(current_peak), self.dtype) for key in ["s2_position_shadow", "s2_time_shadow", "s1_time_shadow"]: is_position = "position" in key type_str = key.split("_")[0] stype = 2 if "s2" in key else 1 mask_pre = (peaks["type"] == stype) & (peaks["area"] > self.shadow_threshold[key]) split_peaks = strax.touching_windows(peaks[mask_pre], roi_shadow) array = np.zeros(len(current_peak), np.dtype(self.shadowdtype)) # Initialization array["x"] = np.nan array["y"] = np.nan array["dt"] = self.shadow_time_window_backward # The default value for shadow is set to be the lowest possible value if "time" in key: array["shadow"] = ( self.shadow_threshold[key] * array["dt"] ** self.shadow_deltatime_exponent ) else: array["shadow"] = 0 array["nearest_dt"] = self.shadow_time_window_backward # Calculating shadow, the Major of the plugin. # Only record the previous peak casting the largest shadow if len(current_peak): self.peaks_shadow( current_peak, peaks[mask_pre], split_peaks, self.shadow_deltatime_exponent, array, is_position, self.getsigma(self.shadow_sigma_and_baseline, current_peak["area"]), ) # Fill results names = ["shadow", "dt"] if "s2" in key: # Only previous S2 peaks have (x,y) names += ["x", "y"] if "time" in key: # Only time shadow gives the nearest large peak names += ["nearest_dt"] for name in names: if name == "nearest_dt": result[f"{name}_{type_str}"] = array[name] else: result[f"{name}_{key}"] = array[name] distance = np.sqrt( (result[f"x_s2_position_shadow"] - current_peak["x"]) ** 2 + (result[f"y_s2_position_shadow"] - current_peak["y"]) ** 2 ) # If distance is NaN, set largest distance distance = np.where(np.isnan(distance), 2 * straxen.tpc_r, distance) # HalfCauchy PDF when calculating S2 position shadow result["pdf_s2_position_shadow"] = halfcauchy.pdf( distance, scale=self.getsigma(self.shadow_sigma_and_baseline, current_peak["area"]) ) # 6. Set time and endtime for peaks result["time"] = current_peak["time"] result["endtime"] = strax.endtime(current_peak) return result
[docs] @staticmethod @np.errstate(invalid="ignore") def getsigma(sigma_and_baseline, s2): # The parameter of HalfCauchy, which is a function of S2 area return sigma_and_baseline[0] / np.sqrt(s2) + sigma_and_baseline[1]
[docs] @staticmethod @numba.njit def peaks_shadow(peaks, pre_peaks, touching_windows, exponent, result, pos_corr, sigmas=None): """For each peak in peaks, check if there is a shadow-casting peak and check if it casts the largest shadow.""" for p_i, (suspicious_peak, sigma) in enumerate(zip(peaks, sigmas)): # casting_peak is the previous large peak casting shadow # suspicious_peak is the suspicious peak which in shadow from casting_peak indices = touching_windows[p_i] for idx in range(indices[0], indices[1]): casting_peak = pre_peaks[idx] dt = suspicious_peak["center_time"] - casting_peak["center_time"] if dt <= 0: continue # First we record the time difference to the nearest previous peak result["nearest_dt"][p_i] = min(result["nearest_dt"][p_i], dt) # Calculate time shadow new_shadow = casting_peak["area"] * dt**exponent if pos_corr: # Calculate position shadow which is # time shadow with a HalfCauchy PDF multiplier distance = distance_in_xy(suspicious_peak, casting_peak) distance = np.where(np.isnan(distance), 2 * straxen.tpc_r, distance) new_shadow *= 2 / (np.pi * sigma * (1 + (distance / sigma) ** 2)) # Only the previous peak with largest shadow is recorded if new_shadow > result["shadow"][p_i]: result["shadow"][p_i] = new_shadow result["x"][p_i] = casting_peak["x"] result["y"][p_i] = casting_peak["y"] result["dt"][p_i] = suspicious_peak["center_time"] - casting_peak["center_time"]