Source code for straxen.common

import os
import os.path as osp
import json
from re import match
import ast
import configparser
import gzip
import inspect
from typing import Union, Dict, Any
import dill
import pickle
import commentjson

import urllib.request
import numpy as np
import pandas as pd
import numba
import strax
import straxen

export, __all__ = strax.exporter()
__all__.extend(
    [
        "straxen_dir",
        "first_sr1_run",
        "tpc_r",
        "tpc_z",
        "aux_repo",
        "n_tpc_pmts",
        "n_top_pmts",
        "n_hard_aqmon_start",
        "ADC_TO_E",
        "n_nveto_pmts",
        "n_mveto_pmts",
        "tpc_pmt_radius",
        "cryostat_outer_radius",
        "perp_wire_angle",
        "perp_wire_x_rot_pos",
        "INFINITY_64BIT_SIGNED",
    ]
)

straxen_dir = os.path.dirname(
    os.path.abspath(inspect.getfile(inspect.currentframe()))  # type: ignore
)

aux_repo = "https://raw.githubusercontent.com/XENONnT/strax_auxiliary_files/"

tpc_r = 66.4  # [CM], Not really radius, but apothem: from MC paper draft 1.0
cryostat_outer_radius = 81.5  # [cm] radius of the outer cylinder wall.
tpc_z = 148.6515  # [CM], distance between the bottom of gate and top of cathode wires
n_tpc_pmts = 494
n_top_pmts = 253
n_hard_aqmon_start = 800

n_nveto_pmts = 120
n_mveto_pmts = 84

tpc_pmt_radius = 7.62 / 2  # cm

perp_wire_angle = np.deg2rad(30)
perp_wire_x_rot_pos = 13.06  # [cm]

# Convert from ADC * samples to electrons emitted by PMT
# see pax.dsputils.adc_to_pe for calculation. Saving this number in straxen as
# it's needed in analyses
ADC_TO_E = 17142.81741

# See https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:dsg:daq:sector_swap
LAST_MISCABLED_RUN = 8796
TSTART_FIRST_CORRECTLY_CABLED_RUN = 1596036001000000000

INFINITY_64BIT_SIGNED = 9223372036854775807


