Source code for straxen.storage.rucio_remote

import os
import json
from warnings import warn
from typing import Dict

import numpy as np
import strax
from utilix import xent_collection

try:
    import admix
    from rucio.common.exception import DataIdentifierNotFound

    HAVE_ADMIX = True
except (ImportError, AttributeError):
    HAVE_ADMIX = False

export, __all__ = strax.exporter()

__all__.extend(["HAVE_ADMIX"])


[docs]@export class TooMuchDataError(Exception): pass
[docs]@export class RucioRemoteFrontend(strax.StorageFrontend): """Uses the rucio client for the data find.""" storage_type = strax.StorageType.REMOTE local_did_cache = None path = None def __init__(self, download_heavy=False, staging_dir="./strax_data", *args, **kwargs): """ :param download_heavy: option to allow downloading of heavy data through RucioRemoteBackend :param args: Passed to strax.StorageFrontend :param kwargs: Passed to strax.StorageFrontend """ super().__init__(*args, **kwargs) self.readonly = True self.collection = xent_collection() self.backends = [] if HAVE_ADMIX: self.backends = [ RucioRemoteBackend(staging_dir, download_heavy=download_heavy), ] else: self.log.warning( "You passed use_remote=True to rucio fronted, " "but you don't have access to admix/rucio! Using local backed only." )
[docs] def find_several(self, keys, **kwargs): # for performance, dont do find_several with this storage frontend # we basically do the same query we would do in the RunDB plugin return np.zeros_like(keys, dtype=bool).tolist()
def _find(self, key: strax.DataKey, write, allow_incomplete, fuzzy_for, fuzzy_for_options): did = key_to_rucio_did(key) if allow_incomplete or write: raise RuntimeError( "Allow incomplete/writing is not allowed for " f"{self.__class.__name__} since data might not be " "continuous" ) try: rules = admix.rucio.list_rules(did, state="OK") if len(rules): return "RucioRemoteBackend", did except DataIdentifierNotFound: pass raise strax.DataNotAvailable
[docs] def find(self, key: strax.DataKey, write=False, check_broken=False, **kwargs): # Overwrite defaults of super().find() return super().find(key, write, check_broken, **kwargs)
[docs]@export class RucioRemoteBackend(strax.FileSytemBackend): """Get data from remote Rucio RSE.""" # datatypes we don't want to download since they're too heavy heavy_types = ["raw_records", "raw_records_nv", "raw_records_he"] # for caching RSE locations dset_cache: Dict[str, str] = {} def __init__(self, staging_dir, download_heavy=False, **kwargs): """ :param staging_dir: Path (a string) where to save data. Must be a writable location. :param download_heavy: Whether or not to allow downloads of the heaviest data (raw_records*, less aqmon and MV) :param kwargs: Passed to strax.FileSystemBackend """ mess = ( f"You told the rucio backend to download data to {staging_dir}, " "but that path is not writable by your user" ) if os.path.exists(staging_dir): if not os.access(staging_dir, os.W_OK): raise PermissionError(mess) else: try: os.makedirs(staging_dir) except OSError: raise PermissionError(mess) super().__init__(**kwargs) self.staging_dir = staging_dir self.download_heavy = download_heavy def _get_metadata(self, dset_did, **kwargs): if dset_did in self.dset_cache: rse = self.dset_cache[dset_did] else: rses = admix.rucio.get_rses(dset_did) rse = admix.downloader.determine_rse(rses) self.dset_cache[dset_did] = rse metadata_did = f"{dset_did}-metadata.json" downloaded = admix.download(metadata_did, rse=rse, location=self.staging_dir) if len(downloaded) != 1: raise ValueError(f"{metadata_did} should be a single file. We found {len(downloaded)}.") metadata_path = downloaded[0] # check again if not os.path.exists(metadata_path): raise FileNotFoundError(f"No metadata found at {metadata_path}") with open(metadata_path, mode="r") as f: return json.loads(f.read()) def _read_chunk(self, dset_did, chunk_info, dtype, compressor): base_dir = os.path.join(self.staging_dir, did_to_dirname(dset_did)) chunk_file = chunk_info["filename"] chunk_path = os.path.abspath(os.path.join(base_dir, chunk_file)) if not os.path.exists(chunk_path): number, datatype, hsh = parse_rucio_did(dset_did) if datatype in self.heavy_types and not self.download_heavy: error_msg = ( "For space reasons we don't want to have everyone " "downloading raw data. If you know what you're " "doing, pass download_heavy=True to the Rucio " "frontend. If not, check your context and/or ask " "someone if this raw data is needed locally." ) warn(error_msg) raise strax.DataNotAvailable scope, name = dset_did.split(":") chunk_did = f"{scope}:{chunk_file}" if dset_did in self.dset_cache: rse = self.dset_cache[dset_did] else: rses = admix.rucio.get_rses(dset_did) rse = admix.downloader.determine_rse(rses) self.dset_cache[dset_did] = rse downloaded = admix.download(chunk_did, rse=rse, location=self.staging_dir) if len(downloaded) != 1: raise ValueError( f"{chunk_did} should be a single file. We found {len(downloaded)}." ) assert chunk_path == downloaded[0] # check again if not os.path.exists(chunk_path): raise FileNotFoundError(f"No chunk file found at {chunk_path}") return strax.load_file(chunk_path, dtype=dtype, compressor=compressor) def _saver(self, dirname, metadata, **kwargs): raise NotImplementedError( "Cannot save directly into rucio (yet), upload with admix instead" )
[docs]@export class RucioSaver(strax.Saver): """TODO Saves data to rucio if you are the production user.""" def __init__(self, *args, **kwargs): raise NotImplementedError
[docs]@export def parse_rucio_did(did: str) -> tuple: """Parses a Rucio DID and returns a tuple of (number:int, dtype:str, hash:str)""" scope, name = did.split(":") number = int(scope.split("_")[1]) dtype, hsh = name.split("-") return number, dtype, hsh
def did_to_dirname(did: str): """Takes a Rucio dataset DID and returns a dirname like used by strax.FileSystemBackend.""" # make sure it's a DATASET did, not e.g. a FILE if len(did.split("-")) != 2: raise RuntimeError( f"The DID {did} does not seem to be a dataset DID. " "Is it possible you passed a file DID?" ) dirname = did.replace(":", "-").replace("xnt_", "") return dirname
[docs]@export def key_to_rucio_did(key: strax.DataKey) -> str: """Convert a strax.datakey to a rucio did field in rundoc.""" return f"xnt_{key.run_id}:{key.data_type}-{key.lineage_hash}"
[docs]@export class RucioFrontend(RucioRemoteFrontend): def __init__(self, *args, **kwargs): warn("RucioFrontend is deprecated, use RucioRemoteFrontend instead", DeprecationWarning) super().__init__(*args, **kwargs)