Source code for straxen.analyses.holoviews_waveform_display

"""Dynamic holoviews-based waveform display Note imports are inside function, to keep 'import
straxen' free of holoviews."""

import numpy as np
import pandas as pd

import straxen

straxen._BOKEH_X_RANGE = None


@straxen.mini_analysis(requires=["hitlets_nv", "events_nv", "event_positions_nv"])
def plot_nveto_event_display(context, run_id, hitlets_nv, events_nv):
    nvd = straxen.nVETOEventDisplay(events_nv, hitlets_nv, run_id)
    return nvd.plot_event_display()


[docs]def seconds_from(t, t_reference, unit_conversion=int(1e9)): """Helper function which concerts times into relative times in specified unit. :param t: Time :param t_reference: Reference time e.g. start of an event or first peak in event. :param unit_conversion: Conversion factor for time units e.g. 10**3 for micro seconds. """ return (t - t_reference) / unit_conversion
# Custom wheel zoom tool that only zooms in one dimension
[docs]def x_zoom_wheel(): import bokeh.models return bokeh.models.WheelZoomTool(dimensions="width")
@straxen.mini_analysis(requires=["records"], hv_bokeh=True) def hvdisp_plot_pmt_pattern(*, config, records, to_pe, array="bottom"): """Plot a PMT array, with colors showing the intensity of light observed in the time range. :param array: 'top' or 'bottom', array to show. """ import holoviews as hv pmts = straxen.pmt_positions(xenon1t=config["n_tpc_pmts"] < 300) areas = np.bincount( records["channel"], weights=records["area"] * to_pe[records["channel"]], minlength=len(pmts) ) # Which PMTs should we include? m = pmts["array"] == array pmts = pmts[m].copy() pmts["area"] = areas[m] f = 1.08 pmts = hv.Dataset( pmts, kdims=[ hv.Dimension("x", unit="cm", range=(-straxen.tpc_r * f, straxen.tpc_r * f)), hv.Dimension("y", unit="cm", range=(-straxen.tpc_r * f, straxen.tpc_r * f)), hv.Dimension("i", range=(0, config["n_tpc_pmts"]), label="PMT number"), hv.Dimension("area", label="Area", unit="PE"), ], ) pmts = pmts.to(hv.Points, vdims=["area", "i"], group="PMTPattern", label=array.capitalize()) hv.opts.apply_groups( pmts, plot=dict(color_index=2, tools=["hover"], show_grid=False), style=dict(size=17, cmap="plasma"), ) return pmts def _records_to_points(*, records, to_pe, t_reference, config, unit_conversion=int(1e9)): """Return (holoviews.Points, time_stream) corresponding to records.""" import holoviews as hv areas_r = records["area"] * to_pe[records["channel"]] # Create dataframe with record metadata df = pd.DataFrame( dict( area=areas_r, time=seconds_from( records["time"] + records["dt"] * records["length"] // 2, t_reference, unit_conversion=unit_conversion, ), length=records["length"], dt=records["dt"], channel=records["channel"], ) ) rec_points = hv.Points( df, kdims=[ hv.Dimension("time", label="Time [µs]"), hv.Dimension("channel", label="PMT Channel", range=(0, config["n_tpc_pmts"])), ], vdims=[hv.Dimension("area", label="Area [pe]")], ) time_stream = hv.streams.RangeX(source=rec_points) return rec_points, time_stream @straxen.mini_analysis(requires=["records"], hv_bokeh=True) def hvdisp_plot_records_2d( records, to_pe, config, t_reference, time_stream=None, tools=(x_zoom_wheel(), "xpan"), default_tools=("save", "pan", "box_zoom", "save", "reset"), plot_library="bokeh", hooks=(), ): """Plot records in a dynamic 2D histogram of (time, pmt) :param width: Plot width in pixels :param time_stream: holoviews rangex stream to use. If provided, we assume records is already converted to points (which hopefully is what the stream is derived from) :param tools: Tools to be used in the interactive plot. Only works with bokeh as plot library. :param plot_library: Default bokeh, library to be used for the plotting. :param width: With of the record matrix in pixel. :param hooks: Hooks to adjust plot settings. :return: datashader object, records holoview points, RangeX time stream of records. """ shader, records, time_stream = _hvdisp_plot_records_2d( records, to_pe, config, t_reference, time_stream=time_stream, default_tools=default_tools, tools=tools, hooks=hooks, plot_library=plot_library, ) shader = shader.opts(title="Time vs. Channel") return shader def _hvdisp_plot_records_2d( records, to_pe, config, t_reference, event_range=(None, None), time_stream=None, default_tools=(), tools=(), hooks=(), plot_library="bokeh", unit_conversion=10**3, ): import holoviews as hv import holoviews.operation.datashader hv.extension(plot_library) if time_stream is None: # Records are still a dataframe, convert it to points records, time_stream = _records_to_points( records=records, to_pe=to_pe, t_reference=t_reference, unit_conversion=unit_conversion, config=config, ) # Whether to show the toolbar or not: if tools: toolbar = "right" else: toolbar = None # Creating the plot: shader = hv.operation.datashader.dynspread( hv.operation.datashader.datashade( records, dynamic=True, x_range=event_range, y_range=(0, config["n_tpc_pmts"]), streams=[time_stream], ), threshold=0.1, ) hv.opts.apply_groups( shader, plot=dict( aspect=4, responsive="width", hooks=list(hooks), toolbar=toolbar, tools=list(tools), default_tools=list(default_tools), fontsize={"labels": 12}, show_grid=True, ), ) return shader, records, time_stream
[docs]def plot_record_polygons( record_points, center_time=True, scaling=10**-3, ): """Plots record hv.Points as polygons for record matrix. :param record_points: Holoviews Points generated with _records_to_points. :param center_time: If true treats specified times as center times. :param scaling: Scale times scale by this factor. E.g. if times should be in µs use 10**-3. :return: hv.Polygons """ import holoviews as hv df = record_points.data.copy() if not center_time: df.loc[:, "time"] = df.loc[:, "time"] + (df.loc[:, "length"] / 2 * df.loc[:, "dt"]) data = _make_polys(df, scaling=scaling) polys = hv.Polygons(data, kdims=["time", "channel"], vdims=list(data[0].keys())[1:]).opts( color="area", aspect=4, responsive="width", line_width=0, cmap="viridis", ) return polys
def _make_polys(df, scaling=1): """Function which converts the hitlet/record points into polygons for the hitlet/record matrix.""" res = [] for row_i in range(len(df)): data_dict = {} # Kdims are given by time and channel: t, ch = df.loc[row_i, ["time", "channel"]] w = df.loc[row_i, "length"] * df.loc[row_i, "dt"] data_dict[("time", "channel")] = _rectangle(t, ch, width=w * scaling) # Add other parameters to polygons for vdmis: keys = [key for key in df.columns.values if key not in ["time", "channel", "dt"]] for key in keys: data_dict[key] = df.loc[row_i, key] res.append(data_dict) return res def _rectangle(time=0, channel=0, width=1.1, height=1): """Generates polygon box coordinates for record matrix. :param time: Center position of the record in time. :param channel: Center position of PMT channel. E.g channel 0 => 0.5 :param width: Length of the record in µs. param height: Height of the record in "channel"-units. X,Y have to be the center of the polygon. Width and height are the full width and height in data coordinates. """ width = width / 2 height = height / 2 return np.array( [ (time - width, channel - height), (time + width, channel - height), (time + width, channel + height), (time - width, channel + height), ] )
[docs]def get_records_matrix_in_window(polys, x_range, time_slice=10): """Helper function which returns polygons for rendering when x_range is below the specified value. This function has to be applied to polygons e.g.: poly.apply(get_records_matrix_in_window, streams=[time_stream]) :param polys: Holoviews Polygons :param x_range: x_range of the RangeX object. :param time_slice: Size of the time slice in [µs] when records should be drawn. """ if x_range is None: # Needed since x_range is by default not defined when plotting # the first time. return polys.iloc[:0] if (x_range[1] - x_range[0]) < time_slice: # If x_range smaller than specified minimum return polys -> # render polys. inds = _in_window(polys.data, x_range) return polys.iloc[inds] return polys.iloc[:0]
def _in_window(polys, x_range): """Function which checks if a polygon is partially in x_range. :param polys: List of ordered dictionaries containing Polygon data. :param x_range: Range which should be tested. """ res = [] for ind, p in enumerate(polys): # Loop over polys, if partially in window keep index. if np.any((x_range[0] <= p["time"]) & (p["time"] < x_range[1])): res.append(ind) return res @straxen.mini_analysis(requires=["peaks", "peak_basics"], hv_bokeh=True) def hvdisp_plot_peak_waveforms( t_reference, time_range, peaks, width=600, show_largest=None, time_dim=None ): """Plot the sum waveforms of peaks. Holoviews time dimension; will create new one if not provided. :param width: Plot width in pixels :param show_largest: Maximum number of peaks to show :param time_dim: """ import holoviews as hv if show_largest is not None and len(peaks) > show_largest: show_i = np.argsort(peaks["area"])[-show_largest::] peaks = peaks[show_i] curves = [] for p in peaks: # label = {1: 's1', 2: 's2'}.get( # p['type'], 'unknown') color = {1: "b", 2: "g"}.get(p["type"], "k") # It's better to plot amplitude /time than per bin, since # sampling times are now variable y = p["data"][: p["length"]] / p["dt"] t_edges = np.arange(p["length"] + 1, dtype=np.int64) t_edges = t_edges * p["dt"] + p["time"] t_edges = seconds_from(t_edges, t_reference) # Make a 'step' plot. Unlike matplotlib's steps-mid, # this also analyses the final edges correctly t_ = np.zeros(2 * len(y)) y_ = np.zeros(2 * len(y)) t_[0::2] = t_edges[:-1] t_[1::2] = t_edges[1:] y_[0::2] = y y_[1::2] = y if time_dim is None: time_dim = hv.Dimension("time", label="Time", unit="sec") curves.append( hv.Curve( dict(time=t_, amplitude=y_), kdims=time_dim, vdims=hv.Dimension("amplitude", label="Amplitude", unit="PE/ns"), group="PeakSumWaveform", ) ) hv.opts.apply_groups( curves[-1], style=dict(color=color), ) overlay = hv.Overlay(items=curves) hv.opts.apply_groups( overlay, plot=dict(width=width), ) return overlay def _range_plot(f, full_time_range, t_reference, **kwargs): # The **bla is needed to disable some holoviews check # on the arguments... def wrapped(x_range, **kwargzz): if len(kwargzz): raise RuntimeError(f"Passed superfluous kwargs {kwargzz}") if x_range is None: x_range = seconds_from(np.asarray(full_time_range), t_reference) # Deal with strange time ranges -- not sure how these arise? x_range = np.nan_to_num(x_range) if x_range[1] == x_range[0]: x_range[1] += 1 return f( time_range=(t_reference + int(x_range[0] * 1e9), t_reference + int(x_range[1] * 1e9)), t_reference=t_reference, **kwargs, ) return wrapped @straxen.mini_analysis(requires=["records", "peaks", "peak_basics"], hv_bokeh=True) def waveform_display( context, run_id, to_pe, time_range, t_reference, records, peaks, config, width=600, show_largest=None, ): """Plot a waveform overview display". :param width: Plot width in pixels. """ import holoviews as hv records_points, time_stream = _records_to_points( records=records, to_pe=to_pe, t_reference=t_reference, config=config ) time_v_channel = context.hvdisp_plot_records_2d( run_id=run_id, to_pe=to_pe, records=records_points, time_stream=time_stream, time_range=time_range, t_reference=t_reference, # We don't need to cut these further, records we get are already cut # to the plot's maximum range by the mini_analysis logic # and datashader does the online cutting / rebinning / zooming. # This is fortunate, since we omitted 'endtime' from records_points! time_selection="skip", ) array_plot = { array: hv.DynamicMap( _range_plot( context.hvdisp_plot_pmt_pattern, run_id=run_id, to_pe=to_pe, records=records, full_time_range=time_range, t_reference=t_reference, time_selection="touching", array=array, ), streams=[time_stream], ) for array in ("top", "bottom") } peak_wvs = hv.DynamicMap( _range_plot( context.hvdisp_plot_peak_waveforms, run_id=run_id, width=width, full_time_range=time_range, t_reference=t_reference, time_selection="touching", time_dim=records_points.kdims[0], peaks=peaks, show_largest=show_largest, ), streams=[time_stream], ) layout = time_v_channel + peak_wvs + array_plot["top"] + array_plot["bottom"] return layout.cols(2)
[docs]def hook(plot, x_range, debug=False): """Hook to set the same RangeX stream for event display and records matrix, voodoo.... Note: Works only in the following order: 1. Create holoviews 2. hv.render plot 3. set bokeh x_range as holoviews x_range Does not work first with bokeh and then with holoviews. Why? I have no clue.... """ if debug: print("Hook: ", x_range) if not x_range: _hook_get_xrange(plot, debug) else: _hook_set_xrange(plot, x_range, debug)
def _hook_get_xrange(plot, debug): straxen._BOKEH_X_RANGE = plot.handles["x_range"] if debug: print("Get", straxen._BOKEH_X_RANGE) def _hook_set_xrange(plot, x_range, debug): if debug: print("Set", x_range) plot.state.x_range = x_range