Source code for wfsim.utils

import numba
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d

import strax
export, __all__ = strax.exporter(export_self=True)


def init_spe_scaling_factor_distributions(file):
    # Extract the spe pdf from a csv file into a pandas dataframe
    spe_shapes = pd.read_csv(file)

    # Create a converter array from uniform random numbers to SPE gains (one interpolator per channel)
    # Scale the distributions so that they have an SPE mean of 1 and then calculate the cdf
    uniform_to_pe_arr = []
    for ch in spe_shapes.columns[1:]:  # skip the first element which is the 'charge' header
        if spe_shapes[ch].sum() > 0:
            mean_spe = (spe_shapes['charge'] * spe_shapes[ch]).sum() / spe_shapes[ch].sum()
            scaled_bins = spe_shapes['charge'] / mean_spe
            cdf = np.cumsum(spe_shapes[ch]) / np.sum(spe_shapes[ch])
        else:
            # if sum is 0, just make some dummy axes to pass to interpolator
            cdf = np.linspace(0, 1, 10)
            scaled_bins = np.zeros_like(cdf)

        uniform_to_pe_arr.append(interp1d(cdf, scaled_bins))

    if uniform_to_pe_arr != []:
        return uniform_to_pe_arr


[docs]@export @numba.jit(numba.int32(numba.int64[:], numba.int64, numba.int64, numba.int64[:, :]), nopython=True) def find_intervals_below_threshold(w, threshold, holdoff, result_buffer): """Fills result_buffer with l, r bounds of intervals in w < threshold. :param w: Waveform to do hitfinding in :param threshold: Threshold for including an interval :param result_buffer: numpy N*2 array of ints, will be filled by function. if more than N intervals are found, none past the first N will be processed. :returns : number of intervals processed Boundary indices are inclusive, i.e. the right boundary is the last index which was < threshold """ result_buffer_size = len(result_buffer) last_index_in_w = len(w) - 1 in_interval = False current_interval = 0 current_interval_start = -1 current_interval_end = -1 for i, x in enumerate(w): if x < threshold: if not in_interval: # Start of an interval in_interval = True current_interval_start = i current_interval_end = i if ((i == last_index_in_w and in_interval) or (x >= threshold and i >= current_interval_end + holdoff and in_interval)): # End of the current interval in_interval = False # Add bounds to result buffer result_buffer[current_interval, 0] = current_interval_start result_buffer[current_interval, 1] = current_interval_end current_interval += 1 if current_interval == result_buffer_size: result_buffer[current_interval, 1] = len(w) - 1 n_intervals = current_interval # No +1, as current_interval was incremented also when the last interval closed return n_intervals