Superruns

Basic concept of a superrun:

A superrun is made up of many regular runs and helps us therefore to organize data in logic units and to load it faster. In the following notebook we will give some brief examples how superruns work and can be used to make analysts lives easier.

Let’s get started how we can define superruns. The example I demonstrate here is based on some dummy Record and Peak plugins. But it works in the same way for regular data.

[1]:
import strax
import straxen
/home/dwenz/mymodules/straxen/straxen/rucio.py:29: UserWarning: No installation of rucio-clients found. Can't use rucio remote backend.
  warnings.warn("No installation of rucio-clients found. Can't use rucio remote backend.")

Define context and create some dummy data:

In the subsequent cells I create a dummy context and write some dummy-data. You can either read through it if you are interested or skip until Define a superrun. For the working examples on superruns you only need to know:

  • Superruns can be created with any of our regular online and offline contexts.

  • In the two cells below I define 3 runs and records for the run_ids 0, 1, 2.

  • The constituents of a superrun are called subruns which we call runs.

[3]:
from strax.testutils import Records, Peaks

superrun_name='_superrun_test'
st = strax.Context(storage=[strax.DataDirectory('./strax_data',
                                                provide_run_metadata=True,
                                                readonly=False,
                                                deep_scan=True)],
                   register=[Records, Peaks],
                   config={'bonus_area': 42}
                                     )
st.set_context_config({ 'use_per_run_defaults': False})

[4]:
import datetime
import pytz

import numpy as np

import json
from bson import json_util

def _write_run_doc(context, run_id, time, endtime):
    """Function which writes a dummy run document.
    """
    run_doc = {'name': run_id, 'start': time, 'end': endtime}
    with open(context.storage[0]._run_meta_path(str(run_id)), 'w') as fp:
        json.dump(run_doc, fp,sort_keys=True, indent=4, default=json_util.default)


offset_between_subruns = 10

now = datetime.datetime.now()
now.replace(tzinfo=pytz.utc)
subrun_ids = [str(r) for r in range(3)]

for run_id in subrun_ids:
    rr = st.get_array(run_id, 'records')
    time = np.min(rr['time'])
    endtime = np.max(strax.endtime(rr))

    _write_run_doc(st,
                    run_id,
                    now + datetime.timedelta(0, int(time)),
                    now + datetime.timedelta(0, int(endtime)),
                    )

    st.set_config({'secret_time_offset': endtime + offset_between_subruns}) # untracked option
    assert st.is_stored(run_id, 'records')


Could not estimate run start and end time from run metadata: assuming it is 0 and inf
Could not estimate run start and end time from run metadata: assuming it is 0 and inf
Could not estimate run start and end time from run metadata: assuming it is 0 and inf
Source finished!
Source finished!
Source finished!

If we print now the lineage and hash for the three runs you will see it is equivalent to our regular data.

[5]:
print(st.key_for('2', 'records'))
st.key_for('2', 'records').lineage
2-records-j3nd2fjbiq
[5]:
{'records': ('Records', '0.0.0', {'crash': False, 'dummy_tracked_option': 42})}

Metadata of our subruns:

To understand a bit better how our dummy data looks like we can have a look into the metadata for a single run. Each subrun is made of 10 chunks each containing 10 waveforms in 10 different channels.

