import numba
import numpy as np
import strax
import straxen
export, __all__ = strax.exporter()
[docs]@export
class LEDAfterpulseProcessing(strax.Plugin):
"""Plugin for processing LED afterpulses.
Detect LED pulses and afterpulses (APs) in raw_records waveforms. Compute the AP datatype.
"""
__version__ = "0.6.0"
depends_on = "raw_records"
data_kind = "afterpulses"
provides = "afterpulses"
compressor = "zstd"
parallel = "process"
rechunk_on_save = True
gain_model = straxen.URLConfig(
infer_type=False,
help="PMT gain model. Specify as (model_type, model_config)",
)
n_tpc_pmts = straxen.URLConfig(
type=int,
help="Number of PMTs in TPC",
)
LED_hit_left_boundary = straxen.URLConfig(
default=50,
infer_type=False,
help="Left boundary after which the first hit marks the position of the LED window",
)
LED_hit_right_boundary = straxen.URLConfig(
default=110,
infer_type=False,
help="Right boundary after which the LED hit will no longer be searched",
)
LED_window_width = straxen.URLConfig(
default=20,
infer_type=False,
help="Width of the window in which hits are merged into the LED hit after the first hit",
)
baseline_samples = straxen.URLConfig(
default=40,
infer_type=False,
help="Number of samples to use at start of WF to determine the baseline",
)
hit_min_amplitude = straxen.URLConfig(
track=True,
infer_type=False,
default=(
"list-to-array://"
"xedocs://hit_thresholds"
"?as_list=True"
"&sort=pmt"
"&attr=value"
"&detector=tpc"
"&run_id=plugin.run_id"
"&version=ONLINE"
),
help="Minimum hit amplitude in ADC counts above baseline. "
"Specify as a tuple of length n_tpc_pmts, or a number,"
'or a string like "legacy-thresholds://pmt_commissioning_initial" which means calling'
"hitfinder_thresholds.py"
'or url string like "xedocs://hit_thresholds_tpc?version=ONLINE" which means'
"calling xedocs.",
)
hit_min_height_over_noise = straxen.URLConfig(
default=4,
infer_type=False,
help=(
"Minimum hit amplitude in numbers of baseline_rms above baseline."
"Actual threshold used is max(hit_min_amplitude, hit_min_"
"height_over_noise * baseline_rms)."
),
)
save_outside_hits = straxen.URLConfig(
default=(3, 20),
infer_type=False,
help="Save (left, right) samples besides hits; cut the rest",
)
[docs] def infer_dtype(self):
dtype = dtype_afterpulses()
return dtype
[docs] def setup(self):
self.to_pe = self.gain_model
self.hit_thresholds = self.hit_min_amplitude
self.hit_left_extension, self.hit_right_extension = self.save_outside_hits
[docs] def compute(self, raw_records):
# Convert everything to the records data type -- adds extra fields.
records = strax.raw_to_records(raw_records)
del raw_records
# calculate baseline and baseline rms
strax.baseline(records, baseline_samples=self.baseline_samples, flip=True)
# find all hits
hits = strax.find_hits(
records,
min_amplitude=self.hit_thresholds,
min_height_over_noise=self.hit_min_height_over_noise,
)
# sort hits by record_i and time, then find LED hit and afterpulse
# hits within the same record
hits_ap = find_ap(
hits,
records,
LED_hit_left_boundary=self.LED_hit_left_boundary,
LED_hit_right_boundary=self.LED_hit_right_boundary,
LED_window_width=self.LED_window_width,
hit_left_extension=self.hit_left_extension,
hit_right_extension=self.hit_right_extension,
)
hits_ap["area_pe"] = hits_ap["area"] * self.to_pe[hits_ap["channel"]]
hits_ap["height_pe"] = hits_ap["height"] * self.to_pe[hits_ap["channel"]]
return hits_ap
[docs]@export
def find_ap(
hits,
records,
LED_hit_left_boundary,
LED_hit_right_boundary,
LED_window_width,
hit_left_extension,
hit_right_extension,
):
"""Find afterpulses (APs) in the given hits data within specified LED hit boundaries and
extensions.
Parameters
----------
hits :
Array containing hit data.
records :
Array containing record data.
LED_hit_left_boundary :
Left boundary of the LED hit window.
LED_hit_right_boundary :
Right boundary of the LED hit window.
LED_window_width :
Extension to the right of the first hit found in the LED hit window
within which hits are merged into the LED hit.
hit_left_extension :
Extension to the left of the hit window.
hit_right_extension :
Extension to the right of the hit window.
Returns
-------
Array containing afterpulse data.
Notes
-----
- Hits to the left of the LED_hit_left_boundary are ignored.
- If no hit is found between LED_hit_left_boundary and LED_hit_right_boundary
the record is skipped.
- The merged LED hits are also saved and can be selected for by having
t_delay = 0 by definition.
"""
buffer = np.zeros(len(hits), dtype=dtype_afterpulses())
if not len(hits):
return buffer
# sort hits first by record_i, then by time
hits_sorted = strax.stable_sort(hits, order=("record_i", "time"))
res = _find_ap(
hits_sorted,
records,
LED_hit_left_boundary,
LED_hit_right_boundary,
LED_window_width,
hit_left_extension,
hit_right_extension,
buffer=buffer,
)
return res
@numba.jit(nopython=True, nogil=True, cache=True)
def _find_ap(
hits,
records,
LED_hit_left_boundary,
LED_hit_right_boundary,
LED_window_width,
hit_left_extension,
hit_right_extension,
buffer=None,
):
# hits need to be sorted by record_i, then time!
offset = 0
is_LED = False
t_LED = None # The position of the total LED pulse.
t_LED_window = None # The position that defines the LED integration window.
prev_record_i = hits[0]["record_i"]
record_data = records[prev_record_i]["data"]
record_len = records[prev_record_i]["length"]
baseline_fpart = records[prev_record_i]["baseline"] % 1
for h_i, h in enumerate(hits):
if h["record_i"] > prev_record_i:
# start of a new record
is_LED = False
# only increment buffer if the old one is not empty! this happens
# when no (LED) hit is found in the previous record
if not buffer[offset]["time"] == 0:
offset += 1
prev_record_i = h["record_i"]
record_data = records[prev_record_i]["data"]
baseline_fpart = records[prev_record_i]["baseline"] % 1
res = buffer[offset]
if h["left"] < LED_hit_left_boundary:
# if hit is before LED hit window: discard
continue
if (h["left"] < LED_hit_right_boundary) and not is_LED:
# this is the first hit in the LED hit window
fill_hitpars(
res,
h,
hit_left_extension,
hit_right_extension,
record_data,
record_len,
baseline_fpart,
)
t_LED_window = res[
"sample_10pc_area"
] # Set it to the first hit in the LED hit boundary.
t_LED = res["sample_10pc_area"]
is_LED = True
continue
if not is_LED:
# No hit found in the LED hit window: skip to next record
continue
if h["left"] < (t_LED_window + LED_window_width):
# This hit is still inside the LED window: extend the LED hit
fill_hitpars(
res,
h,
hit_left_extension,
hit_right_extension,
record_data,
record_len,
baseline_fpart,
extend=True,
)
# set the LED time in the current WF
t_LED = res["sample_10pc_area"]
continue
# Here begins a new hit after the LED window
# if a hit is completely inside the previous hit's right_extension,
# then skip it (because it's already included in the previous hit)
if h["right"] <= res["right_integration"]:
continue
# if a hit only partly overlaps with the previous hit's right_
# extension, merge them (extend previous hit by this one)
if h["left"] <= res["right_integration"]:
fill_hitpars(
res,
h,
hit_left_extension,
hit_right_extension,
record_data,
record_len,
baseline_fpart,
extend=True,
)
res["tdelay"] = res["sample_10pc_area"] - t_LED
continue
# an actual new hit increases the buffer index
offset += 1
res = buffer[offset]
fill_hitpars(
res,
h,
hit_left_extension,
hit_right_extension,
record_data,
record_len,
baseline_fpart,
)
res["tdelay"] = res["sample_10pc_area"] - t_LED
return buffer[:offset]
[docs]@export
@numba.jit(nopython=True, nogil=True, cache=True)
def get_sample_area_quantile(data, quantile, baseline_fpart):
"""Return the index of the first sample in the hit where the integrated area of the hit to that
index is above the specified quantile of the total area.
Parameters:
- data :
Array containing the baselined waveform data of the hit.
- quantile :
The quantile (0 to 1) representing the threshold for the area.
- baseline_fpart :
Fractional part of the baseline (baseline % 1) of the record
Return:
- int: The index of the first sample where the area exceeds the quantile of the total area.
If no such sample is found, returns 0.
Notes:
- If no quantile is found where the area exceeds the threshold, it returns 0. This is
usually caused by real events in the baseline window, which can result in a negative
area.
"""
area = 0
area_tot = data.sum() + len(data) * baseline_fpart
for d_i, d in enumerate(data):
area += d + baseline_fpart
if area > (quantile * area_tot):
return d_i
if d_i == len(data) - 1:
# if no quantile was found, something went wrong
# (negative area due to wrong baseline, caused by real events that
# by coincidence fall in the first samples of the trigger window)
# print('no quantile found: set to 0')
return 0
# What happened here?!
return 0
@numba.jit(nopython=True, nogil=True, cache=True)
def fill_hitpars(
result,
hit,
hit_left_extension,
hit_right_extension,
record_data,
record_len,
baseline_fpart,
extend=False,
):
if not extend: # fill first time only
result["time"] = hit["time"] - hit_left_extension * hit["dt"]
result["dt"] = hit["dt"]
result["channel"] = hit["channel"]
result["left"] = hit["left"]
result["record_i"] = hit["record_i"]
result["threshold"] = hit["threshold"]
result["left_integration"] = hit["left"] - hit_left_extension
result["height"] = hit["height"]
# fill always (if hits are merged, only these will be updated)
result["right"] = hit["right"]
result["right_integration"] = hit["right"] + hit_right_extension
if result["right_integration"] > record_len:
result["right_integration"] = record_len # cap right_integration at end of record
result["length"] = result["right_integration"] - result["left_integration"]
hit_data = record_data[result["left_integration"] : result["right_integration"]]
result["area"] = hit_data.sum() + result["length"] * baseline_fpart
result["sample_10pc_area"] = result["left_integration"] + get_sample_area_quantile(
hit_data, 0.1, baseline_fpart
)
result["sample_50pc_area"] = result["left_integration"] + get_sample_area_quantile(
hit_data, 0.5, baseline_fpart
)
if len(hit_data):
result["max"] = result["left_integration"] + hit_data.argmax()
if extend: # only when merging hits
result["height"] = max(result["height"], hit["height"])
[docs]@export
def dtype_afterpulses():
"""The afterpulse datatype.
Return:
- The afterpulse datatype
"""
dtype_ap = strax.interval_dtype + [
(("Integral in ADC x samples", "area"), np.int32),
(("Pulse area in PE", "area_pe"), np.float32),
(("Sample index in which hit starts", "left"), np.int16),
(
(
"Sample index in which hit area succeeds 10% of total area",
"sample_10pc_area",
),
np.int16,
),
(
(
"Sample index in which hit area succeeds 50% of total area",
"sample_50pc_area",
),
np.int16,
),
(("Sample index of hit maximum", "max"), np.int16),
(
(
"Index of first sample in record just beyond hit (exclusive bound)",
"right",
),
np.int16,
),
(("Height of hit in ADC counts", "height"), np.int32),
(("Height of hit in PE", "height_pe"), np.float32),
(("Delay of hit w.r.t. LED hit in same WF, in samples", "tdelay"), np.int16),
(
(
"Internal (temporary) index of fragment in which hit was found",
"record_i",
),
np.int32,
),
(
("Index of sample in record where integration starts", "left_integration"),
np.int16,
),
(
("Index of first sample beyond integration region", "right_integration"),
np.int16,
),
(("ADC threshold applied in order to find hits", "threshold"), np.float32),
]
return dtype_ap