Source code for straxen.storage.online_monitor_frontend

from strax import MongoFrontend, exporter
from straxen import uconfig

export, __all__ = exporter()

default_online_collection = "online_monitor"


[docs]@export def get_mongo_uri( user_key="pymongo_user", pwd_key="pymongo_password", url_key="pymongo_url", header="RunDB" ): user = uconfig.get(header, user_key) pwd = uconfig.get(header, pwd_key) url = uconfig.get(header, url_key) return f"mongodb://{user}:{pwd}@{url}"
[docs]@export class OnlineMonitor(MongoFrontend): """Online monitor Frontend for Saving data temporarily to the database.""" def __init__( self, uri=None, take_only=None, database=None, col_name=default_online_collection, readonly=True, *args, **kwargs, ): if take_only is None: raise ValueError( f"Specify which data_types to accept! Otherwise the DataBase will be overloaded" ) if uri is None and readonly: uri = get_mongo_uri() elif uri is None and not readonly: # 'not readonly' means that you want to write. Let's get # your admin credentials: uri = get_mongo_uri( header="rundb_admin", user_key="mongo_rdb_username", pwd_key="mongo_rdb_password", url_key="mongo_rdb_url", ) if database is None: database = uconfig.get("RunDB", "pymongo_database") super().__init__( uri=uri, database=database, take_only=take_only, col_name=col_name, *args, **kwargs ) self.readonly = readonly