import os
import json
from warnings import warn
from typing import Dict
import numpy as np
import strax
from utilix import xent_collection
try:
import admix
from rucio.common.exception import DataIdentifierNotFound
HAVE_ADMIX = True
except (ImportError, AttributeError):
HAVE_ADMIX = False
export, __all__ = strax.exporter()
__all__.extend(["HAVE_ADMIX"])
[docs]@export
class TooMuchDataError(Exception):
pass
[docs]@export
class RucioRemoteFrontend(strax.StorageFrontend):
"""Uses the rucio client for the data find."""
storage_type = strax.StorageType.REMOTE
local_did_cache = None
path = None
def __init__(
self,
staging_dir="./strax_data",
rses_only=tuple(),
download_heavy=False,
remove_heavy=False,
tries=3,
num_threads=1,
stage=False,
*args,
**kwargs,
):
"""
:param download_heavy: option to allow downloading of heavy data through RucioRemoteBackend
:param remove_heavy: option to remove heavy data from the RucioRemoteBackend after reading
:param args: Passed to strax.StorageFrontend
:param kwargs: Passed to strax.StorageFrontend
:param rses_only: tuple, limits RSE selection to these options if provided
"""
super().__init__(*args, **kwargs)
self.readonly = True
self.collection = xent_collection()
self.backends = []
if HAVE_ADMIX:
self.backends = [
RucioRemoteBackend(
staging_dir=staging_dir,
rses_only=rses_only,
download_heavy=download_heavy,
remove_heavy=remove_heavy,
tries=tries,
num_threads=num_threads,
stage=stage,
),
]
else:
self.log.warning(
"You passed use_remote=True to rucio fronted, "
"but you don't have access to admix/rucio! Using local backend only."
)
[docs] def find_several(self, keys, **kwargs):
# for performance, dont do find_several with this storage frontend
# we basically do the same query we would do in the RunDB plugin
return np.zeros_like(keys, dtype=bool).tolist()
def _find(self, key: strax.DataKey, write, allow_incomplete, fuzzy_for, fuzzy_for_options):
did = key_to_rucio_did(key)
if allow_incomplete or write:
raise RuntimeError(
"Allow incomplete/writing is not allowed for "
f"{self.__class.__name__} since data might not be "
"continuous"
)
try:
for b in self.backends:
rse = b._get_rse(did, state="OK")
if rse:
return "RucioRemoteBackend", did
except DataIdentifierNotFound:
pass
raise strax.DataNotAvailable
[docs] def find(self, key: strax.DataKey, write=False, check_broken=False, **kwargs):
# Overwrite defaults of super().find()
return super().find(key, write, check_broken, **kwargs)
[docs]@export
class RucioRemoteBackend(strax.FileSytemBackend):
"""Get data from remote Rucio RSE."""
# datatypes we don't want to download since they're too heavy
heavy_types = ["raw_records", "raw_records_nv", "raw_records_he"]
# for caching RSE locations
dset_cache: Dict[str, str] = {}
def __init__(
self,
staging_dir,
rses_only=tuple(),
download_heavy=False,
remove_heavy=False,
tries=3,
num_threads=1,
stage=False,
**kwargs,
):
"""
:param staging_dir: Path (a string) where to save data. Must be
a writable location.
:param download_heavy: Whether or not to allow downloads of the
heaviest data (raw_records*, less aqmon and MV)
:param remove_heavy: Whether or not to remove the heaviest data after reading
:param kwargs: Passed to strax.FileSystemBackend
:param rses_only: tuple, limits RSE selection to these options if provided
"""
mess = (
f"You told the rucio backend to download data to {staging_dir}, "
"but that path is not writable by your user"
)
if os.path.exists(staging_dir):
if not os.access(staging_dir, os.W_OK):
raise PermissionError(mess)
else:
try:
os.makedirs(staging_dir)
except OSError:
raise PermissionError(mess)
super().__init__(**kwargs)
self.staging_dir = staging_dir
self.rses_only = strax.to_str_tuple(rses_only)
self.download_heavy = download_heavy
self.remove_heavy = remove_heavy
self.tries = tries
self.num_threads = num_threads
self.stage = stage
def _get_rse(self, dset_did, **filters):
"""Determine the appropriate Rucio Storage Element (RSE) for a dataset.
:param dset_did (str) :The dataset identifier.
:return (str) : The selected RSEs.
------
Uses self.rses_only to filter available RSEs if set.
"""
rses = admix.rucio.get_rses(dset_did, **filters)
rses = list(set(rses) & set(self.rses_only)) if self.rses_only else rses
rse = admix.downloader.determine_rse(rses)
return rse
def _get_metadata(self, dset_did, **kwargs):
if dset_did in self.dset_cache:
rse = self.dset_cache[dset_did]
else:
rse = self._get_rse(dset_did)
self.dset_cache[dset_did] = rse
metadata_did = strax.RUN_METADATA_PATTERN % dset_did
warn(f"Downloading {metadata_did} from {rse}")
downloaded = admix.download(
metadata_did,
location=self.staging_dir,
tries=self.tries,
num_threads=self.num_threads,
rse=rse,
stage=self.stage,
)
if len(downloaded) != 1:
raise ValueError(f"{metadata_did} should be a single file. We found {len(downloaded)}.")
metadata_path = downloaded[0]
# check again
if not os.path.exists(metadata_path):
raise FileNotFoundError(f"No metadata found at {metadata_path}")
with open(metadata_path, mode="r") as f:
return json.loads(f.read())
def _read_chunk(self, dset_did, chunk_info, dtype, compressor):
base_dir = os.path.join(self.staging_dir, did_to_dirname(dset_did))
chunk_file = chunk_info["filename"]
chunk_path = os.path.abspath(os.path.join(base_dir, chunk_file))
number, datatype, hsh = parse_rucio_did(dset_did)
if not os.path.exists(chunk_path):
if datatype in self.heavy_types and not self.download_heavy:
error_msg = (
"For space reasons we don't want to have everyone "
"downloading raw data. If you know what you're "
"doing, pass download_heavy=True to the Rucio "
"frontend. If not, check your context and/or ask "
"someone if this raw data is needed locally."
)
warn(error_msg)
raise strax.DataNotAvailable
scope, name = dset_did.split(":")
if dset_did in self.dset_cache:
rse = self.dset_cache[dset_did]
else:
rse = self._get_rse(dset_did)
self.dset_cache[dset_did] = rse
chunk_did = f"{scope}:{chunk_file}"
warn(f"Downloading {chunk_did} from {rse}")
downloaded = admix.download(
chunk_did,
location=self.staging_dir,
tries=self.tries,
num_threads=self.num_threads,
rse=rse,
stage=self.stage,
)
if len(downloaded) != 1:
raise ValueError(
f"{chunk_did} should be a single file. We found {len(downloaded)}."
)
assert chunk_path == downloaded[0]
# check again
if not os.path.exists(chunk_path):
raise FileNotFoundError(f"No chunk file found at {chunk_path}")
data = strax.load_file(chunk_path, dtype=dtype, compressor=compressor)
if self.remove_heavy and datatype in self.heavy_types:
warn(
f"Removing {chunk_path} after reading since it's heavy data. "
"This is a one-time operation."
)
os.remove(chunk_path)
return data
def _saver(self, dirname, metadata, **kwargs):
raise NotImplementedError(
"Cannot save directly into rucio (yet), upload with admix instead"
)
[docs]@export
class RucioSaver(strax.Saver):
"""TODO: Saves data to rucio if you are the production user."""
def __init__(self, *args, **kwargs):
raise NotImplementedError
[docs]@export
def parse_rucio_did(did: str) -> tuple:
"""Parses a Rucio DID and returns a tuple of (number:int, dtype:str, hash:str)"""
scope, name = did.split(":")
number = int(scope.split("_")[1])
dtype, hsh = name.split("-")
return number, dtype, hsh
def did_to_dirname(did: str):
"""Takes a Rucio dataset DID and returns a dirname like used by strax.FileSystemBackend."""
# make sure it's a DATASET did, not e.g. a FILE
if len(did.split("-")) != 2:
raise RuntimeError(
f"The DID {did} does not seem to be a dataset DID. "
"Is it possible you passed a file DID?"
)
dirname = did.replace(":", "-").replace("xnt_", "")
return dirname
[docs]@export
def key_to_rucio_did(key: strax.DataKey) -> str:
"""Convert a strax.datakey to a rucio did field in rundoc."""
return f"xnt_{key.run_id}:{key.data_type}-{key.lineage_hash}"