import numpy as np
import straxen
import strax
import numba
export, __all__ = strax.exporter()
[docs]@export
class VetoProximity(strax.OverlapWindowPlugin):
"""Find the closest next/previous veto start w.r.t.
the event time or when a busy happens during an event.
"""
__version__ = "2.2.0"
# Strictly speaking, we could depend on 'events', but then you couldn't
# change the event_window_fields to e.g. s1_time and s2_endtime.
depends_on = ("event_basics", "veto_intervals")
provides = "veto_proximity"
data_kind = "events"
event_window_fields = straxen.URLConfig(
default=("time", "endtime"),
help=(
"Fields to determine where to look for overlaps for using "
"this plugin in the events. The default uses start and endtime "
"of an event, but this can also be the S1 or S2 start/endtime"
),
)
veto_proximity_window = straxen.URLConfig(
default=int(300e9), help="Maximum separation between veto stop and start pulses [ns]"
)
time_no_aqmon_veto_found = straxen.URLConfig(
default=int(3.6e12),
track=True,
type=int,
help=(
"If no next/previous veto is found, we will fill the fields "
"time_to_previous_XX with this time. Set to a large number "
"such that one will never cut events that are < YY ns."
),
)
veto_names = ["busy", "busy_he", "hev", "straxen_deadtime"]
[docs] def infer_dtype(self):
dtype = []
dtype += strax.time_fields
start_field, stop_field = self.event_window_fields
for name in self.veto_names:
dtype += [
(
(
f'Duration of event overlapping with "{name}"-veto [ns]',
f"veto_{name}_overlap",
),
np.int64,
),
(
(
(
f'Time (absolute value) to previous "{name}"-veto '
f'from "{start_field}" of event [ns]'
),
f"time_to_previous_{name}",
),
np.int64,
),
(
(
(
f'Time (absolute value) to next "{name}"-veto '
f'from "{stop_field}" of event [ns]'
),
f"time_to_next_{name}",
),
np.int64,
),
]
return dtype
[docs] def get_window_size(self):
return self.veto_proximity_window
[docs] def set_result_for_veto(
self,
result_buffer: np.ndarray,
event_window: np.ndarray,
veto_intervals: np.ndarray,
veto_name: str,
) -> None:
"""Fill the result buffer inplace. Goal is to find vetos with <veto_name> that are either
during, before or after the current event_window.
:param result_buffer: The buffer to fill inplace
:param event_window: start/stop boundaries of the event to consider. Should be an array with
['time'] and ['endtime'] which can be based on event start/end times or S1/S2 times
:param veto_intervals: veto intervals datatype
:param veto_name: The name of the veto to fill the result buffer for
:return: Nothing, results are filled in place
"""
# Set defaults to be some very long time
result_buffer[f"time_to_previous_{veto_name}"] = self.time_no_aqmon_veto_found
result_buffer[f"time_to_next_{veto_name}"] = self.time_no_aqmon_veto_found
selected_intervals = veto_intervals[veto_intervals["veto_type"] == f"{veto_name}_veto"]
if not len(selected_intervals):
return
vetos_during_event = strax.touching_windows(selected_intervals, event_window)
# Figure out the vetos *during* an event
res = self.get_overlapping_window_time(
vetos_during_event, selected_intervals, event_window, result_buffer
)
result_buffer[f"veto_{veto_name}_overlap"] = res
# Find the next and previous veto's
times_to_prev, times_to_next = strax.abs_time_to_prev_next_interval(
event_window, selected_intervals
)
mask_prev = times_to_prev > 0
result_buffer[f"time_to_previous_{veto_name}"][mask_prev] = times_to_prev[mask_prev]
max_next = times_to_next > 0
result_buffer[f"time_to_next_{veto_name}"][max_next] = times_to_next[max_next]
[docs] @staticmethod
@numba.njit
def get_overlapping_window_time(
vetos_during_event, selected_intervals, event_window, result_buffer
):
"""Computes total time each event overlaps with the corresponding veto."""
res = np.zeros(len(vetos_during_event), np.int64)
for event_i, veto_window in enumerate(vetos_during_event):
if veto_window[1] - veto_window[0]:
vetos_in_window = selected_intervals[veto_window[0] : veto_window[1]].copy()
starts = np.clip(
vetos_in_window["time"],
event_window[event_i]["time"],
event_window[event_i]["endtime"],
)
stops = np.clip(
vetos_in_window["endtime"],
event_window[event_i]["time"],
event_window[event_i]["endtime"],
)
res[event_i] = np.sum(stops - starts)
return res
[docs] def compute(self, events, veto_intervals):
result = np.zeros(len(events), self.dtype)
result["time"] = events["time"]
result["endtime"] = events["endtime"]
# Get containers for touching windows based on self.event_window_fields
event_window = np.zeros(len(events), dtype=strax.time_fields)
event_window["time"] = events[self.event_window_fields[0]]
event_window["endtime"] = events[self.event_window_fields[1]]
for veto_name in self.veto_names:
self.set_result_for_veto(result, event_window, veto_intervals, veto_name)
return result