Source code for straxen.scada

import warnings

import urllib
import time
from datetime import datetime
from datetime import timedelta
import pytz
import requests
import getpass
from configparser import NoOptionError
import pandas as pd
import numba
import numpy as np

import strax
import straxen

export, __all__ = strax.exporter()


# Fancy tqdm style in notebooks
tqdm = strax.utils.tqdm


[docs]@export class SCADAInterface: def __init__(self, context=None, use_progress_bar=True): """Interface to access the XENONnT slow control data via python. :param context: Context you are using e.g. st. This is needed if you would like to query data via run_ids. :param use_progress_bar: Use a progress bar in the Scada interface """ self.we_are_straxen = False self._token_expire_time = None self._token = None self.pmt_file_found = True try: self.SCLogin_url = straxen.uconfig.get("scada", "sclogin_url") self.SCData_URL = straxen.uconfig.get("scada", "scdata_url") self.SCLastValue_URL = straxen.uconfig.get("scada", "sclastvalue_url") except ValueError as e: raise ValueError( f"Cannot load SCADA information, from your xenon" f" config. SCADAInterface cannot be used." ) from e try: # Load parameters from the database. self.pmt_file = straxen.get_resource("PMTmap_SCADA.json", fmt="json") except FileNotFoundError: warnings.warn('Cannot find PMT map, "find_pmt_names" cannot be used.') self.pmt_file_found = False # Use a tqdm progress bar if requested. If a user does not want # a progress bar, just wrap it by a tuple self._use_progress_bar = use_progress_bar self.context = context self.we_are_straxen = True self.get_new_token()
[docs] def get_scada_values( self, parameters, start=None, end=None, run_id=None, query_type_lab=True, time_selection_kwargs=None, fill_gaps=None, filling_kwargs=None, down_sampling=False, every_nth_value=1, ): """Function which returns XENONnT slow control values for a given set of parameters and time range. The time range can be either defined by a start and end time or via the run_id, target and context. :param parameters: dictionary containing the names of the requested scada-parameters. The keys are used as identifier of the parameters in the returned pandas.DataFrame. :param start: int representing the start time of the interval in ns unix time. :param end: same as start but as end. :param run_id: Id of the run. Can also be specified as a list or tuple of run ids. In this case we will return the time range lasting between the start of the first and endtime of the second run. :param query_type_lab: Mode on how to query data from the historians. Can be either False to get raw data or True (default) to get data which was interpolated by historian. Useful if large time ranges have to be queried. :param time_selection_kwargs: Keyword arguments taken by st.to_absolute_time_range(). Default: {"full_range": True} :param fill_gaps: Decides how to fill gaps in which no data was recorded. Only needed for query_type_lab=False. Can be either None, "interpolation" or "forwardfill".None keeps the gaps (default), "interpolation" uses pandas.interpolate and "forwardfill" pandas.ffill. See https://pandas.pydata.org/docs/ for more information. You can change the filling options of the methods with the filling_kwargs. :param filling_kwargs: Kwargs applied to pandas .ffill() or .interpolate(). Only needed for query_type_lab=False. :param down_sampling: Boolean which indicates whether to donw_sample result or to apply average. The averaging is deactivated in case of interpolated data. Only needed for query_type_lab=False. :param every_nth_value: Defines over how many values we compute the average or the nth sample in case we down sample the data. In case query_type_lab=True every nth second is returned. :return: pandas.DataFrame containing the data of the specified parameters. """ if not filling_kwargs: filling_kwargs = {} if not isinstance(parameters, dict): mes = 'The argument "parameters" has to be specified as a dict.' raise ValueError(mes) start, end, now = self._get_and_check_start_end(run_id, start, end, time_selection_kwargs) _fill_gaps = [None, "None", "interpolation", "forwardfill"] if fill_gaps not in _fill_gaps: raise ValueError( f'Wrong argument for "fill_gaps", must be either {_fill_gaps}.' f' You specified "{fill_gaps}"' ) if not self._token: # User has not asked for a token yet: self._get_token() # Check if token will expire soon, if so renew the token before we query # the parameters: hrs, mins = self._token_expires_in() if hrs == 0 and mins < 30: print("Your token will expire in less than 30 min please get first a new one:") self._get_token() # Now loop over specified parameters and get the values for those. for ind, (k, p) in tqdm( enumerate(parameters.items()), total=len(parameters), desc="Load parameters", disable=not self._use_progress_bar, ): try: temp_df = self._query_single_parameter( start, end, k, p, every_nth_value=every_nth_value, fill_gaps=fill_gaps, filling_kwargs=filling_kwargs, down_sampling=down_sampling, query_type_lab=query_type_lab, ) if ind: m = np.all(df.loc[:, "time"] == temp_df.loc[:, "time"]) # noqa if ind and not m: raise ValueError( "This is odd somehow the time stamps for the query of" f" {p} does not match the previous timestamps." ) except ValueError as e: warnings.warn( f'Was not able to load parameters for "{k}". The reason was: "{e}".' f"Continue without {k}." ) temp_df = pd.DataFrame(columns=(k,)) if ind: df = pd.concat((df, temp_df[k]), axis=1) # noqa else: df = temp_df # Adding timezone information and rename index: df.set_index("time", inplace=True) df = df.tz_localize(tz="UTC") df.index.rename("time UTC", inplace=True) if (end // 10**9) > now.astype(np.int64): df.loc[now:, :] = np.nan return df
def _get_and_check_start_end(self, run_id, start, end, time_selection_kwargs): """Helper function which clusters all time related checks and reduces complexity of get_scada_values.""" if not time_selection_kwargs: time_selection_kwargs = {"full_range": True} if run_id is not None and self.context is not None: # User specified a valid context and run_id, so get the start # and end time for our query: if isinstance(run_id, (list, tuple)): run_id = np.sort(run_id) # Do not trust the user's start, _ = self.context.to_absolute_time_range(run_id[0], **time_selection_kwargs) _, end = self.context.to_absolute_time_range(run_id[-1], **time_selection_kwargs) else: start, end = self.context.to_absolute_time_range(run_id, **time_selection_kwargs) elif run_id: mes = ( "You are trying to query slow control data via run_ids" " but you have not specified the context you are " "working with. Please set the context either via " ".context = YOURCONTEXT, or when initializing the " "interface." ) raise ValueError(mes) if not np.all((start, end)): # User has not specified any valid start and end time mes = ( "You have to specify either a run_id and context." " E.g. call get_scada_values(parameters, run_id=run)" " or you have to specify a valid start and end time " "in utc unix time ns." ) raise ValueError(mes) if end < start: raise ValueError("You specified an endtime which is smaller than the start time.") if (np.log10(start) < 18) or (np.log10(end) < 18): raise ValueError( "Expected the time to be in ns unix time (number with 19 digits or more)." " Have you specified the time maybe in seconds or micro-seconds?" ) now = np.datetime64("now") if (end // 10**9) > now.astype(np.int64): mes = ( "You are asking for an endtime which is in the future," " I may be written by a physicist, but I am neither self-" "aware nor can I predict the future like they can. You " f"asked for the endtime: {end // 10**9} but current utc " f"time is {now.astype(np.int64)}. I will return for the values for the " "corresponding times as nans instead." ) warnings.warn(mes) # Chop start/end time if precision is higher then seconds level. start = (start // 10**9) * 10**9 end = (end // 10**9) * 10**9 return int(start), int(end), now def _query_single_parameter( self, start, end, parameter_key, parameter_name, fill_gaps, filling_kwargs, down_sampling, query_type_lab=False, every_nth_value=1, ): """Function to query the values of a single parameter from SCData. :param start: Start time in ns unix time :param end: End time in ns unix time :param parameter_key: Key to identify queried parameter in the DataFrame :param parameter_name: Parameter name in Scada/historian database :param fill_gaps: Decides how to fill gaps in which no data was recorded. Only needed for query_type_lab=False. Can be either None, `'interpolation'` or `'forwardfill'`. None keeps the gaps (default), `'interpolation'` uses `pandas.interpolate`, and `'forwardfill'` uses `pandas.ffill`. See https://pandas.pydata.org/docs/ for more information. You can change the filling options of the methods with the `filling_kwargs`. :param filling_kwargs: Keyword arguments forwarded to pandas.ffill or pandas.interpolate. :param every_nth_value: Defines over how many values we compute the average or the nthed sample in case we down sample the data. :return: DataFrame with a time and parameter_key column. """ if every_nth_value < 1: mes = ( "SCADA takes only values every second. Cannot ask for a" " higher sampling rate than one value per second. However" f" you asked for one value every {every_nth_value} seconds." ) raise ValueError(mes) if not isinstance(every_nth_value, int): raise ValueError('"value_every_seconds" must be an int!') # First we have to create an array where we can fill values: if query_type_lab: # In the lab case we get interpolated data without nans so the df can be set # accordingly. seconds = np.arange(start, end + 1, 10**9 * every_nth_value) else: seconds = np.arange(start, end + 1, 10**9) # +1 to make sure endtime is included df = pd.DataFrame() df.loc[:, "time"] = seconds df["time"] = df["time"].astype("<M8[ns]") df.set_index("time", inplace=True) # Init parameter query: query = {"name": parameter_name} if not query_type_lab: # Check if first value is in requested range: # This is only needed in case of raw data since here it can # happen that the user queries a range without any data. temp_df = self._query( query, self.SCLastValue_URL, end=(start // 10**9) + 1 ) # +1 since it is end before exclusive # Store value as first value in our df df.loc[df.index.values[0], parameter_key] = temp_df["value"][0] offset = 1 else: offset = 0 one_year_in_ns = int(24 * 3600 * 360 * 10**9) starts = np.arange(start + offset, end, one_year_in_ns) if len(starts): ends = starts + one_year_in_ns ends = np.clip(ends, a_max=end, a_min=0) else: ends = np.array([end]) starts = np.array([start]) for start_query, end_query in zip(starts, ends): self._query_data_per_year( parameter_key, query, start_query, end_query, query_type_lab, every_nth_value, df, ) # Let user decided whether to ffill, interpolate or keep gaps: if fill_gaps == "interpolation": df.interpolate(**filling_kwargs, inplace=True) if fill_gaps == "forwardfill": # Now fill values in between like Scada would do: df.ffill(**filling_kwargs, inplace=True) # Step 4. Down-sample data if asked for: df.reset_index(inplace=True) if every_nth_value > 1 and not query_type_lab: # If the user asks for down sampling do so, but only for # raw_data, lab query type is already interpolated and down sampled # by the historian. if down_sampling: df = df[::every_nth_value] else: nt, nv = _average_scada( df["time"].astype(np.int64).values, df[parameter_key].values, every_nth_value ) df = pd.DataFrame() df["time"] = nt.astype("<M8[ns]") df[parameter_key] = nv return df def _query_data_per_year( self, parameter_name, query, start, end, query_type_lab, seconds_interval, result_dataframe, ): """The SCADA API cannot handle query ranges lasting longer than one year. So in case the user specifies a longer time range we have to chunk the time requests in steps of years. Updates the resulting dataframe in place. """ ntries = 0 # This corresponds to a bit more than one year assuming 1 value per second: max_tries = 1000 while ntries < max_tries: # Although we step the query already in years we also have to # do the query in a whole loop as we can only query 35000 # data points at any given time, however it is not possible # to know the sampling rate of the queried parameter apriori. temp_df = self._query( query, self.SCData_URL, start=(start // 10**9), end=(end // 10**9), query_type_lab=query_type_lab, seconds_interval=seconds_interval, raise_error_message=False, # No valid value in query range... ) # +1 since it is end before exclusive if temp_df.empty: # In case WebInterface does not return any data, e.g. if query range too small break times = (temp_df["timestampseconds"].values * 10**9).astype("<M8[ns]") result_dataframe.loc[times, parameter_name] = temp_df.loc[:, "value"].values endtime = temp_df["timestampseconds"].values[-1].astype(np.int64) * 10**9 start = endtime # Next query should start at the last time seen. ntries += 1 if not (len(temp_df) == 35000 and endtime != end // 10**9): # Max query are 35000 values, if end is reached the # length of the dataframe is either smaller or the last # time value is equivalent to queried range. break return result_dataframe def _query( self, query, api, start=None, end=None, query_type_lab=False, seconds_interval=None, raise_error_message=True, ): """Helper to reduce code. Asks for data and returns result. Raises error if api returns error. """ if start: query["StartDateUnix"] = start if end: query["EndDateUnix"] = end if query_type_lab: query["QueryType"] = "lab" query["interval"] = seconds_interval else: query["QueryType"] = "rawbytime" query.pop("interval", None) # Interval only works with lab # Configure query url query_url = urllib.parse.urlencode(query) self._query_url = api + query_url # Security check if url is a real url and not something like file:// if not self._query_url.lower().startswith("https"): raise ValueError( f"The query URL should start with https! Current URL: {self._query_url}" ) response = requests.get(self._query_url, headers={"Authorization": self._token}) if response.status_code == 401: # Invalid token so we have to get a new one, # this should actually never happen, but you never know... print("Your token is invalid. It may have expired please get a new one:") # If the user puts in the wrong credentials the query will fail. self._get_token() response = requests.get(self._query_url, headers={"Authorization": self._token}) if response.status_code != 200: # Check if we get any status code different from 200 == ok # If yes raise the corresponding status: response.raise_for_status() # Read database response and check if query was valid: values = response.json() temp_df = pd.DataFrame(columns=("timestampseconds", "value")) if isinstance(values, dict) and raise_error_message: # Not valid, why: query_status = values["status"] query_message = values["message"] raise ValueError( "SCADAapi has not returned values for the " f"parameter \"{query['name']}\". It returned the " f'status "{query_status}" with the message "{query_message}".' ) if isinstance(values, list): # Valid so return dataframe temp_df = pd.DataFrame(values) return temp_df
[docs] def find_scada_parameter(self): raise NotImplementedError("Feature not implemented yet.")
[docs] def find_pmt_names(self, pmts=None, hv=True, current=False): """Function which returns a list of PMT parameter names to be called in SCADAInterface.get_scada_values. The names refer to the high voltage of the PMTs, not their current. Thanks to Hagar and Giovanni who provided the file. :param pmts: Optional parameter to specify which PMT parameters should be returned. Can be either a list or array of channels or just a single one. :param hv: Bool if true names of high voltage channels are returned. :param current: Bool if true names for the current channels are returned. :return: dictionary containing short names as keys and scada parameter names as values. """ if not self.pmt_file_found: raise ValueError( "json file containing the PMT information was not found. " '"find_pmt_names" cannot be used.' ) if not (hv or current): raise ValueError('Either one "hv" or "current" must be true.') if isinstance(pmts, np.ndarray): # convert to a simple list since otherwise we get ambiguous errors pmts = list(pmts) if not hasattr(pmts, "__iter__"): # If single PMT convert it to itterable pmts = [pmts] # Getting parameter names for all PMTs: # The file contains the names for the HV channels if pmts: pmts_v = {k: v for k, v in self.pmt_file.items() if int(k[3:]) in pmts} else: pmts_v = self.pmt_file res = {} # Now get all relevant names: for key, value in pmts_v.items(): if hv: res[key + "_HV"] = value if current: res[key + "_I"] = value[:-4] + "IMON" return res
[docs] def get_new_token(self): """Function to renew the token of the current session.""" self._get_token()
def _get_token(self): """Function which asks for user credentials to receive a personalized security token. The token is required to query any data from the slow control historians. """ if not self.we_are_straxen: username, password = self._ask_for_credentials() else: try: username = straxen.uconfig.get("scada", "straxen_username") password = straxen.uconfig.get("scada", "straxen_password") except (AttributeError, NoOptionError): # If section does not exist Fall back to user credentials username, password = self._ask_for_credentials() login_query = { "username": username, "password": password, } res = requests.post(self.SCLogin_url, data=login_query) res = res.json() if "token" not in res.keys(): raise ValueError( "Cannot get security token from Slow Control web API. " f"API returned the following reason: {res['Message']}" ) self._token = res["token"] toke_start_time = datetime.now(tz=pytz.timezone("utc")) hours_added = timedelta(hours=3) self._token_expire_time = toke_start_time + hours_added print( "Received token, the token is valid for 3 hrs.\n", f'from {toke_start_time.strftime("%d.%m. %H:%M:%S")} UTC\n', f'till {self._token_expire_time.strftime("%d.%m. %H:%M:%S")} UTC\n' "We will automatically refresh the token for you :). " "Have a nice day and a fruitful analysis!", ) @staticmethod def _ask_for_credentials(): print("Please, enter your Xe1TViewer/SCADA credentials:") time.sleep(1) username = getpass.getpass("Xenon Username: ") password = getpass.getpass("Xenon Password: ") return username, password
[docs] def token_expires_in(self): """Function which displays how long until the current token expires.""" if self._token_expire_time: print( "The current token expires at" f" {self._token_expire_time.strftime('%d.%m. %H:%M:%S')} UTC" ) hrs, mins = self._token_expires_in() print(f"Which is in {hrs} h and {mins} min.") else: raise ValueError( 'You do not have any valid token yet. Please call "get_new_token" first".' )
def _token_expires_in(self): """Computes hrs and minutes until token expires.""" now = datetime.now(tz=pytz.timezone("utc")) dt = (self._token_expire_time - now).seconds # time delta in seconds hrs = dt // 3600 mins = dt % 3600 // 60 return hrs, mins
[docs]@export def convert_time_zone(df, tz): """Function which converts the current time zone of a given pd.DataFrame into another timezone. :param df: pandas.DataFrame containing the Data. Index must be a datetime object with time zone information. :param tz: str representing the timezone the index should be converted to. See the notes for more information. :return: pandas.DataFrame with converted time index. Notes: 1. ) The input pandas.DataFrame must be indexed via datetime objects which are timezone aware. 2.) You can find a complete list of available timezones via: ``` import pytz pytz.all_timezones ``` You can also specify 'strax' as timezone which will convert the time index into a 'strax time' equivalent. The default timezone of strax is UTC. """ if tz == "strax": df = df.tz_convert(tz="UTC") df.index = df.index.astype(np.int64) df.index.rename(f"time strax", inplace=True) else: df = df.tz_convert(tz=tz) df.index.rename(f"time {tz}", inplace=True) return df
@numba.njit def _average_scada(times, values, nvalues): """Function which down samples scada values. :param times: Unix times of the data points :param values: Corresponding sensor value :param nvalues: Number of samples we average over :return: new time values and """ if len(times) % nvalues: n_samples = (len(times) // nvalues) - 1 else: n_samples = len(times) // nvalues res = np.zeros(n_samples, dtype=np.float32) new_times = np.zeros(n_samples, dtype=np.int64) for ind in range(n_samples): res[ind] = np.mean(values[ind * nvalues : (ind + 1) * nvalues]) new_times[ind] = np.mean(times[ind * nvalues : (ind + 1) * nvalues]) return new_times, res