[docs]@export def rotate_perp_wires(x_obs: np.ndarray, y_obs: np.ndarray, angle_extra: Union[float, int] = 0): """Returns x and y in the rotated plane where the perpendicular wires area vertically aligned (parallel to the y-axis). Accepts addition to the rotation angle with `angle_extra` [deg] :param x_obs: array of x coordinates :param y_obs: array of y coordinates :param angle_extra: extra rotation in [deg] :return: x_rotated, y_rotated """ if len(x_obs) != len(y_obs): raise ValueError("x and y are not of the same length") angle_extra_rad = np.deg2rad(angle_extra) x_rot = ( np.cos(perp_wire_angle + angle_extra_rad) * x_obs - np.sin(perp_wire_angle + angle_extra_rad) * y_obs ) y_rot = ( np.sin(perp_wire_angle + angle_extra_rad) * x_obs + np.cos(perp_wire_angle + angle_extra_rad) * y_obs ) return x_rot, y_rot
[docs]@export def pmt_positions(xenon1t=False): """Return pandas dataframe with PMT positions columns: array (top/bottom), i (PMT number), x, y """ if xenon1t: # Get PMT positions from the XENON1T config without PAX config = configparser.ConfigParser() config.read_string( resource_from_url( "https://raw.githubusercontent.com/XENON1T/pax/master/pax/config/XENON1T.ini" ) ) pmt_config = ast.literal_eval(config["DEFAULT"]["pmts"]) return pd.DataFrame( [ dict( x=q["position"]["x"], y=q["position"]["y"], i=q["pmt_position"], array=q.get("array", "other"), ) for q in pmt_config[:248] ] ) else: return resource_from_url( aux_repo + "874de2ffe41147719263183b89d26c9ee562c334/pmt_positions_xenonnt.csv", fmt="csv", )
# In-memory resource cache _resource_cache: Dict[str, Any] = dict() # Formats for which the original file is text, not binary _text_formats = ["txt", "csv", "json"]
[docs]@export def open_resource(file_name: str, fmt="text"): """Open file. :param file_name: str, file to open :param fmt: format of the file :return: opened file """ cached_name = _cache_name(file_name, fmt) if cached_name in _resource_cache: # Retrieve from in-memory cache return _resource_cache[cached_name] # File resource if fmt in ["npy", "npy_pickle"]: result = np.load(file_name, allow_pickle=fmt == "npy_pickle") if isinstance(result, np.lib.npyio.NpzFile): # Slurp the arrays in the file, so the result can be copied, # then close the file so its descriptors does not leak. result_slurped = {k: v[:] for k, v in result.items()} result.close() result = result_slurped elif fmt == "pkl": with open(file_name, "rb") as f: result = pickle.load(f) # nosec elif fmt == "pkl.gz": with gzip.open(file_name, "rb") as f: result = pickle.load(f) # nosec elif fmt == "dill": with open(file_name, "rb") as f: result = dill.load(f) # nosec elif fmt == "dill.gz": with gzip.open(file_name, "rb") as f: result = dill.load(f) # nosec elif fmt == "json.gz": with gzip.open(file_name, "rb") as f: result = json.load(f) elif fmt == "json": with open(file_name, mode="r") as f: result = commentjson.load(f) elif fmt == "binary": with open(file_name, mode="rb") as f: result = f.read() elif fmt in ["text", "txt"]: with open(file_name, mode="r") as f: result = f.read() elif fmt == "csv": result = pd.read_csv(file_name) else: raise ValueError(f"Unsupported format {fmt}!") # Store in in-memory cache _resource_cache[cached_name] = result return result
[docs]@export def get_resource(x: str, fmt="text"): """ Get the resource from an online source to be opened here. We will sequentially try the following: 1. Load if from memory if we asked for it before; 2. load it from a file if the path exists; 3. (preferred option) Load it from our database 4. Load the file from some URL (e.g. raw github content) :param x: str, either it is : A.) a path to the file; B.) the identifier of the file as it's stored under in the database; C.) A URL to the file (e.g. raw github content). :param fmt: str, format of the resource x :return: the opened resource file x opened according to the specified format """ # 1. load from memory cached_name = _cache_name(x, fmt) if cached_name in _resource_cache: return _resource_cache[cached_name] # 2. load from file elif os.path.exists(x): return open_resource(x, fmt=fmt) # 3. load from database elif straxen.uconfig is not None: downloader = straxen.MongoDownloader() if x in downloader.list_files(): path = downloader.download_single(x) return open_resource(path, fmt=fmt) # 4. load from URL if "://" in x: return resource_from_url(x, fmt=fmt) raise FileNotFoundError( f"Cannot open {x} because it is either not stored or we cannot download it from anywhere." )
def _cache_name(name: str, fmt: str) -> str: """Return a name under which to store the requested name with the given format in the _cache.""" return f"{fmt}::{name}" # Legacy loader for public URL files def resource_from_url(html: str, fmt="text"): """Return contents of file or URL html. :param html: str, html to the file you are requesting e.g. raw github content :param fmt: str, format to parse contents into. Do NOT mutate the result you get. Make a copy if you're not sure. If you mutate resources it will corrupt the cache, cause terrible bugs in unrelated code, tears unnumbered ye shall shed, not even the echo of your lamentations shall pass over the mountains, etc. :return: The file opened as specified per it's format """ if "://" not in html: raise ValueError("Can only open urls!") # Web resource; look first in on-disk cache # to prevent repeated downloads. cache_fn = strax.utils.deterministic_hash(html) cache_folders = [ "./resource_cache", "/tmp/straxen_resource_cache", "/dali/lgrandi/strax/resource_cache", ] for cache_folder in cache_folders: try: os.makedirs(cache_folder, exist_ok=True) except (PermissionError, OSError): continue cf = osp.join(cache_folder, cache_fn) if osp.exists(cf): result = open_resource(cf, fmt=fmt) break else: print(f"Did not find {cache_fn} in cache, downloading {html}") # disable bandit result = urllib.request.urlopen(html).read() is_binary = fmt not in _text_formats if not is_binary: result = result.decode() # Store in as many caches as possible m = "wb" if is_binary else "w" available_cf = None for cache_folder in cache_folders: if not osp.exists(cache_folder): continue if not os.access(cache_folder, os.W_OK): continue cf = osp.join(cache_folder, cache_fn) with open(cf, mode=m) as f: f.write(result) available_cf = cf if available_cf is None: raise RuntimeError( f"Could not store {html} in on-disk cache," "none of the cache directories are writeable??" ) # Retrieve result from file-cache # (so we only need one format-parsing logic) result = open_resource(available_cf, fmt=fmt) return result
[docs]@export def get_livetime_sec(context, run_id, things=None): """Get the livetime of a run in seconds. If it is not in the run metadata, estimate it from the data-level metadata of the data things. """ try: md = context.run_metadata(run_id, projection=("start", "end", "livetime")) except strax.RunMetadataNotAvailable: if things is None: raise return (strax.endtime(things[-1]) - things[0]["time"]) / 1e9 else: if "livetime" in md: return md["livetime"] else: return (md["end"] - md["start"]).total_seconds()
[docs]@export def pre_apply_function(data, run_id, target, function_name="pre_apply_function"): """Prior to returning the data (from one chunk) see if any function(s) need to be applied. :param data: one chunk of data for the requested target(s) :param run_id: Single run-id of of the chunk of data :param target: one or more targets :param function_name: the name of the function to be applied. The function_name.py should be stored in the database. :return: Data where the function is applied. """ if function_name not in _resource_cache: # only load the function once and put it in the resource cache function_file = f"{function_name}.py" function_file = straxen.test_utils._overwrite_testing_function_file(function_file) function = get_resource(function_file, fmt="txt") # pylint: disable=exec-used exec(function) # Cache the function to reduce reloading & eval operations _resource_cache[function_name] = locals().get(function_name) data = _resource_cache[function_name](data, run_id, strax.to_str_tuple(target)) return data
[docs]@export def check_loading_allowed( data, run_id, target, max_in_disallowed=1, disallowed=("event_positions", "corrected_areas", "energy_estimates"), ): """Check that the loading of the specified targets is not disallowed. :param data: chunk of data :param run_id: run_id of the run :param target: list of targets requested by the user :param max_in_disallowed: the max number of targets that are in the disallowed list :param disallowed: list of targets that are not allowed to be loaded simultaneously by the user :return: data :raise: RuntimeError if more than max_in_disallowed targets are requested """ n_targets_in_disallowed = sum([t in disallowed for t in strax.to_str_tuple(target)]) if n_targets_in_disallowed > max_in_disallowed: raise RuntimeError(f'Don\'t load {disallowed} separately, use "event_info" instead') return data
[docs]@export def remap_channels( data, verbose=True, safe_copy=False, _tqdm=False, ): """There were some errors in the channel mapping of old data as described in https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:dsg:daq:sector_swap using this function, we can convert old data to reflect the right channel map while loading the data. We convert both the field 'channel' as well as anything that is an array of the same length of the number of channels. :param data: numpy array of pandas dataframe :param verbose: print messages while converting data :param safe_copy: if True make a copy of the data prior to performing manipulations. Will prevent overwrites of the internal references but does require more memory. :param _tqdm: bool (try to) add a tqdm wrapper to show the progress :return: Correctly mapped data """ # This map shows which channels were recabled. We now have to do the same in software # for old runs. remap = get_resource( aux_repo + "/ecb6da7bd4deb98cd0a4e83b3da81c1e67505b16/remapped_channels_since_20200729_17.20UTC.csv", fmt="csv", ) def convert_channel(_data, replace=("channel", "max_pmt")): """Given an array, replace the 'channel' entry if we had to remap it according to the map of channels to be remapped. :param _data: data whereof the replace entries should be changed according to the remapping of channels :param replace: entries (keys/numpy-dtypes) that should be changed in the data :return: remapped data where each of the replace entries has been replaced """ data_keys = get_dtypes(_data) # loop over the things to replace for _rep in replace: if _rep not in data_keys: # Apparently this data doesn't have the entry we want to replace continue if _rep == "channel" and _dat["channel"].ndim != 1: # Only convert channel if they are flat and not nested. continue # Make a buffer we can overwrite and replace with an remapped array buff = np.array(_data[_rep]) buff = _swap_values_in_array( np.array(_data[_rep]), buff, np.array(remap["PMT_new"].values), np.array(remap["PMT_old"].values), ) _data[_rep] = buff if verbose: print(f"convert_channel::\tchanged {_rep}") # Not needed for np.array as the internal memory already reflects it but it is # needed for pd.DataFrames. return _data def remap_single_entry(_data, _array_entry): """Remap the data of a array field (_entry) in the data. For example, remap saturated_channel (which is of length n_pmts) where the entries of the PMT_old will be replaced by the entries of PMT_new and vise versa. :param _data: reshuffle the _data according to for _entry according to the map of channels to be remapped :param _array_entry: key or dtype of the data. NB: should be array whereof the length equals the number of PMTs! :return: correctly mapped data """ _k = get_dtypes(_data) if _array_entry not in _k: raise ValueError( f"remap_single_entry::\tcannot remap {_array_entry} in data with fields {_k}." ) buff = np.array(_data[_array_entry]) for _, _row in remap.iterrows(): pmt_new, pmt_old = _row["PMT_new"], _row["PMT_old"] buff[:, pmt_new] = _data[_array_entry][:, pmt_old] _data[_array_entry] = buff # Not needed for np.array but for pd.DataFrames return _data def convert_channel_like(channel_data, n_chs=n_tpc_pmts): """Look for entries in the data of n_chs length. If found, assume it should be remapped according to the map. :param channel_data: data to be converted according to the map of channels to be remapped. This data is checked for any entries (dtype names) that have a length equal to the n_chs and if so, is remapped accordingly :param n_chs: the number of channels :return: correctly mapped data """ if not len(channel_data): return channel_data # Create a buffer to overright buffer = channel_data.copy() for k in strax.utils.tqdm(get_dtypes(channel_data), disable=not _tqdm): if np.iterable(channel_data[k][0]) and len(channel_data[k][0]) == n_chs: if verbose: print(f"convert_channel_like::\tupdate {k}") buffer = remap_single_entry(buffer, k) return buffer # Take the last two samples as otherwise the pd.DataFrame gives an unexpected output. # I would have preferred st.estimate_run_start(f'00{last_miscabled_run}')) but st is # not per se initialized. if np.any(data["time"][-2:] > TSTART_FIRST_CORRECTLY_CABLED_RUN): raise ValueError(f"Do not remap the data after run 00{LAST_MISCABLED_RUN}") if safe_copy: # Make sure we make a new entry as otherwise some internal buffer of numpy arrays # may yield puzzling results as internal buffers may also reflect the change. _dat = data.copy() del data else: # Just continue with data _dat = data # Do the conversion(s) _dat = convert_channel(_dat) if not isinstance(_dat, pd.DataFrame): # pd.DataFrames are flat arrays and thus cannot have channel_like arrays in them _dat = convert_channel_like(_dat) return _dat
[docs]@export def remap_old(data, targets, run_id, works_on_target=""): """If the data is of before the time sectors were re-cabled, apply a software remap otherwise just return the data is it is. :param data: numpy array of data with at least the field time. It is assumed the data is sorted by time :param targets: targets in the st.get_array to get :param run_id: required positional argument of apply_function_to_data in strax :param works_on_target: regex match string to match any of the targets. By default set to '' such that any target in the targets would be remapped (which is what we want as channels are present in most data types). If one only wants records (no raw- records) and peaks* use e.g. works_on_target = 'records|peaks'. """ if np.any(data["time"][:2] >= TSTART_FIRST_CORRECTLY_CABLED_RUN): # We leave the 'new' data be pass elif not np.any([match(works_on_target, t) for t in strax.to_str_tuple(targets)]): # None of the targets are such that we want to remap pass elif len(data): # select the old data and do the remapping for this mask = data["time"] < TSTART_FIRST_CORRECTLY_CABLED_RUN data = data.copy() data[mask] = remap_channels(data[mask]) return data
[docs]@export def get_dtypes(_data): """Return keys/dtype names of pd.DataFrame or numpy array. :param _data: data to get the keys/dtype names :return: keys/dtype names """ if isinstance(_data, np.ndarray): _k = _data.dtype.names elif isinstance(_data, pd.DataFrame): _k = _data.keys() return _k
@numba.jit(nopython=True, nogil=True, cache=True) def _swap_values_in_array(data_arr, buffer, items, replacements): """Fill buffer for item[k] -> replacements[k] :param data_arr: numpy array of data :param buffer: copy of data_arr where the replacements of items will be saved :param items: array of len x containing values that are in data_arr and need to be replaced with the corresponding item in replacements :param replacements: array of len x containing the values that should replace the corresponding item in items :return: the buffer reflecting the changes """ for i, val in enumerate(data_arr): for k, it in enumerate(items): if val == it: buffer[i] = replacements[k] break return buffer ## # Old XENON1T Stuff ## first_sr1_run = 170118_1327
[docs]@export def pax_file(x): """Return URL to file hosted in the pax repository master branch.""" return "https://raw.githubusercontent.com/XENON1T/pax/master/pax/data/" + x