Source code for straxen.plugins.raw_records.daqreader

import os
import glob
import warnings
from typing import Tuple
from collections import Counter
from immutabledict import immutabledict

import numpy as np
import numba
import strax

export, __all__ = strax.exporter()
__all__.extend(["ARTIFICIAL_DEADTIME_CHANNEL"])

# Just below the TPC acquisition monitor, see
# https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:dsg:daq:channel_groups
ARTIFICIAL_DEADTIME_CHANNEL = 799


class ArtificialDeadtimeInserted(UserWarning):
    pass


[docs]@export @strax.takes_config( # All these must have track=False, so the raw_records hash never changes! # DAQ settings -- should match settings given to redax strax.Option( "record_length", default=110, track=False, type=int, help="Number of samples per raw_record" ), strax.Option( "max_digitizer_sampling_time", default=10, track=False, type=int, help="Highest interval time of the digitizer sampling times(s) used.", ), strax.Option( "run_start_time", type=float, track=False, default=0, help="time of start run (s since unix epoch)", ), strax.Option( "daq_chunk_duration", track=False, default=int(5e9), type=int, help="Duration of regular chunks in ns", ), strax.Option( "daq_overlap_chunk_duration", track=False, default=int(5e8), type=int, help="Duration of intermediate/overlap chunks in ns", ), strax.Option( "daq_compressor", default="lz4", track=False, help="Algorithm used for (de)compressing the live data", ), strax.Option( "readout_threads", type=dict, track=False, help=( "Dictionary of the readout threads where the keys " "specify the reader and value the number of threads" ), ), strax.Option("daq_input_dir", type=str, track=False, help="Directory where readers put data"), # DAQReader settings strax.Option( "safe_break_in_pulses", default=1000, track=False, infer_type=False, help=( "Time (ns) between pulses indicating a safe break " "in the datastream -- gaps of this size cannot be " "interior to peaklets." ), ), strax.Option( "channel_map", track=False, type=immutabledict, infer_type=False, help="immutabledict mapping subdetector to (min, max) channel number.", ), ) class DAQReader(strax.Plugin): """Read the XENONnT DAQ-live_data from redax and split it to the appropriate raw_record data- types based on the channel-map. Does nothing whatsoever to the live_data; not even baselining. Provides: - raw_records: (tpc)raw_records. - raw_records_he: raw_records for the high energy boards digitizing the top PMT-array at lower amplification. - raw_records_nv: neutron veto raw_records; only stored temporary as the software coincidence trigger not applied yet. - raw_records_mv: muon veto raw_records. - raw_records_aqmon: raw_records for the acquisition monitor (_nv for neutron veto). """ provides: Tuple[str, ...] = ( "raw_records", "raw_records_he", # high energy "raw_records_aqmon", "raw_records_nv", # nveto raw_records (will not be stored long term) "raw_records_aqmon_nv", "raw_records_aux_mv", "raw_records_mv", # mveto has to be last due to lineage ) data_kind = immutabledict(zip(provides, provides)) depends_on: Tuple = tuple() parallel = "process" chunk_target_size_mb = 50 rechunk_on_save = immutabledict( raw_records=False, raw_records_he=False, raw_records_aqmon=True, raw_records_nv=False, raw_records_aqmon_nv=True, raw_records_aux_mv=True, raw_records_mv=False, ) compressor = "lz4" __version__ = "0.0.0" input_timeout = 300
[docs] def infer_dtype(self): return { d: strax.raw_record_dtype(samples_per_record=self.config["record_length"]) for d in self.provides }
[docs] def setup(self): self.t0 = int(self.config["run_start_time"]) * int(1e9) self.dt_max = self.config["max_digitizer_sampling_time"] self.n_readout_threads = sum(self.config["readout_threads"].values()) if self.config["safe_break_in_pulses"] > min( self.config["daq_chunk_duration"], self.config["daq_overlap_chunk_duration"] ): raise ValueError( "Chunk durations must be larger than the minimum safe break" " duration (preferably a lot larger!)" )
def _path(self, chunk_i): return self.config["daq_input_dir"] + f"/{chunk_i:06d}" def _chunk_paths(self, chunk_i): """Return paths to previous, current and next chunk If any of them does not exist, or they are not yet populated with data from all readers, their path is replaced by False.""" p = self._path(chunk_i) result = [] for q in [p + "_pre", p, p + "_post"]: if os.path.exists(q): n_files = self._count_files_per_chunk(q) if n_files >= self.n_readout_threads: result.append(q) else: print( f"Found incomplete folder {q}: " f"contains {n_files} files but expected " f"{self.n_readout_threads}. " "Waiting for more data." ) if self.source_finished(): # For low rates, different threads might end in a # different chunck at the end of a run, # still keep the results in this case. print(f"Run finished correctly nonetheless: saving the results") result.append(q) else: result.append(False) else: result.append(False) return tuple(result) @staticmethod def _partial_chunk_to_thread_name(partial_chunk): """Convert name of part of the chunk to the thread_name that wrote it.""" return "_".join(partial_chunk.split("_")[:-1]) def _count_files_per_chunk(self, path_chunk_i): """Check that the files in the chunks have names consistent with the readout threads.""" counted_files = Counter( [self._partial_chunk_to_thread_name(p) for p in os.listdir(path_chunk_i)] ) for thread, n_counts in counted_files.items(): if thread not in self.config["readout_threads"]: raise ValueError(f"Bad data for {path_chunk_i}. Got {thread}") if n_counts > self.config["readout_threads"][thread]: raise ValueError( f"{thread} wrote {n_counts}, expected{self.config['readout_threads'][thread]}" ) return sum(counted_files.values())
[docs] def source_finished(self): end_dir = self.config["daq_input_dir"] + "/THE_END" if not os.path.exists(end_dir): return False else: return self._count_files_per_chunk(end_dir) >= self.n_readout_threads
[docs] def is_ready(self, chunk_i): ended = self.source_finished() pre, current, post = self._chunk_paths(chunk_i) next_ahead = os.path.exists(self._path(chunk_i + 1)) if current and ( (pre and post or chunk_i == 0 and post or ended and (pre and not next_ahead)) ): return True return False
def _load_chunk(self, path, start, end, kind="central"): records = [ strax.load_file( fn, compressor=self.config["daq_compressor"], dtype=self.dtype_for("raw_records") ) for fn in sorted(glob.glob(f"{path}/*")) ] records = np.concatenate(records) records = strax.sort_by_time(records) first_start, last_start, last_end = None, None, None if len(records): first_start, last_start = records[0]["time"], records[-1]["time"] # Records are sorted by (start)time and are of variable length. # Their end-times can differ. In the most pessimistic case we have # to look back one record length for each channel. tot_channels = np.sum([np.diff(x) + 1 for x in self.config["channel_map"].values()]) look_n_samples = self.config["record_length"] * tot_channels last_end = strax.endtime(records[-look_n_samples:]).max() if first_start < start or last_start >= end: raise ValueError( f"Bad data from DAQ: chunk {path} should contain data " f"that starts in [{start}, {end}), but we see start times " f"ranging from {first_start} to {last_start}." ) if kind == "central": result = records break_time = None else: # Find a time at which we can safely partition the data. min_gap = self.config["safe_break_in_pulses"] if not len(records) or last_end + min_gap < end: # There is enough room at the end of the data break_time = end - min_gap result = records if kind == "post" else records[:0] else: # Let's hope there is some quiet time in the middle try: result, break_time = strax.from_break( records, safe_break=min_gap, # Records from the last chunk can extend as far as: not_before=(start + self.config["record_length"] * self.dt_max), left=kind == "post", tolerant=False, ) except strax.NoBreakFound: # We still have to break somewhere, but this can involve # throwing away data. # Let's do it at the end of the chunk # Maybe find a better time, e.g. a longish-but-not-quite # satisfactory gap break_time = end - min_gap # Mark the region where data /might/ be removed with # artificial deadtime. dead_time_start = break_time - self.config["record_length"] * self.dt_max warnings.warn( f"Data in {path} is so dense that no {min_gap} " "ns break exists: data loss inevitable. " "Inserting artificial deadtime between " f"{dead_time_start} and {end}.", ArtificialDeadtimeInserted, ) if kind == "pre": # Give the artificial deadtime past the break result = self._artificial_dead_time( start=break_time, end=end, dt=self.dt_max ) else: # Remove data that would stick out result = records[strax.endtime(records) <= break_time] # Add the artificial deadtime until the break result = strax.sort_by_time( np.concatenate( [ result, self._artificial_dead_time( start=dead_time_start, end=break_time, dt=self.dt_max ), ] ) ) return result, break_time def _artificial_dead_time(self, start, end, dt): return strax.dict_to_rec( dict( time=[start], length=[(end - start) // dt], dt=[dt], channel=[ARTIFICIAL_DEADTIME_CHANNEL], ), self.dtype_for("raw_records"), )
[docs] def compute(self, chunk_i): dt_central = self.config["daq_chunk_duration"] dt_overlap = self.config["daq_overlap_chunk_duration"] t_start = chunk_i * (dt_central + dt_overlap) t_end = t_start + dt_central pre, current, post = self._chunk_paths(chunk_i) r_pre, r_post = None, None break_pre, break_post = t_start, t_end if pre: if chunk_i == 0: warnings.warn( f"DAQ is being sloppy: there should be no pre dir {pre} " "for chunk 0. We're ignoring it.", UserWarning, ) else: r_pre, break_pre = self._load_chunk( path=pre, start=t_start - dt_overlap, end=t_start, kind="pre" ) r_main, _ = self._load_chunk(path=current, start=t_start, end=t_end, kind="central") if post: r_post, break_post = self._load_chunk( path=post, start=t_end, end=t_end + dt_overlap, kind="post" ) # Concatenate the result. records = np.concatenate([x for x in (r_pre, r_main, r_post) if x is not None]) # Split records by channel result_arrays = split_channel_ranges( records, np.asarray(list(self.config["channel_map"].values())) ) del records # Convert to strax chunks result = dict() for i, subd in enumerate(self.config["channel_map"]): if len(result_arrays[i]): # dt may differ per subdetector dt = result_arrays[i]["dt"][0] # Convert time to time in ns since unix epoch. # Ensure the offset is a whole digitizer sample result_arrays[i]["time"] += dt * (self.t0 // dt) # Ignore data from the 'blank' channels, corresponding to # channels that have nothing connected if subd.endswith("blank"): continue result_name = "raw_records" if subd.startswith("nveto"): result_name += "_nv" elif subd != "tpc": result_name += "_" + subd result[result_name] = self.chunk( start=self.t0 + break_pre, end=self.t0 + break_post, data=result_arrays[i], data_type=result_name, ) print(f"Read chunk {chunk_i:06d} from DAQ") for r in result.values(): # Print data rate / data type if any if r._mbs() > 0: print(f"\t{r}") return result
[docs]@export @numba.njit(nogil=True, cache=True) def split_channel_ranges(records, channel_ranges): """Return numba.List of record arrays in channel_ranges. ~2.5x as fast as a naive implementation with np.in1d """ n_subdetectors = len(channel_ranges) which_detector = np.zeros(len(records), dtype=np.int8) n_in_detector = np.zeros(n_subdetectors, dtype=np.int64) # First loop to count number of records per detector for r_i, r in enumerate(records): for d_i in range(n_subdetectors): left, right = channel_ranges[d_i] if r["channel"] > right: continue elif r["channel"] >= left: which_detector[r_i] = d_i n_in_detector[d_i] += 1 break else: # channel_ranges should be sorted ascending. print(r["time"], r["channel"], channel_ranges) raise ValueError("Bad data from DAQ: data in unknown channel!") # Allocate memory results = numba.typed.List() for d_i in range(n_subdetectors): results.append(np.empty(n_in_detector[d_i], dtype=records.dtype)) # Second loop to fill results # This is slightly faster than using which_detector == d_i masks, # since it only needs one loop over the data. n_placed = np.zeros(n_subdetectors, dtype=np.int64) for r_i, r in enumerate(records): d_i = which_detector[r_i] results[d_i][n_placed[d_i]] = r n_placed[d_i] += 1 return results