Source code for straxen.plugins.merged_s2s.merged_s2s

from typing import Tuple, Dict, Union
import numpy as np
from tqdm import tqdm
from scipy.stats import norm, poisson
import numba
import strax
import straxen
from straxen.plugins.defaults import DEFAULT_POSREC_ALGO, FAR_XYPOS_S2_TYPE, WIDE_XYPOS_S2_TYPE
from straxen.plugins.peaklets.peaklets import drop_data_field


export, __all__ = strax.exporter()


[docs]@export class MergedS2s(strax.OverlapWindowPlugin): """Merge together peaklets if peak finding favours that they would form a single peak instead. Technically, the S2 merging algorithm merges S2 peaklets into S2 peaks. By introducing more information about the waveform and (x, y) distribution of potential groups of peaklets, the algorithm removes PI and DE population from S2 peaks. Note: Types FAR_XYPOS_S2_TYPE (20) and WIDE_XYPOS_S2_TYPE (22) are still S2s, but they do not participate in the event building. The algorithm merges S2 peaklets when they are close in (t, x, y). But if a group of peaklets is dense in time but sparse in (x, y), the following steps are conducted: 1. Merge these peaklets that are dense in (x, y). 2. Assign the peaklets that are dense in time but not dense in (x, y) by type 20, they are usually PI or DE. 3. If the sum of nearby type 20 is large compared to the merged peak, assign the merged peak as type 22, because it is usually PI. Reference: xenon:xenonnt:analysis:s2_merging_time_position xenon:xenonnt:analysis:sr2_peak_types """ __version__ = "2.0.1" depends_on: Tuple[str, ...] = ( "peaklets", f"peaklet_positions_{DEFAULT_POSREC_ALGO}", "peaklet_classification", "lone_hits", ) provides: Union[Tuple[str, ...], str] = ("merged_s2s", "enhanced_peaklet_classification") data_kind: Union[Dict[str, str], str] = dict( merged_s2s="merged_s2s", enhanced_peaklet_classification="peaklets" ) n_tpc_pmts = straxen.URLConfig(type=int, help="Number of TPC PMTs") n_top_pmts = straxen.URLConfig(type=int, help="Number of top TPC array PMTs") default_reconstruction_algorithm = straxen.URLConfig( default=DEFAULT_POSREC_ALGO, help="default reconstruction algorithm that provides (x,y)" ) s2_merge_max_duration = straxen.URLConfig( default=50_000, type=int, infer_type=False, help="Do not merge peaklets at all if the result would be a peak longer than this [ns]", ) s2_merge_gap_thresholds = straxen.URLConfig( default=( (1.84, 2.84e04), (2.18, 2.40e04), (2.51, 1.96e04), (2.84, 1.80e04), (3.18, 1.68e04), (3.51, 1.86e04), (3.84, 1.98e04), (4.18, 1.66e04), (4.51, 1.21e04), ), infer_type=False, help=( "Points to define maximum separation between peaklets to allow " "merging [ns] depending on log10 area of the merged peak\n" "where the gap size of the first point is the maximum gap to allow merging." "The format is ((log10(area), max_gap), (..., ...), (..., ...))" ), ) s2_merge_p_thresholds = straxen.URLConfig( default=( (4.18, 9.89e-04), (4.51, 7.15e-03), (4.84, 1.60e-01), ), infer_type=False, help=( "Points to define minimum p-value of merging proposal " "depending on log10 area of the merged peak\n" "The format is ((log10(area), p-value), (..., ...), (..., ...))" ), ) s2_merge_dr_thresholds = straxen.URLConfig( default=( (1.51, 1.40e01), (1.84, 1.83e01), (2.18, 1.97e01), (2.51, 1.24e01), (2.84, 5.75e00), ), type=tuple, help=( "Points to define maximum weighted mean deviation of " "the peaklets from the main cluster [cm]\n" "The format is ((log10(area_top), dr), (..., ...), (..., ...))" ), ) s2_merge_unmerged_thresholds = straxen.URLConfig( default=(1, 49.5, 0.01), type=tuple, help=( "Max (number, fraction of unmerged, total area of unmerged) " "of type 20 peaklets inside a peak. " "The number of type 20 peaklets should not be larger than the number threshold. " "The area of type 20 peaklets should not be larger than the both area thresholds. " "The fraction threshold is important when S2 is large, " "while the total area threshold is important when S2 is small." ), ) merge_lone_hits = straxen.URLConfig( default=True, type=bool, help="Merge lone hits into merged S2s", ) merge_s0 = straxen.URLConfig( default=True, type=bool, help="Merge S0s into merged S2s", ) gain_model = straxen.URLConfig( infer_type=False, help="PMT gain model. Specify as (str(model_config), str(version), nT-->boolean", ) rough_seg = straxen.URLConfig( default=30, type=(int, float), help="Rough single electron gain [PE/e]" ) sigma_seg = straxen.URLConfig( default=6.5, type=(int, float), help="Standard deviation of the single electron gain [PE/e]", ) rough_min_sigma = straxen.URLConfig( default=1e2, type=(int, float), help="Minimum sigma for the merged peaks" ) rough_max_sigma = straxen.URLConfig( default=2e4, type=(int, float), help="Maximum sigma for the merged peaks" ) rough_sigma_bins = straxen.URLConfig( default=10, type=int, help="Number of bins for sigma of merged peaks" ) rough_mu_bins = straxen.URLConfig( default=10, type=int, help="Number of bins for mu of merged peaks" ) poisson_max_mu = straxen.URLConfig( default=25, type=int, help="When to switch from Poisson to normal distribution" ) poisson_survival_ratio = straxen.URLConfig( default=1e-4, type=float, help=( "Survival ratio for Poisson distribution. The PMF smaller than this will be ignored." ), ) normal_max_sigma = straxen.URLConfig( default=7, type=(int, float), help="Maximum sigma for the normal distribution CDF panel", ) normal_n_bins = straxen.URLConfig( default=501, type=int, help="Number of bins for the normal distribution CDF panel", ) maxexp = straxen.URLConfig( default=10, type=(int, float), help="Maximum exponent for the posterior to keep numerical stability", ) use_bayesian_merging = straxen.URLConfig(default=True, type=bool, help="Use Bayesian merging") rm_sparse_xy = straxen.URLConfig( default=True, type=bool, help="Remove peaklets that are too far away in (x, y)" ) use_uncertainty_weights = straxen.URLConfig( default=True, type=bool, help="Use uncertainty from probabilistic posrec to derive weights" ) p_value_prioritized = straxen.URLConfig( default=False, type=bool, help="Whether to prioritize p-value over area when testing the proposal", ) merged_s2s_get_window_size_factor = straxen.URLConfig( default=5, type=int, track=False, help="Factor of the window size for the merged_s2s plugin" ) disable_progress_bar = straxen.URLConfig( default=True, type=bool, track=False, help="Whether to disable the progress bar" ) indicator_dtype = np.dtype( [(("Peaklet is merging input or peak is merged from peaklets", "merged"), bool)] ) copied_dtype = np.dtype( [ ("time", np.int64), ("endtime", np.int64), ("area", np.float32), ("median_time", np.float32), ("area_decile_from_midpoint", np.float32, (11,)), ] ) def _have_data(self, field): return field in self.deps["peaklets"].dtype_for("peaklets").names
[docs] def infer_dtype(self): peaklet_classification_dtype = self.deps["peaklet_classification"].dtype_for( "peaklet_classification" ) peaklets_dtype = self.deps["peaklets"].dtype_for("peaklets") # The merged dtype is argument position dependent! # It must be first classification then peaklet # Otherwise strax will raise an error # when checking for the returned dtype! merged_s2s_dtype = strax.merged_dtype( (peaklet_classification_dtype, peaklets_dtype, self.indicator_dtype) ) enhanced_peaklet_classification_dtype = strax.merged_dtype( (peaklet_classification_dtype, self.indicator_dtype) ) return dict( merged_s2s=merged_s2s_dtype, enhanced_peaklet_classification=enhanced_peaklet_classification_dtype, )
[docs] def setup(self): self.to_pe = self.gain_model self.gap_thresholds = np.array(self.s2_merge_gap_thresholds).T self.p_thresholds = np.array(self.s2_merge_p_thresholds).T self.dr_thresholds = np.array(self.s2_merge_dr_thresholds).T # Max gap and area should be set by the gap thresholds to avoid contradictions if np.argmax(self.gap_thresholds[1]) != 0: raise ValueError("The first point should be the maximum gap to allow merging") if self.p_thresholds[1].max() > 1 or self.p_thresholds[1].min() < 0: raise ValueError("P-value should be in the range of [0, 1]") self.max_gap = self.gap_thresholds[1, 0] self.max_duration = self.s2_merge_max_duration self.poisson_max_k = np.ceil( poisson.isf(q=self.poisson_survival_ratio, mu=self.poisson_max_mu) ).astype(int) self.factorial_panel = np.array( [np.prod(np.arange(1, k + 1, dtype=float)) for k in range(self.poisson_max_k + 1)] ) if np.any(self.factorial_panel < 0): raise ValueError("Factorial panel has negative values, this might because of overflow") x = np.linspace(-self.normal_max_sigma, self.normal_max_sigma, self.normal_n_bins) self.normal_panel = np.vstack([x, norm.cdf(x)]) self.sigma = np.linspace(self.rough_min_sigma, self.rough_max_sigma, self.rough_sigma_bins) self.sigma_panel = np.repeat(self.sigma[None, :], self.rough_mu_bins, axis=0).flatten() self.unmerged_thresholds = self.s2_merge_unmerged_thresholds
[docs] def get_window_size(self): return self.merged_s2s_get_window_size_factor * ( int(self.s2_merge_gap_thresholds[0][1]) + self.s2_merge_max_duration )
[docs] def no_merging(self, peaklets): is_s2 = peaklets["type"] == 2 return np.sum(is_s2) <= 1 or self.max_gap < 0
[docs] def compute(self, peaklets, lone_hits, start, end): # "merged" will be set as False automatically _peaklets = strax.merge_arrs( [peaklets, np.zeros(len(peaklets), dtype=self.indicator_dtype)], dtype=strax.merged_dtype((peaklets.dtype, self.indicator_dtype)), ) if self.use_uncertainty_weights: name = f"position_contour_{self.default_reconstruction_algorithm}" if name not in _peaklets.dtype.names: raise ValueError(f"{name} is not in the input peaklets dtype") # initialize enhanced_peaklet_classification # "merged" will be set as False automatically enhanced_peaklet_classification = np.zeros( len(_peaklets), dtype=self.dtype_for("enhanced_peaklet_classification") ) # copy fields, especially type for d in enhanced_peaklet_classification.dtype.names: enhanced_peaklet_classification[d] = _peaklets[d] if self.no_merging(_peaklets): empty_result = self.empty_result() empty_result["enhanced_peaklet_classification"] = enhanced_peaklet_classification return empty_result # make sure the peaklets are not overwritten _peaklets.flags.writeable = False is_s2 = _peaklets["type"] == 2 # peaklets might be overwritten in the merge method # so do not reuse the peaklets after this line merged_s2s, not_excluded, merged = self.merge(_peaklets, lone_hits, start, end) # mark the peaklets can be merged by time-density but not # by position-density as type FAR_XYPOS_S2_TYPE enhanced_peaklet_classification["type"][is_s2 & ~not_excluded] = FAR_XYPOS_S2_TYPE enhanced_peaklet_classification["merged"][merged] = True return dict( merged_s2s=merged_s2s, enhanced_peaklet_classification=enhanced_peaklet_classification )
[docs] def merge(self, _peaklets, lone_hits, start, end): """Merge into S2s if the peaklets are close enough in time and position.""" # this is an OverlapWindowPlugin, some peaklets will be reused in the next iteration # use _peaklets here to prevent peaklets from being overwritten if np.any(_peaklets["time"][1:] < strax.endtime(_peaklets)[:-1]): raise ValueError("Peaklets not disjoint, why?") # only keep S2 peaklets for merging is_s2 = _peaklets["type"] == 2 if not (self._have_data("data_top") and self._have_data("data_start")): peaklets = np.zeros( is_s2.sum(), dtype=strax.merged_dtype( ( strax.peak_dtype( n_channels=self.n_tpc_pmts, store_data_top=True, store_data_start=True ), self.indicator_dtype, ) ), ) strax.copy_to_buffer(_peaklets[is_s2], peaklets, "_add_data_top_or_start_field") else: peaklets = _peaklets[is_s2] # make sure the peaklets are not overwritten peaklets.flags.writeable = False max_buffer = int(self.max_duration // strax.gcd_of_array(peaklets["dt"])) # _not_excluded is the mask of the peaklets that are not excluded by (x, y) merging start_merge_at, end_merge_at, _not_excluded = self.get_merge_instructions( peaklets, start, end, self.max_gap, self.max_duration, self.sigma, self.rough_seg, self.sigma_seg, self.rough_mu_bins, self.poisson_max_mu, self.poisson_survival_ratio, self.normal_panel, self.factorial_panel, self.sigma_panel, self.maxexp, self.n_top_pmts, self.p_thresholds, self.dr_thresholds, self.default_reconstruction_algorithm, self.use_bayesian_merging, self.rm_sparse_xy, self.use_uncertainty_weights, self.p_value_prioritized, self.gap_thresholds, disable=self.disable_progress_bar, ) if "data_top" not in peaklets.dtype.names or "data_start" not in peaklets.dtype.names: raise ValueError("data_top or data_start is not in the peaklets dtype") _merged_s2 = np.zeros(len(peaklets), dtype=bool) for i in range(len(start_merge_at)): sl = slice(start_merge_at[i], end_merge_at[i]) _merged_s2[sl] = True # exclude the peaklets that are not merged by (x, y) _merged_s2[~_not_excluded] = False # have to redo the merging to prevent numerical instability is_s0 = _peaklets["type"] == 0 if self.merge_s0 and len(start_merge_at) and is_s0.sum() > 0: # build the time interval of merged_s2s, even though they are not merged yet endtime = strax.endtime(peaklets) merged_s2s_window = np.zeros(len(start_merge_at), dtype=strax.time_fields) for i in range(len(start_merge_at)): sl = slice(start_merge_at[i], end_merge_at[i]) merged_s2s_window["time"][i] = peaklets["time"][sl][_not_excluded[sl]][0] merged_s2s_window["endtime"][i] = endtime[sl][_not_excluded[sl]][-1] # the S0s that should be merged should fully be contained _merged_s0 = strax.fully_contained_in(_peaklets[is_s0], merged_s2s_window) != -1 merged_s0s = strax.split_by_containment(_peaklets[is_s0], merged_s2s_window) # offsets of indices increments = np.array([len(m) for m in merged_s0s], dtype=int) offsets = np.hstack([0, np.cumsum(increments)]) _start_merge_at = start_merge_at + offsets[:-1] _end_merge_at = end_merge_at + offsets[1:] if np.min(_end_merge_at - _start_merge_at) < 2: raise ValueError("You are merging nothing!") # prepare for peaklets including S0s __merged = np.hstack([_not_excluded, np.full(increments.sum(), True)]) _peaklets = np.hstack([peaklets, np.hstack(merged_s0s)]) argsort = strax.stable_argsort(_peaklets["time"]) merged_s2s = self.merge_peaklets( _peaklets[argsort], _start_merge_at, _end_merge_at, __merged[argsort], max_buffer, max_unmerged=self.unmerged_thresholds, ) else: _merged_s0 = np.zeros(is_s0.sum(), dtype=bool) merged_s2s = self.merge_peaklets( peaklets, start_merge_at, end_merge_at, _not_excluded, max_buffer, max_unmerged=self.unmerged_thresholds, ) if ( len(merged_s2s) and np.max((strax.endtime(merged_s2s) - merged_s2s["time"])) > self.max_duration ): raise ValueError("Merged S2 is too long") if self.merge_lone_hits: # Updated time and length of lone_hits and sort again: # this is an OverlapWindowPlugin, some lone_hits will be reused in the next iteration # so do not overwirte the lone_hits lh = np.copy(lone_hits) del lone_hits lh_time_shift = (lh["left"] - lh["left_integration"]) * lh["dt"] lh["time"] = lh["time"] - lh_time_shift lh["length"] = lh["right_integration"] - lh["left_integration"] lh = strax.sort_by_time(lh) _store_data_top = "data_top" in self.dtype_for("merged_s2s").names _store_data_start = "data_start" in self.dtype_for("merged_s2s").names strax.add_lone_hits( merged_s2s, lh, self.to_pe, n_top_channels=self.n_top_pmts, store_data_top=_store_data_top, store_data_start=_store_data_start, ) strax.compute_properties(merged_s2s, n_top_channels=self.n_top_pmts) # remove position fields merged_s2s = drop_data_field( merged_s2s, self.dtype_for("merged_s2s"), "_drop_data_field_merged_s2s" ) # make sure merged has same length as peaklets merged = np.zeros(len(is_s2), dtype=bool) merged[is_s2] = _merged_s2 merged[is_s0] = _merged_s0 not_excluded = np.ones(len(is_s2), dtype=bool) not_excluded[is_s2] = _not_excluded # of course all merged S2s are merged merged_s2s["merged"] = True return merged_s2s, not_excluded, merged
[docs] @staticmethod def get_left_right(peaklet): """Get the left and right boundaries of the peaklet.""" # The gap is defined as the 90% to 10% area decile distance of the adjacent peaks left = (peaklet["area_decile_from_midpoint"][1] + peaklet["median_time"]).astype(int) right = (peaklet["area_decile_from_midpoint"][9] + peaklet["median_time"]).astype(int) return left, right
[docs] @staticmethod def get_gap(y, x): """Get the gap between two peaklets.""" y_left, y_right = MergedS2s.get_left_right(y) x_left, x_right = MergedS2s.get_left_right(x) dt = x["time"] - y["time"] boundaries = np.sort([y_left, y_right, x_left + dt, x_right + dt]) this_gap = boundaries[2] - boundaries[1] return this_gap
[docs] @staticmethod @numba.njit(cache=True, nogil=True) def get_duration(y, x): """Get the duration of the merged peaklets.""" return max(y["endtime"], x["endtime"]) - min(y["time"], x["time"])
[docs] @staticmethod @numba.njit(cache=True, nogil=True) def decile_interp( x, y, panel=np.linspace(0, 1, 11), ): area = x["area"] + y["area"] xf = x["area"] / area yf = y["area"] / area xp = np.concatenate((panel * xf, panel * yf + xf)) yp = np.concatenate( ( (x["area_decile_from_midpoint"] + x["median_time"]), (y["time"] - x["time"]) + (y["area_decile_from_midpoint"] + y["median_time"]), ) ) decile = np.interp(panel, xp, yp) return decile[5], decile - decile[5]
[docs] @staticmethod def merge_two_peaks(x, y): """Merge two peaklets in order.""" z = np.array(0, dtype=x.dtype) z["time"] = x["time"] z["endtime"] = y["endtime"] z["area"] = x["area"] + y["area"] z["median_time"], z["area_decile_from_midpoint"] = MergedS2s.decile_interp(x, y) return z
[docs] @staticmethod def get_merge_instructions( _peaks, start, end, max_gap, max_duration, sigma, rough_seg, sigma_seg, rough_mu_bins, poisson_max_mu, poisson_survival_ratio, normal_panel, factorial_panel, sigma_panel, maxexp, n_top_pmts, p_thresholds, dr_thresholds, posrec_algo, bayesian=True, sparse_xy=True, uncertainty_weights=True, p_value_prioritized=False, gap_thresholds=None, diagnosing=False, disable=True, ): """Find the group of peaklets to merge. There are two ways to merge peaklets: 1. Bayesian merging: merge peaklets based on the p-value of the time-density merging 2. Normal merging: merge peaklets based on the gap between the peaklets """ # (x, y) positions of the peaklets positions = np.vstack([_peaks[f"x_{posrec_algo}"], _peaks[f"y_{posrec_algo}"]]).T if uncertainty_weights: contour_area = _peaks[f"position_contour_area_{posrec_algo}"] # weights of the peaklets when calculating the weighted mean deviation in (x, y) area = _peaks["area"] area_top = area * _peaks["area_fraction_top"] peaks = np.zeros(len(_peaks), dtype=MergedS2s.copied_dtype) for name in peaks.dtype.names: if name == "endtime": peaks[name] = strax.endtime(_peaks) else: peaks[name] = _peaks[name] peaks_copy = np.copy(peaks) peaks_copy.flags.writeable = False n_peaks = len(peaks) if n_peaks == 0: raise ValueError("No peaklets to merge") start_index = np.arange(n_peaks) # exclusive end index end_index = np.arange(n_peaks) + 1 # keep a copy of the original indices start_index_copy = np.copy(start_index) start_index_copy.flags.writeable = False end_index_copy = np.copy(end_index) end_index_copy.flags.writeable = False # mask to help keep track of the peaklets that have been examined unexamined = np.full(n_peaks, True) # mask to help keep track of the peaklets that should not be merged because # of too high standard deviation from the main cluster in (x, y) of the peaklets merged = np.full(n_peaks, True) # approximation of the integration boundaries core_bounds = (peaks["time"][1:] + peaks["endtime"][:-1]) // 2 # here the constraint on boundaries is also to make sure get_window_size covers the gaps left_bounds = np.maximum(np.hstack([start, core_bounds]), peaks["time"] - int(max_gap / 2)) right_bounds = np.minimum( np.hstack([core_bounds, end]), peaks["endtime"] + int(max_gap / 2) ) # keep a copy of the original boundaries left_bounds_copy = np.copy(left_bounds) left_bounds_copy.flags.writeable = False right_bounds_copy = np.copy(right_bounds) right_bounds_copy.flags.writeable = False if diagnosing: merged_area = [] p_values = [] dr_avgs = [] def gap_small_enough(y, x): log_area = np.log10(y["area"] + x["area"]) this_gap = MergedS2s.get_gap(y, x) gap_threshold = thresholds_interpolation(log_area, gap_thresholds) # gap of 90-10% area decile distance should not be larger than the threshold return this_gap < gap_threshold def can_merge_left(i, peaks): # whether peaklet i (from _peak) can be merged to its left unexamined peaklet flag = ( i - 1 >= 0 and unexamined[i - 1] and MergedS2s.get_gap(peaks[i], peaks[i - 1]) < max_gap and MergedS2s.get_duration(peaks[i], peaks[i - 1]) <= max_duration ) if not flag: return False return gap_small_enough(peaks[i], peaks[i - 1]) def can_merge_right(i, peaks): # whether peaklet i (from _peak) can be merged to its right unexamined peaklet flag = ( i < n_peaks and unexamined[i] and MergedS2s.get_gap(peaks[i - 1], peaks[i]) < max_gap and MergedS2s.get_duration(peaks[i - 1], peaks[i]) <= max_duration ) if not flag: return False return gap_small_enough(peaks[i - 1], peaks[i]) argsort = strax.stable_argsort(peaks["area"]) for i in tqdm(argsort[::-1], disable=disable): if not unexamined[i]: continue p = np.array([1.0, 1.0]) p_threshold = np.array([0.0, 0.0]) left = True dr_avg = 0.0 # in the while loops, the peaklets will be merged until the p-value # is smaller than the threshold or the peaklets can not be merged anymore # i will NOT be updated in the while loop while np.any(p >= p_threshold) and unexamined[i]: indices = [] # please mind here that the definition of gaps should # be consistent with in the merging algorithm # and the merged peak should not be longer than the max duration if can_merge_left(start_index[i], peaks): indices.append(start_index[i] - 1) else: indices.append(None) if can_merge_right(end_index[i], peaks): indices.append(end_index[i]) else: indices.append(None) p = [] p_threshold = [] _area = [] # get p-values for j in indices: if j is None: p.append(-1.0) p_threshold.append(1.0) _area.append(-1.0) continue # if area is non-positive, "merge" in time # but still skip it later in (x, y) because its weight is nan if not peaks["area"][j] > 0: p.append(1.0) p_threshold.append(-1.0) _area.append(peaks["area"][j]) continue log_area = np.log10(peaks["area"][i] + peaks["area"][j]) if bayesian: p_ = get_p_value( peaks[i], peaks[j], left_bounds[i], right_bounds[i], left_bounds[j], right_bounds[j], sigma, rough_seg, sigma_seg, rough_mu_bins, poisson_max_mu, poisson_survival_ratio, normal_panel, factorial_panel, sigma_panel, maxexp, ) p_threshold_ = thresholds_interpolation( log_area, p_thresholds, ) else: # this is kept for diagnosing and merged_s2s_he p_ = 1.0 p_threshold_ = 0.0 p.append(p_) p_threshold.append(p_threshold_) _area.append(peaks["area"][j]) p = np.array(p) p_threshold = np.array(p_threshold) _area = np.array(_area) examined = slice(start_index[i], end_index[i]) if diagnosing: merged_area.append(area[examined][merged[examined]].sum()) if np.all(p < p_threshold): # this will not allow merging of the already examined peaklets unexamined[examined] = False else: # whether test the proposal with larger area first if p_value_prioritized: left = p[0] > p[1] else: left = _area[0] > _area[1] # if test left first if left: if p[0] >= p_threshold[0]: left = True else: assert p[1] >= p_threshold[1] left = False else: if p[1] >= p_threshold[1]: left = False else: assert p[0] >= p_threshold[0] left = True # slice indicating the direction of merging if left: direction = [indices[0], indices[0] + 1] index = indices[0] else: direction = [indices[1] - 1, indices[1]] index = indices[1] # slice indicating the peaklets to be merged start_idx = start_index[direction[0]] end_idx = end_index[direction[1]] merging = slice(start_idx, end_idx) if sparse_xy: # calculate weighted averaged deviation of peaklets from the main cluster if uncertainty_weights: weights = 1 / contour_area[merging][merged[merging]] else: weights = area_top[merging][merged[merging]] dr_avg = weighted_averaged_dr( positions[merging, 0][merged[merging]], positions[merging, 1][merged[merging]], weights, ) # do we really merge the peaklets? dr_threshold_ = thresholds_interpolation( np.log10(area_top[merging][merged[merging]].sum()), dr_thresholds, ) merge = dr_avg < dr_threshold_ else: merge = True if merge: merged_peak = MergedS2s.merge_two_peaks( peaks[direction[0]], peaks[direction[1]] ) # check if the merged peak is too long if merged_peak["endtime"] - merged_peak["time"] > max_duration: raise ValueError("Merged S2 is too long") else: merged[index] = False merged_peak = peaks[i] # update merging peaks and boundaries # update ALL peaks in the slice, this is only temporarily needed # because we can not foresee in which order the peaks will be merged peaks[merging] = merged_peak start_index[merging] = start_index[merging.start] end_index[merging] = end_index[merging.stop - 1] left_bounds[merging] = left_bounds[merging.start] right_bounds[merging] = right_bounds[merging.stop - 1] if diagnosing: if left: p_values.append(p[0]) else: p_values.append(p[1]) if sparse_xy: dr_avgs.append(dr_avg) else: dr_avgs.append(0.0) if np.any(np.diff(start_index) < 0) or np.any(np.diff(end_index) < 0): raise ValueError("Indices are not sorted!") n_peaklets = end_index - start_index need_merging = n_peaklets > 1 start_index = np.unique(start_index[need_merging]) end_index = np.unique(end_index[need_merging]) if diagnosing: return start_index, end_index, merged, merged_area, p_values, dr_avgs return start_index, end_index, merged
[docs] @staticmethod def merge_peaklets( peaklets, start_merge_at, end_merge_at, merged, max_buffer=int(1e5), max_unmerged=None, return_all_peaks=False, ): if return_all_peaks: # execute earlier to prevent peaklets from being overwritten # mark the peaklets can be merged by time-density but not # by position-density as type FAR_XYPOS_S2_TYPE _far_xypos = np.copy(peaklets[~merged]) _far_xypos["type"] = FAR_XYPOS_S2_TYPE # mark the peaklets can be merged by time-density and position-density as type 2 merged_s2s = strax.merge_peaks( peaklets, start_merge_at, end_merge_at, merged=merged, max_buffer=max_buffer, ) # we do not do another round of classification for simplicity merged_s2s["type"] = 2 # if the number of type 20 peaklets inside a peak is larger than the threshold # or the area of type 20 peaklets inside a peak is larger than the threshold # mark the peaklets as type WIDE_XYPOS_S2_TYPE if max_unmerged is not None: n_de = [] area = [] area_de = [] for i in range(len(merged_s2s)): sl = slice(start_merge_at[i], end_merge_at[i]) n_de.append(np.sum(~merged[sl])) area.append(peaklets["area"][sl][merged[sl]].sum()) area_de.append(peaklets["area"][sl][~merged[sl]].sum()) n_de = np.array(n_de) area = np.array(area) area_de = np.array(area_de) # if the number of type 20 peaklets inside a peak is larger than the threshold mask = n_de > max_unmerged[0] # if the area of type 20 peaklets inside a peak is larger than the (both) threshold(s) mask |= (area_de > max_unmerged[1]) & (area_de > area * max_unmerged[2]) merged_s2s["type"] = np.where(mask, WIDE_XYPOS_S2_TYPE, merged_s2s["type"]) if return_all_peaks: merged_s2s = strax.sort_by_time(np.concatenate([_far_xypos, merged_s2s])) return merged_s2s
@numba.njit(cache=True, nogil=True) def get_p_value( y, x, ly, ry, lx, rx, sigma, rough_seg, sigma_seg, rough_mu_bins, poisson_max_mu, poisson_survival_ratio, normal_panel, factorial_panel, sigma_panel, maxexp, ): # better constraint the mu in boundaries of waveform # because the number of bins is not high enough comprising for computation efficiency mu = rough_mu(y, x, rough_mu_bins) t0 = y["time"] y_integrated = normal_cdf(((ry - t0) - mu[:, None]) / sigma, normal_panel) y_integrated -= normal_cdf(((ly - t0) - mu[:, None]) / sigma, normal_panel) x_integrated = normal_cdf(((rx - t0) - mu[:, None]) / sigma, normal_panel) x_integrated -= normal_cdf(((lx - t0) - mu[:, None]) / sigma, normal_panel) # keep numerical stability eps = 2.220446049250313e-16 # np.finfo(float).eps y_integrated += eps x_integrated += eps pmf = get_posterior( mu, sigma, y["area"], y["area_decile_from_midpoint"], y_integrated, y["time"], y["median_time"], x["time"], x["median_time"], rough_seg, maxexp, ).flatten() non_zero = pmf > 0 if not np.any(non_zero): return 0.0 # the mask non_zero here is also implicit flatten ps = p_values( sigma_panel[non_zero], y["area"], y_integrated.flatten()[non_zero], x["area"], x_integrated.flatten()[non_zero], rough_seg, sigma_seg, poisson_max_mu, poisson_survival_ratio, normal_panel, factorial_panel, ) p = np.sum(ps * pmf[non_zero]) if np.isnan(p): raise RuntimeError("p-value is NaN") return p @numba.njit(cache=True, nogil=True) def thresholds_interpolation(log_area, thresholds): """Return threshold for log_area of the merged S2 with linear interpolation given the points in thresholds. :param log_area: Log 10 area of the merged S2 :param thresholds: tuple (n, 2) of fix points for interpolation. """ if log_area < thresholds[0, 0]: return thresholds[1, 0] if log_area > thresholds[0, -1]: return thresholds[1, -1] return np.interp(log_area, thresholds[0], thresholds[1]) @numba.njit(cache=True, nogil=True) def rough_mu(y, x, rough_mu_bins): """Get rough bins for mu as the integration space.""" mu = np.linspace( min(y["time"], x["time"]) - y["time"], max(y["endtime"], x["endtime"]) - y["time"], rough_mu_bins, ) return mu @numba.njit(cache=True, nogil=True) def poisson_pmf(k, mu, factorial_panel): """Probability mass function of Poisson distribution, numba decorated.""" return mu**k / np.exp(mu) / factorial_panel[k] @numba.njit(cache=True, nogil=True) def normal_cdf(x, normal_panel): """Cumulative density function of standard normal distribution, numba decorated.""" return np.interp(x, normal_panel[0], normal_panel[1]) @numba.njit(cache=True, nogil=True) def get_posterior( mu, sigma, y_area, y_area_decile_from_midpoint, y_integrated, y_time, y_median_time, x_time, x_median_time, rough_seg, maxexp, ): """Get the posterior of (mu, sigma) based on the waveform y.""" # production of normal PDF y_t = y_area_decile_from_midpoint + y_median_time y_t = (y_t[:-1] + y_t[1:]) / 2 y_nse = y_area / rough_seg sigma2 = 2 * sigma**2 log_sigma = np.log(sigma) # dt is median between median time of two peaklets dt = np.abs((y_time - x_time) + (y_median_time - x_median_time)) # because the number of electrons are uniformly distributed in y_t log_pdf = -(((y_t - mu[:, None]) ** 2).sum(axis=1) * (y_nse / 10))[:, None] / sigma2 # complete normal distribution PDF log_sigma * y_nse # add log prior dt**2 / 4 / sigma2 + 2 * log_sigma # account for the bounded sampling of y np.log(y_integrated) * y_nse log_pdf -= np.log(y_integrated) * y_nse + log_sigma * (y_nse + 2) + dt**2 / 4 / sigma2 # to prevent numerical overflow log_pdf -= log_pdf.max() log_pdf += maxexp pdf = np.exp(log_pdf) pmf = pdf / pdf.sum() if np.any(np.isnan(pmf)): raise RuntimeError("pmf is NaN") return pmf @numba.njit(cache=True, nogil=True) def p_values( sigma, y_area, y_integrated, x_area, x_integrated, rough_seg, sigma_seg, poisson_max_mu, poisson_survival_ratio, normal_panel, factorial_panel, ): """The p-value if observe the x_area given the y_area, numba decorated.""" # distribution of allowed area expected_nse = x_integrated * y_area / y_integrated / rough_seg ps = [] for mu in expected_nse: # the number of electron follows Poisson distribution if mu > poisson_max_mu: ps.append(1 - normal_cdf((x_area / rough_seg - mu) / np.sqrt(mu), normal_panel)) else: cdf = poisson_pmf(0, mu, factorial_panel) k = 1 p = 0 while cdf < 1 - poisson_survival_ratio: pmf = poisson_pmf(k, mu, factorial_panel) cdf += pmf p += pmf * ( 1 - normal_cdf((x_area - k * rough_seg) / (k**0.5 * sigma_seg), normal_panel) ) k += 1 ps.append(p) ps = np.array(ps) return ps @numba.njit(cache=True, nogil=True) def weighted_averaged_dr(x, y, weights): """Weighted average deviation from weighted average (x, y)""" mask = weights > 0 mask &= ~np.isnan(x) mask &= ~np.isnan(x) # do not merge any S2 looks weird if not np.all(mask): return np.nan x_avg = np.average(x[mask], weights=weights[mask]) y_avg = np.average(y[mask], weights=weights[mask]) dr = np.sqrt((x - x_avg) ** 2 + (y - y_avg) ** 2) dr_avg = np.average(dr[mask], weights=weights[mask]) return dr_avg