import numpy as np
import numba
import strax
import straxen
from .peak_ambience import _quick_assign
from ..events import Events
export, __all__ = strax.exporter()
[docs]@export
class PeakNearestTriggering(Events):
"""Time difference and properties of the nearest triggering peaks in the left and right
direction of peaks."""
__version__ = "0.0.0"
depends_on = ("peak_basics", "peak_proximity")
provides = "peak_nearest_triggering"
data_kind = "peaks"
save_when = strax.SaveWhen.EXPLICIT
shadow_time_window_backward = straxen.URLConfig(
default=int(1e9),
type=int,
track=True,
help="Search for peaks casting time & position shadow in this time window [ns]",
)
only_trigger_min_area = straxen.URLConfig(
default=False,
type=bool,
track=True,
help="Whether only require the triggering peak to have area larger than trigger_min_area",
)
[docs] def infer_dtype(self):
dtype = []
common_descr = "of the nearest triggering peak on the"
for direction in ["left", "right"]:
dtype += [
(
(f"time difference {common_descr} {direction} [ns]", f"{direction}_dtime"),
np.int64,
),
((f"time {common_descr} {direction} [ns]", f"{direction}_time"), np.int64),
((f"endtime {common_descr} {direction} [ns]", f"{direction}_endtime"), np.int64),
(
(f"center_time {common_descr} {direction} [ns]", f"{direction}_center_time"),
np.int64,
),
((f"type {common_descr} {direction}", f"{direction}_type"), np.int8),
(
(f"proximity_score {common_descr} {direction}", f"{direction}_proximity_score"),
np.float32,
),
(
(
f"n_competing_left {common_descr} {direction}",
f"{direction}_n_competing_left",
),
np.int32,
),
((f"n_competing {common_descr} {direction}", f"{direction}_n_competing"), np.int32),
((f"area {common_descr} {direction} [PE]", f"{direction}_area"), np.float32),
]
dtype += strax.time_fields
return dtype
[docs] def get_window_size(self):
# This method is required by the OverlapWindowPlugin class
return 10 * self.shadow_time_window_backward
[docs] def compute(self, peaks):
argsort = strax.stable_argsort(peaks["center_time"])
_peaks = peaks[argsort].copy()
result = np.zeros(len(peaks), self.dtype)
_quick_assign(argsort, result, self.compute_triggering(peaks, _peaks))
return result
[docs] def compute_triggering(self, peaks, current_peak):
# sort peaks by center_time,
# because later we will use center_time to find the nearest peak
_peaks = strax.stable_sort(peaks, order="center_time")
# only looking at triggering peaks
if self.only_trigger_min_area:
_is_triggering = _peaks["area"] > self.trigger_min_area
else:
_is_triggering = self._is_triggering(_peaks)
_peaks = _peaks[_is_triggering]
# init result
result = np.zeros(len(current_peak), self.dtype)
strax.set_nan_defaults(result)
# use center_time as the anchor of things
things = np.zeros(len(_peaks), dtype=strax.time_fields)
things["time"] = _peaks["center_time"]
things["endtime"] = _peaks["center_time"]
# also se center_time as the anchor of containers
containers = np.zeros(len(current_peak), dtype=strax.time_fields)
containers["time"] = current_peak["center_time"] - self.shadow_time_window_backward
containers["endtime"] = current_peak["center_time"] + self.shadow_time_window_backward
# find indices of the nearest peaks in left and right direction
split_peaks = strax.touching_windows(things, containers)
left_indices, right_indices = self.nearest_indices(
current_peak["center_time"],
_peaks["center_time"],
split_peaks,
)
# assign fields
result["left_dtime"] = np.where(
left_indices != -1,
current_peak["center_time"] - _peaks["center_time"][left_indices],
self.shadow_time_window_backward,
)
result["right_dtime"] = np.where(
right_indices != -1,
_peaks["center_time"][right_indices] - current_peak["center_time"],
self.shadow_time_window_backward,
)
for field in [
"time",
"endtime",
"center_time",
"type",
"proximity_score",
"n_competing_left",
"n_competing",
"area",
]:
result["left_" + field] = np.where(
left_indices != -1, _peaks[field][left_indices], result["left_" + field]
)
result["right_" + field] = np.where(
right_indices != -1, _peaks[field][right_indices], result["right_" + field]
)
result["time"] = current_peak["time"]
result["endtime"] = strax.endtime(current_peak)
for direction in ["left", "right"]:
assert np.all(result[f"{direction}_dtime"] > 0), f"{direction}_dtime should be positive"
return result
[docs] @staticmethod
@numba.njit
def nearest_indices(reference_time, nearest_time, touching_windows):
"""Find the nearest indices in the left and right direction."""
left_indices = np.full(len(reference_time), -1, dtype=np.int64)
right_indices = np.full(len(reference_time), -1, dtype=np.int64)
for r_i, r in enumerate(reference_time):
indices = touching_windows[r_i]
if indices[0] == indices[1]:
continue
center_time = nearest_time[indices[0] : indices[1]]
left_dtime = r - center_time[0]
right_dtime = center_time[-1] - r
# make sure that the dtime is positive
for p_i, t in enumerate(center_time):
if t < r and r - t <= left_dtime:
left_dtime = r - t
left_indices[r_i] = indices[0] + p_i
elif t > r and t - r <= right_dtime:
right_dtime = t - r
right_indices[r_i] = indices[0] + p_i
return left_indices, right_indices