Source code for straxen.storage.rucio_local

import glob
import hashlib
import json
import os
import re
import socket
import typing
import warnings

import strax
from bson import json_util
from .rucio_remote import key_to_rucio_did, parse_rucio_did

export, __all__ = strax.exporter()


[docs]@export class RucioLocalFrontend(strax.StorageFrontend): """Storage that loads from rucio by assuming the rucio file naming convention without access to the rucio database. Normally, you don't need this StorageFrontend as it should return the same data as the RunDB frontend """ storage_type = strax.StorageType.LOCAL local_prefixes = { "UC_DALI_USERDISK": "/dali/lgrandi/rucio/", "SDSC_USERDISK": "/expanse/lustre/projects/chi135/shockley/rucio", } local_rses = {"UC_DALI_USERDISK": r".rcc.", "SDSC_USERDISK": r".sdsc."} def __init__(self, path=None, *args, **kwargs): kwargs.setdefault("readonly", True) super().__init__(*args, **kwargs) if path is None: local_rse = self.determine_rse() if local_rse is None: self.path = None self.backends = [] return self.path = self.local_prefixes[local_rse] else: self.path = path self.backends = [RucioLocalBackend(self.path)]
[docs] def determine_rse(self): # check if there is a local rse for the host we are running on hostname = socket.getfqdn() local_rse = None for rse, host_regex in self.local_rses.items(): if re.search(host_regex, hostname): if local_rse is not None: raise ValueError( f"The regex {host_regex} matches two RSEs {rse} and" f" {local_rse}. I'm not sure what to do with that." ) local_rse = rse return local_rse
def _find(self, key: strax.DataKey, write, allow_incomplete, fuzzy_for, fuzzy_for_options): if self.path is None: raise strax.DataNotAvailable did = key_to_rucio_did(key) if allow_incomplete or write: raise RuntimeError( "Allow incomplete/writing is not allowed for " f"{self.__class.__name__} since data might not be " "continuous" ) if self.did_is_local(did): return self.backends[0].__class__.__name__, did if fuzzy_for or fuzzy_for_options: matches_to = self._match_fuzzy(key, fuzzy_for, fuzzy_for_options) if matches_to: return matches_to raise strax.DataNotAvailable
[docs] def did_is_local(self, did): """Determines whether or not a given did is on a local RSE. If there is no local RSE, returns False. :param did: Rucio DID string :return: boolean for whether DID is local or not. """ if self.path is None: raise strax.DataNotAvailable try: md = self._get_backend("RucioLocalBackend").get_metadata(did) except (strax.DataNotAvailable, strax.DataCorrupted, KeyError): return False return self._all_chunk_stored(md, did)
def _all_chunk_stored(self, md: dict, did: str) -> bool: """Check if all the chunks are stored that are claimed in the metadata- file.""" scope, name = did.split(":") for chunk in md.get("chunks", []): if chunk.get("filename"): _did = f"{scope}:{chunk['filename']}" ch_path = rucio_path(self.path, _did) if not os.path.exists(ch_path): return False return True def _match_fuzzy( self, key: strax.DataKey, fuzzy_for: tuple, fuzzy_for_options: tuple, ) -> typing.Optional[tuple]: if self.path is None: return None pattern = os.path.join(self.path, f"xnt_{key.run_id}/*/*/{key.data_type}*metadata.json") mds = glob.glob(pattern) for md in mds: md_dict = read_md(md) if self._matches( md_dict["lineage"], # Convert lineage dict to json like to compare json.loads(json.dumps(key.lineage, sort_keys=True)), fuzzy_for, fuzzy_for_options, ): fuzzy_lineage_hash = md_dict["lineage_hash"] did = f"xnt_{key.run_id}:{key.data_type}-{fuzzy_lineage_hash}" warnings.warn(f"Was asked for {key} returning {md}", UserWarning) if self._all_chunk_stored(md_dict, did): return self.backends[0].__class__.__name__, did return None
[docs]@export class RucioLocalBackend(strax.FileSytemBackend): """Get data from local rucio RSE.""" def __init__(self, rucio_dir, *args, **kwargs): super().__init__(*args, **kwargs) self.rucio_dir = rucio_dir def _get_metadata(self, did: str, **kwargs): scope, name = did.split(":") number, dtype, hsh = parse_rucio_did(did) metadata_json = f"{dtype}-{hsh}-metadata.json" metadata_did = f"{scope}:{metadata_json}" metadata_path = rucio_path(self.rucio_dir, metadata_did) folder = os.path.split(metadata_path)[0] if not os.path.exists(folder): raise strax.DataNotAvailable(f"No folder for metadata at {metadata_path}") if not os.path.exists(metadata_path): raise strax.DataCorrupted(f"Folder exists but no metadata at {metadata_path}") with open(metadata_path, mode="r") as f: return json.loads(f.read()) def _read_chunk(self, did, chunk_info, dtype, compressor): scope, name = did.split(":") did = f"{scope}:{chunk_info['filename']}" fn = rucio_path(self.rucio_dir, did) return strax.load_file(fn, dtype=dtype, compressor=compressor) def _saver(self, **kwargs): raise NotImplementedError( "Cannot save directly into rucio (yet), upload with admix instead" )
def rucio_path(root_dir, did): """Convert target to path according to rucio convention. See the __hash method here: https://github.com/rucio/rucio/blob/1.20.15/lib/rucio/rse/protocols/protocol.py """ scope, filename = did.split(":") # disable bandit rucio_md5 = hashlib.md5(did.encode("utf-8")).hexdigest() # nosec t1 = rucio_md5[0:2] t2 = rucio_md5[2:4] return os.path.join(root_dir, scope, t1, t2, filename) def read_md(path: str) -> dict: with open(path, mode="r") as f: md = json.loads(f.read(), object_hook=json_util.object_hook) return md