Source code for straxen.plugins.peaks.peak_shadow

import itertools
import numpy as np
import numba
import strax
import straxen

from straxen.plugins.defaults import DEFAULT_POSREC_ALGO, FAR_XYPOS_S2_TYPE, WIDE_XYPOS_S2_TYPE
from .peak_proximity import half_cauchy_pdf
from .peak_ambience import distance_in_xy, _quick_assign

export, __all__ = strax.exporter()


[docs]@export class PeakShadow(strax.OverlapWindowPlugin): """This plugin can find and calculate the time & position shadow from previous peaks in time. It also gives the area and (x, y) of the previous peaks. References: * v0.1.5 reference: xenon:xenonnt:ac:prediction:shadow_ambience """ __version__ = "0.2.0" depends_on = ("peak_basics", "peak_positions") provides = "peak_shadow" save_when = strax.SaveWhen.EXPLICIT shadow_threshold = straxen.URLConfig( default={"s1_time_shadow": 1e3, "s2_time_shadow": 1e4, "s2_position_shadow": 1e4}, type=dict, track=True, help="Only take S1/S2s larger than this into account when calculating Shadow [PE]", ) shadow_sum = straxen.URLConfig( default={"s1_time_shadow": False, "s2_time_shadow": True, "s2_position_shadow": False}, type=dict, track=True, help="Whether the shadow should be summed up rather than taking the largest one", ) shadow_merge_replace = straxen.URLConfig( default={"s1_time_shadow": False, "s2_time_shadow": False, "s2_position_shadow": False}, type=dict, track=True, help="Whether replace the primary peak with the merged peak", ) shadow_merge_dt = straxen.URLConfig( default={"s1_time_shadow": -1, "s2_time_shadow": -1, "s2_position_shadow": -1}, type=dict, track=True, help="Merge primary peaks within this time window [ns]", ) shadow_merge_dr = straxen.URLConfig( default={"s1_time_shadow": -1, "s2_time_shadow": -1, "s2_position_shadow": -1}, type=dict, track=True, help="Merge primary peaks within this distance [cm]", ) shadow_merge_last_center = straxen.URLConfig( default={"s1_time_shadow": False, "s2_time_shadow": False, "s2_position_shadow": False}, type=dict, track=True, help="Merge primary peaks but keep the last center time", ) shadow_pdf_ordered = straxen.URLConfig( default={"s1_time_shadow": False, "s2_time_shadow": False, "s2_position_shadow": False}, type=dict, track=True, help="Whether position shadow should be ordered by the position correlation only", ) default_reconstruction_algorithm = straxen.URLConfig( default=DEFAULT_POSREC_ALGO, help="default reconstruction algorithm that provides (x,y)" ) shadow_time_window_backward = straxen.URLConfig( default=int(1e9), type=int, track=True, help="Search for peaks casting time & position shadow in this time window [ns]", ) shadow_uncertainty_weights = straxen.URLConfig( default=True, type=bool, track=True, help="Whether to use uncertainty weights when merging peaks", ) shadow_deltatime_exponent = straxen.URLConfig( default=-1.0, type=float, track=True, help="The exponent of delta t when calculating shadow" ) shadow_sigma_and_baseline = straxen.URLConfig( default=[15.220, 0.036], type=list, track=True, help=( "Fitted position correlation sigma[cm*PE^0.5] and baseline[cm] using in position shadow" ), )
[docs] def get_window_size(self): return (10 * self.shadow_time_window_backward, 0)
[docs] def infer_dtype(self): s1_time_shadow_dtype = [] s2_time_shadow_dtype = [] s2_position_shadow_dtype = [] nearest_dtype = [] # We have time shadow(S2/dt) and position shadow(S2/dt*p(s)) # previous S1 can only cast time shadow, previous S2 can cast both time & position shadow for key, dtype in zip( ["s1_time_shadow", "s2_time_shadow", "s2_position_shadow"], [s1_time_shadow_dtype, s2_time_shadow_dtype, s2_position_shadow_dtype], ): type_str, tp_desc, _ = key.split("_") dtype.append( ( ( f"Previous large {type_str} casted largest {tp_desc} shadow [PE/ns]", f"shadow_{key}", ), np.float32, ) ) dtype.append( ( ( ( f"Time difference to the previous large {type_str} peak casting largest" f" {tp_desc} shadow [ns]" ), f"dt_{key}", ), np.int64, ) ) # Only previous S2 peaks have (x,y) if "s2" in key: dtype.append( ( ( f"X of previous large s2 peak casting largest {tp_desc} shadow [cm]", f"x_{key}", ), np.float32, ) ) dtype.append( ( ( f"Y of previous large s2 peak casting largest {tp_desc} shadow [cm]", f"y_{key}", ), np.float32, ) ) # Only time shadow gives the nearest large peak if "time" in key: dtype.append( ( ( f"The nearest previous large {type_str} [ns]", f"nearest_{type_str}", ), np.float32, ) ) dtype.append( ( ( f"Time difference to the nearest previous large {type_str} [ns]", f"nearest_dt_{type_str}", ), np.int64, ) ) # Also record the PDF of HalfCauchy when calculating S2 position shadow s2_position_shadow_dtype.append( ( ("PDF describing correlation to the previous large s2", "pdf_s2_position_shadow"), np.float32, ) ) dtype = ( s1_time_shadow_dtype + s2_time_shadow_dtype + s2_position_shadow_dtype + nearest_dtype + strax.time_fields ) return dtype
[docs] def setup(self): super().setup() for key in ["s2_position_shadow", "s2_time_shadow", "s1_time_shadow"]: if ( self.shadow_sum[key] and not self.shadow_merge_replace[key] and self.shadow_merge_dt[key] > 0 ): raise ValueError( f"shadow_sum is set to True, but shadow_merge_replace is set to False for {key}" ) if ( self.shadow_uncertainty_weights and self.default_reconstruction_algorithm != DEFAULT_POSREC_ALGO ): raise ValueError( "Using pos-rec uncertainty as (x, y) merging weights, " f"but default_reconstruction_algorithm is not set to {DEFAULT_POSREC_ALGO}" )
[docs] @staticmethod def cluster_xy(peaks, indices, dr): """Find connectivity map of peaks in xy space.""" clusters = [] unvisited = set(indices) while unvisited: current = unvisited.pop() cluster = {current} to_check = {current} while to_check: checking = to_check.pop() dists = distance_in_xy(peaks[checking], peaks[list(unvisited)]) close = {i for i, d in zip(unvisited, dists) if d <= dr} to_check.update(close) cluster.update(close) unvisited -= close clusters.append(list(cluster)) clusters = [sorted(cluster) for cluster in clusters if len(cluster) > 1] return clusters
[docs] @staticmethod def merge_peaks(peaks, weights, dt, dr, replace=True, last=True): """Merge peaks in time and space.""" # Prepare the dtype for the merged peaks dtype = [] needed_fileds = "time endtime center_time area x y".split() for d, n in zip(peaks.dtype.descr, peaks.dtype.names): if n in needed_fileds: dtype.append(d) dtype = np.dtype(dtype) # Copy the fields to a new array _peaks = np.zeros(len(peaks), dtype=dtype) for d in needed_fileds: _peaks[d] = peaks[d] # Identify indices of peaks mergable in time if dt > 0: t_clusters = [[0]] for i in range(len(peaks) - 1): if np.abs(peaks["center_time"][i + 1] - peaks["center_time"][i]) > dt: t_clusters.append([i]) else: t_clusters[-1].append(i + 1) t_clusters = [cluster for cluster in t_clusters if len(cluster) > 1] else: t_clusters = [] # Identify indices of peaks mergable also in space if dr > 0: txy_clusters = [] for cluster in t_clusters: txy_clusters += PeakShadow.cluster_xy(peaks, cluster, dr) else: txy_clusters = t_clusters # Merge peaks merged_peaks = np.zeros(len(txy_clusters), dtype=dtype) for i, cluster in enumerate(txy_clusters): # Merge peaks merged_peaks[i]["time"] = peaks["time"][cluster].min() merged_peaks[i]["endtime"] = strax.endtime(peaks)[cluster].max() # center_time is by definition the average time of waveform if last: merged_peaks[i]["center_time"] = peaks["center_time"][cluster[-1]] else: merged_peaks[i]["center_time"] = np.average( peaks["center_time"][cluster], weights=peaks["area"][cluster] ) merged_peaks[i]["area"] = np.sum(peaks["area"][cluster]) for xy in ["x", "y"]: if np.any(np.isinf(weights[cluster])): merged_peaks[i][xy] = peaks[xy][cluster][np.isinf(weights[cluster])].mean() else: merged_peaks[i][xy] = np.average(peaks[xy][cluster], weights=weights[cluster]) # Replace peaks with merged peaks if replace is True if replace: for i, cluster in enumerate(txy_clusters): _peaks[cluster] = merged_peaks[i] _peaks = np.delete( _peaks, list(itertools.chain.from_iterable([cluster[1:] for cluster in txy_clusters])), ) else: _peaks = np.insert(_peaks, [cluster[0] for cluster in txy_clusters], merged_peaks) _peaks = strax.sort_by_time(_peaks) # Return merged peaks return _peaks
@property def shadowdtype(self): dtype = [] dtype += [("shadow", np.float32), ("dt", np.int64), ("pdf", np.float32)] dtype += [("x", np.float32), ("y", np.float32)] dtype += [("nearest", np.float32), ("nearest_dt", np.int64)] return dtype
[docs] def compute(self, peaks): argsort = strax.stable_argsort(peaks["center_time"]) _peaks = peaks[argsort].copy() result = np.zeros(len(peaks), self.dtype) _quick_assign(argsort, result, self.compute_shadow(peaks, _peaks)) return result
[docs] def compute_shadow(self, peaks, current_peak): # 1. Define time window for each peak, we will find previous peaks within these time windows roi = np.zeros(len(current_peak), dtype=strax.time_fields) roi["time"] = current_peak["center_time"] - self.shadow_time_window_backward roi["endtime"] = current_peak["center_time"] # 2. Calculate S2 position shadow, S2 time shadow, and S1 time shadow # weights of the peaklets when calculating the weighted mean deviation in (x, y) posrec_algo = self.default_reconstruction_algorithm if self.shadow_uncertainty_weights: weights = 1 / peaks[f"position_contour_area_{posrec_algo}"] else: weights = peaks["area"] * peaks["area_fraction_top"] result = np.zeros(len(current_peak), self.dtype) for key in ["s2_position_shadow", "s2_time_shadow", "s1_time_shadow"]: if "s2" in key: # in very rare cases, high energy S2 canbe classified as type 20 and 22 stype = [2, FAR_XYPOS_S2_TYPE, WIDE_XYPOS_S2_TYPE] else: # type 3 is also S1 xenon:xenonnt:lsanchez:som_sr1b_summary_note stype = [1, 3] mask_pre = np.isin(peaks["type"], stype) & (peaks["area"] > self.shadow_threshold[key]) _peaks = self.merge_peaks( peaks[mask_pre], weights[mask_pre], dt=self.shadow_merge_dt[key], dr=self.shadow_merge_dr[key], replace=self.shadow_merge_replace[key], last=self.shadow_merge_last_center[key], ) split_peaks = strax.touching_windows(_peaks, roi) array = np.zeros(len(current_peak), np.dtype(self.shadowdtype)) strax.set_nan_defaults(array) # Initialization array["dt"] = self.shadow_time_window_backward array["nearest_dt"] = self.shadow_time_window_backward # The default value for shadow is set to be the lowest possible value if "time" in key: array["shadow"] = ( self.shadow_threshold[key] * array["dt"] ** self.shadow_deltatime_exponent ) else: # Assign the position shadow to be zero array["shadow"] = 0 array["pdf"] = 0 # Calculating shadow, the Major of the plugin. # Only record the previous peak casting the largest shadow if is_sum is False is_position = "position" in key is_sum = self.shadow_sum[key] if is_position and is_sum: raise ValueError( "Position shadow cannot be summed up, please set its shadow_sum to False" ) if len(current_peak): self.peaks_shadow( current_peak, _peaks, split_peaks, self.shadow_deltatime_exponent, array, is_position=is_position, is_sum=is_sum, is_pdf=self.shadow_pdf_ordered[key], sigmas=self.getsigma(self.shadow_sigma_and_baseline, current_peak["area"]), ) # Fill results type_str = key.split("_")[0] names = ["shadow", "dt"] if "s2" in key: # Only previous S2 peaks have (x,y) names += ["x", "y"] if "time" in key: # Only time shadow gives the nearest large peak names += ["nearest", "nearest_dt"] for name in names: if "nearest" in name: result[f"{name}_{type_str}"] = array[name] else: result[f"{name}_{key}"] = array[name] if key == "s2_position_shadow": # HalfCauchy PDF when calculating S2 position shadow result["pdf_s2_position_shadow"] = array["pdf"] # 6. Set time and endtime for peaks result["time"] = current_peak["time"] result["endtime"] = strax.endtime(current_peak) return result
[docs] @staticmethod @np.errstate(invalid="ignore") def getsigma(sigma_and_baseline, s2): # The parameter of HalfCauchy, which is a function of S2 area return sigma_and_baseline[0] / np.sqrt(s2) + sigma_and_baseline[1]
[docs] @staticmethod @numba.njit def peaks_shadow( peaks, pre_peaks, touching_windows, exponent, result, is_position=False, is_sum=False, is_pdf=False, sigmas=None, ): """For each peak in peaks, check if there is a shadow-casting peak and check if it casts the largest shadow.""" for p_i, (suspicious_peak, sigma) in enumerate(zip(peaks, sigmas)): # casting_peak is the previous large peak casting shadow # suspicious_peak is the suspicious peak which in shadow from casting_peak indices = touching_windows[p_i] for idx in range(indices[0], indices[1]): casting_peak = pre_peaks[idx] dt = suspicious_peak["center_time"] - casting_peak["center_time"] if dt <= 0: continue # First we record the time difference to the nearest previous peak if dt < result["nearest_dt"][p_i]: result["nearest_dt"][p_i] = dt result["nearest"][p_i] = casting_peak["area"] # Calculate time shadow new_shadow = casting_peak["area"] * dt**exponent if is_position or is_pdf: # Calculate HalfCauchy PDF distance = distance_in_xy(suspicious_peak, casting_peak) # If distance is NaN, set largest distance distance = np.where(np.isnan(distance), 2 * straxen.tpc_r, distance) new_pdf = half_cauchy_pdf(distance, sigma).item() else: new_pdf = np.nan if is_position: new_shadow *= new_pdf # Only the previous peak with largest shadow is recorded found = is_pdf and new_pdf > result["pdf"][p_i] found |= ~is_pdf and new_shadow > result["shadow"][p_i] if found: result["shadow"][p_i] = new_shadow result["pdf"][p_i] = new_pdf result["x"][p_i] = casting_peak["x"] result["y"][p_i] = casting_peak["y"] result["dt"][p_i] = suspicious_peak["center_time"] - casting_peak["center_time"] if is_sum: sl = slice(indices[0], indices[1]) dt = suspicious_peak["center_time"] - pre_peaks["center_time"][sl] mask = dt > 0 if mask.sum() == 0: continue ft = dt[mask] ** exponent result["shadow"][p_i] = np.sum(pre_peaks["area"][sl][mask] * ft) result["pdf"][p_i] = np.nan result["x"][p_i] = np.nan result["y"][p_i] = np.nan result["dt"][p_i] = mask.sum() * np.sum(ft) ** (1 / exponent)