Source code for straxen.plugins.events_nv.events_nv

import strax
import straxen
import numpy as np
import numba
import typing as ty
from immutabledict import immutabledict

export, __all__ = strax.exporter()


[docs]@export class nVETOEvents(strax.OverlapWindowPlugin): """Plugin which computes the boundaries of veto events.""" __version__ = "0.0.3" depends_on = "hitlets_nv" provides = "events_nv" data_kind = "events_nv" compressor = "zstd" events_seen = 0 event_left_extension_nv = straxen.URLConfig( default=0, track=True, type=int, help="Extends event window this many [ns] to the left." ) event_resolving_time_nv = straxen.URLConfig( default=200, track=True, type=int, help="Resolving time for window coincidence [ns]." ) event_min_hits_nv = straxen.URLConfig( default=3, track=True, type=int, help="Minimum number of fully confined hitlets to define an event.", ) channel_map = straxen.URLConfig( track=False, type=immutabledict, help="immutabledict mapping subdetector to (min, max) channel number", )
[docs] def infer_dtype(self): self.name_event_number = "event_number_nv" self.channel_range = self.channel_map["nveto"] self.n_channel = (self.channel_range[1] - self.channel_range[0]) + 1 return veto_event_dtype(self.name_event_number, self.n_channel)
[docs] def get_window_size(self): return self.event_left_extension_nv + self.event_resolving_time_nv + 1
[docs] def compute(self, hitlets_nv, start, end): events, hitlets_ids_in_event = find_veto_events( hitlets_nv, self.event_min_hits_nv, self.event_resolving_time_nv, self.event_left_extension_nv, event_number_key=self.name_event_number, n_channel=self.n_channel, ) if len(hitlets_ids_in_event): compute_nveto_event_properties( events, hitlets_nv, hitlets_ids_in_event, start_channel=self.channel_range[0] ) # Get eventids: n_events = len(events) events[self.name_event_number] = np.arange(n_events) + self.events_seen self.events_seen += n_events # Don't extend beyond the chunk boundaries # This will often happen for events near the invalid boundary of the # overlap processing (which should be thrown away) events["time"] = np.clip(events["time"], start, end) events["endtime"] = np.clip(events["endtime"], start, end) return events
def veto_event_dtype(name_event_number: str = "event_number_nv", n_pmts: int = 120) -> list: dtype = [] dtype += strax.time_fields # because mutable dtype += [ (("Veto event number in this dataset", name_event_number), np.int64), (("Total area of all hitlets in event [pe]", "area"), np.float32), (("Total number of hitlets in events", "n_hits"), np.int32), (("Total number of contributing channels", "n_contributing_pmt"), np.uint8), (("Area in event per channel [pe]", "area_per_channel"), np.float32, n_pmts), ( ( "Area weighted mean time of the event relative to the event start [ns]", "center_time", ), np.float32, ), (("Weighted variance of time [ns]", "center_time_spread"), np.float32), ] return dtype @numba.njit(cache=True, nogil=True) def compute_nveto_event_properties( events: np.ndarray, hitlets: np.ndarray, contained_hitlets_ids: np.ndarray, start_channel: int = 2000, ): """Computes properties of the neutron-veto events. Writes results directly to events. :param events: Events for which properties should be computed :param hitlets: hitlets which were used to build the events. :param contained_hitlets_ids: numpy array of the shape n x 2 which holds the indices of the hitlets contained in the corresponding event. :param start_channel: Integer specifying start channel, e.g. 2000 for nveto. """ for e, (s_i, e_i) in zip(events, contained_hitlets_ids): hitlet = hitlets[s_i:e_i] event_area = np.sum(hitlet["area"]) e["area"] = event_area e["n_hits"] = len(hitlet) e["n_contributing_pmt"] = len(np.unique(hitlet["channel"])) t = hitlet["time"] - hitlet[0]["time"] if event_area: e["center_time"] = np.sum(t * hitlet["area"]) / event_area if e["n_hits"] > 1 and e["center_time"]: w = hitlet["area"] / e["area"] # normalized weights # Definition of variance e["center_time_spread"] = np.sqrt( np.sum(w * np.power(t - e["center_time"], 2)) / np.sum(w) ) else: e["center_time_spread"] = np.inf # Compute per channel properties: for hit in hitlet: ch = hit["channel"] - start_channel e["area_per_channel"][ch] += hit["area"]
[docs]@export def find_veto_events( hitlets: np.ndarray, coincidence_level: int, resolving_time: int, left_extension: int, event_number_key: str = "event_number_nv", n_channel: int = 120, ) -> ty.Tuple[np.ndarray, np.ndarray]: """Function which find the veto events as a nfold concidence in a given resolving time window. All hitlets which touch the event window contribute. :param hitlets: Hitlets which shall be used for event creation. :param coincidence_level: int, coincidence level. :param resolving_time: int, resolving window for coincidence in ns. :param left_extension: int, left event extension in ns. :param event_number_key: str, field name for the event number :param n_channel: int, number of channels in detector. :return: events, hitelt_ids_per_event """ # Find intervals which satisfy requirement: event_intervals = straxen.plugins.nveto_recorder.find_coincidence( hitlets, coincidence_level, resolving_time, left_extension, ) # Find all hitlets which touch the coincidence windows: # (we cannot use fully_contained in here since some muon signals # may be larger than 300 ns) hitlets_ids_in_event = strax.touching_windows(hitlets, event_intervals) # For some rare cases long signals may touch two intervals, in that # case we merge the intervals in the subsequent function: hitlets_ids_in_event = _solve_ambiguity(hitlets_ids_in_event) # Now we can create the veto events: events = np.zeros( len(hitlets_ids_in_event), dtype=veto_event_dtype(event_number_key, n_channel) ) _make_event(hitlets, hitlets_ids_in_event, events) return events, hitlets_ids_in_event
@numba.njit(cache=True, nogil=False) def _solve_ambiguity(contained_hitlets_ids: np.ndarray) -> np.ndarray: """Function which solves the ambiguity if a single hitlets overlaps with two event intervals. This can happen for muon signals which have a long tail, since we define the coincidence window as a fixed window. Hence those tails can extend beyond the fixed window. """ res = np.zeros(contained_hitlets_ids.shape, dtype=contained_hitlets_ids.dtype) if not len(res): # Return empty result return res offset = 0 start_i, end_i = contained_hitlets_ids[0] for e_i, ids in enumerate(contained_hitlets_ids[1:]): if end_i > ids[0]: # Current and next interval overlap so just updated the end # index. end_i = ids[1] else: # They do not overlap store indices: res[offset] = [start_i, end_i] offset += 1 # Init next interval: start_i, end_i = ids # Last event: res[offset, :] = [start_i, end_i] offset += 1 return res[:offset] @numba.njit(cache=True, nogil=True) def _make_event(hitlets: np.ndarray, hitlet_ids: np.ndarray, res: np.ndarray): """Function which sets veto event time and endtime.""" for ei, ids in enumerate(hitlet_ids): hit = hitlets[ids[0] : ids[1]] res[ei]["time"] = hit[0]["time"] res[ei]["endtime"] = np.max(strax.endtime(hit))