import json
import strax
import numbers
import straxen
import inspect
from ast import literal_eval
from typing import Any, Dict, Optional, Mapping, Union
import pandas as pd
from urllib.parse import urlparse, parse_qs
from pydantic.validators import find_validators
from pydantic.config import get_config
from strax.config import OMITTED
from straxen.misc import filter_kwargs
export, __all__ = strax.exporter()
_CACHES: Dict[int, Any] = {}
[docs]@export
def clear_config_caches():
for cache in _CACHES.values():
cache.clear()
[docs]@export
def config_cache_size_mb():
return straxen.total_size(_CACHES) // 1e6
def parse_val(val: str):
"""Attempt to parse a string value as a python literal, falls back to returning just the
original string if cant be parsed."""
try:
val = literal_eval(val)
except ValueError:
pass
except SyntaxError:
pass
return val
[docs]@export
class URLConfig(strax.Config):
"""Dispatch on URL protocol.
unrecognized protocol returns identity inspired by dasks Dispatch and fsspec fs protocols.
"""
_LOOKUP: Dict[str, Any] = {}
_PREPROCESSORS = ()
SCHEME_SEP = "://"
QUERY_SEP = "?"
NAMESPACE_SEP = "."
PLUGIN_ATTR_PREFIX = "plugin."
def __init__(self, cache=0, **kwargs):
"""
:param cache: number of values to keep in cache, if set to True will cache all values
:param **kwargs: additional keyword arguments accepted by strax.Option
"""
self.final_type = OMITTED
super().__init__(**kwargs)
# Ensure backwards compatibility with Option validation
# type of the config value can be different from the fetched value.
if self.type is not OMITTED:
self.final_type = self.type
self.type = OMITTED # do not enforce type on the URL
if cache:
cache_len = 100 if cache is True else int(cache)
cache = straxen.CacheDict(cache_len=cache_len)
_CACHES[id(self)] = cache
@property
def cache(self):
return _CACHES.get(id(self), {})
[docs] @classmethod
def register(cls, protocol, func=None):
"""Register dispatch of `func` on urls starting with protocol name `protocol`"""
def wrapper(func):
if isinstance(protocol, tuple):
for t in protocol:
cls.register(t, func)
return func
if not isinstance(protocol, str):
raise ValueError("Protocol name must be a string.")
if protocol in cls._LOOKUP:
raise ValueError(f"Protocol with name {protocol} already registered.")
cls._LOOKUP[protocol] = func
return func
return wrapper(func) if func is not None else wrapper
[docs] @classmethod
def preprocessor(cls, func=None, precedence=0):
"""Register a new processor to modify the config values before they are used."""
def wrapper(func):
entry = (precedence, func)
if entry in cls._PREPROCESSORS:
raise ValueError(f"This processor is already registered.")
cls._PREPROCESSORS += (entry,)
return func
return wrapper(func) if func is not None else wrapper
[docs] @classmethod
def eval(
cls, protocol: str, arg: Optional[Union[str, tuple]] = None, kwargs: Optional[dict] = None
):
"""Evaluate a URL/AST by recusively dispatching protocols by name with argument arg and
keyword arguments kwargs and return the value.
If protocol does not exist, returnes arg
:param protocol: name of the protocol or a URL
:param arg: argument to pass to protocol, can be another (sub-protocol, arg, kwargs) tuple,
in which case sub-protocol will be evaluated and passed to protocol
:param kwargs: keyword arguments to be passed to the protocol
:return: (Any) The return value of the protocol on these arguments
"""
if protocol is not None and arg is None:
protocol, arg, kwargs = cls.url_to_ast(protocol)
if protocol is None:
return arg
if kwargs is None:
kwargs = {}
meth = cls._LOOKUP[protocol]
if isinstance(arg, tuple):
arg = cls.eval(*arg)
# Just to be on the safe side
kwargs = straxen.filter_kwargs(meth, kwargs)
return meth(arg, **kwargs)
[docs] @classmethod
def split_url_kwargs(cls, url):
"""Split a url into path and kwargs."""
path, _, _ = url.partition(cls.QUERY_SEP)
kwargs = {}
for k, v in parse_qs(urlparse(url).query).items():
# values of query arguments are evaluated as lists
# split logic depending on length
n = len(v)
if not n:
kwargs[k] = None
elif n == 1:
kwargs[k] = parse_val(v[0])
else:
kwargs[k] = list(map(parse_val, v))
return path, kwargs
[docs] @classmethod
def kwarg_from_url(cls, url: str, key: str):
path, kwargs = cls.split_url_kwargs(url)
return kwargs.get(key, None)
[docs] @classmethod
def lookup_value(cls, value, **namespace):
"""Optionally fetch an attribute from namespace if value is a string with cls.NAMESPACE_SEP
in it, the string is split and the first part is used to lookup an object in namespace and
the second part is used to lookup the value in the object.
If the value is not a string or the target object is not in the namesapce, the value is
returned as is.
"""
if isinstance(value, list):
return [cls.lookup_value(v, **namespace) for v in value]
if isinstance(value, str) and cls.NAMESPACE_SEP in value:
name, _, key = value.partition(cls.NAMESPACE_SEP)
if name in namespace:
obj = namespace[name]
if isinstance(obj, Mapping):
value = obj.get(key, value)
else:
value = getattr(obj, key, value)
return value
[docs] @classmethod
def deref_ast(cls, protocol, arg, kwargs, **namespace):
"""Dereference an AST by looking up values in namespace."""
if isinstance(arg, tuple):
arg = cls.deref_ast(*arg, **namespace)
else:
arg = cls.lookup_value(arg, **namespace)
kwargs = {k: cls.lookup_value(v, **namespace) for k, v in kwargs.items()}
return protocol, arg, kwargs
[docs] def validate(
self,
config,
run_id=None, # TODO: will soon be removed
run_defaults=None,
set_defaults=True,
):
"""This method is called by the context on plugin initialization at this stage, the run_id
and context config are already known but the config values are not yet set on the plugin.
Therefore its the perfect place to run any preprocessors on the config values to make any
needed changes before the configs are hashed.
"""
super().validate(config, run_id, run_defaults, set_defaults)
cfg = config[self.name]
sorted_preprocessors = reversed(sorted(self._PREPROCESSORS, key=lambda x: x[0]))
full_kwargs = dict(
name=self.name,
run_id=run_id,
run_defaults=run_defaults,
set_defaults=set_defaults,
)
for _, preprocessor in sorted_preprocessors:
kwargs = filter_kwargs(preprocessor, full_kwargs)
new_cfg = preprocessor(cfg, **kwargs)
cfg = new_cfg if new_cfg is not None else cfg
config[self.name] = cfg
if not isinstance(cfg, str) or self.SCHEME_SEP not in cfg:
# if the value is not a url config it is validated against
# its intended type (final_type)
self.validate_type(cfg)
[docs] def validate_type(self, value):
"""Validate the type of a value against its intended type."""
msg = (
f"Invalid type for option {self.name}. "
f"Expected a {self.final_type} instance, got {type(value)}"
)
if self.final_type is not OMITTED:
# Use pydantic to validate the type
# its validation is more flexible than isinstance
# it will coerce standard equivalent types
cfg = get_config(dict(arbitrary_types_allowed=True))
if isinstance(self.final_type, tuple):
validators = [v for t in self.final_type for v in find_validators(t, config=cfg)]
else:
validators = find_validators(self.final_type, config=cfg)
for validator in validators:
try:
validator(value)
break
except Exception:
pass
else:
raise TypeError(msg)
return value
[docs] def fetch(self, plugin):
"""Override the Config.fetch method this is called when the attribute is accessed from
withing the Plugin instance."""
# first fetch the user-set value
# from the config dictionary
url = super().fetch(plugin)
if not isinstance(url, str):
# if the value is not a string it is evaluated
# as a literal config and returned as is.
return self.validate_type(url)
if self.SCHEME_SEP not in url:
# no protocol in the url so its evaluated
# as string-literal config and returned as is
return self.validate_type(url)
# evaluate the url as AST
protocol, arg, kwargs = self.url_to_ast(url)
# allow run_id to be missing
run_id = getattr(plugin, "run_id", "000000")
# construct a deterministic hash key from AST
key = strax.deterministic_hash((plugin.config, run_id, protocol, arg, kwargs))
# fetch from cache if exists
value = self.cache.get(key, None)
# not in cache, lets fetch it
if value is None:
# resolve any referenced to plugin or config attributes
protocol, arg, kwargs = self.deref_ast(
protocol, arg, kwargs, config=plugin.config, plugin=plugin
)
value = self.eval(protocol, arg, kwargs)
value = self.validate_type(value)
self.cache[key] = value
return value
[docs] @classmethod
def ast_to_url(
cls,
protocol: Union[str, tuple],
arg: Optional[Union[str, tuple]] = None,
kwargs: Optional[dict] = None,
):
"""Convert a protocol abstract syntax tree to a valid URL."""
if isinstance(protocol, tuple):
protocol, arg, kwargs = protocol
if kwargs is None:
kwargs = {}
if protocol is None:
return arg
if isinstance(arg, (list, dict, numbers.Number)) and protocol != "json":
arg = (
"json",
json.dumps(arg),
)
if isinstance(arg, tuple):
arg = cls.ast_to_url(*arg)
if not isinstance(arg, str):
raise TypeError(f"Type {type(arg)} is not supported as an argument.")
arg, extra_kwargs = cls.split_url_kwargs(arg)
kwargs.update(extra_kwargs)
url = f"{protocol}{cls.SCHEME_SEP}{arg}"
url = cls.format_url_kwargs(url, **kwargs)
return url
[docs] @classmethod
def url_to_ast(cls, url, **kwargs):
"""Convert a URL to a protocol abstract syntax tree."""
if not isinstance(url, str):
raise TypeError(f"URL must be a string, got {type(url)}")
if cls.SCHEME_SEP not in url:
# no protocol in the url so its evaluated
# as string-literal config and returned as is
return None, url, {}
# separate the protocol name from the path
protocol, _, path = url.partition(cls.SCHEME_SEP)
# find the corresponding protocol method
meth = cls._LOOKUP.get(protocol, None)
if meth is None:
# unrecognized protocol
# evaluate as string-literal
return None, url, {}
arg, url_kwargs = cls.split_url_kwargs(path)
kwargs.update(url_kwargs)
if cls.SCHEME_SEP in arg:
# url contains a nested protocol
# first parsce sub-protocol
arg = cls.url_to_ast(arg, **kwargs)
# Filter unused kwargs for this method.
# This is done also at the eval level but
# probably better to be safe.
kwargs = straxen.filter_kwargs(meth, kwargs)
# Always sort kwargs for consistent ASTs
kwargs = dict(sorted(kwargs.items()))
return protocol, arg, kwargs
[docs] @classmethod
def are_equal(cls, first, second):
"""Return whether two URLs are equivalent (have equal ASTs)"""
return cls.url_to_ast(first) == cls.url_to_ast(second)
[docs] @classmethod
def protocol_descr(cls):
rows = []
for k, v in cls._LOOKUP.items():
descr = v.__doc__
if descr is not None:
descr = descr.split("\n")[0]
row = {
"name": f"{k}://",
"description": descr,
"signature": str(inspect.signature(v)),
"location": v.__module__,
}
rows.append(row)
return pd.DataFrame(rows)
[docs] @classmethod
def print_protocols(cls):
df = cls.protocol_descr()
if len(df):
print(df)
else:
print("No protocols registered.")
[docs] @classmethod
def preprocessor_descr(cls):
rows = []
for k, v in cls._PREPROCESSORS:
descr = v.__doc__
if descr is not None:
descr = descr.split("\n")[0]
row = {
"precedence": k,
"description": descr,
"signature": str(inspect.signature(v)),
"location": v.__module__,
}
rows.append(row)
return pd.DataFrame(rows)
[docs] @classmethod
def print_preprocessors(cls):
df = cls.preprocessor_descr()
if len(df):
print(df)
else:
print("No Preprocessors registered.")
[docs] @classmethod
def print_summary(cls):
print("=" * 30 + " Protocols " + "=" * 30)
cls.print_protocols()
print("=" * 30 + " Preprocessors " + "=" * 30)
cls.print_preprocessors()
[docs] @classmethod
def evaluate_dry(cls, url: str, **kwargs):
"""Utility function to quickly test and evaluate URL configs, without the initialization of
plugins (so no plugin attributes). plugin attributes can be passed as keyword arguments.
example::
from straxen import URLConfig
url_string='xedocs://electron_drift_velocity?run_id=027000&version=ONLINE'
URLConfig.evaluate_dry(url_string)
# or similarly
url_string='xedocs://electron_drift_velocity?run_id=plugin.run_id&version=ONLINE'
URLConfig.evaluate_dry(url_string, run_id='027000')
Please note that this has to be done outside of the plugin, so any
attributes of the plugin are not yet note to this dry evaluation
of the url-string.
:param url: URL to evaluate, see above for example.
:keyword: any additional kwargs are passed to self.dispatch (see example)
:return: evaluated value of the URL.
"""
url = cls.format_url_kwargs(url, **kwargs)
_, combined_kwargs = cls.split_url_kwargs(url)
for k, v in combined_kwargs.items():
if isinstance(v, str) and cls.PLUGIN_ATTR_PREFIX in v:
raise ValueError(
f"The URL parameter {k} depends on the plugin. "
"You must specify the value for this parameter "
"for this URL to be evaluated correctly. "
f"Try passing {k} as a keyword argument. "
f"e.g.: `URLConfig.evaluate_dry({url}, {k}=SOME_VALUE)`."
)
return cls.eval(url)