Source code for wfsim.pax_interface

import logging
import time
import pickle
import zipfile
import zlib
from collections import namedtuple
import os

import numpy as np
import pandas as pd

import wfsim
from .pax_datastructure import datastructure
from .strax_interface import *
from strax import exporter
from straxen.common import get_resource

export, __all__ = exporter()


[docs] @export class PaxEvents(object): def __init__(self, config): self.config = config self.rawdata = wfsim.RawData(self.config) self.truth_buffer = np.zeros(100000, dtype=instruction_dtype + truth_extra_dtype + [('fill', bool)]) # 500 s1 + 500 s2 def __call__(self, instructions): event_i = 0 # Indices of event new_event = True first_left = np.inf last_right = -np.inf for channel, left, right, data in self.rawdata(instructions, self.truth_buffer): if self.rawdata.instruction_event_number > event_i: event.start_time = (first_left - 100000) * self.config['sample_duration'] event.stop_time = (last_right + 100000) * self.config['sample_duration'] yield event event_i = self.rawdata.instruction_event_number new_event = True if new_event: event = datastructure.Event(event_number=event_i, start_time=0, stop_time=int(3e6), n_channels=self.config['n_channels'], sample_duration=self.config['sample_duration'], pulses=[],) new_event = False first_left = left if right > last_right: last_right = right event.pulses.append(datastructure.Pulse( channel=channel, left=left - (first_left - 100000), raw_data=data))
EventProxy = namedtuple('EventProxy', ['data', 'event_number', 'block_id']) default_config = { 'fax_file': None, 'detector': 'XENON1T', 'field_distortion_on': True, 'event_rate': 1, # Must set to one so chunk can be interpret as an event 'chunk_size': 1, # Must set to one so chunk can be interpret as an event 'nchunk': 200, # Number of events 'fax_config': ('https://raw.githubusercontent.com/XENONnT/' 'strax_auxiliary_files/master/fax_files/fax_config_1t.json'), 'samples_to_store_before': 2, 'samples_to_store_after': 20, 'right_raw_extension': 50000, 'trigger_window': 50, 'zle_threshold': 0, 'run_number': 10000, # Change run_number to prevent overwritting 'events_per_file': 1000, 'output_name': './pax_data' # Each run will be saved to a subfolder under output_name }
[docs] @export class PaxEventSimulator(object): """ Simulate wf from instruction and stored in wfsim.pax_datastructure.datastructure.Event mimicing pax.datastructure.Event Then pickled, compressed and saved mimicing pax raw data zips. Call compute to start the simulation process. """ def __init__(self, config=None): if config is None: config = {} self.config = default_config self.config.update(get_resource(self.config['fax_config'], fmt='json')) self.config.update(config) if self.config['fax_file']: if self.config['fax_file'][-5:] == '.root': self.instructions = read_g4(self.config['fax_file']) self.config['nevents'] = np.max(self.instructions['event_number']) else: self.instructions = instruction_from_csv(self.config['fax_file']) self.config['nevents'] = np.max(self.instructions['event_number']) else: self.instructions = rand_instructions(self.config) self.pax_event = PaxEvents(self.config) self.transfer_plugin = self.WriteZippedEncoder(self.config) self.output_plugin = self.WriteZipped(self.config)
[docs] class WriteZippedEncoder(object): # Pax WriteZippedEncoder plugin with all parent class method extraced def __init__(self, config): self.config = config
[docs] @staticmethod def make_event_proxy(event, data, block_id=None): if block_id is None: block_id = event.block_id return EventProxy(data=data, event_number=event.event_number, block_id=block_id)
[docs] def transfer_event(self, event): data = pickle.dumps(event) data = zlib.compress(data, 4) # We also add start and stop time to the data, for use in MongoDBClearUntriggered return self.make_event_proxy(event, data=dict(blob=data, start_time=event.start_time, stop_time=event.stop_time))
[docs] class WriteZipped(object): # Pax WriteZipped plugin with all parent class method extraced file_extension = 'zip' def __init__(self, config): self.config = config self.events_per_file = self.config.get('events_per_file', 50) self.first_event_in_current_file = None self.last_event_written = None self.output_dir = os.path.join(self.config['output_name'], '%s_MC_%d' % (self.config['detector'], self.config['run_number'])) os.makedirs(self.output_dir, exist_ok=True) # Start the temporary file. Events will first be written here, until events_per_file is reached self.tempfile = os.path.join(self.output_dir, 'temp.' + self.file_extension)
[docs] def open_new_file(self, first_event_number): """Opens a new file, closing any old open ones""" if self.last_event_written is not None: self.close_current_file() self.first_event_in_current_file = first_event_number self.events_written_to_current_file = 0 self.current_file = zipfile.ZipFile(self.tempfile, mode='w')
[docs] def write_event(self, event_proxy): """Write one more event to the folder, opening/closing files as needed""" if self.last_event_written is None \ or self.events_written_to_current_file >= self.events_per_file: self.open_new_file(first_event_number=event_proxy.event_number) self.current_file.writestr(str(event_proxy.event_number), event_proxy.data['blob']) self.events_written_to_current_file += 1 self.last_event_written = event_proxy.event_number
[docs] def close_current_file(self): """Closes the currently open file, if there is one. Also handles temporary file renaming. """ if self.last_event_written is None: print("You didn't write any events... Did you crash pax?") return self.current_file.close() # Rename the temporary file to reflect the events we've written to it os.rename(self.tempfile, os.path.join(self.output_dir, '%s-%d-%09d-%09d-%09d.%s' % (self.config['detector'], self.config['run_number'], self.first_event_in_current_file, self.last_event_written, self.events_written_to_current_file, self.file_extension)))
[docs] def compute(self): for event in self.pax_event(self.instructions): event = self.transfer_plugin.transfer_event(event) self.output_plugin.write_event(event) self.output_plugin.close_current_file() # Save truth file as well truth_file_path = os.path.join(self.output_plugin.output_dir, '%s-%d-truth.csv' % (self.config['detector'], self.config['run_number'])) truth = pd.DataFrame(self.pax_event.truth_buffer[self.pax_event.truth_buffer['fill']]) truth.drop(columns='fill', inplace=True) truth.to_csv(truth_file_path, index=False)