Source code for straxen.legacy.plugins_1t.pax_interface

import os
import glob
from typing import Tuple

import numpy as np
import strax

export, __all__ = strax.exporter()


def records_needed(pulse_length, samples_per_record):
    """Return records needed to store pulse_length samples."""
    return np.ceil(pulse_length / samples_per_record).astype(np.int64)


[docs]@export def pax_to_records( input_filename, samples_per_record=strax.DEFAULT_RECORD_LENGTH, events_per_chunk=10 ): """Return pulse records array from pax zip input_filename Convert pax .zip files to flat records format This only works if you have pax installed in your strax environment, which is somewhat tricky.""" # Monkeypatch matplotlib so pax is importable # See https://github.com/XENON1T/pax/pull/734 import matplotlib matplotlib._cntr = None from pax import core # Pax is not a dependency mypax = core.Processor( "XENON1T", config_dict=dict( pax=dict( look_for_config_in_runs_db=False, plugin_group_names=["input"], encoder_plugin=None, input_name=input_filename, ), # Fast startup: skip loading big maps WaveformSimulator=dict( s1_light_yield_map="placeholder_map.json", s2_light_yield_map="placeholder_map.json", s1_patterns_file=None, s2_patterns_file=None, ), ), ) print(f"Starting conversion, {events_per_chunk} evt/chunk") results = [] def finish_results(): nonlocal results records = np.concatenate(results) # In strax data, records are always stored # sorted, baselined and integrated records = strax.sort_by_time(records) print("Returning %d records" % len(records)) results = [] return records for event in mypax.get_events(): event = mypax.process_event(event) if not len(event.pulses): # Triggerless pax data contains many empty events # at the end. With the fixed events per chunk setting # this can lead to empty files, which confuses strax. continue pulse_lengths = np.array([p.length for p in event.pulses]) n_records_tot = records_needed(pulse_lengths, samples_per_record).sum() records = np.zeros(n_records_tot, dtype=strax.raw_record_dtype(samples_per_record)) output_record_index = 0 # Record offset in data for p in event.pulses: n_records = records_needed(p.length, samples_per_record) for rec_i in range(n_records): r = records[output_record_index] r["time"] = event.start_time + p.left * 10 + rec_i * samples_per_record * 10 r["channel"] = p.channel r["pulse_length"] = p.length r["record_i"] = rec_i r["dt"] = 10 # How much are we storing in this record? if rec_i != n_records - 1: # There's more chunks coming, so we store a full chunk n_store = samples_per_record assert p.length > samples_per_record * (rec_i + 1) else: # Just enough to store the rest of the data # Note it's not p.length % samples_per_record!!! # (that would be zero if we have to store a full record) n_store = p.length - samples_per_record * rec_i assert 0 <= n_store <= samples_per_record r["length"] = n_store offset = rec_i * samples_per_record r["data"][:n_store] = p.raw_data[offset : offset + n_store] output_record_index += 1 results.append(records) if len(results) >= events_per_chunk: yield finish_results() mypax.shutdown() if len(results): y = finish_results() if len(y): yield y
[docs]@export @strax.takes_config( strax.Option( "pax_raw_dir", default="/data/xenon/raw", track=False, infer_type=False, help="Directory with raw pax datasets", ), strax.Option( "stop_after_zips", default=0, track=False, infer_type=False, help="Convert only this many zip files. 0 = all.", ), strax.Option( "events_per_chunk", default=50, track=False, infer_type=False, help="Number of events to yield per chunk", ), strax.Option( "samples_per_record", default=strax.DEFAULT_RECORD_LENGTH, track=False, infer_type=False, help="Number of samples per record", ), ) class RecordsFromPax(strax.Plugin): provides = "raw_records" data_kind = "raw_records" compressor = "zstd" depends_on: Tuple = tuple() parallel = False rechunk_on_save = False
[docs] def infer_dtype(self): return strax.raw_record_dtype(self.config["samples_per_record"])
[docs] def iter(self, *args, **kwargs): if not os.path.exists(self.config["pax_raw_dir"]): raise FileNotFoundError(self.config["pax_raw_dir"]) input_dir = os.path.join(self.config["pax_raw_dir"], self.run_id) pax_files = sorted(glob.glob(input_dir + "/XENON*.zip")) pax_sizes = np.array([os.path.getsize(x) for x in pax_files]) print(f"Found {len(pax_files)} files, {pax_sizes.sum() / 1e9:.2f} GB") last_endtime = 0 for file_i, in_fn in enumerate(pax_files): if self.config["stop_after_zips"] and file_i >= self.config["stop_after_zips"]: break for records in pax_to_records( in_fn, samples_per_record=self.config["samples_per_record"], events_per_chunk=self.config["events_per_chunk"], ): if not len(records): continue if last_endtime == 0: last_endtime = records[0]["time"] new_endtime = strax.endtime(records).max() yield self.chunk(start=last_endtime, end=new_endtime, data=records) last_endtime = new_endtime