Source code for straxen.plugins.afterpulses.afterpulse_processing

import numba
import numpy as np
import strax
import straxen

export, __all__ = strax.exporter()


[docs]@export class LEDAfterpulseProcessing(strax.Plugin): __version__ = "0.5.1" depends_on = "raw_records" data_kind = "afterpulses" provides = "afterpulses" compressor = "zstd" parallel = "process" rechunk_on_save = True gain_model = straxen.URLConfig( infer_type=False, help="PMT gain model. Specify as (model_type, model_config)", ) n_tpc_pmts = straxen.URLConfig( type=int, help="Number of PMTs in TPC", ) LED_window_left = straxen.URLConfig( default=50, infer_type=False, help="Left boundary of sample range for LED pulse integration", ) LED_window_right = straxen.URLConfig( default=100, infer_type=False, help="Right boundary of sample range for LED pulse integration", ) baseline_samples = straxen.URLConfig( default=40, infer_type=False, help="Number of samples to use at start of WF to determine the baseline", ) hit_min_amplitude = straxen.URLConfig( track=True, infer_type=False, default="cmt://hit_thresholds_tpc?version=ONLINE&run_id=plugin.run_id", help=( "Minimum hit amplitude in ADC counts above baseline. " "Specify as a tuple of length n_tpc_pmts, or a number," 'or a string like "legacy-thresholds://pmt_commissioning_initial" which means calling' "hitfinder_thresholds.py" 'or url string like "cmt://hit_thresholds_tpc?version=ONLINE" which means' "calling cmt." ), ) hit_min_height_over_noise = straxen.URLConfig( default=4, infer_type=False, help=( "Minimum hit amplitude in numbers of baseline_rms above baseline." "Actual threshold used is max(hit_min_amplitude, hit_min_" "height_over_noise * baseline_rms)." ), ) save_outside_hits = straxen.URLConfig( default=(3, 20), infer_type=False, help="Save (left, right) samples besides hits; cut the rest", )
[docs] def infer_dtype(self): dtype = dtype_afterpulses() return dtype
[docs] def setup(self): self.to_pe = self.gain_model self.hit_thresholds = self.hit_min_amplitude self.hit_left_extension, self.hit_right_extension = self.save_outside_hits
[docs] def compute(self, raw_records): # Convert everything to the records data type -- adds extra fields. records = strax.raw_to_records(raw_records) del raw_records # calculate baseline and baseline rms strax.baseline(records, baseline_samples=self.baseline_samples, flip=True) # find all hits hits = strax.find_hits( records, min_amplitude=self.hit_thresholds, min_height_over_noise=self.hit_min_height_over_noise, ) # sort hits by record_i and time, then find LED hit and afterpulse # hits within the same record hits_ap = find_ap( hits, records, LED_window_left=self.LED_window_left, LED_window_right=self.LED_window_right, hit_left_extension=self.hit_left_extension, hit_right_extension=self.hit_right_extension, ) hits_ap["area_pe"] = hits_ap["area"] * self.to_pe[hits_ap["channel"]] hits_ap["height_pe"] = hits_ap["height"] * self.to_pe[hits_ap["channel"]] return hits_ap
[docs]@export def find_ap( hits, records, LED_window_left, LED_window_right, hit_left_extension, hit_right_extension ): buffer = np.zeros(len(hits), dtype=dtype_afterpulses()) if not len(hits): return buffer # sort hits first by record_i, then by time hits_sorted = np.sort(hits, order=("record_i", "time")) res = _find_ap( hits_sorted, records, LED_window_left, LED_window_right, hit_left_extension, hit_right_extension, buffer=buffer, ) return res
@numba.jit(nopython=True, nogil=True, cache=True) def _find_ap( hits, records, LED_window_left, LED_window_right, hit_left_extension, hit_right_extension, buffer=None, ): # hits need to be sorted by record_i, then time! offset = 0 is_LED = False t_LED = None prev_record_i = hits[0]["record_i"] record_data = records[prev_record_i]["data"] record_len = records[prev_record_i]["length"] baseline_fpart = records[prev_record_i]["baseline"] % 1 for h_i, h in enumerate(hits): if h["record_i"] > prev_record_i: # start of a new record is_LED = False # only increment buffer if the old one is not empty! this happens # when no (LED) hit is found in the previous record if not buffer[offset]["time"] == 0: offset += 1 prev_record_i = h["record_i"] record_data = records[prev_record_i]["data"] baseline_fpart = records[prev_record_i]["baseline"] % 1 res = buffer[offset] if h["left"] < LED_window_left: # if hit is before LED window: discard continue if h["left"] < LED_window_right: # hit is in LED window if not is_LED: # this is the first hit in the LED window fill_hitpars( res, h, hit_left_extension, hit_right_extension, record_data, record_len, baseline_fpart, ) # set the LED time in the current WF t_LED = res["sample_10pc_area"] is_LED = True continue # more hits in LED window: extend the first (merging all hits in the LED window) fill_hitpars( res, h, hit_left_extension, hit_right_extension, record_data, record_len, baseline_fpart, extend=True, ) t_LED = res["sample_10pc_area"] continue # Here begins a new hit after the LED window if (h["left"] >= LED_window_right) and not is_LED: # no LED hit found: ignore and go to next hit (until new record begins) continue # if a hit is completely inside the previous hit's right_extension, # then skip it (because it's already included in the previous hit) if h["right"] <= res["right_integration"]: continue # if a hit only partly overlaps with the previous hit's right_ # extension, merge them (extend previous hit by this one) if h["left"] <= res["right_integration"]: fill_hitpars( res, h, hit_left_extension, hit_right_extension, record_data, record_len, baseline_fpart, extend=True, ) res["tdelay"] = res["sample_10pc_area"] - t_LED continue # an actual new hit increases the buffer index offset += 1 res = buffer[offset] fill_hitpars( res, h, hit_left_extension, hit_right_extension, record_data, record_len, baseline_fpart ) res["tdelay"] = res["sample_10pc_area"] - t_LED return buffer[:offset]
[docs]@export @numba.jit(nopython=True, nogil=True, cache=True) def get_sample_area_quantile(data, quantile, baseline_fpart): """Returns first sample index in hit where integrated area of hit is above total area.""" area = 0 area_tot = data.sum() + len(data) * baseline_fpart for d_i, d in enumerate(data): area += d + baseline_fpart if area > (quantile * area_tot): return d_i if d_i == len(data) - 1: # if no quantile was found, something went wrong # (negative area due to wrong baseline, caused by real events that # by coincidence fall in the first samples of the trigger window) # print('no quantile found: set to 0') return 0 # What happened here?! return 0
@numba.jit(nopython=True, nogil=True, cache=True) def fill_hitpars( result, hit, hit_left_extension, hit_right_extension, record_data, record_len, baseline_fpart, extend=False, ): if not extend: # fill first time only result["time"] = hit["time"] - hit_left_extension * hit["dt"] result["dt"] = hit["dt"] result["channel"] = hit["channel"] result["left"] = hit["left"] result["record_i"] = hit["record_i"] result["threshold"] = hit["threshold"] result["left_integration"] = hit["left"] - hit_left_extension result["height"] = hit["height"] # fill always (if hits are merged, only these will be updated) result["right"] = hit["right"] result["right_integration"] = hit["right"] + hit_right_extension if result["right_integration"] > record_len: result["right_integration"] = record_len # cap right_integration at end of record result["length"] = result["right_integration"] - result["left_integration"] hit_data = record_data[result["left_integration"] : result["right_integration"]] result["area"] = hit_data.sum() + result["length"] * baseline_fpart result["sample_10pc_area"] = result["left_integration"] + get_sample_area_quantile( hit_data, 0.1, baseline_fpart ) result["sample_50pc_area"] = result["left_integration"] + get_sample_area_quantile( hit_data, 0.5, baseline_fpart ) if len(hit_data): result["max"] = result["left_integration"] + hit_data.argmax() if extend: # only when merging hits result["height"] = max(result["height"], hit["height"])
[docs]@export def dtype_afterpulses(): # define new data type for afterpulse data dtype_ap = [ (("Channel/PMT number", "channel"), "<i2"), (("Time resolution in ns", "dt"), "<i2"), (("Start time of the interval (ns since unix epoch)", "time"), "<i8"), (("Length of the interval in samples", "length"), "<i4"), (("Integral in ADC x samples", "area"), "<i4"), (("Pulse area in PE", "area_pe"), "<f4"), (("Sample index in which hit starts", "left"), "<i2"), (("Sample index in which hit area succeeds 10% of total area", "sample_10pc_area"), "<i2"), (("Sample index in which hit area succeeds 50% of total area", "sample_50pc_area"), "<i2"), (("Sample index of hit maximum", "max"), "<i2"), (("Index of first sample in record just beyond hit (exclusive bound)", "right"), "<i2"), (("Height of hit in ADC counts", "height"), "<i4"), (("Height of hit in PE", "height_pe"), "<f4"), (("Delay of hit w.r.t. LED hit in same WF, in samples", "tdelay"), "<i2"), (("Internal (temporary) index of fragment in which hit was found", "record_i"), "<i4"), (("Index of sample in record where integration starts", "left_integration"), np.int16), (("Index of first sample beyond integration region", "right_integration"), np.int16), (("ADC threshold applied in order to find hits", "threshold"), np.float32), ] return dtype_ap