import numba
import numpy as np
import pandas as pd
from copy import deepcopy
from scipy.interpolate import interp1d
import strax
export, __all__ = strax.exporter(export_self=False)
PULSE_MAX_DURATION = int(1e3)
N_SPLIT_LOOP = 5
[docs]
@export
@numba.jit(numba.int32(numba.int64[:], numba.int64, numba.int64, numba.int64[:, :]),
nopython=True)
def find_intervals_below_threshold(w, threshold, holdoff, result_buffer):
"""Fills result_buffer with l, r bounds of intervals in w < threshold.
:param w: Waveform to do hitfinding in
:param threshold: Threshold for including an interval
:param holdoff: Holdoff number of samples after the pulse return back down to threshold
:param result_buffer: numpy N*2 array of ints, will be filled by function.
if more than N intervals are found, none past the first N will be processed.
:returns : number of intervals processed
Boundary indices are inclusive, i.e. the right boundary is the last index which was < threshold
"""
result_buffer_size = len(result_buffer)
last_index_in_w = len(w) - 1
in_interval = False
current_interval = 0
current_interval_start = -1
current_interval_end = -1
for i, x in enumerate(w):
if x < threshold:
if not in_interval:
# Start of an interval
in_interval = True
current_interval_start = i
current_interval_end = i
if ((i == last_index_in_w and in_interval) or
(x >= threshold and i >= current_interval_end + holdoff and in_interval)):
# End of the current interval
in_interval = False
# Add bounds to result buffer
result_buffer[current_interval, 0] = current_interval_start
result_buffer[current_interval, 1] = current_interval_end
current_interval += 1
if current_interval == result_buffer_size:
result_buffer[current_interval, 1] = len(w) - 1
n_intervals = current_interval # No +1, as current_interval was incremented also when the last interval closed
return n_intervals
@numba.njit
def find_optical_t_range(firsts, lasts, timings, tmins, tmaxs, start=0):
"""
Helper function find the min and max of each optical entry
also substract tmin from the timings within each entry
"""
for ix in range(start, len(firsts)):
if firsts[ix] == lasts[ix]:
tmins[ix] = -1
tmaxs[ix] = -1
# No photons in this instruction
continue
tmin = timings[firsts[ix]]
tmax = timings[firsts[ix]]
for iy in range(firsts[ix], lasts[ix]):
if timings[iy] < tmin:
tmin = timings[iy]
if timings[iy] > tmax:
tmax = timings[iy]
tmins[ix] = tmin
tmaxs[ix] = tmax
timings[firsts[ix]: lasts[ix]] -= tmin
@numba.njit
def split_long_optical_pulse(firsts, lasts, timings, channels):
"""
Helper function to split photon timings of a single optical entry into
two entries if the is a gap longer than PULSE_MAX_DURATION ns.
"""
for ix in range(len(firsts)):
extra_long_time_index = []
for iy in range(firsts[ix], lasts[ix]):
if timings[iy] > PULSE_MAX_DURATION:
extra_long_time_index.append(iy)
if len(extra_long_time_index) == 0:
continue
for cnt, iy in enumerate(extra_long_time_index):
cnt += firsts[ix]
if iy > cnt:
tmp = timings[cnt]
timings[cnt] = timings[iy]
timings[iy] = tmp
tmp = channels[cnt]
channels[cnt] = channels[iy]
channels[iy] = tmp
yield ix, firsts[ix], cnt + 1
firsts[ix] = cnt + 1
[docs]
@export
def optical_adjustment(instructions, timings, channels):
"""
Helper function to process optical instructions so that for each entry
1) Move the instruction timing to the first photon in the entry and move photon timings
2) Split photon timing into maximum interval of PULSE_MAX_DURATION default 1us
The split photon are put into new instruction append at the end of the instructions
"""
tmins = np.zeros(len(instructions), np.int64)
tmaxs = np.zeros(len(instructions), np.int64)
start = 0
for i in range(N_SPLIT_LOOP):
find_optical_t_range(instructions['_first'],
instructions['_last'],
timings, tmins, tmaxs,
start=start)
instructions['time'][start:] += tmins[start:]
long_pulse = ((tmaxs - tmins) > PULSE_MAX_DURATION) & (np.arange(len(instructions)) >= start)
n_long_pulse = long_pulse.sum()
if n_long_pulse < 1:
break
extra_inst = []
for ix, first, last in split_long_optical_pulse(instructions['_first'][long_pulse],
instructions['_last'][long_pulse],
timings,
channels):
tmp = deepcopy(instructions[np.where(long_pulse)[0][ix]])
tmp['_first'] = first
tmp['_last'] = last
instructions[np.where(long_pulse)[0][ix]]['_first']=last
extra_inst.append(tmp)
instructions = np.append(instructions, extra_inst)
tmins = np.hstack([tmins, np.zeros(len(extra_inst), np.int64)])
tmaxs = np.hstack([tmaxs, np.zeros(len(extra_inst), np.int64)])
start = len(instructions)
# Instructions is now a copy so return is needed
return instructions