Source code for straxen.plugins.events.event_positions

import numpy as np
import straxen
from straxen.plugins.defaults import DEFAULT_POSREC_ALGO

import strax

export, __all__ = strax.exporter()


[docs]@export class EventPositions(strax.Plugin): """Computes the observed and corrected position for the main S1/S2 pairs in an event. For XENONnT data, it returns the FDC corrected positions of the default_reconstruction_algorithm. In case the fdc_map is given as a file (not through CMT), then the coordinate system should be given as (x, y, z), not (x, y, drift_time). """ depends_on = "event_basics" __version__ = "0.3.0" default_reconstruction_algorithm = straxen.URLConfig( default=DEFAULT_POSREC_ALGO, help="default reconstruction algorithm that provides (x,y)" ) electron_drift_velocity = straxen.URLConfig( default="cmt://electron_drift_velocity?version=ONLINE&run_id=plugin.run_id", cache=True, help="Vertical electron drift velocity in cm/ns (1e4 m/ms)", ) electron_drift_time_gate = straxen.URLConfig( default="cmt://electron_drift_time_gate?version=ONLINE&run_id=plugin.run_id", help="Electron drift time from the gate in ns", cache=True, ) fdc_map = straxen.URLConfig( infer_type=False, help="3D field distortion correction map path", default="legacy-fdc://xenon1t_sr0_sr1?run_id=plugin.run_id", ) z_bias_map = straxen.URLConfig( infer_type=False, help="Map of Z bias due to non uniform drift velocity/field", default="legacy-z_bias://0", )
[docs] def infer_dtype(self): dtype = [] for j in "x y r".split(): comment = f"Main interaction {j}-position, field-distortion corrected (cm)" dtype += [(j, np.float32, comment)] for s_i in [1, 2]: comment = ( f"Alternative S{s_i} interaction (rel. main S{3 - s_i}) {j}-position," " field-distortion corrected (cm)" ) field = f"alt_s{s_i}_{j}_fdc" dtype += [(field, np.float32, comment)] for j in ["z"]: comment = "Interaction z-position, corrected to non-uniform drift velocity (cm)" dtype += [(j, np.float32, comment)] comment = ( "Interaction z-position, corrected to non-uniform drift velocity, duplicated (cm)" ) dtype += [(j + "_dv_corr", np.float32, comment)] for s_i in [1, 2]: comment = ( f"Alternative S{s_i} z-position (rel. main S{3 - s_i}), corrected to" " non-uniform drift velocity (cm)" ) field = f"alt_s{s_i}_z" dtype += [(field, np.float32, comment)] # values for corrected Z position comment = ( f"Alternative S{s_i} z-position (rel. main S{3 - s_i}), corrected to" " non-uniform drift velocity, duplicated (cm)" ) field = f"alt_s{s_i}_z_dv_corr" dtype += [(field, np.float32, comment)] naive_pos = [] fdc_pos = [] for j in "r z".split(): naive_pos += [ ( f"{j}_naive", np.float32, f"Main interaction {j}-position with observed position (cm)", ) ] fdc_pos += [ ( f"{j}_field_distortion_correction", np.float32, f"Correction added to {j}_naive for field distortion (cm)", ) ] for s_i in [1, 2]: naive_pos += [ ( f"alt_s{s_i}_{j}_naive", np.float32, ( f"Alternative S{s_i} interaction (rel. main S{3 - s_i}) {j}-position" " with observed position (cm)" ), ) ] fdc_pos += [ ( f"alt_s{s_i}_{j}_field_distortion_correction", np.float32, f"Correction added to alt_s{s_i}_{j}_naive for field distortion (cm)", ) ] dtype += naive_pos + fdc_pos for s_i in [1, 2]: dtype += [ ( f"alt_s{s_i}_theta", np.float32, f"Alternative S{s_i} (rel. main S{3 - s_i}) " "interaction angular position (radians)", ) ] dtype += [("theta", np.float32, f"Main interaction angular position (radians)")] return dtype + strax.time_fields
[docs] def setup(self): self.coordinate_scales = [1.0, 1.0, -self.electron_drift_velocity] self.map = self.fdc_map
[docs] def compute(self, events): result = {"time": events["time"], "endtime": strax.endtime(events)} # s_i == 0 indicates the main event, # while s_i != 0 means alternative S1 or S2 is used based on s_i value # while the other peak is the main one # (e.g., s_i == 1 means that the event is defined using altS1 and main S2) for s_i in [0, 1, 2]: # alt_sx_interaction_drift_time is calculated between main Sy and alternative Sx drift_time = ( events["drift_time"] if not s_i else events[f"alt_s{s_i}_interaction_drift_time"] ) z_obs = -self.electron_drift_velocity * drift_time xy_pos = "s2_" if s_i != 2 else "alt_s2_" orig_pos = np.vstack([events[f"{xy_pos}x"], events[f"{xy_pos}y"], z_obs]).T r_obs = np.linalg.norm(orig_pos[:, :2], axis=1) z_obs = z_obs + self.electron_drift_velocity * self.electron_drift_time_gate # apply z bias correction z_dv_delta = self.z_bias_map(np.array([r_obs, z_obs]).T, map_name="z_bias_map") corr_pos = np.vstack([events[f"{xy_pos}x"], events[f"{xy_pos}y"], z_obs - z_dv_delta]).T # apply r bias correction delta_r = self.map(corr_pos) with np.errstate(invalid="ignore", divide="ignore"): r_cor = r_obs + delta_r scale = np.divide(r_cor, r_obs, out=np.zeros_like(r_cor), where=r_obs != 0) pre_field = "" if s_i == 0 else f"alt_s{s_i}_" post_field = "" if s_i == 0 else "_fdc" result.update( { f"{pre_field}x{post_field}": orig_pos[:, 0] * scale, f"{pre_field}y{post_field}": orig_pos[:, 1] * scale, f"{pre_field}r{post_field}": r_cor, f"{pre_field}r_naive": r_obs, f"{pre_field}r_field_distortion_correction": delta_r, f"{pre_field}theta": np.arctan2(orig_pos[:, 1], orig_pos[:, 0]), f"{pre_field}z_naive": z_obs, # using z_dv_corr (z_obs - z_dv_delta) in agreement with the dtype description # the FDC for z (z_cor) is found to be not reliable (see #527) f"{pre_field}z": z_obs - z_dv_delta, f"{pre_field}z_dv_corr": z_obs - z_dv_delta, } ) return result