import numba
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
import strax
export, __all__ = strax.exporter(export_self=True)
def init_spe_scaling_factor_distributions(file):
# Extract the spe pdf from a csv file into a pandas dataframe
spe_shapes = pd.read_csv(file)
# Create a converter array from uniform random numbers to SPE gains (one interpolator per channel)
# Scale the distributions so that they have an SPE mean of 1 and then calculate the cdf
uniform_to_pe_arr = []
for ch in spe_shapes.columns[1:]: # skip the first element which is the 'charge' header
if spe_shapes[ch].sum() > 0:
mean_spe = (spe_shapes['charge'] * spe_shapes[ch]).sum() / spe_shapes[ch].sum()
scaled_bins = spe_shapes['charge'] / mean_spe
cdf = np.cumsum(spe_shapes[ch]) / np.sum(spe_shapes[ch])
else:
# if sum is 0, just make some dummy axes to pass to interpolator
cdf = np.linspace(0, 1, 10)
scaled_bins = np.zeros_like(cdf)
uniform_to_pe_arr.append(interp1d(cdf, scaled_bins))
if uniform_to_pe_arr != []:
return uniform_to_pe_arr
[docs]@export
@numba.jit(numba.int32(numba.int64[:], numba.int64, numba.int64, numba.int64[:, :]),
nopython=True)
def find_intervals_below_threshold(w, threshold, holdoff, result_buffer):
"""Fills result_buffer with l, r bounds of intervals in w < threshold.
:param w: Waveform to do hitfinding in
:param threshold: Threshold for including an interval
:param result_buffer: numpy N*2 array of ints, will be filled by function.
if more than N intervals are found, none past the first N will be processed.
:returns : number of intervals processed
Boundary indices are inclusive, i.e. the right boundary is the last index which was < threshold
"""
result_buffer_size = len(result_buffer)
last_index_in_w = len(w) - 1
in_interval = False
current_interval = 0
current_interval_start = -1
current_interval_end = -1
for i, x in enumerate(w):
if x < threshold:
if not in_interval:
# Start of an interval
in_interval = True
current_interval_start = i
current_interval_end = i
if ((i == last_index_in_w and in_interval) or
(x >= threshold and i >= current_interval_end + holdoff and in_interval)):
# End of the current interval
in_interval = False
# Add bounds to result buffer
result_buffer[current_interval, 0] = current_interval_start
result_buffer[current_interval, 1] = current_interval_end
current_interval += 1
if current_interval == result_buffer_size:
result_buffer[current_interval, 1] = len(w) - 1
n_intervals = current_interval # No +1, as current_interval was incremented also when the last interval closed
return n_intervals