import warnings
import bokeh
import bokeh.plotting as bklt
import numba
import numpy as np
import strax
import straxen
from straxen.analyses.holoviews_waveform_display import (
_hvdisp_plot_records_2d,
hook,
plot_record_polygons,
get_records_matrix_in_window,
) # noqa
# Default legend, unknow, S1 and S2
LEGENDS = ("Unknown", "S1", "S2")
straxen._BOKEH_CONFIGURED_NOTEBOOK = False
@straxen.mini_analysis(
requires=("event_basics", "peaks", "peak_basics", "peak_positions"), warn_beyond_sec=0.05
)
def event_display_interactive(
events,
peaks,
to_pe,
run_id,
context,
bottom_pmt_array=True,
only_main_peaks=False,
only_peak_detail_in_wf=False,
plot_all_pmts=False,
plot_record_matrix=False,
plot_records_threshold=10,
xenon1t=False,
colors=("gray", "blue", "green"),
yscale=("linear", "linear", "linear"),
log=True,
):
"""Interactive event display for XENONnT. Plots detailed main/alt S1/S2, bottom and top PMT hit
pattern as well as all other peaks in a given event.
:param bottom_pmt_array: If true plots bottom PMT array hit-pattern.
:param only_main_peaks: If true plots only main peaks into detail
plots as well as PMT arrays.
:param only_peak_detail_in_wf: Only plots main/alt S1/S2 into
waveform. Only plot main peaks if only_main_peaks is true.
:param plot_all_pmts: Bool if True, colors switched off PMTs instead
of showing them in gray, useful for graphs shown in talks.
:param plot_record_matrix: If true record matrix is plotted below.
waveform.
:param plot_records_threshold: Threshold at which zoom level to display
record matrix as polygons. Larger values may lead to longer
render times since more polygons are shown.
:param xenon1t: Flag to use event display with 1T data.
:param colors: Colors to be used for peaks. Order is as peak types,
0 = Unknown, 1 = S1, 2 = S2. Can be any colors accepted by bokeh.
:param yscale: Defines scale for main/alt S1 == 0, main/alt S2 == 1,
waveform plot == 2. Please note, that the log scale can lead to funny
glyph renders for small values.
:param log: If true color sclae is used for hitpattern plots.
example::
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:80% !important; }</style>"))
import bokeh.plotting as bklt
fig = st.event_display_interactive(
run_id,
time_range=(event['time'],
event['endtime'])
)
bklt.show(fig)
:raises:
Raises an error if the user queries a time range which contains
more than a single event.
:return: bokeh.plotting.figure instance.
"""
st = context
if len(yscale) != 3:
raise ValueError(f'"yscale" needs three entries, but you passed {len(yscale)}.')
if not hasattr(st, "_BOKEH_CONFIGURED_NOTEBOOK"):
st._BOKEH_CONFIGURED_NOTEBOOK = True
# Configure show to show notebook:
from bokeh.io import output_notebook
output_notebook()
if len(events) != 1:
raise ValueError(
"The time range you specified contains more or"
" less than a single event. The event display "
" only works with individual events for now."
)
if peaks.shape[0] == 0:
raise ValueError("Found an event without peaks this should not had have happened.")
# Select main/alt S1/S2s based on time and endtime in event:
m_other_peaks = np.ones(len(peaks), dtype=np.bool_) # To select non-event peaks
endtime = strax.endtime(peaks)
signal = {}
if only_main_peaks:
s1_keys = ["s1"]
s2_keys = ["s2"]
labels = {"s1": "S1", "s2": "S2"}
else:
s1_keys = ["s1", "alt_s1"]
s2_keys = ["s2", "alt_s2"]
labels = {"s1": "MS1", "alt_s1": "AS1", "s2": "MS2", "alt_s2": "AS2"}
for s_x in labels.keys():
# Loop over Main/Alt Sx and get store S1/S2 Main/Alt in signals,
# store information about other peaks as "m_other_peaks"
m = (peaks["time"] == events[f"{s_x}_time"]) & (endtime == events[f"{s_x}_endtime"])
signal[s_x] = peaks[m]
m_other_peaks &= ~m
# Detail plots for main/alt S1/S2:
fig_s1, fig_s2 = plot_detail_plot_s1_s2(
signal,
s1_keys,
s2_keys,
labels,
colors,
yscale[:2],
)
# PMT arrays:
if not only_main_peaks:
# Plot all keys into both arrays:
top_array_keys = s2_keys + s1_keys
bottom_array_keys = s1_keys + s2_keys
else:
top_array_keys = s2_keys
bottom_array_keys = s1_keys
fig_top, fig_bottom = plot_pmt_arrays_and_positions(
top_array_keys,
bottom_array_keys,
signal,
to_pe,
labels,
plot_all_pmts,
xenon1t=xenon1t,
log=log,
)
m_other_s2 = m_other_peaks & (peaks["type"] == 2)
if np.any(m_other_s2) and not only_main_peaks:
# Now we have to add the positions of all the other S2 to the top pmt array
# if not only main peaks.
fig_top, plot = plot_posS2s(
peaks[m_other_s2], label="OS2s", fig=fig_top, s2_type_style_id=2
)
plot.visible = False
# Main waveform plot:
if only_peak_detail_in_wf:
# If specified by the user only plot main/alt S1/S2
peaks = peaks[~m_other_peaks]
waveform = plot_event(peaks, signal, labels, events[0], colors, yscale[-1])
# Create tile:
title = _make_event_title(events[0], run_id)
# Put everything together:
if bottom_pmt_array:
upper_row = [fig_s1, fig_s2, fig_top, fig_bottom]
else:
upper_row = [fig_s1, fig_s2, fig_top]
upper_row = bokeh.layouts.Row(
children=upper_row,
sizing_mode="scale_width",
)
plots = bokeh.layouts.gridplot(
children=[upper_row, waveform],
sizing_mode="scale_width",
ncols=1,
merge_tools=True,
toolbar_location="above",
)
event_display = bokeh.layouts.Column(
children=[title, plots],
sizing_mode="scale_width",
max_width=1600,
)
# Add record matrix if asked:
if plot_record_matrix:
if st.is_stored(run_id, "records"):
# Check if records can be found and load:
r = st.get_array(
run_id, "records", time_range=(events[0]["time"], events[0]["endtime"])
)
elif st.is_stored(run_id, "raw_records"):
warnings.warn(
f"Cannot find records for {run_id}, making them from raw_records instead."
)
p = st.get_single_plugin(run_id, "records")
r = st.get_array(
run_id, "raw_records", time_range=(events[0]["time"], events[0]["endtime"])
)
r = p.compute(r, events[0]["time"], events[0]["endtime"])["records"]
else:
warnings.warn(
f"Can neither find records nor raw_records for run {run_id}, proceed without record"
" matrix."
)
plot_record_matrix = False
if plot_record_matrix:
straxen._BOKEH_X_RANGE = None
# First get hook to for x_range:
x_range_hook = lambda plot, element: hook(plot, x_range=straxen._BOKEH_X_RANGE, debug=False)
# Create datashader plot:
wf, record_points, time_stream = _hvdisp_plot_records_2d(
records=r,
to_pe=to_pe,
t_reference=peaks[0]["time"],
event_range=(waveform.x_range.start, waveform.x_range.end),
config=st.config,
hooks=[x_range_hook],
tools=[],
)
# Create record polygons:
polys = plot_record_polygons(record_points)
records_in_window = polys.apply(
get_records_matrix_in_window, streams=[time_stream], time_slice=plot_records_threshold
)
# Render plot to initialize x_range:
import holoviews as hv
import panel
_ = hv.render(wf)
# Set x-range of event plot:
bokeh_set_x_range(waveform, straxen._BOKEH_X_RANGE, debug=False)
event_display = panel.Column(
event_display,
wf * records_in_window,
sizing_mode="scale_width",
)
return event_display
[docs]def plot_detail_plot_s1_s2(signal, s1_keys, s2_keys, labels, colors, yscale=("linear", "linear")):
"""Function to plot the main/alt S1/S2 peak details.
:param signal: Dictionary containing the peak information.
:param s1_keys: S1 keys to be plotted e.g. with and without alt S1
:param s2_keys: Same but for S2
:param labels: Labels to be used for Peaks
:param colors: Colors to be used
:param yscale: Tuple with axis scale type.
:return: S1 and S2 bokeh figure.
"""
# First we create figure then we loop over figures and plots and
# add drawings:
fig_s1 = straxen.bokeh_utils.default_fig(
title="Main/Alt S1",
y_axis_type=yscale[0],
)
fig_s2 = straxen.bokeh_utils.default_fig(
title="Main/Alt S2",
y_axis_type=yscale[1],
)
for fig, peak_types in zip([fig_s1, fig_s2], (s1_keys, s2_keys)):
# Loop over fig and corresponding peak keys
for peak_type in peak_types:
if "s2" in peak_type:
# If S2 use µs as units
time_scalar = 1000 # ns
unit = "µs"
else:
time_scalar = 1 # ns
unit = "ns"
if signal[peak_type].shape[0]:
# If signal exists, plot:
fig, plot = plot_peak_detail(
signal[peak_type],
time_scalar=time_scalar,
label=labels[peak_type],
unit=unit,
fig=fig,
colors=colors,
)
if "alt" in peak_type:
# Not main S1/S2, so make peak invisible
plot.visible = False
return fig_s1, fig_s2
[docs]def plot_pmt_arrays_and_positions(
top_array_keys, bottom_array_keys, signal, to_pe, labels, plot_all_pmts, xenon1t=False, log=True
):
"""Function which plots the Top and Bottom PMT array.
:return: fig_top, fig_bottom
"""
# Same logic as for detailed Peaks, first make figures
# then loop over figures and data and populate figures with plots
fig_top = straxen.bokeh_utils.default_fig(title="Top array")
fig_bottom = straxen.bokeh_utils.default_fig(title="Bottom array")
for pmt_array_type, fig, peak_types in zip(
["top", "bottom"], [fig_top, fig_bottom], [top_array_keys, bottom_array_keys]
):
for ind, k in enumerate(peak_types):
# Loop over peaks enumerate them since we plot all Peaks
# Main/ALt S1/S2 into the PMT array, but only the first one
# Should be visible.
if not signal[k].shape[0]:
# alt S1/S2 does not exist so go to next.
continue
fig, plot, _ = plot_pmt_array(
signal[k][0],
pmt_array_type,
to_pe,
plot_all_pmts=plot_all_pmts,
label=labels[k],
xenon1t=xenon1t,
fig=fig,
log=log,
)
if ind:
# Not main S1 or S2
plot.visible = False
if pmt_array_type == "top" and "s2" in k:
# In case of the top PMT array we also have to plot the S2 positions:
fig, plot = plot_posS2s(
signal[k][0], label=labels[k], fig=fig, s2_type_style_id=ind
)
if ind:
# Not main S2
plot.visible = False
return fig_top, fig_bottom
[docs]def plot_event(peaks, signal, labels, event, colors, yscale="linear"):
"""Wrapper for plot peaks to highlight main/alt. S1/S2.
:param peaks: Peaks in event
:param signal: Dictionary containing main/alt. S1/S2
:param labels: dict with labels to be used
:param event: Event to set correctly x-ranges.
:param colors: Colors to be used for unknown, s1 and s2 signals.
:param yscale: string of yscale type.
:return: bokeh.plotting.figure instance
"""
waveform = plot_peaks(peaks, time_scalar=1000, colors=colors, yscale=yscale)
# Highlight main and alternate S1/S2:
start = peaks[0]["time"]
end = strax.endtime(peaks)[-1]
# Workaround did not manage to scale via pixels...
ymax = np.max((peaks["data"].T / peaks["dt"]).T)
ymax -= 0.1 * ymax
for s, p in signal.items():
if p.shape[0]:
pos = (p[0]["center_time"] - start) / 1000
main = bokeh.models.Span(
location=pos,
dimension="height",
line_alpha=0.6,
)
vline_label = bokeh.models.Label(
x=pos,
y=ymax,
angle=np.pi / 2,
text=labels[s],
)
if "alt" in s:
main.line_dash = "dotted"
else:
main.line_dash = "dashed"
waveform.add_layout(main)
waveform.add_layout(vline_label)
# Get some meaningful x-range limit to 10% left and right extending
# beyond first last peak, clip at event boundary.
length = (end - start) / 10**3
waveform.x_range.start = max(-0.1 * length, (event["time"] - start) / 10**3)
waveform.x_range.end = min(1.1 * length, (event["endtime"] - start) / 10**3)
return waveform
[docs]def plot_peak_detail(
peak,
time_scalar=1,
label="",
unit="ns",
colors=("gray", "blue", "green"),
fig=None,
):
"""Function which makes a detailed plot for the given peak. As in the main/alt S1/S2 plots of
the event display.
:param peak: Peak to be plotted.
:param time_scalar: Factor to rescale the time from ns to other scale. E.g. =1000 scales to µs.
:param label: Label to be used in the plot legend.
:param unit: Time unit of the plotted peak.
:param colors: Colors to be used for unknown, s1 and s2 peaks.
:param fig: Instance of bokeh.plotting.figure if None one will be created via
straxen.bokeh.utils.default_figure().
:return: Instance of bokeh.plotting.figure
"""
if not peak.shape:
peak = np.array([peak])
if peak.shape[0] != 1:
raise ValueError(
"Cannot plot the peak details for more than one "
"peak. Please make sure peaks has the shape (1,)!"
)
p_type = peak[0]["type"]
if not fig:
fig = straxen.bokeh_utils.default_fig(title=f"Main/Alt S{p_type}")
tt = straxen.bokeh_utils.peak_tool_tip(p_type)
tt = [v for k, v in tt.items() if k not in ["time_static", "center_time", "endtime"]]
fig.add_tools(bokeh.models.HoverTool(name=label, tooltips=tt))
source = straxen.bokeh_utils.get_peaks_source(
peak,
relative_start=peak[0]["time"],
time_scaler=time_scalar,
keep_amplitude_per_sample=False,
)
patches = fig.patches(
source=source,
legend_label=label,
fill_color=colors[p_type],
fill_alpha=0.2,
line_color=colors[p_type],
line_width=0.5,
name=label,
)
fig.xaxis.axis_label = f"Time [{unit}]"
fig.xaxis.axis_label_text_font_size = "12pt"
fig.yaxis.axis_label = "Amplitude [pe/ns]"
fig.yaxis.axis_label_text_font_size = "12pt"
fig.legend.location = "top_right"
fig.legend.click_policy = "hide"
if label:
fig.legend.visible = True
else:
fig.legend.visible = False
return fig, patches
[docs]def plot_peaks(peaks, time_scalar=1, fig=None, colors=("gray", "blue", "green"), yscale="linear"):
"""Function which plots a list/array of peaks relative to the first one.
:param peaks: Peaks to be plotted.
:param time_scalar: Factor to rescale the time from ns to other scale. E.g. =1000 scales to µs.
:param colors: Colors to be used for unknown, s1 and s2 signals
:param yscale: yscale type can be "linear" or "log"
:param fig: Instance of bokeh.plotting.figure if None one will be created via
straxen.bokeh.utils.default_figure().
:return: bokeh.plotting.figure instance.
"""
if not fig:
fig = straxen.bokeh_utils.default_fig(width=1600, height=400, y_axis_type=yscale)
for i in range(0, 3):
_ind = np.where(peaks["type"] == i)[0]
if not len(_ind):
continue
source = straxen.bokeh_utils.get_peaks_source(
peaks[_ind],
relative_start=peaks[0]["time"],
time_scaler=time_scalar,
keep_amplitude_per_sample=False,
)
fig.patches(
source=source,
fill_color=colors[i],
fill_alpha=0.2,
line_color=colors[i],
line_width=0.5,
legend_label=LEGENDS[i],
name=LEGENDS[i],
)
tt = straxen.bokeh_utils.peak_tool_tip(i)
tt = [v for k, v in tt.items() if k != "time_dynamic"]
fig.add_tools(bokeh.models.HoverTool(name=LEGENDS[i], tooltips=tt))
fig.add_tools(bokeh.models.WheelZoomTool(dimensions="width", name="wheel"))
fig.toolbar.active_scroll = [t for t in fig.tools if t.name == "wheel"][0]
fig.xaxis.axis_label = "Time [µs]"
fig.xaxis.axis_label_text_font_size = "12pt"
fig.yaxis.axis_label = "Amplitude [pe/ns]"
fig.yaxis.axis_label_text_font_size = "12pt"
fig.legend.location = "top_left"
fig.legend.click_policy = "hide"
return fig
[docs]def plot_pmt_array(
peak,
array_type,
to_pe,
plot_all_pmts=False,
log=False,
xenon1t=False,
fig=None,
label="",
):
"""Plots top or bottom PMT array for given peak.
:param peak: Peak for which the hit pattern should be plotted.
:param array_type: String which specifies if "top" or "bottom" PMT array should be plotted
:param to_pe: PMT gains.
:param log: If true use a log-scale for the color scale.
:param plot_all_pmts: If True colors all PMTs instead of showing swtiched off PMTs as gray dots.
:param xenon1t: If True plots 1T array.
:param fig: Instance of bokeh.plotting.figure if None one will be created via
straxen.bokeh.utils.default_figure().
:param label: Label of the peak which should be used for the plot legend
:return: Tuple containing a bokeh figure, glyph and transform instance.
"""
if peak.shape:
raise ValueError("Can plot PMT array only for a single peak at a time.")
tool_tip = [
("Plot", "$name"),
("Channel", "@pmt"),
("X-Position [cm]", "$x"),
("Y-Position [cm]", "$y"),
("area [pe]", "@area"),
]
array = ("top", "bottom")
if array_type not in array:
raise ValueError('"array_type" must be either top or bottom.')
if not fig:
fig = straxen.bokeh_utils.default_fig(title=f"{array_type} array")
# Creating TPC axis and title
fig = _plot_tpc(fig)
# Plotting PMTs:
pmts = straxen.pmt_positions(xenon1t)
if plot_all_pmts:
mask_pmts = np.zeros(len(pmts), dtype=np.bool_)
else:
mask_pmts = to_pe == 0
pmts_on = pmts[~mask_pmts]
pmts_on = pmts_on[pmts_on["array"] == array_type]
if np.any(mask_pmts):
pmts_off = pmts[mask_pmts]
pmts_off = pmts_off[pmts_off["array"] == array_type]
fig = _plot_off_pmts(pmts_off, fig)
area_per_channel = peak["area_per_channel"][pmts_on["i"]]
if log:
area_plot = np.log10(area_per_channel)
# Manually set infs to zero since cmap cannot handle it.
area_plot = np.where(area_plot == -np.inf, 0, area_plot)
else:
area_plot = area_per_channel
mapper = bokeh.transform.linear_cmap(
field_name="area_plot", palette="Viridis256", low=min(area_plot), high=max(area_plot)
)
source_on = bklt.ColumnDataSource(
data={
"x": pmts_on["x"],
"y": pmts_on["y"],
"area": area_per_channel,
"area_plot": area_plot,
"pmt": pmts_on["i"],
}
)
p = fig.scatter(
source=source_on,
radius=straxen.tpc_pmt_radius,
fill_color=mapper,
fill_alpha=1,
line_color="black",
legend_label=label,
name=label + "_pmt_array",
)
fig.add_tools(bokeh.models.HoverTool(name=label + "_pmt_array", tooltips=tool_tip))
fig.legend.location = "top_left"
fig.legend.click_policy = "hide"
fig.legend.orientation = "horizontal"
fig.legend.padding = 0
fig.toolbar_location = None
return fig, p, mapper
def _plot_tpc(fig=None):
"""Plots ring at TPC radius and sets xy limits + labels."""
if not fig:
fig = straxen.bokeh_utils.default_fig()
fig.circle(
x=0,
y=0,
radius=straxen.tpc_r,
fill_color="white",
line_color="black",
line_width=3,
fill_alpha=0,
)
fig.xaxis.axis_label = "x [cm]"
fig.xaxis.axis_label_text_font_size = "12pt"
fig.yaxis.axis_label = "y [cm]"
fig.yaxis.axis_label_text_font_size = "12pt"
fig.x_range.start = -80
fig.x_range.end = 80
fig.y_range.start = -80
fig.y_range.end = 80
return fig
def _plot_off_pmts(pmts, fig=None):
"""Plots PMTs which are switched off."""
if not fig:
fig = straxen.bokeh_utils.default_fig()
fig.circle(
x=pmts["x"],
y=pmts["y"],
fill_color="gray",
line_color="black",
radius=straxen.tpc_pmt_radius,
)
return fig
[docs]def plot_posS2s(peaks, label="", fig=None, s2_type_style_id=0):
"""Plots xy-positions of specified peaks.
:param peaks: Peaks for which the position should be plotted.
:param label: Legend label and plot name (name serves as idenitfier).
:param fig: bokeh.plotting.figure instance the plot should be plotted into. If None creates new
instance.
:param s2_type_style_id: 0 plots main S2 style, 1 for alt S2 and 2 for other S2s (e.g. single
electrons).
"""
if not peaks.shape:
peaks = np.array([peaks])
if not np.all(peaks["type"] == 2):
raise ValueError("All peaks must be S2!")
if not fig:
fig = straxen.bokeh_utils.default_fig()
source = straxen.bokeh_utils.get_peaks_source(peaks)
if s2_type_style_id == 0:
p = fig.cross(
source=source, name=label, legend_label=label, color="red", line_width=2, size=12
)
if s2_type_style_id == 1:
p = fig.cross(
source=source,
name=label,
legend_label=label,
color="orange",
angle=45 / 360 * 2 * np.pi,
line_width=2,
size=12,
)
if s2_type_style_id == 2:
p = fig.diamond_cross(source=source, name=label, legend_label=label, color="red", size=8)
tt = straxen.bokeh_utils.peak_tool_tip(2)
tt = [v for k, v in tt.items() if k not in ["time_dynamic", "amplitude"]]
fig.add_tools(
bokeh.models.HoverTool(
name=label, tooltips=[("Position x [cm]", "@x"), ("Position y [cm]", "@y")] + tt
)
)
return fig, p
def _make_event_title(event, run_id, width=1600):
"""Function which makes the title of the plot for the specified event.
Note:
To center the title I use a transparent box.
:param event: Event which we are plotting
:param run_id: run_id
:return: Title as bokeh.models.Div instance
"""
start = event["time"]
date = np.datetime_as_string(start.astype("<M8[ns]"), unit="s")
start_ns = start - (start // 10**9) * 10**9
end = strax.endtime(event)
end_ns = end - start + start_ns
event_number = event["event_number"]
text = (
f"<h2>Event {event_number} from run {run_id}<br>"
f"Recorded at {date[:10]} {date[10:]} UTC,"
f" {start_ns} ns - {end_ns} ns </h2>"
)
title = bokeh.models.Div(
text=text,
styles={
"text-align": "left",
},
sizing_mode="scale_width",
width=width,
# orientation='vertical',
width_policy="fit",
margin=(0, 0, -30, 50),
)
return title
[docs]def bokeh_set_x_range(plot, x_range, debug=False):
"""Function which adjust java script call back for x_range of a bokeh plot. Required to link
bokeh and holoviews x_range.
Note:
This is somewhat voodoo + some black magic,
but it works....
"""
from bokeh.models import CustomJS
code = """\
const start = cb_obj.start;
const end = cb_obj.end;
// Need to update the attributes at the same time.
x_range.setv({start, end});
"""
for attr in ["start", "end"]:
if debug:
# Prints x_range bar to check Id, as I said voodoo
print(x_range)
plot.x_range.js_on_change(attr, CustomJS(args=dict(x_range=x_range), code=code))
[docs]class DataSelectionHist:
"""Class for an interactive data selection plot."""
def __init__(self, name, size=600):
"""Class for an interactive data selection plot.
:param name: Name of the class object instance. Needed for dynamic return, e.g. ds =
DataSelectionHist("ds")
:param size: Edge size of the figure in pixel.
"""
raise NotImplementedError(
"This function does not work with"
" the latest bokeh version. If you are still using"
" this function please let us know in tech-support"
" by the 01.04.2024, else we reomve this function."
)
self.name = name
self.selection_index = None
self.size = size
from bokeh.io import output_notebook
output_notebook()
[docs] def histogram2d(
self,
items,
xdata,
ydata,
bins,
hist_range,
x_label="X-Data",
y_label="Y-Data",
log_color_scale=True,
cmap_steps=256,
clim=(None, None),
undeflow_color=None,
overflow_color=None,
weights=1,
):
"""2d Histogram which allows to select the plotted items dynamically.
Note:
You can select the data either via a box select or Lasso
select tool. The data can be returned by:
ds.get_back_selected_items()
Hold shift to select multiple regions.
Warnings:
Depending on the number of bins the Lasso selection can
become relatively slow. The number of bins should not be
larger than 100.
The box selection performance is better.
:param items: numpy.structured.array of items to be selected.
e.g. peaks or events.
:param xdata: numpy.array for xdata e.g. peaks['area']
:param ydata: same
:param bins: Integer specifying the number of bins. Currently
x and y axis must share the same binning.
:param hist_range: Tuple of x-range and y-range.
:param x_label: Label to be used for the x-axis
:param y_label: same but for y
:param log_color_scale: If true (default) use log colorscale
:param cmap_steps: Integer between 0 and 256 for stepped
colorbar.
:param clim: Tuple of color limits.
:param undeflow_color: If specified colors all bins below clim
with the corresponding color.
:param overflow_color: Same but per limit.
:param weights: If specified each bin entry is weighted by this
value. Can be either a scalar e.g. a time or an array of
weights which has the same length as the x/y data.
:return: bokeh figure instance.
"""
if isinstance(bins, tuple):
raise ValueError(
"Currently only squared bins are supported. Plase change bins into an integer."
)
x_pos, y_pos = self._make_bin_positions((bins, bins), hist_range)
weights = np.ones(len(xdata)) * weights
hist, hist_inds = self._hist2d_with_index(xdata, ydata, weights, self.xedges, self.yedges)
# Define times and ids for return:
self.items = items
self.hist_inds = hist_inds
colors = self._get_color(
hist,
cmap_steps,
log_color_scale=log_color_scale,
clim=clim,
undeflow_color=undeflow_color,
overflow_color=overflow_color,
)
# Create Figure and add LassoTool:
f = bokeh.plotting.figure(
title="DataSelection", width=self.size, height=self.size, tools="box_select,reset,save"
)
# Add hover tool, colorbar is too complictaed:
tool_tip = [("Bin Center x", "@x"), ("Bin Center y", "@y"), ("Entries", "@h")]
f.add_tools(
bokeh.models.LassoSelectTool(select_every_mousemove=False),
bokeh.models.HoverTool(tooltips=tool_tip),
)
s1 = bokeh.plotting.ColumnDataSource(
data=dict(x=x_pos, y=y_pos, h=hist.flatten(), color=colors)
)
f.square(source=s1, size=self.size / bins, color="color", nonselection_alpha=0.3)
f.x_range.start = self.xedges[0]
f.x_range.end = self.xedges[-1]
f.y_range.start = self.yedges[0]
f.y_range.end = self.yedges[-1]
f.xaxis.axis_label = x_label
f.yaxis.axis_label = y_label
self.selection_index = None
s1.selected.js_on_change(
"indices",
bokeh.models.CustomJS(
args=dict(s1=s1),
code=f"""
var inds = cb_obj.indices;
var kernel = IPython.notebook.kernel;
kernel.execute("{self.name}.selection_index = " + inds);
""",
),
)
return f
[docs] def get_back_selected_items(self):
if not self.selection_index:
raise ValueError(
"No data selection found. Have you selected any data? "
"If yes you most likely have not intialized the DataSelctor correctly. "
'You have to callit as: my_instance_name = DataSelectionHist("my_instance_name")'
)
m = np.isin(self.hist_inds, self.selection_index)
return self.items[m]
@staticmethod
@numba.njit
def _hist2d_with_index(xdata, ydata, weights, x_edges, y_edges):
n_x_bins = len(x_edges) - 1
n_y_bins = len(y_edges) - 1
res_hist_inds = np.zeros(len(xdata), dtype=np.int32)
res_hist = np.zeros((n_x_bins, n_y_bins), dtype=np.int64)
# Create bin ranges:
offset = 0
for ind, xv in enumerate(xdata):
yv = ydata[ind]
w = weights[ind]
hist_ind = 0
found = False
for ind_xb, low_xb in enumerate(x_edges[:-1]):
high_xb = x_edges[ind_xb + 1]
if not low_xb <= xv:
hist_ind += n_y_bins
continue
if not xv < high_xb:
hist_ind += n_y_bins
continue
# Checked both bins value is in bin, so check y:
for ind_yb, low_yb in enumerate(y_edges[:-1]):
high_yb = y_edges[ind_yb + 1]
if not low_yb <= yv:
hist_ind += 1
continue
if not yv < high_yb:
hist_ind += 1
continue
found = True
res_hist_inds[offset] = hist_ind
res_hist[ind_xb, ind_yb] += w
offset += 1
# Set to -1 if not in any
if not found:
res_hist_inds[offset] = -1
offset += 1
return res_hist, res_hist_inds
def _make_bin_positions(self, bins, bin_range):
"""Helper function to create center positions for "histogram" markers."""
edges = []
for b, br in zip(bins, bin_range):
# Create x and y edges
d_range = br[1] - br[0]
edges.append(np.arange(br[0], br[1] + d_range / b, d_range / b))
# Convert into marker positions:
xedges = edges[0]
yedges = edges[1]
self.xedges = xedges
self.yedges = yedges
x_pos = xedges[:-1] + np.diff(xedges) / 2
x_pos = np.repeat(x_pos, len(yedges) - 1)
y_pos = yedges[:-1] + np.diff(yedges) / 2
y_pos = np.array(list(y_pos) * (len(xedges) - 1))
return x_pos, y_pos
def _get_color(
self,
hist,
cmap_steps,
log_color_scale=False,
clim=(None, None),
undeflow_color=None,
overflow_color=None,
):
"""Helper function to create colorscale."""
hist = hist.flatten()
if clim[0] and undeflow_color:
# If underflow is specified get indicies for underflow bins
inds_underflow = np.argwhere(hist < clim[0]).flatten()
if clim[1] and overflow_color:
inds_overflow = np.argwhere(hist > clim[1]).flatten()
# Clip data according to clim
if np.any(clim):
hist = np.clip(hist, clim[0], clim[1])
self.clim = (np.min(hist), np.max(hist))
if log_color_scale:
color = np.log10(hist)
color /= np.max(color)
color *= cmap_steps - 1
else:
color = hist / np.max(hist)
color *= cmap_steps - 1
cmap = np.array(bokeh.palettes.viridis(cmap_steps))
cmap = cmap[np.round(color).astype(np.int8)]
if undeflow_color:
cmap[inds_underflow] = undeflow_color
if overflow_color:
cmap[inds_overflow] = overflow_color
return cmap