Source code for straxen.plugins.events_nv.event_positions_nv

import numpy as np
import straxen
import numba
import typing as ty

import strax

export, __all__ = strax.exporter()


[docs]@export class nVETOEventPositions(strax.Plugin): """Plugin which computes the interaction position in the nveto as an azimuthal angle.""" __version__ = "0.1.1" depends_on = ("events_nv", "hitlets_nv") data_kind = "events_nv" provides = "event_positions_nv" compressor = "zstd" position_max_time_nv = straxen.URLConfig( default=20, infer_type=False, help="Time [ns] within an event use to compute the azimuthal angle of the event.", ) nveto_pmt_position_map = straxen.URLConfig( help="nVeto PMT position mapfile", default="resource://nveto_pmt_position.csv?fmt=csv", infer_type=False, )
[docs] def infer_dtype(self): return veto_event_positions_dtype()
[docs] def setup(self): npmt_pos = self.nveto_pmt_position_map # Use records instead of a dataframe. self.pmt_properties = npmt_pos.to_records(index=False)
[docs] def compute(self, events_nv, hitlets_nv): event_angles = np.zeros(len(events_nv), dtype=self.dtype) # Split hitlets by containment, works since we updated event start/end in # compute_event_properties. hits_in_events = strax.split_by_containment(hitlets_nv, events_nv) # Compute hitlets within the first x ns of event: hits_in_events, n_prompt = first_hitlets(hits_in_events, self.position_max_time_nv) event_angles["n_prompt_hitlets"] = n_prompt # Compute azimuthal angle and xyz positions: angle = get_average_angle(hits_in_events, self.pmt_properties) event_angles["angle"] = angle compute_positions(event_angles, hits_in_events, self.pmt_properties) strax.copy_to_buffer(events_nv, event_angles, f"_copy_events_nv") return event_angles
def veto_event_positions_dtype() -> list: dtype = [] dtype += strax.time_fields dtype += [ ( ( 'Number of prompt hitlets within the first "position_max_time_nv" ns of the event.', "n_prompt_hitlets", ), np.int16, ), ( ("Azimuthal angle, where the neutron capture was detected in [0, 2 pi).", "angle"), np.float32, ), (("Area weighted mean of position in x [mm]", "pos_x"), np.float32), (("Area weighted mean of position in y [mm]", "pos_y"), np.float32), (("Area weighted mean of position in z [mm]", "pos_z"), np.float32), (("Weighted variance of position in x [mm]", "pos_x_spread"), np.float32), (("Weighted variance of position in y [mm]", "pos_y_spread"), np.float32), (("Weighted variance of position in z [mm]", "pos_z_spread"), np.float32), ] return dtype @numba.njit(cache=True, nogil=True) def compute_positions( event_angles: np.ndarray, contained_hitlets: numba.typed.typedlist.List, pmt_pos: np.ndarray, start_channel: int = 2000, ): """Function which computes some artificial event position for a given neutron/muon-veto event. The position is computed based on a simple area weighted mean. Please note that the event position can be reconstructed in unphysical regions like being within the TPC. :param event_angles: Result array of the veto_event_position dtype. The result is updated inplace. :param contained_hitlets: Hitlets contained in each event. :param pmt_pos: Position of the veto PMTs :param start_channel: Starting channel of the detector. """ for e_angles, hitlets in zip(event_angles, contained_hitlets): prompt_event_area = np.sum(hitlets["area"]) if prompt_event_area: ch = hitlets["channel"] - start_channel pos_x = pmt_pos["x"][ch] pos_y = pmt_pos["y"][ch] pos_z = pmt_pos["z"][ch] e_angles["pos_x"] = np.sum(pos_x * hitlets["area"]) / prompt_event_area e_angles["pos_y"] = np.sum(pos_y * hitlets["area"]) / prompt_event_area e_angles["pos_z"] = np.sum(pos_z * hitlets["area"]) / prompt_event_area w = hitlets["area"] / prompt_event_area # normalized weights if len(hitlets) and np.sum(w) > 0: e_angles["pos_x_spread"] = np.sqrt( np.sum(w * (pos_x - e_angles["pos_x"]) ** 2) / np.sum(w) ) e_angles["pos_y_spread"] = np.sqrt( np.sum(w * (pos_y - e_angles["pos_y"]) ** 2) / np.sum(w) ) e_angles["pos_z_spread"] = np.sqrt( np.sum(w * (pos_z - e_angles["pos_z"]) ** 2) / np.sum(w) ) @numba.njit(cache=True, nogil=True) def get_average_angle( hitlets_in_event: numba.typed.typedlist.List, pmt_properties: np.ndarray, start_channel: int = 2000, ) -> np.ndarray: """Computes azimuthal angle as an area weighted mean over all hitlets. :param hitlets_in_event: numba.typed.List containing the hitlets per event. :param pmt_properties: numpy.sturctured.array containing the PMT positions in the fields "x" and "y". :param start_channel: First channel e.g. 2000 for nevto. :return: np.array holding the azimuthal angles. """ res = np.zeros(len(hitlets_in_event), np.float32) for ind, hitlets in enumerate(hitlets_in_event): if np.sum(hitlets["area"]): x = pmt_properties["x"][hitlets["channel"] - start_channel] y = pmt_properties["y"][hitlets["channel"] - start_channel] weighted_mean_x = np.sum(x * hitlets["area"]) / np.sum(hitlets["area"]) weighted_mean_y = np.sum(y * hitlets["area"]) / np.sum(hitlets["area"]) res[ind] = _circ_angle(weighted_mean_x, weighted_mean_y) else: res[ind] = np.nan return res @numba.njit(cache=True, nogil=True) def circ_angle(x_values: np.ndarray, y_values: np.ndarray) -> np.ndarray: """Loops over a set of x and y values and computes azimuthal angle. :param x_values: x-coordinates :param y_values: y-coordinates :return: angles """ res = np.zeros(len(x_values), dtype=np.float32) for ind, (x, y) in enumerate(zip(x_values, y_values)): res[ind] = _circ_angle(x, y) return res @numba.njit(cache=True, nogil=True) def _circ_angle(x: float, y: float) -> float: if x > 0 and y >= 0: # 1st quadrant angle = np.abs(np.arctan(y / x)) return angle elif x <= 0 and y > 0: # 2nd quadrant angle = np.abs(np.arctan(x / y)) return np.pi / 2 + angle elif x < 0 and y <= 0: # 3rd quadrant angle = np.abs(np.arctan(y / x)) return np.pi + angle elif y < 0 and x >= 0: # 4th quadrant angle = np.abs(np.arctan(x / y)) return 3 / 2 * np.pi + angle elif x == 0 and y == 0: return np.NaN else: print(x, y) raise ValueError("It should be impossible to arrive here, but somehow we managed.") @numba.njit(cache=True, nogil=True) def first_hitlets( hitlets_per_event: np.ndarray, max_time: int ) -> ty.Tuple[numba.typed.List, np.ndarray]: """Returns hitlets within the first "max_time" ns of an event. :param hitlets_per_event: numba.typed.List of hitlets per event. :param max_time: int max allowed time difference to leading hitlet in ns. """ res_hitlets_in_event = numba.typed.List() res_n_prompt = np.zeros(len(hitlets_per_event), np.int16) for ind, hitlets in enumerate(hitlets_per_event): m = (hitlets["time"] - hitlets[0]["time"]) < max_time h = hitlets[m] res_hitlets_in_event.append(h) res_n_prompt[ind] = len(h) return res_hitlets_in_event, res_n_prompt