import typing
import numpy as np
import strax
import straxen
from straxen.plugins.aqmon_hits.aqmon_hits import AqmonChannels
export, __all__ = strax.exporter()
# ### Veto hardware ###:
# V1495 busy veto module:
# Generates a 25 ns NIM pulse whenever a veto begins and a 25 ns NIM signal when it ends.
# A new start signal can occur only after the previous busy instance ended.
# 1ms (1e6 ns) - minimum busy veto length, or until the board clears its memory
# DDC10 High Energy Veto:
# 10ms (1e7 ns) - fixed HE veto length in XENON1T DDC10,
# in XENONnT it will be calibrated based on the length of large S2 SE tails
# The start/stop signals for the HEV are generated by the V1495 board
[docs]@export
class VetoIntervals(strax.OverlapWindowPlugin):
"""Find pairs of veto start and veto stop signals and the veto.
duration between them:
- busy_* <= V1495 busy veto for tpc channels
- busy_he_* <= V1495 busy veto for high energy tpc channels
- hev_* <= DDC10 hardware high energy veto
- straxen_deadtime <= special case of deadtime introduced by the
DAQReader-plugin
"""
__version__ = "1.1.1"
depends_on = "aqmon_hits"
provides = "veto_intervals"
data_kind = "veto_intervals"
# This option is just showing where the OverlapWindowPlugin fails.
# We need to buffer the entire run in order not to run into chunking
# issues. A better solution would be using
# github.com/AxFoundation/strax/pull/654
max_veto_window = straxen.URLConfig(
default=int(7.2e12),
track=True,
type=int,
help=(
"Maximum separation between veto stop and start pulses [ns]. "
"Set to be >> than the max duration of the run to be able to "
"fully store one run into buffer since aqmon-hits are not "
"sorted by endtime"
),
)
[docs] def infer_dtype(self):
dtype = [
(("veto interval [ns]", "veto_interval"), np.int64),
(("veto signal type", "veto_type"), np.str_("U30")),
]
dtype += strax.time_fields
return dtype
[docs] def setup(self):
self.veto_names = ["busy_", "busy_he_", "hev_"]
self.channel_map = {aq_ch.name.lower(): int(aq_ch) for aq_ch in AqmonChannels}
[docs] def get_window_size(self):
# Give a very wide window
return self.max_veto_window
[docs] def compute(self, aqmon_hits, start, end):
# Allocate a nice big buffer and throw away the part we don't need later
result = np.zeros(len(aqmon_hits) * len(self.veto_names), self.dtype)
vetos_seen = 0
for veto_name in self.veto_names:
veto_hits_start = channel_select(aqmon_hits, self.channel_map[veto_name + "start"])
veto_hits_stop = channel_select(aqmon_hits, self.channel_map[veto_name + "stop"])
veto_hits_start, veto_hits_stop = self.handle_starts_and_stops_outside_of_run(
veto_hits_start=veto_hits_start,
veto_hits_stop=veto_hits_stop,
chunk_start=start,
chunk_end=end,
veto_name=veto_name,
)
n_vetos = len(veto_hits_start)
result["time"][vetos_seen : vetos_seen + n_vetos] = veto_hits_start["time"]
result["endtime"][vetos_seen : vetos_seen + n_vetos] = veto_hits_stop["time"]
result["veto_type"][vetos_seen : vetos_seen + n_vetos] = veto_name + "veto"
vetos_seen += n_vetos
# Straxen deadtime is special, it's a start and stop with no data
# but already an interval so easily used here
artificial_deadtime = aqmon_hits[
(aqmon_hits["channel"] == AqmonChannels.ARTIFICIAL_DEADTIME)
]
n_artificial = len(artificial_deadtime)
if n_artificial:
result[vetos_seen:n_artificial]["time"] = artificial_deadtime["time"]
result[vetos_seen:n_artificial]["endtime"] = strax.endtime(artificial_deadtime)
result[vetos_seen:n_artificial]["veto_type"] = "straxen_deadtime_veto"
vetos_seen += n_artificial
result = result[:vetos_seen]
result["veto_interval"] = result["endtime"] - result["time"]
sort = np.argsort(result["time"])
result = result[sort]
return result
[docs] def handle_starts_and_stops_outside_of_run(
self,
veto_hits_start: np.ndarray,
veto_hits_stop: np.ndarray,
chunk_start: int,
chunk_end: int,
veto_name: str,
) -> typing.Tuple[np.ndarray, np.ndarray]:
"""We might be missing one start or one stop at the end of the run, set it to the chunk
endtime if this is the case."""
# Just for traceback info that we declare this here
extra_start = []
extra_stop = []
missing_a_final_stop = (
len(veto_hits_start)
and len(veto_hits_stop)
and veto_hits_start[-1]["time"] > veto_hits_stop["time"][-1]
)
missing_a_final_stop = missing_a_final_stop or (
len(veto_hits_start) and not len(veto_hits_stop)
)
if missing_a_final_stop:
# There is one *start* of the //end// of the run -> the
# **stop** is missing (because it's outside of the run),
# let's add one **stop** at the //end// of this chunk
extra_stop = self.fake_hit(chunk_end)
veto_hits_stop = np.concatenate([veto_hits_stop, extra_stop])
if len(veto_hits_stop) - len(veto_hits_start) == 1:
# There is one *stop* of the //beginning// of the run
# -> the **start** is missing (because it's from before
# starting the run), # let's add one **start** at the
# //beginning// of this chunk
extra_start = self.fake_hit(chunk_start)
veto_hits_start = np.concatenate([extra_start, veto_hits_start])
something_is_wrong = len(veto_hits_start) != len(veto_hits_stop)
message = (
f"Got inconsistent number of {veto_name} starts "
f"{len(veto_hits_start)}) / stops ({len(veto_hits_stop)})."
)
if len(extra_start):
message += " Despite the fact that we inserted one extra start at the beginning of the run." # noqa
elif len(extra_stop):
message += (
" Despite the fact that we inserted one extra stop at the end of the run." # noqa
)
if something_is_wrong:
raise ValueError(message)
if np.any(veto_hits_start["time"] > veto_hits_stop["time"]):
raise ValueError("Found veto's starting before the previous stopped")
return veto_hits_start, veto_hits_stop
[docs] @staticmethod
def fake_hit(start, dt=1, length=1):
hit = np.zeros(1, strax.hit_dtype)
hit["time"] = start
hit["dt"] = dt
hit["length"] = length
return hit
# Don't use @numba since numba doesn't like masking arrays, use numpy
def channel_select(rr, ch):
"""Return data from start/stop veto channel in the acquisition monitor (AM)"""
return rr[rr["channel"] == ch]