import logging
from numba import njit
import numpy as np
from scipy.interpolate import interp1d
from tqdm import tqdm
from .load_resource import load_config, DummyMap
from strax import exporter
from . import units
from .utils import find_intervals_below_threshold
export, __all__ = exporter()
__all__.append('PULSE_TYPE_NAMES')
log = logging.getLogger('SimulationCore')
log.setLevel('DEBUG')
PULSE_TYPE_NAMES = ('RESERVED', 's1', 's2', 'unknown', 'pi_el', 'pmt_ap', 'pe_el')
[docs]@export
class NestId():
"""Nest ids for refering to different scintilation models, only ER is actually validated"""
NR = [0]
ALPHA = [6]
ER = [7, 8, 11, 12]
LED = [20]
_ALL = NR + ALPHA + ER + LED
[docs]@export
class Pulse(object):
"""Pulse building class"""
def __init__(self, config):
self.config = config
self.config.update(getattr(self.config, self.__class__.__name__, {}))
self.resource = load_config(config)
self.init_pmt_current_templates()
self.init_spe_scaling_factor_distributions()
self.config['turned_off_pmts'] = np.arange(len(config['gains']))[np.array(config['gains']) == 0]
self.clear_pulse_cache()
def __call__(self):
"""
PMTs' response to incident photons
Use _photon_timings, _photon_channels to build pulses
"""
if ('_photon_timings' not in self.__dict__) or \
('_photon_channels' not in self.__dict__):
raise NotImplementedError
# The pulse cache should be immediately transfered after call this function
self.clear_pulse_cache()
# Correct for PMT Transition Time Spread (skip for pmt afterpulses)
# note that PMT datasheet provides FWHM TTS, so sigma = TTS/(2*sqrt(2*log(2)))=TTS/2.35482
if '_photon_gains' not in self.__dict__:
self._photon_timings += np.random.normal(self.config['pmt_transit_time_mean'],
self.config['pmt_transit_time_spread'] / 2.35482,
len(self._photon_timings)).astype(np.int64)
dt = self.config.get('sample_duration', 10) # Getting dt from the lib just once
self._n_double_pe = self._n_double_pe_bot = 0 # For truth aft output
counts_start = 0 # Secondary loop index for assigning channel
for channel, counts in zip(*np.unique(self._photon_channels, return_counts=True)):
# Use 'counts' amount of photon for this channel
_channel_photon_timings = self._photon_timings[counts_start:counts_start+counts]
counts_start += counts
if channel in self.config['turned_off_pmts']: continue
# If gain of each photon is not specifically assigned
# Sample from spe scaling factor distribution and to individual gain
# In contrast to pmt afterpulse that should have gain determined before this step
if '_photon_gains' not in self.__dict__:
if self.config['detector'] == 'XENON1T':
_channel_photon_gains = self.config['gains'][channel] \
* self.uniform_to_pe_arr(np.random.random(len(_channel_photon_timings)), channel)
else:
_channel_photon_gains = self.config['gains'][channel] \
* self.uniform_to_pe_arr(np.random.random(len(_channel_photon_timings)))
# Add some double photoelectron emission by adding another sampled gain
n_double_pe = np.random.binomial(len(_channel_photon_timings),
p=self.config['p_double_pe_emision'])
self._n_double_pe += n_double_pe
if channel in self.config['channels_bottom']:
self._n_double_pe_bot += n_double_pe
#_dpe_index = np.random.randint(len(_channel_photon_timings),
# size=n_double_pe)
if self.config['detector'] == 'XENON1T':
_channel_photon_gains[:n_double_pe] += self.config['gains'][channel] \
* self.uniform_to_pe_arr(np.random.random(n_double_pe), channel)
else:
_channel_photon_gains[:n_double_pe] += self.config['gains'][channel] \
* self.uniform_to_pe_arr(np.random.random(n_double_pe))
else:
_channel_photon_gains = np.array(self._photon_gains[self._photon_channels == channel])
# Build a simulated waveform, length depends on min and max of photon timings
min_timing, max_timing = np.min(
_channel_photon_timings), np.max(_channel_photon_timings)
pulse_left = (int(min_timing // dt)
- int(self.config['samples_to_store_before'])
- self.config.get('samples_before_pulse_center', 2))
pulse_right = (int(max_timing // dt)
+ int(self.config['samples_to_store_after'])
+ self.config.get('samples_after_pulse_center', 20))
pulse_current = np.zeros(pulse_right - pulse_left + 1)
Pulse.add_current(_channel_photon_timings.astype(int),
_channel_photon_gains,
pulse_left,
dt,
self._pmt_current_templates,
pulse_current)
# For single event, data of pulse level is small enough to store in dataframe
self._pulses.append(dict(
photons = len(_channel_photon_timings),
channel = channel,
left = pulse_left,
right = pulse_right,
duration = pulse_right - pulse_left + 1,
current = pulse_current,))
[docs] def init_pmt_current_templates(self):
"""
Create spe templates, for 10ns sample duration and 1ns rounding we have:
_pmt_current_templates[i] : photon timing fall between [10*m+i, 10*m+i+1)
(i, m are integers)
"""
# Interpolate on cdf ensures that each spe pulse would sum up to 1 pe*sample duration^-1
pe_pulse_function = interp1d(
self.config.get('pe_pulse_ts'),
np.cumsum(self.config.get('pe_pulse_ys')),
bounds_error=False, fill_value=(0, 1))
# Samples are always multiples of sample_duration
sample_duration = self.config.get('sample_duration', 10)
samples_before = self.config.get('samples_before_pulse_center', 2)
samples_after = self.config.get('samples_after_pulse_center', 20)
pmt_pulse_time_rounding = self.config.get('pmt_pulse_time_rounding', 1.0)
# Let's fix this, so everything can be turned into int
assert pmt_pulse_time_rounding == 1
samples = np.linspace(-samples_before * sample_duration,
+ samples_after * sample_duration,
1 + samples_before + samples_after)
self._template_length = np.int(len(samples) - 1)
templates = []
for r in np.arange(0, sample_duration, pmt_pulse_time_rounding):
pmt_current = np.diff(pe_pulse_function(samples - r)) / sample_duration # pe / 10 ns
# Normalize here to counter tiny rounding error from interpolation
pmt_current *= (1 / sample_duration) / np.sum(pmt_current) # pe / 10 ns
templates.append(pmt_current)
self._pmt_current_templates = np.array(templates)
log.debug('Create spe waveform templates with %s ns resolution' % pmt_pulse_time_rounding)
[docs] def init_spe_scaling_factor_distributions(self):
# Extract the spe pdf from a csv file into a pandas dataframe
spe_shapes = self.resource.photon_area_distribution
# Create a converter array from uniform random numbers to SPE gains (one interpolator per channel)
# Scale the distributions so that they have an SPE mean of 1 and then calculate the cdf
uniform_to_pe_arr = []
for ch in spe_shapes.columns[1:]: # skip the first element which is the 'charge' header
if spe_shapes[ch].sum() > 0:
# mean_spe = (spe_shapes['charge'].values * spe_shapes[ch]).sum() / spe_shapes[ch].sum()
scaled_bins = spe_shapes['charge'].values # / mean_spe
cdf = np.cumsum(spe_shapes[ch]) / np.sum(spe_shapes[ch])
else:
# if sum is 0, just make some dummy axes to pass to interpolator
cdf = np.linspace(0, 1, 10)
scaled_bins = np.zeros_like(cdf)
grid_cdf = np.linspace(0, 1, 2001)
grid_scale = interp1d(cdf, scaled_bins,
bounds_error=False,
fill_value=(scaled_bins[0], scaled_bins[-1]))(grid_cdf)
uniform_to_pe_arr.append(grid_scale)
if len(uniform_to_pe_arr):
self.__uniform_to_pe_arr = np.stack(uniform_to_pe_arr)
log.debug('Initialize spe scaling factor distributions')
[docs] def clear_pulse_cache(self):
self._pulses = []
[docs] @staticmethod
@njit
def add_current(photon_timings,
photon_gains,
pulse_left,
dt,
pmt_current_templates,
pulse_current):
# """
# Simulate single channel waveform given the photon timings
# photon_timing - dim-1 integer array of photon timings in unit of ns
# photon_gain - dim-1 float array of ph. 2 el. gain individual photons
# pulse_left - left of the pulse in unit of 10 ns
# dt - mostly it is 10 ns
# pmt_current_templates - list of spe templates of different reminders
# pulse_current - waveform
# """
if not len(photon_timings):
return
template_length = len(pmt_current_templates[0])
i_photons = np.argsort(photon_timings)
# Convert photon_timings to int outside this function
# photon_timings = photon_timings // 1
gain_total = 0
tmp_photon_timing = photon_timings[i_photons[0]]
for i in i_photons:
if photon_timings[i] > tmp_photon_timing:
start = int(tmp_photon_timing // dt) - pulse_left
reminder = int(tmp_photon_timing % dt)
pulse_current[start:start + template_length] += \
pmt_current_templates[reminder] * gain_total
gain_total = photon_gains[i]
tmp_photon_timing = photon_timings[i]
else:
gain_total += photon_gains[i]
else:
start = int(tmp_photon_timing // dt) - pulse_left
reminder = int(tmp_photon_timing % dt)
pulse_current[start:start + template_length] += \
pmt_current_templates[reminder] * gain_total
[docs] @staticmethod
def singlet_triplet_delays(size, singlet_ratio, config, phase):
"""
Given the amount of the eximer, return time between excimer decay
and their time of generation.
size - amount of eximer
self.phase - 'liquid' or 'gas'
singlet_ratio - fraction of excimers that become singlets
(NOT the ratio of singlets/triplets!)
"""
if phase == 'liquid':
t1, t3 = (config['singlet_lifetime_liquid'],
config['triplet_lifetime_liquid'])
elif phase == 'gas':
t1, t3 = (config['singlet_lifetime_gas'],
config['triplet_lifetime_gas'])
delay = np.random.choice([t1, t3], size, replace=True,
p=[singlet_ratio, 1 - singlet_ratio])
return np.random.exponential(1, size) * delay
[docs]@export
class S1(Pulse):
"""
Given temperal inputs as well as number of photons
Random generate photon timing and channel distribution.
"""
def __init__(self, config):
super().__init__(config)
self.phase = 'liquid' # To distinguish singlet/triplet time delay.
def __call__(self, instruction):
"""Main s1 simulation function. Called by RawData for s1 simulation.
Generates first number of photons in the s1, then timings and channels.
These arrays are fed to Pulse to generate the data.
param instructions: Array with dtype wfsim.instruction_dtype """
if len(instruction.shape) < 1:
# shape of recarr is a bit strange
instruction = np.array([instruction])
_, _, t, x, y, z, n_photons, recoil_type, *rest = [
np.array(v).reshape(-1) for v in zip(*instruction)]
positions = np.array([x, y, z]).T # For map interpolation
n_photons = self.get_n_photons(n_photons=n_photons,
positions=positions,
s1_light_yield_map=self.resource.s1_light_yield_map,
config=self.config)
self._photon_timings = self.photon_timings(t=t,
n_photons=n_photons,
recoil_type=recoil_type,
config=self.config,
phase=self.phase)
# The new way iterpolation is written always require a list
self._photon_channels = self.photon_channels(positions=positions,
n_photons=n_photons,
config=self.config,
s1_pattern_map=self.resource.s1_pattern_map)
super().__call__()
[docs] @staticmethod
def get_n_photons(n_photons,positions, s1_light_yield_map, config):
"""Calculates number of detected photons based on number of photons in total and the postions
:param n_photons: 1d array of ints with number of photons:
:param postions: 2d array with xyz positions of interactions
:param s1_light_yield map: interpolator instance of s1 light yield map
:param config: dict wfsim config
return array with number photons"""
if config['detector']=='XENONnT':
ly = np.squeeze(s1_light_yield_map(positions),
axis=-1)/(1+config['p_double_pe_emision'])
elif config['detector']=='XENON1T':
ly = s1_light_yield_map(positions)
ly *= config['s1_detection_efficiency']
n_photons = np.random.binomial(n=n_photons, p=ly)
return n_photons
[docs] @staticmethod
def photon_channels(positions, n_photons, config, s1_pattern_map):
"""Calculate photon arrival channels
:params positions: 2d array with xy positions of interactions
:params n_photons: 1d array of ints with number of photons to simulate
:params config: dict wfsim config
:params s1_pattern_map: interpolator instance of the s1 pattern map
returns nested array with photon channels
"""
channels = np.arange(config['n_tpc_pmts']) # +1 for the channel map
p_per_channel = s1_pattern_map(positions)
p_per_channel[:, np.in1d(channels, config['turned_off_pmts'])] = 0
_photon_channels = np.array([]).astype(int)
for ppc, n in zip(p_per_channel, n_photons):
_photon_channels = np.append(_photon_channels,
np.random.choice(
channels,
size=n,
p=ppc / np.sum(ppc),
replace=True))
return _photon_channels
[docs] @staticmethod
def photon_timings(t, n_photons, recoil_type, config, phase):
"""Calculate distribution of photon arrival timnigs
:param t: 1d array of ints
:param n_photons: 1d array of ints
:param recoil_type: 1d array of ints
:param config: dict wfsim config
:param phase: str "liquid"
returns photon timing array"""
_photon_timings = np.repeat(t, n_photons)
if len(_photon_timings) == 0:
return _photon_timings.astype(int)
if (config['s1_model_type'] == 'simple' and
np.isin(recoil_type, NestId._ALL).all()):
# Simple S1 model enabled: use it for ER and NR.
_photon_timings += np.random.exponential(config['s1_decay_time'], len(_photon_timings)).astype(int)
_photon_timings += np.random.normal(0, config['s1_decay_spread'], len(_photon_timings)).astype(int)
return _photon_timings
counts_start = 0
for i, counts in enumerate(n_photons):
for k in vars(NestId):
if k.startswith('_'): continue
if recoil_type[i] in getattr(NestId, k):
str_recoil_type = k
try:
_photon_timings[counts_start: counts_start + counts] += \
getattr(S1, str_recoil_type.lower())(
size=counts,
config=config,
phase=phase).astype(int)
except AttributeError:
raise AttributeError(f"Recoil type must be ER, NR, alpha or LED, not {recoil_type}. Check nest ids")
counts_start += counts
return _photon_timings
[docs] @staticmethod
def alpha(size, config, phase):
""" Calculate S1 photon timings for an alpha decay. Neglible recombination time, not validated
:param size: 1d array of ints, number of photons
:param config: dict wfsim config
:parma phase: str "liquid"
return 1d array of photon timings"""
return Pulse.singlet_triplet_delays(size, config['s1_ER_alpha_singlet_fraction'], config, phase)
[docs] @staticmethod
def led(size, config, **kwargs):
""" distribute photons uniformly within the LED pulse length, not validated
:param size: 1d array of ints, number of photons
:param config: dict wfsim config
:parma phase: str "liquid
return 1d array of photon timings"""
return np.random.uniform(0, config['led_pulse_length'], size)
[docs] @staticmethod
def er(size, config, phase):
"""Complex ER model, not validated
:param size: 1d array of ints, number of photons
:param config: dict wfsim config
:parma phase: str "liquid"
return 1d array of photon timings
"""
# How many of these are primary excimers? Others arise through recombination.
# This config is not set for the nT fax config todo
config.setdefault('liquid_density', 1.872452802978054e+30)
density = config['liquid_density'] / (units.g / units.cm ** 3)
excfrac = 0.4 - 0.11131 * density - 0.0026651 * density ** 2 # primary / secondary excimers
excfrac = 1 / (1 + excfrac) # primary / all excimers
# primary / all excimers that produce a photon:
excfrac /= 1 - (1 - excfrac) * (1 - config['s1_ER_recombination_fraction'])
config['s1_ER_primary_excimer_fraction'] = excfrac
log.debug('Inferred s1_ER_primary_excimer_fraction %s' % excfrac)
# Recombination time from NEST 2014
# 3.5 seems fishy, they fit an exponential to data, but in the code they use a non-exponential distribution...
efield = (config['drift_field'] / (units.V / units.cm))
reco_time = 3.5 / \
0.18 * (1 / 20 + 0.41) * np.exp(-0.009 * efield)
config['s1_ER_recombination_time'] = reco_time
log.debug('Inferred s1_ER_recombination_time %s' % reco_time)
timings = np.random.choice([0, reco_time], size, replace=True,
p=[excfrac, 1 - excfrac])
primary = timings == 0
size_primary = len(timings[primary])
timings[primary] += Pulse.singlet_triplet_delays(
size_primary, config['s1_ER_primary_singlet_fraction'], config, phase)
# Correct for the recombination time
# For the non-exponential distribution: see Kubota 1979, solve eqn 2 for n/n0.
# Alternatively, see Nest V098 source code G4S1Light.cc line 948
timings[~primary] *= 1 / (-1 + 1 / np.random.uniform(0, 1, size - size_primary))
# Update max recombine time in the nT fax config
config['maximum_recombination_time'] = 1000
timings[~primary] = np.clip(timings[~primary], 0, config['maximum_recombination_time'])
timings[~primary] += Pulse.singlet_triplet_delays(
size - size_primary, config['s1_ER_secondary_singlet_fraction'], config, phase)
return timings
[docs] @staticmethod
def nr(size, config, phase):
"""NR model model, not validated
:param size: 1d array of ints, number of photons
:param config: dict wfsim config
:parma phase: str "liquid"
return 1d array of photon timings
"""
return Pulse.singlet_triplet_delays(size, config['s1_NR_singlet_fraction'], config, phase)
[docs]@export
class S2(Pulse):
"""
Given temperal inputs as well as number of electrons
Random generate photon timing and channel distribution.
"""
def __init__(self, config):
super().__init__(config)
self.phase = 'gas' # To distinguish singlet/triplet time delay.
self.luminescence_switch_threshold = 100 # When to use simplified model (NOT IN USE)
def __call__(self, instruction):
if len(instruction.shape) < 1:
# shape of recarr is a bit strange
instruction = np.array([instruction])
_, _, t, x, y, z, n_electron, recoil_type, *rest = [
np.array(v).reshape(-1) for v in zip(*instruction)]
# Reverse engineerring FDC
if self.config['field_distortion_on']:
z_obs, positions = self.inverse_field_distortion(x, y, z, resource=self.resource)
else:
z_obs, positions = z, np.array([x, y]).T
sc_gain = self.get_s2_light_yield(positions=positions,
config=self.config,
resource=self.resource)
n_electron = self.get_electron_yield(n_electron=n_electron,
z_obs=z_obs,
config=self.config)
# Second generate photon timing and channel
self._photon_timings, self._instruction = self.photon_timings(t, n_electron, z_obs, positions, sc_gain,
config=self.config,
resource=self.resource,
phase=self.phase)
self._photon_channels, self._photon_timings = self.photon_channels(n_electron=n_electron,
z_obs=z_obs,
positions=positions,
_photon_timings=self._photon_timings,
_instruction=self._instruction,
config=self.config,
resource=self.resource)
super().__call__()
[docs] @staticmethod
def get_s2_light_yield(positions,config,resource):
"""Calculate s2 light yield...
:param positions: 2d array of positions (floats)
:param config: dict with wfsim config
:param resource: instance of the resource class
returns array of floats (mean expectation)
"""
if config['detector']=='XENONnT':
sc_gain = np.squeeze(resource.s2_light_yield_map(positions), axis=-1) \
* config['s2_secondary_sc_gain']
elif config['detector']=='XENON1T':
sc_gain = resource.s2_light_yield_map(positions) \
* config['s2_secondary_sc_gain']
return sc_gain
[docs] @staticmethod
def get_electron_yield(n_electron,z_obs,config):
"""Drift electrons up to the gas interface and absorb them
:param n_electron: 1d array with ints as number of electrons
:param z_obs: 1d array of floats with the observed z positions
:param config: dict with wfsim config
returns 1d array ints with number of electrons
"""
# Average drift time of the electrons
drift_time_mean = - z_obs / \
config['drift_velocity_liquid'] + config['drift_time_gate']
# Absorb electrons during the drift
electron_lifetime_correction = np.exp(- 1 * drift_time_mean /
config['electron_lifetime_liquid'])
cy = config['electron_extraction_yield'] * electron_lifetime_correction
#why are there cy greater than 1? We should check this
cy = np.clip(cy, a_min = 0, a_max = 1)
n_electron = np.random.binomial(n=n_electron, p=cy)
return n_electron
[docs] @staticmethod
def inverse_field_distortion(x, y, z, resource):
"""For 1T the pattern map is a data driven one so we need to reverse engineer field distortion into the simulated positions
:param x: 1d array of float
:param y: 1d array of float
:param z: 1d array of float
:param resource: instance of resource class
returns z: 1d array, postions 2d array
"""
positions = np.array([x, y, z]).T
for i_iter in range(6): # 6 iterations seems to work
dr = resource.fdc_3d(positions)
if i_iter > 0: dr = 0.5 * dr + 0.5 * dr_pre # Average between iter
dr_pre = dr
r_obs = np.sqrt(x**2 + y**2) - dr
x_obs = x * r_obs / (r_obs + dr)
y_obs = y * r_obs / (r_obs + dr)
z_obs = - np.sqrt(z**2 + dr**2)
positions = np.array([x_obs, y_obs, z_obs]).T
positions = np.array([x_obs, y_obs]).T
return z_obs, positions
@staticmethod
@njit
def _luminescence_timings_simple(n, dG, E0, r, dr, rr, alpha, uE, p, n_electron, shape):
emission_time = np.zeros(shape)
"""
Luminescence time distribution computation, calculates emission timings of photons from the excited electrons
return 1d nested array with ints
"""
ci = 0
for i in range(n):
ne = n_electron[i]
dt = dr / (alpha * E0[i] * rr)
dy = E0[i] * rr / uE - 0.8 * p # arXiv:physics/0702142
j = np.argmax(r <= dG[i])
t = np.cumsum(dt[j:])
y = np.cumsum(dy[j:])
probabilities = np.random.rand(ne, shape[1])
emission_time[ci:ci+ne, :] = np.interp(probabilities, y / y[-1], t)
ci += ne
return emission_time
[docs] @staticmethod
def luminescence_timings_simple(xy, n_electron, shape, config, resource):
"""
Luminescence time distribution computation according to simple s2 model (many many many single electrons)
:param xy: 1d array with positions
:param n_electron: 1d array with ints for number f electrons
:param shape: tuple with nelectron,nphotons
:param config: dict wfsim config
:param resource: instance of wfsim resource
returns _luminescence_timings_simple
"""
assert len(n_electron) == len(xy), 'Input number of n_electron should have same length as positions'
assert np.sum(n_electron) == shape[0], 'Total number of electron does not agree with shape[0]'
number_density_gas = config['pressure'] / \
(units.boltzmannConstant * config['temperature'])
alpha = config['gas_drift_velocity_slope'] / number_density_gas
uE = units.kV / units.cm
pressure = config['pressure'] / units.bar
if config.get('enable_gas_gap_warping', True):
dG = resource.gas_gap_length(xy)
else:
dG = np.ones(len(xy)) * config['elr_gas_gap_length']
rA = config['anode_field_domination_distance']
rW = config['anode_wire_radius']
dL = config['gate_to_anode_distance'] - dG
VG = config['anode_voltage'] / (1 + dL / dG / config['lxe_dielectric_constant'])
E0 = VG / ((dG - rA) / rA + np.log(rA / rW)) # V / cm
dr = 0.0001 # cm
r = np.arange(np.max(dG), rW, -dr)
rr = np.clip(1 / r, 1 / rA, 1 / rW)
return S2._luminescence_timings_simple(len(xy), dG, E0,
r, dr, rr, alpha, uE, pressure, n_electron, shape)
@staticmethod
@njit
def _luminescence_timings_garfield(distance, x_grid, n_grid, i_grid, shape, index):
for ix in range(shape[0]):
pitch_index = np.argmin(np.abs(distance[ix] - x_grid))
for iy in range(shape[1]):
index[ix, iy] = i_grid[pitch_index] + np.random.randint(n_grid[pitch_index])
[docs] @staticmethod
def luminescence_timings_garfield(xy, shape, resource, config):
"""
Luminescence time distribution computation according to garfield scintillation maps
:param xy: 1d array with positions
:param shape: tuple with nelectron,nphotons
:param config: dict wfsim config
:param resource: instance of wfsim resource
returns 1d (nested?) array with ints for photon timings
"""
assert 's2_luminescence' in resource.__dict__, 's2_luminescence model not found'
assert shape[0] == len(xy), 'Output shape should have same length as positions'
x_grid, n_grid = np.unique(resource.s2_luminescence['x'], return_counts=True)
i_grid = (n_grid.sum() - np.cumsum(n_grid[::-1]))[::-1]
tilt = getattr(config, 'anode_xaxis_angle', np.pi / 4)
pitch = getattr(config, 'anode_pitch', 0.5)
rotation_mat = np.array(((np.cos(tilt), -np.sin(tilt)), (np.sin(tilt), np.cos(tilt))))
jagged = lambda relative_y: (relative_y + pitch / 2) % pitch - pitch / 2
distance = jagged(np.matmul(xy, rotation_mat)[:, 1]) # shortest distance from any wire
index = np.zeros(shape).astype(int)
S2._luminescence_timings_garfield(distance, x_grid, n_grid, i_grid, shape, index)
return resource.s2_luminescence['t'][index]
[docs] @staticmethod
@njit
def electron_timings(t, n_electron, z, sc_gain, timings, gains,
drift_velocity_liquid,
drift_time_gate,
diffusion_constant_longitudinal,
electron_trapping_time):
"""Calculate arrival times of the electrons. Data is written to the timings and gains arrays
:param t: 1d array of ints
:param n_electron:1 d array of ints
:param z: 1d array of floats
:param sc_gain: secondairy scintallation gain
:param timings: empty array with length sum(n_electron)
:param gains: empty array with length sum(n_electron)
:param drift_velocity_liquid, drift_time_gate, diffusion_constant_longitudinal, electron_trapping_time: configuration values
"""
assert len(timings) == np.sum(n_electron)
assert len(gains) == np.sum(n_electron)
assert len(sc_gain) == len(t)
i_electron = 0
for i in np.arange(len(t)):
# Diffusion model from Sorensen 2011
drift_time_mean = - z[i] / \
drift_velocity_liquid + drift_time_gate
_drift_time_mean = max(drift_time_mean, 0)
drift_time_stdev = np.sqrt(2 * diffusion_constant_longitudinal * _drift_time_mean)
drift_time_stdev /= drift_velocity_liquid
# Calculate electron arrival times in the ELR region
for _ in np.arange(n_electron[i]):
_timing = t[i] + \
np.random.exponential(electron_trapping_time)
_timing += np.random.normal(drift_time_mean, drift_time_stdev)
timings[i_electron] = _timing
# add manual fluctuation to sc gain
gains[i_electron] = sc_gain[i]
i_electron += 1
[docs] @staticmethod
def photon_timings( t, n_electron, z, xy, sc_gain, config, resource, phase):
"""Generates photon timings for S2s. Returns a list of photon timings and instructions repeated for original electron
:param t: 1d float array arrival time of the electrons
:param n_electron: 1d float array number of electrons to simulate
:param z: float array. Z positions of s2
:param xy: 1d float array, xy positions of s2
:param sc_gain: float, secondairy s2 gain
:param config: dict of the wfsim config
:param resource: instance of the resource class
:param phase: string, "gas" """
# First generate electron timinga
_electron_timings = np.zeros(np.sum(n_electron))
_electron_gains = np.zeros(np.sum(n_electron))
_config = [config[k] for k in
['drift_velocity_liquid',
'drift_time_gate',
'diffusion_constant_longitudinal',
'electron_trapping_time']]
S2.electron_timings(t, n_electron, z, sc_gain,
_electron_timings, _electron_gains, *_config)
if len(_electron_timings) < 1:
_photon_timings = []
return _photon_timings, []
# For vectorized calculation, artificially top #photon per electron at +4 sigma
nele = len(_electron_timings)
npho = np.ceil(np.max(_electron_gains) +
4 * np.sqrt(np.max(_electron_gains))).astype(int)
if config['s2_luminescence_model'] == 'simple':
_photon_timings = S2.luminescence_timings_simple(xy, n_electron, (nele, npho),
config=config,
resource=resource)
elif config['s2_luminescence_model'] == 'garfield':
_photon_timings = S2.luminescence_timings_garfield(
np.repeat(xy, n_electron, axis=0),
(nele, npho),
config=config,
resource=resource)
_photon_timings += np.repeat(_electron_timings, npho).reshape((nele, npho))
# Crop number of photons by random number generated with poisson
probability = np.tile(np.arange(npho), nele).reshape((nele, npho))
threshold = np.repeat(np.random.poisson(_electron_gains), npho).reshape((nele, npho))
_photon_timings = _photon_timings[probability < threshold]
# Special index for match photon to original electron poistion
_instruction = np.repeat(
np.repeat(np.arange(len(t)), n_electron), npho).reshape((nele, npho))
_instruction = _instruction[probability < threshold]
_photon_timings += Pulse.singlet_triplet_delays(
len(_photon_timings), config['singlet_fraction_gas'],config, phase)
_photon_timings += np.random.normal(0,config['s2_time_spread'],len(_photon_timings))
# The timings generated is NOT randomly ordered, must do shuffle
# Shuffle within each given n_electron[i]
# We can do this by first finding out cumulative sum of the photons
cumulate_npho = np.pad(np.cumsum(threshold[:, 0]), [1, 0])[np.cumsum(n_electron)]
for i in range(len(cumulate_npho)):
if i == 0:
s = slice(0, cumulate_npho[i])
else:
s = slice(cumulate_npho[i-1], cumulate_npho[i])
np.random.shuffle(_photon_timings[s])
return _photon_timings, _instruction
[docs] @staticmethod
def s2_pattern_map_diffuse(n_electron, z, xy, config, resource):
"""Returns an array of pattern of shape [n interaction, n PMTs]
pattern of each interaction is an average of n_electron patterns evaluated at
diffused position near xy. The diffused positions sample from 2d symmetric gaussian
with spread scale with sqrt of drift time.
:param n_electron: a 1d int array
:param z: a 1d float array
:param xy: a 2d float array of shape [n interaction, 2]
"""
drift_time_gate = config['drift_time_gate']
drift_velocity_liquid = config['drift_velocity_liquid']
diffusion_constant_transverse = getattr(config, 'diffusion_constant_transverse', 0)
assert all(z < 0), 'All S2 in liquid should have z < 0'
drift_time_mean = - z / drift_velocity_liquid + drift_time_gate # Add gate time for consistancy?
hdiff_stdev = np.sqrt(2 * diffusion_constant_transverse * drift_time_mean)
hdiff = np.random.normal(0, 1, (np.sum(n_electron), 2)) * np.repeat(hdiff_stdev, n_electron, axis=0).reshape((-1, 1))
# Should we also output this xy position in truth?
xy_multi = np.repeat(xy, n_electron, axis=0) + hdiff # One entry xy per electron
# Remove points outside tpc, and the pattern will be the average inside tpc
# Should be done natually with the s2 pattern map, however, there's some bug there so we apply this hard cut
mask = np.sum(xy_multi ** 2, axis=1) <= config['tpc_radius'] ** 2
if isinstance(resource.s2_pattern_map, DummyMap):
output_dim = resource.s2_pattern_map.shape[-1]
else:
output_dim = resource.s2_pattern_map.data['map'].shape[-1]
pattern = np.zeros((len(n_electron), output_dim))
n0 = 0
# Average over electrons for each s2
for ix, ne in enumerate(n_electron):
s = slice(n0, n0+ne)
pattern[ix, :] = np.average(resource.s2_pattern_map(xy_multi[s][mask[s]]), axis=0)
n0 += ne
return pattern
[docs] @staticmethod
def photon_channels(n_electron, z_obs, positions, _photon_timings, _instruction, config, resource):
"""Set the _photon_channels property list of length same as _photon_timings
:param n_electron: a 1d int array
:param z_obs: a 1d float array
:param positions: a 2d float array of shape [n interaction, 2] for the xy coordinate
:param _photon_timings: 1d int array of photon timings,
:param _instructions: array of instructions with dtype wfsim.instructions_dtype
:param config: dict wfsim config
:param resource: instance of resource class
"""
if len(_photon_timings) == 0:
_photon_channels = []
return _photon_timings, _photon_channels
aft = config['s2_mean_area_fraction_top']
aft_random = config.get('randomize_fraction_of_s2_top_array_photons', 0)
channels = np.arange(config['n_tpc_pmts']).astype(int)
top_index = np.arange(config['n_top_pmts'])
bottom_index = np.array(config['channels_bottom'])
if config.get('diffusion_constant_transverse', 0) > 0:
pattern = S2.s2_pattern_map_diffuse(n_electron, z_obs, positions, config, resource) # [position, pmt]
else:
pattern = resource.s2_pattern_map(positions) # [position, pmt]
if pattern.shape[1] - 1 not in bottom_index:
pattern = np.pad(pattern, [[0, 0], [0, len(bottom_index)]],
'constant', constant_values=1)
sum_pat = np.sum(pattern, axis=1).reshape(-1, 1)
pattern = np.divide(pattern, sum_pat, out=np.zeros_like(pattern), where=sum_pat!=0)
assert pattern.shape[0] == len(positions)
assert pattern.shape[1] == len(channels)
_buffer_photon_channels = []
# Randomly assign to channel given probability of each channel
for unique_i, count in zip(*np.unique(_instruction, return_counts=True)):
pat = pattern[unique_i] # [pmt]
if aft > 0: # Redistribute pattern with user specified aft
_aft = aft * (1 + np.random.normal(0, aft_random))
_aft = np.clip(_aft, 0, 1)
pat[top_index] = pat[top_index] / pat[top_index].sum() * _aft
pat[bottom_index] = pat[bottom_index] / pat[bottom_index].sum() * (1 - _aft)
if np.isnan(pat).sum() > 0: # Pattern map return zeros
_photon_channels = np.array([-1] * count)
else:
_photon_channels = np.random.choice(
channels,
size=count,
p=pat,
replace=True)
_buffer_photon_channels.append(_photon_channels)
_photon_channels = np.concatenate(_buffer_photon_channels)
# Remove photon with channel -1
mask = _photon_channels != -1
_photon_channels = _photon_channels[mask]
_photon_timings = _photon_timings[mask]
sorted_index = np.argsort(_photon_channels)
return _photon_channels[sorted_index], _photon_timings[sorted_index]
[docs]@export
class PhotoIonization_Electron(S2):
"""
Produce electron after pulse simulation, using already built cdfs
The cdfs follow distribution parameters extracted from data.
"""
def __init__(self, config):
super().__init__(config)
self._photon_timings = []
[docs] def generate_instruction(self, signal_pulse, signal_pulse_instruction):
if len(signal_pulse._photon_timings) == 0: return []
return self.electron_afterpulse(signal_pulse, signal_pulse_instruction)
[docs] def electron_afterpulse(self, signal_pulse, signal_pulse_instruction):
"""
For electron afterpulses we assume a uniform x, y
"""
delaytime_pmf_hist = self.resource.uniform_to_ele_ap
# To save calculation we first find out how many photon will give rise ap
n_electron = np.random.poisson(delaytime_pmf_hist.n
* len(signal_pulse._photon_timings)
* self.config['photoionization_modifier'])
ap_delay = delaytime_pmf_hist.get_random(n_electron).clip(
self.config['drift_time_gate'] + 1, None)
# Randomly select original photon as time zeros
t_zeros = signal_pulse._photon_timings[np.random.randint(
low=0, high=len(signal_pulse._photon_timings),
size=n_electron)]
instruction = np.repeat(signal_pulse_instruction[0], n_electron)
instruction['type'] = 4 # pi_el
instruction['time'] = t_zeros + self.config['drift_time_gate']
instruction['x'], instruction['y'] = self._rand_position(n_electron)
instruction['z'] = - ap_delay * self.config['drift_velocity_liquid']
instruction['amp'] = 1
return instruction
def _rand_position(self, n):
Rupper = self.config['tpc_radius']
r = np.sqrt(np.random.uniform(0, Rupper*Rupper, n))
angle = np.random.uniform(-np.pi, np.pi, n)
return r * np.cos(angle), r * np.sin(angle)
[docs]@export
class PhotoElectric_Electron(S2):
"""
Produce electron after S2 pulse simulation, using a gaussian distribution
"""
def __init__(self, config):
super().__init__(config)
self._photon_timings = []
[docs] def generate_instruction(self, signal_pulse, signal_pulse_instruction):
if len(signal_pulse._photon_timings) == 0: return []
return self.electron_afterpulse(signal_pulse, signal_pulse_instruction)
[docs] def electron_afterpulse(self, signal_pulse, signal_pulse_instruction):
n_electron = np.random.poisson(self.config['photoelectric_p']
* len(signal_pulse._photon_timings)
* self.config['photoelectric_modifier'])
ap_delay = np.clip(
np.random.normal(self.config['photoelectric_t_center'] + self.config['drift_time_gate'],
self.config['photoelectric_t_spread'],
n_electron), 0, None)
# Randomly select original photon as time zeros
t_zeros = signal_pulse._photon_timings[np.random.randint(
low=0, high=len(signal_pulse._photon_timings),
size=n_electron)]
instruction = np.repeat(signal_pulse_instruction[0], n_electron)
instruction['type'] = 6 # pe_el
instruction['time'] = t_zeros + self.config['drift_time_gate']
instruction['x'], instruction['y'] = self._rand_position(n_electron)
instruction['z'] = - ap_delay * self.config['drift_velocity_liquid']
instruction['amp'] = 1
return instruction
def _rand_position(self, n):
Rupper = 46
r = np.sqrt(np.random.uniform(0, Rupper*Rupper, n))
angle = np.random.uniform(-np.pi, np.pi, n)
return r * np.cos(angle), r * np.sin(angle)
[docs]@export
class PMT_Afterpulse(Pulse):
"""
Produce pmt after pulse simulation, using already built cdfs
The cdfs follow distribution parameters extracted from data.
"""
def __init__(self, config):
super().__init__(config)
def __call__(self, signal_pulse):
if len(signal_pulse._photon_timings) == 0:
self.clear_pulse_cache()
return
self._photon_timings = []
self._photon_channels = []
self._photon_amplitude = []
self.photon_afterpulse(signal_pulse)
super().__call__()
[docs] def photon_afterpulse(self, signal_pulse):
"""
For pmt afterpulses, gain and dpe generation is a bit different from standard photons
"""
self.element_list = self.resource.uniform_to_pmt_ap.keys()
for element in self.element_list:
delaytime_cdf = self.resource.uniform_to_pmt_ap[element]['delaytime_cdf']
amplitude_cdf = self.resource.uniform_to_pmt_ap[element]['amplitude_cdf']
# Assign each photon FRIST random uniform number rU0 from (0, 1] for timing
rU0 = 1 - np.random.rand(len(signal_pulse._photon_timings))
# Select those photons with U <= max of cdf of specific channel
cdf_max = delaytime_cdf[signal_pulse._photon_channels, -1]
sel_photon_id = np.where(rU0 <= cdf_max * self.config['pmt_ap_modifier'])[0]
if len(sel_photon_id) == 0: continue
sel_photon_channel = signal_pulse._photon_channels[sel_photon_id]
# Assign selected photon SECOND random uniform number rU1 from (0, 1] for amplitude
rU1 = 1 - np.random.rand(len(sel_photon_id))
# The map is made so that the indices are delay time in unit of ns
if 'Uniform' in element:
ap_delay = np.random.uniform(delaytime_cdf[sel_photon_channel, 0],
delaytime_cdf[sel_photon_channel, 1])
ap_amplitude = np.ones_like(ap_delay)
else:
ap_delay = (np.argmin(
np.abs(
delaytime_cdf[sel_photon_channel]
- rU0[sel_photon_id][:, None]), axis=-1)
- self.config['pmt_ap_t_modifier'])
ap_amplitude = np.argmin(
np.abs(
amplitude_cdf[sel_photon_channel]
- rU1[:, None]), axis=-1)/100.
self._photon_timings += (signal_pulse._photon_timings[sel_photon_id] + ap_delay).tolist()
self._photon_channels += signal_pulse._photon_channels[sel_photon_id].tolist()
self._photon_amplitude += np.atleast_1d(ap_amplitude).tolist()
self._photon_timings = np.array(self._photon_timings)
self._photon_channels = np.array(self._photon_channels).astype(int)
self._photon_amplitude = np.array(self._photon_amplitude)
self._photon_gain = np.array(self.config['gains'])[self._photon_channels] \
* self._photon_amplitude
[docs]@export
class RawData(object):
def __init__(self, config):
self.config = config
self.pulses = dict(
s1=S1(config),
s2=S2(config),
pi_el=PhotoIonization_Electron(config),
pe_el=PhotoElectric_Electron(config),
pmt_ap=PMT_Afterpulse(config),
)
self.resource = load_config(self.config)
def __call__(self, instructions, truth_buffer=None, **kwargs):
if truth_buffer is None:
truth_buffer = []
# Pre-load some constents from config
v = self.config['drift_velocity_liquid']
rext = self.config['right_raw_extension']
# Data cache
self._pulses_cache = []
self._raw_data_cache = []
# Iteration conditions
self.source_finished = False
self.last_pulse_end_time = - np.inf
self.instruction_event_number = np.min(instructions['event_number'])
# Primary instructions must be sorted by signal time
# int(type) by design S1-esque being odd, S2-esque being even
# thus type%2-1 is 0:S1-esque; -1:S2-esque
# Make a list of clusters of instructions, with gap smaller then rext
inst_time = instructions['time'] + instructions['z'] / v * (instructions['type'] % 2 - 1)
inst_queue = np.argsort(inst_time)
inst_queue = np.split(inst_queue, np.where(np.diff(inst_time[inst_queue]) > rext)[0]+1)
# Instruction buffer
instb = np.zeros(10000, dtype=instructions.dtype) # size ~ 1% of size of primary
instb_filled = np.zeros_like(instb, dtype=bool) # Mask of where buffer is filled
# ik those are illegible, messy logic. lmk if you have a better way
pbar = tqdm(total=len(inst_queue), desc='Simulating Raw Records')
while not self.source_finished:
# A) Add a new instruction into buffer
try:
ixs = inst_queue.pop(0) # The index from original instruction list
self.source_finished = len(inst_queue) == 0
assert len(np.where(~instb_filled)[0]) > len(ixs), "Run out of instruction buffer"
ib = np.where(~instb_filled)[0][:len(ixs)] # The index of first empty slot in buffer
instb[ib] = instructions[ixs]
instb_filled[ib] = True
pbar.update(1)
except: pass
# B) Cluster instructions again with gap size <= rext
instb_indx = np.where(instb_filled)[0]
instb_type = instb[instb_indx]['type']
instb_time = instb[instb_indx]['time'] + instb[instb_indx]['z'] \
/ v * (instb_type % 2 - 1)
instb_queue = np.argsort(instb_time, kind='stable')
instb_queue = np.split(instb_queue,
np.where(np.diff(instb_time[instb_queue]) > rext)[0]+1)
# C) Push pulse cache out first if nothing comes right after them
if np.min(instb_time) - self.last_pulse_end_time > rext and not np.isinf(self.last_pulse_end_time):
self.digitize_pulse_cache()
yield from self.ZLE()
# D) Run all clusters before the current source
stop_at_this_group = False
for ibqs in instb_queue:
for ptype in [1, 2, 4, 6]: # S1 S2 PI Gate
mask = instb_type[ibqs] == ptype
if np.sum(mask) == 0: continue # No such instruction type
instb_run = instb_indx[ibqs[mask]] # Take hold of todo list
if self.symtype(ptype) in ['s1', 's2']:
stop_at_this_group = True # Stop group iteration
_instb_run = np.array_split(instb_run, len(instb_run))
else: _instb_run = [instb_run] # Small trick to make truth small
# Run pulse simulation for real
for instb_run in _instb_run:
for instb_secondary in self.sim_data(instb[instb_run]):
ib = np.where(~instb_filled)[0][:len(instb_secondary)]
instb[ib] = instb_secondary
instb_filled[ib] = True
if len(truth_buffer): # Extract truth info
self.get_truth(instb[instb_run], truth_buffer)
instb_filled[instb_run] = False # Free buffer AFTER copyting into truth buffer
if stop_at_this_group:
break
self.digitize_pulse_cache() # from pulse cache to raw data
yield from self.ZLE()
self.source_finished = len(inst_queue) == 0 and np.sum(instb_filled) == 0
pbar.close()
[docs] @staticmethod
def symtype(ptype):
return PULSE_TYPE_NAMES[ptype]
[docs] def sim_primary(self, primary_pulse, instruction, **kwargs):
self.pulses[primary_pulse](instruction)
[docs] def sim_data(self, instruction, **kwargs):
"""Simulate a pulse according to instruction, and yield any additional instructions
for secondary electron afterpulses.
"""
# Simulate the primary pulse
primary_pulse = self.symtype(instruction['type'][0])
self.sim_primary(primary_pulse, instruction, **kwargs)
# Add PMT afterpulses, if requested
do_pmt_ap = self.config.get('enable_pmt_afterpulses', True)
if do_pmt_ap:
self.pulses['pmt_ap'](self.pulses[primary_pulse])
# Append pulses we just simulated to our cache
for pt in [primary_pulse, 'pmt_ap']:
if pt == 'pmt_ap' and not do_pmt_ap:
continue
_pulses = getattr(self.pulses[pt], '_pulses')
if len(_pulses) > 0:
self._pulses_cache += _pulses
self.last_pulse_end_time = max(
self.last_pulse_end_time,
np.max([p['right'] for p in _pulses]) * 10)
# Make new instructions for electron afterpulses, if requested
if primary_pulse in ['s1', 's2']:
if self.config.get('enable_electron_afterpulses', True):
yield self.pulses['pi_el'].generate_instruction(
self.pulses[primary_pulse], instruction)
if primary_pulse in ['s2']: # Only add gate ap to s2
yield self.pulses['pe_el'].generate_instruction(
self.pulses[primary_pulse], instruction)
self.instruction_event_number = instruction['event_number'][0]
[docs] def digitize_pulse_cache(self):
"""
Superimpose pulses (wfsim definition) into WFs w/ dynamic range truncation
"""
if len(self._pulses_cache) == 0:
self._raw_data = []
else:
self.current_2_adc = self.config['pmt_circuit_load_resistor'] \
* self.config['external_amplification'] \
/ (self.config['digitizer_voltage_range'] / 2 ** (self.config['digitizer_bits']))
self.left = np.min([p['left'] for p in self._pulses_cache]) - self.config['trigger_window']
self.right = np.max([p['right'] for p in self._pulses_cache]) + self.config['trigger_window']
assert self.right - self.left < 1000000, "Pulse cache too long"
if self.left % 2 != 0: self.left -= 1 # Seems like a digizier effect
self._raw_data = np.zeros((801,
self.right - self.left + 1), dtype=('<i8'))
# Use this mask to by pass non-activated channels
# Set to true when working with real noise
self._channel_mask = np.zeros(801, dtype=[('mask', '?'), ('left', 'i8'), ('right', 'i8')])
self._channel_mask['left'] = int(2**63-1)
for _pulse in self._pulses_cache:
ch = _pulse['channel']
self._channel_mask['mask'][ch] = True
self._channel_mask['left'][ch] = min(_pulse['left'], self._channel_mask['left'][ch])
self._channel_mask['right'][ch] = max(_pulse['right'], self._channel_mask['right'][ch])
adc_wave = - np.trunc(_pulse['current'] * self.current_2_adc).astype(int)
_slice = slice(_pulse['left'] - self.left, _pulse['right'] - self.left + 1)
self._raw_data[ch, _slice] += adc_wave
if self.config['detector'] == 'XENONnT':
adc_wave_he = adc_wave * int(self.config['high_energy_deamplification_factor'])
if ch < self.config['n_top_pmts']:
ch_he = np.arange(self.config['channel_map']['he'][0],self.config['channel_map']['he'][1]+1)[ch]
self._raw_data[ch_he, _slice] += adc_wave_he
self._channel_mask[ch_he] = True
self._channel_mask['left'][ch_he] = self._channel_mask['left'][ch]
self._channel_mask['right'][ch_he] = self._channel_mask['right'][ch]
elif ch <= self.config['channels_bottom'][-1]:
self.sum_signal(adc_wave_he,
_pulse['left'] - self.left,
_pulse['right'] - self.left + 1,
self._raw_data[self.config['channel_map']['sum_signal']])
self._pulses_cache = []
self._channel_mask['left'] -= self.left + self.config['trigger_window']
self._channel_mask['right'] -= self.left - self.config['trigger_window']
# Adding noise, baseline and digitizer saturation
if self.config.get('enable_noise', True):
self.add_noise(data=self._raw_data,
channel_mask=self._channel_mask,
noise_data=self.resource.noise_data,
noise_data_length=len(self.resource.noise_data))
self.add_baseline(self._raw_data, self._channel_mask,
self.config['digitizer_reference_baseline'],)
self.digitizer_saturation(self._raw_data, self._channel_mask)
[docs] def ZLE(self):
"""
Modified software zero lengh encoding, coverting WFs into pulses (XENON definition)
"""
# Ask for memory allocation just once
if 'zle_intervals_buffer' not in self.__dict__:
self.zle_intervals_buffer = -1 * np.ones((50000, 2), dtype=np.int64)
for ix, data in enumerate(self._raw_data):
if not self._channel_mask['mask'][ix]:
continue
channel_left, channel_right = self._channel_mask['left'][ix], self._channel_mask['right'][ix]
data = data[channel_left:channel_right+1]
# For simulated data taking reference baseline as baseline
# Operating directly on digitized downward waveform
if str(ix) in self.config.get('special_thresholds', {}):
threshold = self.config['digitizer_reference_baseline'] \
- self.config['special_thresholds'][str(ix)] - 1
else:
threshold = self.config['digitizer_reference_baseline'] - self.config['zle_threshold'] - 1
n_itvs_found = find_intervals_below_threshold(
data,
threshold=threshold,
holdoff=self.config['trigger_window'] + self.config['trigger_window'] + 1,
result_buffer=self.zle_intervals_buffer,)
itvs_to_encode = self.zle_intervals_buffer[:n_itvs_found]
itvs_to_encode[:, 0] -= self.config['trigger_window']
itvs_to_encode[:, 1] += self.config['trigger_window']
itvs_to_encode = np.clip(itvs_to_encode, 0, len(data) - 1)
# Land trigger window on even numbers
itvs_to_encode[:, 0] = np.ceil(itvs_to_encode[:, 0] / 2.0) * 2
itvs_to_encode[:, 1] = np.floor(itvs_to_encode[:, 1] / 2.0) * 2
for itv in itvs_to_encode:
yield ix, self.left + channel_left + itv[0], self.left + channel_left + itv[1], data[itv[0]:itv[1]+1]
[docs] def get_truth(self, instruction, truth_buffer):
"""Write truth in the first empty row of truth_buffer
:param instruction: Array of instructions that were simulated as a
single cluster, and should thus get one line in the truth info.
:param truth_buffer: Truth buffer to write in.
"""
ix = np.argmin(truth_buffer['fill'])
tb = truth_buffer[ix]
peak_type = self.symtype(instruction['type'][0])
pulse = self.pulses[peak_type]
for quantum in 'photon', 'electron':
times = getattr(pulse, f'_{quantum}_timings', [])
if len(times):
tb[f'n_{quantum}'] = len(times)
tb[f't_mean_{quantum}'] = np.mean(times)
tb[f't_first_{quantum}'] = np.min(times)
tb[f't_last_{quantum}'] = np.max(times)
tb[f't_sigma_{quantum}'] = np.std(times)
else:
# Peak does not have photons / electrons
# zero-photon afterpulses can be removed from truth info
if peak_type not in ['s1', 's2'] and quantum == 'photon':
return
tb[f'n_{quantum}'] = 0
tb[f't_mean_{quantum}'] = np.nan
tb[f't_first_{quantum}'] = np.nan
tb[f't_last_{quantum}'] = np.nan
tb[f't_sigma_{quantum}'] = np.nan
#Endtime is the end of the last pulse
tb['endtime'] = np.mean(instruction['time']) if np.isnan(tb['t_last_photon']) else tb['t_last_photon'] + \
(self.config['samples_before_pulse_center']+self.config['samples_after_pulse_center']+1)*self.config['sample_duration']
channels = getattr(pulse, '_photon_channels', [])
if self.config.get('exclude_dpe_in_truth', False):
n_dpe = n_dpe_bot = 0
else:
n_dpe = getattr(pulse, '_n_double_pe', 0)
n_dpe_bot = getattr(pulse, '_n_double_pe_bot', 0)
tb['n_photon'] += n_dpe
tb['n_photon'] -= np.sum(np.isin(channels, getattr(pulse, 'turned_off_pmts', [])))
# this turned_off guy, check how this works with a config['turned_off_guys']
channels_bottom = list(
set(self.config['channels_bottom']).difference(getattr(pulse, 'turned_off_pmts', [])))
tb['n_photon_bottom'] = (
np.sum(np.isin(channels, channels_bottom))
+ n_dpe_bot)
# Summarize the instruction cluster in one row of the truth file
for field in instruction.dtype.names:
value = instruction[field]
if len(instruction) > 1 and field in 'txyz':
tb[field] = np.mean(value)
elif len(instruction) > 1 and field == 'amp':
tb[field] = np.sum(value)
else:
# Cannot summarize intelligently: just take the first value
tb[field] = value[0]
# Signal this row is now filled, so it won't be overwritten
tb['fill'] = True
[docs] @staticmethod
@njit
def sum_signal(adc_wave, left, right, sum_template):
sum_template[left:right] += adc_wave
return sum_template
[docs] @staticmethod
@njit
def add_noise(data, channel_mask, noise_data, noise_data_length):
"""
Get chunk(s) of noise sample from real noise data
"""
for ch in range(data.shape[0]):
if not channel_mask['mask'][ch]:
continue
left, right = channel_mask['left'][ch], channel_mask['right'][ch]
id_t = np.random.randint(low=0, high=noise_data_length-right+left)
for ix in range(left, right+1):
if id_t+ix >= noise_data_length or ix >= len(data[ch]):
# Don't create value-errors
continue
data[ch, ix] += noise_data[id_t+ix]
[docs] @staticmethod
@njit
def add_baseline(data, channel_mask, baseline):
for ch in range(data.shape[0]):
if not channel_mask['mask'][ch]:
continue
left, right = channel_mask['left'][ch], channel_mask['right'][ch]
for ix in range(left, right+1):
data[ch, ix] += baseline
[docs] @staticmethod
@njit
def digitizer_saturation(data, channel_mask):
for ch in range(data.shape[0]):
if not channel_mask['mask'][ch]:
continue
left, right = channel_mask['left'][ch], channel_mask['right'][ch]
for ix in range(left, right+1):
if data[ch, ix] < 0:
data[ch, ix] = 0