Source code for straxen.plugins.events.corrected_areas

from typing import Tuple

import numpy as np
import strax
import straxen
from straxen.common import rotate_perp_wires
from straxen.plugins.defaults import DEFAULT_POSREC_ALGO

export, __all__ = strax.exporter()


[docs]@export class CorrectedAreas(strax.Plugin): """Plugin which applies light collection efficiency maps and electron life time to the data. Computes the cS1/cS2 for the main/alternative S1/S2 as well as the corrected life time. Note: Please be aware that for both, the main and alternative S1, the area is corrected according to the xy-position of the main S2. There are now 3 components of cS2s: cs2_top, cS2_bottom and cs2. cs2_top and cs2_bottom are corrected by the corresponding maps, and cs2 is the sum of the two. """ __version__ = "0.5.3" depends_on: Tuple[str, ...] = ("event_basics", "event_positions") # Descriptor configs elife = straxen.URLConfig( default="cmt://elife?version=ONLINE&run_id=plugin.run_id", help="electron lifetime in [ns]" ) default_reconstruction_algorithm = straxen.URLConfig( default=DEFAULT_POSREC_ALGO, help="default reconstruction algorithm that provides (x,y)" ) s1_xyz_map = straxen.URLConfig( default=( "itp_map://resource://cmt://format://" "s1_xyz_map_{algo}?version=ONLINE&run_id=plugin.run_id" "&fmt=json&algo=plugin.default_reconstruction_algorithm" ), cache=True, ) s2_xy_map = straxen.URLConfig( default=( "itp_map://resource://cmt://format://" "s2_xy_map_{algo}?version=ONLINE&run_id=plugin.run_id" "&fmt=json&algo=plugin.default_reconstruction_algorithm" ), cache=True, ) # average SE gain for a given time period. default to the value of this run in ONLINE model # thus, by default, there will be no time-dependent correction according to se gain avg_se_gain = straxen.URLConfig( default="cmt://avg_se_gain?version=ONLINE&run_id=plugin.run_id", help=( "Nominal single electron (SE) gain in PE / electron extracted. " "Data will be corrected to this value" ), ) # se gain for this run, allowing for using CMT. default to online se_gain = straxen.URLConfig( default="cmt://se_gain?version=ONLINE&run_id=plugin.run_id", help="Actual SE gain for a given run (allows for time dependence)", ) # relative extraction efficiency which can change with time and modeled by CMT. rel_extraction_eff = straxen.URLConfig( default="cmt://rel_extraction_eff?version=ONLINE&run_id=plugin.run_id", help="Relative extraction efficiency for this run (allows for time dependence)", ) # relative light yield # defaults to no correction rel_light_yield = straxen.URLConfig( default="cmt://relative_light_yield?version=ONLINE&run_id=plugin.run_id", help="Relative light yield (allows for time dependence)", ) # Single electron gain partition # AB and CD partitons distiguished based on # linear and circular regions # SR0 values set as default # https://xe1t-wiki.lngs.infn.it/doku.php?id=jlong:sr0_2_region_se_correction # https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:noahhood:corrections:se_gain_ee_final single_electron_gain_partition = straxen.URLConfig( default={"linear": 28, "circular": 60}, help=( "Two distinct patterns of evolution of single electron corrections between AB and CD. " "Distinguish thanks to linear and circular regions" ), ) # cS2 AFT correction due to photon ionization # https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:zihao:sr1_s2aft_photonionization_correction cs2_bottom_top_ratio_correction = straxen.URLConfig( default=1, help="Scaling factor for cS2 AFT correction due to photon ionization" )
[docs] def infer_dtype(self): dtype = [] dtype += strax.time_fields for peak_type, peak_name in zip(["", "alt_"], ["main", "alternate"]): # Only apply dtype += [ (f"{peak_type}cs1", np.float32, f"Corrected area of {peak_name} S1 [PE]"), ( f"{peak_type}cs1_wo_timecorr", np.float32, f"Corrected area of {peak_name} S1 (before LY correction) [PE]", ), ] names = ["_wo_timecorr", "_wo_picorr", "_wo_elifecorr", ""] descriptions = ["S2 xy", "SEG/EE", "photon ionization", "elife"] for i, name in enumerate(names): if i == len(names) - 1: description = "" elif i == 0: # special treatment for wo_timecorr, apply elife correction description = " (before " + " + ".join(descriptions[i + 1 : -1]) description += ( ", after " + " + ".join(descriptions[: i + 1] + descriptions[-1:]) + ")" ) else: description = " (before " + " + ".join(descriptions[i + 1 :]) description += ", after " + " + ".join(descriptions[: i + 1]) + ")" dtype += [ ( f"{peak_type}cs2{name}", np.float32, f"Corrected area of {peak_name} S2{description} [PE]", ), ( f"{peak_type}cs2_area_fraction_top{name}", np.float32, ( "Fraction of area seen by the top PMT array for corrected " f"{peak_name} S2{description}" ), ), ] return dtype
[docs] def ab_region(self, x, y): new_x, new_y = rotate_perp_wires(x, y) cond = new_x < self.single_electron_gain_partition["linear"] cond &= new_x > -self.single_electron_gain_partition["linear"] cond &= new_x**2 + new_y**2 < self.single_electron_gain_partition["circular"] ** 2 return cond
[docs] def cd_region(self, x, y): return ~self.ab_region(x, y)
[docs] def s2_map_names(self): # S2 top and bottom are corrected separately, and cS2 total is the sum of the two # figure out the map name if len(self.s2_xy_map.map_names) > 1: s2_top_map_name = "map_top" s2_bottom_map_name = "map_bottom" else: s2_top_map_name = "map" s2_bottom_map_name = "map" return s2_top_map_name, s2_bottom_map_name
[docs] def seg_ee_correction_preparation(self): """Get single electron gain and extraction efficiency options.""" self.regions = {"ab": self.ab_region, "cd": self.cd_region} # setup SEG and EE corrections # if they are dicts, we just leave them as is # if they are not, we assume they are floats and # create a dict with the same correction in each region if isinstance(self.se_gain, dict): seg = self.se_gain else: seg = {key: self.se_gain for key in self.regions} if isinstance(self.avg_se_gain, dict): avg_seg = self.avg_se_gain else: avg_seg = {key: self.avg_se_gain for key in self.regions} if isinstance(self.rel_extraction_eff, dict): ee = self.rel_extraction_eff else: ee = {key: self.rel_extraction_eff for key in self.regions} return seg, avg_seg, ee
[docs] def compute(self, events): result = np.zeros(len(events), self.dtype) result["time"] = events["time"] result["endtime"] = events["endtime"] # S1 corrections depend on the actual corrected event position. # We use this also for the alternate S1; for e.g. Kr this is # fine as the S1 correction varies slowly. event_positions = np.vstack([events["x"], events["y"], events["z"]]).T for peak_type in ["", "alt_"]: result[f"{peak_type}cs1_wo_timecorr"] = events[f"{peak_type}s1_area"] / self.s1_xyz_map( event_positions ) result[f"{peak_type}cs1"] = result[f"{peak_type}cs1_wo_timecorr"] / self.rel_light_yield # S2 corrections s2_top_map_name, s2_bottom_map_name = self.s2_map_names() seg, avg_seg, ee = self.seg_ee_correction_preparation() # now can start doing corrections for peak_type in ["", "alt_"]: # S2(x,y) corrections use the observed S2 positions s2_positions = np.vstack([events[f"{peak_type}s2_x"], events[f"{peak_type}s2_y"]]).T # corrected S2 with S2(x,y) map only, i.e. no elife correction # this is for S2-only events which don't have drift time info s2_xy_top = self.s2_xy_map(s2_positions, map_name=s2_top_map_name) cs2_top_xycorr = ( events[f"{peak_type}s2_area"] * events[f"{peak_type}s2_area_fraction_top"] / s2_xy_top ) s2_xy_bottom = self.s2_xy_map(s2_positions, map_name=s2_bottom_map_name) cs2_bottom_xycorr = ( events[f"{peak_type}s2_area"] * (1 - events[f"{peak_type}s2_area_fraction_top"]) / s2_xy_bottom ) # collect electron lifetime correction # for electron lifetime corrections to the S2s, # use drift time computed using the main S1. el_string = peak_type + "s2_interaction_" if peak_type == "alt_" else peak_type elife_correction = np.exp(events[f"{el_string}drift_time"] / self.elife) # collect SEG and EE corrections seg_ee_corr = np.zeros(len(events)) for partition, func in self.regions.items(): # partitioned SEG and EE partition_mask = func(events[f"{peak_type}s2_x"], events[f"{peak_type}s2_y"]) # correct for SEG and EE seg_ee_corr[partition_mask] = seg[partition] / avg_seg[partition] * ee[partition] # apply S2 xy correction cs2_xycorr = cs2_top_xycorr + cs2_bottom_xycorr result[f"{peak_type}cs2_wo_timecorr"] = cs2_xycorr * elife_correction result[f"{peak_type}cs2_area_fraction_top_wo_timecorr"] = cs2_top_xycorr / cs2_xycorr # apply SEG and EE correction cs2_top_wo_picorr = cs2_top_xycorr / seg_ee_corr cs2_bottom_wo_picorr = cs2_bottom_xycorr / seg_ee_corr cs2_wo_picorr = cs2_top_wo_picorr + cs2_bottom_wo_picorr result[f"{peak_type}cs2_wo_picorr"] = cs2_wo_picorr result[f"{peak_type}cs2_area_fraction_top_wo_picorr"] = ( cs2_top_wo_picorr / result[f"{peak_type}cs2_wo_picorr"] ) # apply photon ionization intensity and cS2 AFT correction (see #1247) # cS2 bottom should be corrected by photon ionization, but not cS2 top cs2_top_wo_elifecorr = cs2_top_wo_picorr cs2_bottom_wo_elifecorr = cs2_bottom_wo_picorr * self.cs2_bottom_top_ratio_correction cs2_wo_elifecorr = cs2_top_wo_elifecorr + cs2_bottom_wo_elifecorr # scale top and bottom to ensure total cS2 is conserved, since the time # dependence of it has been already corrected by SEG correction cs2_top_wo_elifecorr *= cs2_wo_picorr / cs2_wo_elifecorr cs2_bottom_wo_elifecorr *= cs2_wo_picorr / cs2_wo_elifecorr cs2_wo_elifecorr = cs2_wo_picorr result[f"{peak_type}cs2_wo_elifecorr"] = cs2_wo_elifecorr result[f"{peak_type}cs2_area_fraction_top_wo_elifecorr"] = ( cs2_top_wo_elifecorr / result[f"{peak_type}cs2_wo_elifecorr"] ) # apply electron lifetime correction result[f"{peak_type}cs2"] = result[f"{peak_type}cs2_wo_elifecorr"] * elife_correction result[f"{peak_type}cs2_area_fraction_top"] = result[ f"{peak_type}cs2_area_fraction_top_wo_elifecorr" ] return result