from copy import deepcopy
import logging
import numpy as np
import os.path as osp
import strax
from strax import exporter
import straxen
NT_AUX_INSTALLED = False
STRAX_AUX_INSTALLED = False
try:
import ntauxfiles
NT_AUX_INSTALLED = True
except (ModuleNotFoundError, ImportError):
pass
try:
import straxauxfiles
STRAX_AUX_INSTALLED = True
except (ModuleNotFoundError, ImportError):
pass
export, __all__ = exporter()
logging.basicConfig(handlers=[logging.StreamHandler()])
log = logging.getLogger('wfsim.resource')
log.setLevel('WARNING')
_cached_configs = dict()
[docs]
@export
def load_config(config):
"""Create a Resource instance from the configuration
Uses a cache to avoid re-creating instances from the same config
"""
h = strax.deterministic_hash(Resource.config_to_file(config))
if h in _cached_configs:
return _cached_configs[h]
result = Resource(config)
_cached_configs[h] = result
log.debug(f'Caching config file set {h}')
return result
[docs]
@export
class Resource:
"""
Get the configs needed for running WFSim. Configs can be obtained in
two ways:
1. Get it directly from the mongo database. This only needs the
name of the file.
2. Load it with straxen get_resource, this can either:
Download from a public repository
Read from local cache
Download from a private repository if credentials
are properly setup
"""
[docs]
@staticmethod
def config_to_file(config):
"""
Find and complete all file paths
returns dictionary mapping item name to path
"""
if config is None:
config = dict()
files = {}
if config['detector'] == 'XENON1T':
files.update({
'photon_area_distribution': 'XENON1T_spe_distributions.csv',
's1_lce_correction_map': 'XENON1T_s1_xyz_ly_kr83m_SR1_pax-680_fdc-3d_v0.json',
's1_pattern_map': 'XENON1T_s1_xyz_patterns_interp_corrected_MCv2.1.0.json.gz',
's2_correction_map': 'XENON1T_s2_xy_ly_SR1_v2.2.json',
's2_pattern_map': 'XENON1T_s2_xy_patterns_top_corrected_MCv2.1.0.json.gz',
'photon_ap_cdfs': 'x1t_pmt_afterpulse_config.pkl.gz',
'fdc_3d': 'XENON1T_FDC_SR1_data_driven_time_dependent_3d_correction_tf_nn_part1_v1.json.gz',
'ele_ap_pdfs': 'x1t_se_afterpulse_delaytime.pkl.gz',
'noise_file': 'x1t_noise_170203_0850_00_small.npz'
})
elif config['detector'] == 'XENONnT':
files.update({
'photon_area_distribution': 'XENONnT_spe_distributions_20210305.csv',
's1_pattern_map': 'XENONnT_s1_xyz_patterns_LCE_corrected_qes_MCva43fa9b_wires.pkl',
's1_lce_correction_map': 'XENONnT_s1_xyz_LCE_corrected_qes_MCva43fa9b_wires.json.gz',
's2_pattern_map': 'XENONnT_s2_xy_patterns_LCE_corrected_qes_MCva43fa9b_wires.pkl',
's2_correction_map': 'XENONnT_s2_xy_map_v4_210503_mlp_3_in_1_iterated.json',
'se_gain_map': 'XENONnT_se_xy_map_v1_mlp.json',
'photon_ap_cdfs': 'XENONnT_pmt_afterpulse_config_012605.json.gz',
's2_luminescence': 'XENONnT_GARFIELD_B1d5n_C30n_G1n_A6d5p_T1d5n_PMTs1d5n_FSR0d95n.npz',
"s2_luminescence_gg": "garfield_timing_map_gas_gap_sr0.npy",
'gas_gap_map': 'gas_gap_warping_map_January_2021.pkl',
'garfield_gas_gap_map': 'garfield_gas_gap_map_sr0.json',
'noise_file': 'x1t_noise_170203_0850_00_small.npz',
'fdc_3d': 'XnT_3D_FDC_xyt_dummy_all_zeros_v0.1.json.gz',
'field_dependencies_map': '',
"diffusion_longitudinal_map": '',
's1_time_spline': 'XENONnT_s1_proponly_va43fa9b_wires_20200625.json.gz',
's2_time_spline': '',
})
elif config['detector'] == 'XENONnT_neutron_veto':
files.update({
'photon_area_distribution': 'XENONnT_spe_distributions_nveto_013071.csv',
'nv_pmt_qe': 'nveto_pmt_qe.json',
'noise_file': 'xnt_noise_nveto_014901.npz'
})
else:
raise ValueError(f"Unsupported detector {config['detector']}")
# Allowing user to replace default with specified files
for k in set(config).intersection(files):
files[k] = config[k]
commit = 'master' # Replace this by a commit hash if you feel solid and responsible
if config.get('url_base', False):
files['url_base'] = config['url_base']
elif config['detector'] == "XENON1T":
files['url_base'] = f'https://raw.githubusercontent.com/XENONnT/strax_auxiliary_files/{commit}/sim_files'
else:
files['url_base'] = f'https://raw.githubusercontent.com/XENONnT/private_nt_aux_files/{commit}/sim_files'
return files
[docs]
@staticmethod
def get_file_path(base, fname):
"""Find the full path to the resource file
Try 4 methods in the following order
1. The base is not url, return base + name
2. If ntauxfiles (straxauxfiles) is installed, return will be package dir + name
pip install won't work, try python setup.py in the packages
3. Download the latest version using straxen mongo downloader from database,
return the cached file path + md5
4. Download using straxen get_resource from the url (github raw)
simply return base + name
Be careful with the forth options, straxen creates
cache files that might not be updated with the latest github commit.
"""
if not fname:
log.warning(f"A file has value False, assuming this is intentional.")
return
if fname.startswith('/'):
log.warning(f"Using local file {fname} for a resource. "
f"Do not set this as a default or TravisCI tests will break")
return fname
if base.startswith('/'):
log.warning(f"Using local folder {base} for all resources. "
f"Do not set this as a default or TravisCI tests will break")
return osp.join(base, fname)
if NT_AUX_INSTALLED:
# You might want to use this, for example if you are a developer
if fname in ntauxfiles.list_private_files():
log.warning(f"Using the private repo to load {fname} locally")
fpath = ntauxfiles.get_abspath(fname)
log.info(f"Loading {fname} is successfully from {fpath}")
return fpath
if STRAX_AUX_INSTALLED:
if fname in straxauxfiles.list_aux_files():
log.warning(f"Using the public repo to load {fname} locally")
fpath = straxauxfiles.get_abspath(fname)
log.info(f"Loading {fname} is successfully from {fpath}")
return fpath
try:
# https://straxen.readthedocs.io/en/latest/config_storage.html
# downloading-xenonnt-files-from-the-database # noqa
# we need to add the straxen.MongoDownloader() in this
# try: except NameError: logic because the NameError
# gets raised if we don't have access to utilix.
downloader = straxen.MongoDownloader()
# FileNotFoundError, ValueErrors can be raised if we
# cannot load the requested config
fpath = downloader.download_single(fname)
log.warning(f"Loading {fname} from mongo downloader to {fpath}")
return fname # Keep the name and let get_resource do its thing
except (FileNotFoundError, ValueError, NameError, AttributeError):
log.info(f"Mongo downloader not possible or does not have {fname}")
# We cannot download the file from the database. We need to
# try to get a placeholder file from a URL.
furl = osp.join(base, fname)
log.warning(f'{fname} did not download, trying download from {base}')
return furl
def __init__(self, config=None):
files = self.config_to_file(config)
log.debug('Getting\n' + '\n'.join([f'{k}: {v}' for k, v in files.items()]))
for k, v in files.items():
if isinstance(v, list):
# It's a dummy map call, do nothing
continue
if k == 'url_base':
continue
if v == '':
log.warning(f'{k} has no path so this config file is set to None')
files[k] = None
continue
log.debug(f'Obtaining {k} from {v}')
files[k] = self.get_file_path(files['url_base'], v)
if config.get('detector', 'XENONnT') == 'XENON1T':
self.s1_pattern_map = make_map(files['s1_pattern_map'], fmt='json.gz')
self.s1_lce_correction_map = make_map(files['s1_lce_correction_map'], fmt='json')
self.s2_correction_map = make_map(files['s2_correction_map'], fmt='json')
self.s2_pattern_map = make_map(files['s2_pattern_map'], fmt='json.gz')
self.fdc_3d = make_map(files['fdc_3d'], fmt='json.gz')
# Gas gap warping map
if config.get('enable_gas_gap_warping', False):
self.gas_gap_length = make_map(["constant dummy", 0.25, [254, ]])
# Photon After Pulses
if config.get('enable_pmt_afterpulses', False):
self.uniform_to_pmt_ap = straxen.get_resource(files['photon_ap_cdfs'], fmt='pkl.gz')
# Electron After Pulses
if config.get('enable_electron_afterpulses', False):
self.uniform_to_ele_ap = straxen.get_resource(files['ele_ap_pdfs'], fmt='pkl.gz')
elif config.get('detector', 'XENONnT') == 'XENONnT':
pmt_mask = np.array(config['gains']) > 0 # Converted from to pe (from cmt by default)
self.s1_pattern_map = make_patternmap(files['s1_pattern_map'], fmt='pkl', pmt_mask=pmt_mask)
self.s2_pattern_map = make_patternmap(files['s2_pattern_map'], fmt='pkl', pmt_mask=pmt_mask)
self.se_gain_map = make_map(files['se_gain_map'])
# self.s2_correction_map = make_map(files['s2_correction_map'], fmt = 'json')
# if there is a (data driven!) map, load it. If not make it from the pattern map
if files['s1_lce_correction_map']:
self.s1_lce_correction_map = make_map(files['s1_lce_correction_map'])
else:
lymap = deepcopy(self.s1_pattern_map)
# AT: this scaling with mast is redundant to `make_patternmap`, but keep it in for now
lymap.data['map'] = np.sum(lymap.data['map'][:][:][:], axis=3, keepdims=True, where=pmt_mask)
lymap.__init__(lymap.data)
self.s1_lce_correction_map = lymap
# making S2 aft scaling (if provided)
if 's2_mean_area_fraction_top' in config.keys():
avg_s2aft_=config['s2_mean_area_fraction_top']
if avg_s2aft_>=0.0:
if isinstance(files['s2_pattern_map'], list):
log.warning(f'Scaling of S2 AFT with dummy map, this will have no effect!')
else:
s2map=deepcopy(self.s2_pattern_map)
s2map_topeff_=s2map.data['map'][...,0:config['n_top_pmts']].sum(axis=2)
s2map_toteff_=s2map.data['map'].sum(axis=2)
orig_aft_=np.mean((s2map_topeff_/s2map_toteff_)[s2map_toteff_>0.0])
# getting scales for top/bottom separately to preserve total efficiency
scale_top_=avg_s2aft_/orig_aft_
scale_bot_=(1 - avg_s2aft_)/(1 - orig_aft_)
s2map.data['map'][:,:,0:config['n_top_pmts']]*=scale_top_
s2map.data['map'][:,:,config['n_top_pmts']:config['n_tpc_pmts']]*=scale_bot_
self.s2_pattern_map.__init__(s2map.data)
# if there is a (data driven!) map, load it. If not make it from the pattern map
if files['s2_correction_map']:
self.s2_correction_map = make_map(files['s2_correction_map'], fmt = 'json')
else:
s2cmap = deepcopy(self.s2_pattern_map)
# Lower the LCE by removing contribution from dead PMTs
# AT: masking is a bit redundant due to PMT mask application in make_patternmap
s2cmap.data['map'] = np.sum(s2cmap.data['map'][:][:], axis=2, keepdims=True, where=pmt_mask)
# Scale by median value
s2cmap.data['map'] = s2cmap.data['map'] / np.median(s2cmap.data['map'][s2cmap.data['map'] > 0])
s2cmap.__init__(s2cmap.data)
self.s2_correction_map = s2cmap
# Garfield luminescence timing samples
# if config.get('s2_luminescence_model', False) == 'garfield':
if 'garfield_gas_gap' in config.get('s2_luminescence_model', ''):
#garfield_gas_gap option is using (x,y) -> gas gap (from the map) -> s2 luminescence
#from garfield. This s2_luminescence_gg is indexed only by the gas gap, and
#corresponds to electrons drawn directly below the anode
s2_luminescence_map = straxen.get_resource(files['s2_luminescence_gg'], fmt='npy')
self.s2_luminescence_gg = s2_luminescence_map
self.garfield_gas_gap_map = make_map(files['garfield_gas_gap_map'], fmt = 'json')
elif 'garfield' in config.get('s2_luminescence_model', ''):
#This option indexes the luminescence times using the liquid level values
#as well as the position between the full pitch of two gate wires
gf_file_name = files['s2_luminescence']
if gf_file_name.endswith('npy'):
s2_luminescence_map = straxen.get_resource(gf_file_name, fmt='npy')
self.s2_luminescence = s2_luminescence_map
elif gf_file_name.endswith('npz'):
# Backwards compatibility from before #363 / #370
s2_luminescence_map = straxen.get_resource(gf_file_name, fmt='npy_pickle')['arr_0']
# Get directly the map for the simulated level
liquid_level_available = np.unique(s2_luminescence_map['ll']) # available levels (cm)
liquid_level = config['gate_to_anode_distance'] - config['elr_gas_gap_length'] # cm
liquid_level = min(liquid_level_available, key=lambda x: abs(x - liquid_level))
self.s2_luminescence = s2_luminescence_map[s2_luminescence_map['ll'] == liquid_level]
else:
raise ValueError(f'{gf_file_name} is of unknown format')
if config.get('field_distortion_model', "none") == "inverse_fdc":
self.fdc_3d = make_map(files['fdc_3d'], fmt='json.gz')
self.fdc_3d.scale_coordinates([1., 1., - config['drift_velocity_liquid']])
if config.get('field_distortion_model', "none") == "comsol":
self.fd_comsol = make_map(config['field_distortion_comsol_map'], fmt='json.gz', method='RectBivariateSpline')
# Gas gap warping map
if config.get('enable_gas_gap_warping', False):
gas_gap_map = straxen.get_resource(files['gas_gap_map'], fmt='pkl')
self.gas_gap_length = lambda positions: gas_gap_map.lookup(*positions.T)
# Field dependencies
# This config entry a dictionary of 5 items
if any(config['enable_field_dependencies'].values()):
field_dependencies_map = make_map(files['field_dependencies_map'], fmt='json.gz', method='RectBivariateSpline')
self.drift_velocity_scaling=1.0
# calculating drift velocity scaling to match total drift time for R=0 between cathode and gate
if "norm_drift_velocity" in config['enable_field_dependencies'].keys():
if config['enable_field_dependencies']['norm_drift_velocity']:
norm_dvel = field_dependencies_map(np.array([ [0], [- config['tpc_length']]]).T,
map_name='drift_speed_map')[0]
norm_dvel*=1e-4
self.drift_velocity_scaling=config['drift_velocity_liquid']/norm_dvel
def rz_map(z, xy, **kwargs):
r = np.sqrt(xy[:, 0]**2 + xy[:, 1]**2)
return field_dependencies_map(np.array([r, z]).T, **kwargs)
self.field_dependencies_map = rz_map
# Data-driven longitudinal diffusion map
# TODO: Change to the best way to accommodate simulation/data-driven map
if config['enable_field_dependencies']["diffusion_longitudinal_map"]:
diffusion_longitudinal_map = make_map(files['diffusion_longitudinal_map'], fmt='json.gz',
method='WeightedNearestNeighbors')
def _rz_map(z, xy, **kwargs):
r = np.sqrt(xy[:, 0]**2 + xy[:, 1]**2)
return diffusion_longitudinal_map(np.array([r, z]).T, **kwargs)
self.diffusion_longitudinal_map = _rz_map
# Photon After Pulses
if config.get('enable_pmt_afterpulses', False):
self.uniform_to_pmt_ap = straxen.get_resource(files['photon_ap_cdfs'], fmt='json.gz')
# S1 photon timing splines
if config.get('s1_time_spline', False):
self.s1_optical_propagation_spline = make_map(files['s1_time_spline'], fmt='json.gz',
method='RegularGridInterpolator')
# Electron After Pulses
if config.get('enable_electron_afterpulses', False):
self.uniform_to_ele_ap = straxen.get_resource(config.get('ele_ap_pdfs', ''), fmt='dill')
# S2 photons timing optical propagation delays
if config.get('s2_time_spline', False):
self.s2_optical_propagation_spline = make_map(files['s2_time_spline'])
elif config.get('detector', 'XENONnT') == 'XENONnT_neutron_veto':
# Neutron veto PMT QE as function of wavelength
self.nv_pmt_qe = straxen.get_resource(files['nv_pmt_qe'], fmt='json')
# SPE area distributions
self.photon_area_distribution = straxen.get_resource(files['photon_area_distribution'], fmt='csv')
# Noise sample
if config.get('enable_noise', False):
self.noise_data = straxen.get_resource(files['noise_file'], fmt='npy')['arr_0']
n_channels = len(self.noise_data[0])
log.warning(f'Using noise data {files["noise_file"]} with {n_channels} channels for {config["detector"]}')
log.debug(f'{self.__class__.__name__} fully initialized')
def make_map(map_file, fmt=None, method='WeightedNearestNeighbors'):
"""Fetch and make an instance of InterpolatingMap based on map_file
Alternatively map_file can be a list of ["constant dummy", constant: int, shape: list]
return an instance of DummyMap"""
if isinstance(map_file, list):
assert map_file[0] == 'constant dummy', ('Alternative file input can only be '
'("constant dummy", constant: int, shape: list')
return DummyMap(map_file[1], map_file[2])
elif isinstance(map_file, str):
if fmt is None:
fmt = parse_extension(map_file)
log.debug(f'Initialize map interpolator for file {map_file}')
map_data = straxen.get_resource(map_file, fmt=fmt)
return straxen.InterpolatingMap(map_data, method=method)
else:
raise TypeError("Can't handle map_file except a string or a list")
[docs]
@export
def make_patternmap(map_file, fmt=None, method='WeightedNearestNeighbors', pmt_mask=None):
""" This is special interpretation of the of previous make_map(), but designed
for pattern map loading with provided PMT mask. This way simplifies both S1 and S2
cases
"""
# making tests not failing, we can probably overwrite it completel
if isinstance(map_file, list):
log.warning(f'Using dummy map with pattern mask! This has no effect here!')
assert map_file[0] == 'constant dummy', ('Alternative file input can only be '
'("constant dummy", constant: int, shape: list')
return DummyMap(map_file[1], map_file[2])
elif isinstance(map_file, str):
if fmt is None:
fmt = parse_extension(map_file)
map_data = deepcopy(straxen.get_resource(map_file, fmt=fmt))
# XXX: straxed deals with pointers and caches resources, it means that resources are global
# what is bad, so we make own copy here and modify it locally
if 'compressed' in map_data:
compressor, dtype, shape = map_data['compressed']
map_data['map'] = np.frombuffer(
strax.io.COMPRESSORS[compressor]['decompress'](map_data['map']),
dtype=dtype).reshape(*shape)
del map_data['compressed']
if 'quantized' in map_data:
map_data['map'] = map_data['quantized']*map_data['map'].astype(np.float32)
del map_data['quantized']
if not (pmt_mask is None):
assert (map_data['map'].shape[-1]==pmt_mask.shape[0]), "Error! Pattern map and PMT gains must have same dimensions!"
map_data['map'][..., ~pmt_mask]=0.0
return straxen.InterpolatingMap(map_data, method=method)
else:
raise TypeError("Can't handle map_file except a string or a list")
[docs]
@export
class DummyMap:
"""Return constant results
the length match the length of input
but from the second dimensions the shape is user defined input
"""
def __init__(self, const, shape=()):
self.const = const
self.shape = shape
def __call__(self, x, **kwargs):
shape = [len(x)] + list(self.shape)
return np.ones(shape) * self.const
[docs]
def reduce_last_dim(self):
assert len(self.shape) >= 1, 'Need at least 1 dim to reduce further'
const = self.const * self.shape[-1]
shape = list(self.shape)
shape[-1] = 1
return DummyMap(const, shape)
##
# Copy from https://github.com/XENONnT/private_nt_aux_files
##
def parse_extension(name):
"""Get the extention from a file name. If zipped or tarred, can contain a dot"""
split_name = name.split('.')
if len(split_name) == 2:
fmt = split_name[-1]
elif len(split_name) > 2 and 'gz' in name:
fmt = '.'.join(split_name[-2:])
else:
fmt = split_name[-1]
log.warning(f'Using {fmt} for unspecified {name}')
return fmt