[6]:
st.get_meta('2', 'records')
[6]:
{'chunk_target_size_mb': 200,
 'chunks': [{'chunk_i': 0,
   'end': 41,
   'filename': 'records-j3nd2fjbiq-000000',
   'filesize': 313,
   'first_endtime': 41,
   'first_time': 40,
   'last_endtime': 41,
   'last_time': 40,
   'n': 10,
   'nbytes': 2570,
   'run_id': '2',
   'start': 40,
   'subruns': None},
  {'chunk_i': 1,
   'end': 42,
   'filename': 'records-j3nd2fjbiq-000001',
   'filesize': 313,
   'first_endtime': 42,
   'first_time': 41,
   'last_endtime': 42,
   'last_time': 41,
   'n': 10,
   'nbytes': 2570,
   'run_id': '2',
   'start': 41,
   'subruns': None},
  {'chunk_i': 2,
   'end': 43,
   'filename': 'records-j3nd2fjbiq-000002',
   'filesize': 313,
   'first_endtime': 43,
   'first_time': 42,
   'last_endtime': 43,
   'last_time': 42,
   'n': 10,
   'nbytes': 2570,
   'run_id': '2',
   'start': 42,
   'subruns': None},
  {'chunk_i': 3,
   'end': 44,
   'filename': 'records-j3nd2fjbiq-000003',
   'filesize': 313,
   'first_endtime': 44,
   'first_time': 43,
   'last_endtime': 44,
   'last_time': 43,
   'n': 10,
   'nbytes': 2570,
   'run_id': '2',
   'start': 43,
   'subruns': None},
  {'chunk_i': 4,
   'end': 45,
   'filename': 'records-j3nd2fjbiq-000004',
   'filesize': 313,
   'first_endtime': 45,
   'first_time': 44,
   'last_endtime': 45,
   'last_time': 44,
   'n': 10,
   'nbytes': 2570,
   'run_id': '2',
   'start': 44,
   'subruns': None},
  {'chunk_i': 5,
   'end': 46,
   'filename': 'records-j3nd2fjbiq-000005',
   'filesize': 313,
   'first_endtime': 46,
   'first_time': 45,
   'last_endtime': 46,
   'last_time': 45,
   'n': 10,
   'nbytes': 2570,
   'run_id': '2',
   'start': 45,
   'subruns': None},
  {'chunk_i': 6,
   'end': 47,
   'filename': 'records-j3nd2fjbiq-000006',
   'filesize': 313,
   'first_endtime': 47,
   'first_time': 46,
   'last_endtime': 47,
   'last_time': 46,
   'n': 10,
   'nbytes': 2570,
   'run_id': '2',
   'start': 46,
   'subruns': None},
  {'chunk_i': 7,
   'end': 48,
   'filename': 'records-j3nd2fjbiq-000007',
   'filesize': 313,
   'first_endtime': 48,
   'first_time': 47,
   'last_endtime': 48,
   'last_time': 47,
   'n': 10,
   'nbytes': 2570,
   'run_id': '2',
   'start': 47,
   'subruns': None},
  {'chunk_i': 8,
   'end': 49,
   'filename': 'records-j3nd2fjbiq-000008',
   'filesize': 313,
   'first_endtime': 49,
   'first_time': 48,
   'last_endtime': 49,
   'last_time': 48,
   'n': 10,
   'nbytes': 2570,
   'run_id': '2',
   'start': 48,
   'subruns': None},
  {'chunk_i': 9,
   'end': 50,
   'filename': 'records-j3nd2fjbiq-000009',
   'filesize': 313,
   'first_endtime': 50,
   'first_time': 49,
   'last_endtime': 50,
   'last_time': 49,
   'n': 10,
   'nbytes': 2570,
   'run_id': '2',
   'start': 49,
   'subruns': None}],
 'compressor': 'blosc',
 'data_kind': 'records',
 'data_type': 'records',
 'dtype': "[(('Start time since unix epoch [ns]', 'time'), '<i8'), (('Length of the interval in samples', 'length'), '<i4'), (('Width of one sample [ns]', 'dt'), '<i2'), (('Channel/PMT number', 'channel'), '<i2'), (('Length of pulse to which the record belongs (without zero-padding)', 'pulse_length'), '<i4'), (('Fragment number in the pulse', 'record_i'), '<i2'), (('Integral in ADC counts x samples', 'area'), '<i4'), (('Level of data reduction applied (strax.ReductionLevel enum)', 'reduction_level'), '|u1'), (('Baseline in ADC counts. data = int(baseline) - data_orig', 'baseline'), '<f4'), (('Baseline RMS in ADC counts. data = baseline - data_orig', 'baseline_rms'), '<f4'), (('Multiply data by 2**(this number). Baseline is unaffected.', 'amplitude_bit_shift'), '<i2'), (('Waveform data in raw counts above integer part of baseline', 'data'), '<i2', (110,))]",
 'end': 50,
 'lineage': {'records': ['Records',
   '0.0.0',
   {'crash': False, 'dummy_tracked_option': 42}]},
 'lineage_hash': 'j3nd2fjbiq',
 'run_id': '2',
 'start': 40,
 'strax_version': '0.16.0',
 'writing_ended': 1626283809.9985752,
 'writing_started': 1626283809.9701405}

Define a superrun:

Defining a superrun is quite simple one has to call:

[7]:
st.define_run(superrun_name, subrun_ids)
print('superrun_name: ', superrun_name, '\n'
      'subrun_ids: ', subrun_ids
     )
superrun_name:  _superrun_test
subrun_ids:  ['0', '1', '2']

where the first argument is a string specifying the name of the superrun e.g. _Kr83m_20200816. Please note that superrun names must start with an underscore.

The second argument is a list of run_ids of subruns the superrun should be made of. Please note that the definition of a superrun does not need any specification of a data_kind like peaks or event_info because it is a “run”.

By default, it is only allowed to store new runs under the usere’s specified strax_data directory. In this example it is simply ./strax_data and the run_meta data can be looked at via:

[8]:
st.run_metadata('_superrun_test')
[8]:
{'sub_run_spec': {'0': 'all', '1': 'all', '2': 'all'},
 'start': datetime.datetime(2021, 7, 14, 12, 30, 7, 830000, tzinfo=<bson.tz_util.FixedOffset object at 0x7f6a66bb6070>),
 'end': datetime.datetime(2021, 7, 14, 12, 30, 57, 830000, tzinfo=<bson.tz_util.FixedOffset object at 0x7f6a66bb6070>),
 'livetime': 30000000000.0,
 'name': '_superrun_test'}

