Source code for straxen.contexts

import os
import warnings
from typing import Dict, Any, List, Optional
from immutabledict import immutabledict
import socket

import strax
import straxen

from straxen import HAVE_ADMIX

common_opts: Dict[str, Any] = dict(
    register_all=[straxen.plugins],
    register=[
        straxen.PulseProcessing,
        straxen.Peaklets,
        straxen.PeakletClassificationSOM,
        straxen.MergedS2s,
        straxen.PeaksSOM,
        straxen.PeakBasicsSOM,
        straxen.PeakProximity,
        straxen.Events,
        straxen.EventBasicsSOM,
        straxen.EventPositions,
        straxen.CorrectedAreas,
        straxen.EnergyEstimates,
        straxen.EventInfoDouble,
        straxen.DistinctChannels,
        straxen.PeakPositionsMLP,
        straxen.PeakPositionsCNF,
    ],
    check_available=("peak_basics", "event_basics"),
    store_run_fields=("name", "number", "start", "end", "livetime", "mode", "source"),
    use_per_run_defaults=False,
)


common_config = dict(
    n_tpc_pmts=straxen.n_tpc_pmts,
    n_top_pmts=straxen.n_top_pmts,
    gain_model=(
        "list-to-array://xedocs://pmt_area_to_pes"
        "?as_list=True&sort=pmt&detector=tpc"
        "&run_id=plugin.run_id&version=ONLINE&attr=value"
    ),
    gain_model_nv=(
        "list-to-array://xedocs://pmt_area_to_pes"
        "?as_list=True&sort=pmt&detector=neutron_veto"
        "&run_id=plugin.run_id&version=ONLINE&attr=value"
    ),
    gain_model_mv=(
        "list-to-array://xedocs://pmt_area_to_pes"
        "?as_list=True&sort=pmt&detector=muon_veto"
        "&run_id=plugin.run_id&version=ONLINE&attr=value"
    ),
    channel_map=immutabledict(
        # (Minimum channel, maximum channel)
        # Channels must be listed in a ascending order!
        tpc=(0, 493),
        he=(500, 752),  # high energy
        aqmon=(790, 807),
        aqmon_nv=(808, 815),  # nveto acquisition monitor
        tpc_blank=(999, 999),
        mv=(1000, 1083),
        aux_mv=(1084, 1087),  # Aux mv channel 2 empty 1 pulser and 1 GPS
        mv_blank=(1999, 1999),
        nveto=(2000, 2119),
        nveto_blank=(2999, 2999),
    ),
)


[docs]def find_rucio_local_path(include_rucio_local, _rucio_local_path): """Check the hostname to determine which rucio local path to use. Note that access to /dali/lgrandi/rucio/ is possible only if you are on dali compute node or login node. :param include_rucio_local: add the rucio local storage frontend. This is only needed if one wants to do a fuzzy search in the data the runs database is out of sync with rucio :param _rucio_local_path: str, path of local RSE of rucio. Only use for testing! """ hostname = socket.gethostname() # if you are on dali compute node, do nothing if ("dali" in hostname) and ("login" not in hostname): _include_rucio_local = include_rucio_local __rucio_local_path = _rucio_local_path # Assumed the only other option is 'midway' or login nodes, # where we have full access to dali and project space. # This doesn't make sense outside XENON but we don't care. else: _include_rucio_local = True __rucio_local_path = "/project/lgrandi/rucio/" print( "You specified _auto_append_rucio_local=True and you are not on dali compute nodes, " f"so we will add the following rucio local path: {__rucio_local_path}" ) return _include_rucio_local, __rucio_local_path
[docs]def xenonnt( output_folder: str = "./strax_data", we_are_the_daq: bool = False, minimum_run_number: int = 7157, maximum_run_number: Optional[int] = None, # Frontends include_rucio_remote: bool = False, include_online_monitor: bool = False, include_rucio_local: bool = False, # Frontend options download_heavy: bool = False, remove_heavy: bool = False, _auto_append_rucio_local: bool = True, _rucio_path: str = "/dali/lgrandi/rucio/", _rucio_local_path: Optional[str] = None, _raw_paths: List[str] = ["/dali/lgrandi/xenonnt/raw"], _processed_paths: List[str] = [ "/project/lgrandi/xenonnt/processed", "/project/lgrandi/xenonnt/processed_sr2_offline_round_1", "/project/lgrandi/xenonnt/processed_sr2_offline_round_2", "/project/lgrandi/xenonnt/processed_sr2_offline_round_3", "/project/lgrandi/xenonnt/processed_sr2_offline_round_4", "/project2/lgrandi/xenonnt/processed", "/dali/lgrandi/xenonnt/processed", "/dali/lgrandi/xenonnt/processed_sr2_offline_round_1", "/dali/lgrandi/xenonnt/processed_sr2_offline_round_2", "/dali/lgrandi/xenonnt/processed_sr2_offline_round_3", "/dali/lgrandi/xenonnt/processed_sr2_offline_round_4", ], # Testing options _database_init: bool = True, _forbid_creation_of: Optional[dict] = None, **kwargs, ): """XENONnT online processing and analysis. :param output_folder: str, Path of the strax.DataDirectory where new data can be stored :param we_are_the_daq: bool, if we have admin access to upload data :param minimum_run_number: int, lowest number to consider :param maximum_run_number: Highest number to consider. When None (the default) consider all runs that are higher than the minimum_run_number. :param include_rucio_remote: add the rucio remote frontend to the context :param include_online_monitor: add the online monitor storage frontend :param include_rucio_local: add the rucio local storage frontend. This is only needed if one wants to do a fuzzy search in the data the runs database is out of sync with rucio :param download_heavy: bool, whether or not to allow downloads of heavy data (raw_records*, less the aqmon) :param remove_heavy: bool, whether or not to remove the heavy data after reading :param _auto_append_rucio_local: bool, whether or not to automatically append the rucio local path :param _rucio_path: str, path of rucio :param _rucio_local_path: str, path of local RSE of rucio. Only use for testing! :param _raw_paths: list[str], common path of the raw-data :param _processed_paths: list[str]. common paths of output data :param _database_init: bool, start the database (for testing) :param _forbid_creation_of: str/tuple, of datatypes to prevent form being written (raw_records* is always forbidden). :param kwargs: dict, context options :return: strax.Context """ if "xedocs_version" in kwargs: warnings.warn( "Please use xenonnt_* instead of xenonnt if you want to specify xedocs_version" ) context_options = {**straxen.contexts.common_opts, **kwargs} st = strax.Context(config=straxen.contexts.common_config, **context_options) st.register( [ straxen.DAQReader, straxen.LEDCalibration, straxen.LEDAfterpulseProcessing, straxen.nVeto_reflectivity, ] ) if _auto_append_rucio_local: include_rucio_local, _rucio_local_path = find_rucio_local_path( include_rucio_local, _rucio_local_path ) st.storage = ( [ straxen.RunDB( readonly=not we_are_the_daq, minimum_run_number=minimum_run_number, maximum_run_number=maximum_run_number, runid_field="number", new_data_path=output_folder, rucio_path=_rucio_path, ) ] if _database_init else [] ) if not we_are_the_daq: for _raw_path in _raw_paths: st.storage += [ strax.DataDirectory(_raw_path, readonly=True, take_only=straxen.DAQReader.provides) ] for _processed_path in _processed_paths: st.storage += [strax.DataDirectory(_processed_path, readonly=True)] if output_folder: st.storage += [ strax.DataDirectory( output_folder, provide_run_metadata=True, ) ] st.context_config["forbid_creation_of"] = straxen.daqreader.DAQReader.provides if _forbid_creation_of is not None: st.context_config["forbid_creation_of"] += strax.to_str_tuple(_forbid_creation_of) # Add the rucio frontend if we are able to if include_rucio_remote and HAVE_ADMIX: rucio_frontend = straxen.RucioRemoteFrontend( staging_dir=os.path.join(output_folder, "rucio"), download_heavy=download_heavy, remove_heavy=remove_heavy, ) st.storage += [rucio_frontend] if include_rucio_local: rucio_local_frontend = straxen.RucioLocalFrontend(path=_rucio_local_path) st.storage += [rucio_local_frontend] # Only the online monitor backend for the DAQ if _database_init and (include_online_monitor or we_are_the_daq): st.storage += [ straxen.OnlineMonitor( readonly=not we_are_the_daq, take_only=( "veto_intervals", "online_peak_monitor", "event_basics", "online_monitor_nv", "online_monitor_mv", "individual_peak_monitor", ), ) ] # Remap the data if it is before channel swap (because of wrongly cabled # signal cable connectors) These are runs older than run 8797. Runs # newer than 8796 are not affected. See: # https://github.com/XENONnT/straxen/pull/166 and # https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:dsg:daq:sector_swap st.set_context_config( { "apply_data_function": ( straxen.remap_old, straxen.check_loading_allowed, ) } ) return st
if "xedocs_version" not in strax.Context.takes_config: strax.Context = strax.takes_config( strax.Option( name="xedocs_version", default=None, type=str, help="The version of the xedocs database to use", ), )(strax.Context) @strax.Context.add_method def apply_xedocs_configs(context: strax.Context, db="straxen_db", **kwargs) -> None: import xedocs if isinstance(db, str): func = getattr(xedocs.databases, db) db_kwargs = straxen.filter_kwargs(func, kwargs) db = func(**db_kwargs) filter_kwargs = {k: v for k, v in kwargs.items() if k in db.context_configs.schema.__fields__} docs = db.context_configs.find_docs(**filter_kwargs) global_config = {doc.config_name: doc.value for doc in docs} if len(global_config): context.set_config(global_config) context.set_context_config({"xedocs_version": filter_kwargs["version"]}) else: raise KeyError( f"Could not find any context configs matching {filter_kwargs} in the xedocs database" )
[docs]def xenonnt_online(xedocs_version="global_ONLINE", _from_cutax=False, **kwargs): """XENONnT context.""" if not _from_cutax and xedocs_version != "global_ONLINE": warnings.warn("Don't load a context directly from straxen, use cutax instead!") st = straxen.contexts.xenonnt(**kwargs) st.apply_xedocs_configs(version=xedocs_version, **kwargs) return st
[docs]def xenonnt_led(**kwargs): st = xenonnt_online(**kwargs) st.set_context_config( { "check_available": ("raw_records", "led_calibration"), "free_options": list(common_config.keys()), } ) # Return a new context with only raw_records and led_calibration registered st = st.new_context(replace=True, config=st.config, storage=st.storage, **st.context_config) st.register( [ straxen.DAQReader, straxen.LEDCalibration, straxen.nVETORecorder, straxen.nVETOPulseProcessing, straxen.nVETOHitlets, straxen.nVetoExtTimings, ] ) st.set_config({"coincidence_level_recorder_nv": 1}) return st