The superrun-metadata contains a list of all subruns making up the superrun, the start and end time (in milliseconds) of the corresponding collections of runs and its naive livetime in nanoseconds without any corrections for deadtime.

Please note that in the presented example the time difference between start and end time is 50 s while the live time is only about 30 s. This comes from the fact that I defined the time between two runs to be 10 s. It should be always kept in mind for superruns that livetime is not the same as the end - start of the superrun.

The superun will appear in the run selection as any other run:

[9]:
st.select_runs()
Checking data availability: 0it [00:00, ?it/s]
[9]:
name number mode tags
0 0 0.0
1 1 1.0
2 2 2.0
3 _superrun_test NaN

Loading data with superruns:

Loading superruns can be done in two different ways. Lets try first the already implemented approach and compare the data with loading the individual runs separately:

[10]:
sub_runs = st.get_array(subrun_ids, 'records')  # Loading all subruns individually like we are used to
superrun = st.get_array(superrun_name, 'records')  # Loading the superrun
assert np.all(sub_runs['time'] == superrun['time'])  # Comparing if the data is the same

To increase the loading speed it can be allowed to skip the lineage check of the individual subruns:

[11]:
sub_runs = st.get_array(subrun_ids, 'records')
superrun = st.get_array(superrun_name, 'records', _check_lineage_per_run_id=False)
assert np.all(sub_runs['time'] == superrun['time'])
/home/dwenz/mymodules/strax/strax/context.py:217: UserWarning: Unknown config option _check_lineage_per_run_id; will do nothing.
  warnings.warn(f"Unknown config option {k}; will do nothing.")
/home/dwenz/mymodules/strax/strax/context.py:223: UserWarning: Invalid context option _check_lineage_per_run_id; will do nothing.
  warnings.warn(f"Invalid context option {k}; will do nothing.")

So how does this magic work? Under the hood a superrun first checks if the data of the different subruns has been created before. If not it will make the data for you. After that the data of the individual runs is loaded.

The loading speed can be further increased if we rechunk and write the data of our superrun as “new” data to disk. This can be done easily for light weight data_types like peaks and above. Further, this allows us to combine multiple data_types if the same data_kind, like for example event_info and cuts.

Writing a “new” superrun:

To write a new superrun one has to set the corresponding context setting to true:

[12]:
st.set_context_config({'write_superruns': True})
[13]:
st.is_stored(superrun_name, 'records')
[13]:
False
[14]:
st.make(superrun_name, 'records')
st.is_stored(superrun_name, 'records')
[14]:
True

Lets see if the data is the same:

[15]:
sub_runs = st.get_array(subrun_ids, 'records')
superrun = st.get_array(superrun_name, 'records', _check_lineage_per_run_id=False)
assert np.all(sub_runs['time'] == superrun['time'])

And the data will now shown as available in select runs:

[16]:
st.select_runs(available=('records', ))
[16]:
name number mode tags records_available
0 0 0.0 True
1 1 1.0 True
2 2 2.0 True
3 _superrun_test NaN True

If a some data does not exist for a super run we can simply created it via the superrun_id. This will not only create the data of the rechunked superrun but also the data of the subrungs if not already stored:

[17]:
st.is_stored(subrun_ids[0], 'peaks')
[17]:
False
[18]:
st.make(superrun_name, 'peaks')
st.is_stored(subrun_ids[0], 'peaks')
[18]:
True
[19]:
peaks = st.get_array(superrun_name, 'peaks')

Some developer information:

In case of a stored and rechunked superruns every chunk has also now some additional information about the individual subruns it is made of:

[20]:
for chunk in st.get_iter(superrun_name, 'records'):
    chunk
chunk.subruns, chunk.run_id
[20]:
({'0': {'end': 10, 'start': 0},
  '1': {'end': 30, 'start': 20},
  '2': {'end': 50, 'start': 40}},
 '_superrun_test')

The same goes for the meta data:

[21]:
st.get_meta(superrun_name, 'records')['chunks']
[21]:
[{'chunk_i': 0,
  'end': 50,
  'filename': 'records-j3nd2fjbiq-000000',
  'filesize': 2343,
  'first_endtime': 1,
  'first_time': 0,
  'last_endtime': 50,
  'last_time': 49,
  'n': 300,
  'nbytes': 77100,
  'run_id': '_superrun_test',
  'start': 0,
  'subruns': {'0': {'end': 10, 'start': 0},
   '1': {'end': 30, 'start': 20},
   '2': {'end': 50, 'start': 40}}}]
[ ]: