Source code for wfsim.pax_interface

import logging
import time
import pickle
import zipfile
import zlib
from collections import namedtuple
import os

import numpy as np
import pandas as pd

from .core import RawData
from .pax_datastructure import datastructure
from .strax_interface import *
from strax import exporter
from straxen.common import get_resource

export, __all__ = exporter()

[docs]@export class PaxEvents(object): def __init__(self, config): self.config = config self.rawdata = RawData(self.config) self.truth_buffer = np.zeros(100000, dtype=instruction_dtype + truth_extra_dtype + [('fill', bool)]) # 500 s1 + 500 s2 def __call__(self, instructions): event_i = 0 # Indices of event new_event = True first_left = np.inf last_right = -np.inf for channel, left, right, data in self.rawdata(instructions, self.truth_buffer): if self.rawdata.instruction_event_number > event_i: event.start_time=(first_left - 100000) * self.config['sample_duration'] event.stop_time=(last_right + 100000) * self.config['sample_duration'] yield event event_i = self.rawdata.instruction_event_number new_event = True if new_event: event = datastructure.Event(event_number=event_i, start_time=0, stop_time=int(3e6), n_channels=self.config['n_channels'], sample_duration=self.config['sample_duration'], pulses=[],) new_event = False first_left = left if right > last_right: last_right = right event.pulses.append(datastructure.Pulse( channel=channel, left=left - (first_left - 100000), raw_data=data))
EventProxy = namedtuple('EventProxy', ['data', 'event_number', 'block_id']) default_config = { 'fax_file':None, 'detector':'XENON1T', 'field_distortion_on':True, 'event_rate':1, # Must set to one so chunk can be interpret as an event 'chunk_size':1, # Must set to one so chunk can be interpret as an event 'nchunk':200, # Number of events 'fax_config':('https://raw.githubusercontent.com/XENONnT/' 'strax_auxiliary_files/master/fax_files/fax_config_1t.json'), 'samples_to_store_before':2, 'samples_to_store_after':20, 'right_raw_extension':50000, 'trigger_window':50, 'zle_threshold':0, 'run_number':10000, # Change run_number to prevent overwritting 'events_per_file':1000, 'output_name':'./pax_data' # Each run will be saved to a subfolder under output_name }
[docs]@export class PaxEventSimulator(object): """ Simulate wf from instruction and stored in wfsim.pax_datastructure.datastructure.Event mimicing pax.datastructure.Event Then pickled, compressed and saved mimicing pax raw data zips. Call compute to start the simulation process. """ def __init__(self, config={}): self.config = default_config self.config.update(get_resource(self.config['fax_config'], fmt='json')) self.config.update(config) if self.config['fax_file']: if self.config['fax_file'][-5:] == '.root': self.instructions = read_g4(self.config['fax_file']) self.config['nevents'] = np.max(self.instructions['event_number']) else: self.instructions = instruction_from_csv(self.config['fax_file']) self.config['nevents'] = np.max(self.instructions['event_number']) else: self.instructions = rand_instructions(self.config) self.pax_event = PaxEvents(self.config) self.transfer_plugin = self.WriteZippedEncoder(self.config) self.output_plugin = self.WriteZipped(self.config)
[docs] class WriteZippedEncoder(): # Pax WriteZippedEncoder plugin with all parent class method extraced def __init__(self, config): self.config = config
[docs] @staticmethod def make_event_proxy(event, data, block_id=None): if block_id is None: block_id = event.block_id return EventProxy(data=data, event_number=event.event_number, block_id=block_id)
[docs] def transfer_event(self, event): data = pickle.dumps(event) data = zlib.compress(data, 4) # We also add start and stop time to the data, for use in MongoDBClearUntriggered return self.make_event_proxy(event, data=dict(blob=data, start_time=event.start_time, stop_time=event.stop_time))
[docs] class WriteZipped(): # Pax WriteZipped plugin with all parent class method extraced file_extension = 'zip' def __init__(self, config): self.config = config self.events_per_file = self.config.get('events_per_file', 50) self.first_event_in_current_file = None self.last_event_written = None self.output_dir = os.path.join(self.config['output_name'], '%s_MC_%d' % (self.config['detector'], self.config['run_number'])) os.makedirs(self.output_dir, exist_ok=True) # Start the temporary file. Events will first be written here, until events_per_file is reached self.tempfile = os.path.join(self.output_dir, 'temp.' + self.file_extension)
[docs] def open_new_file(self, first_event_number): """Opens a new file, closing any old open ones""" if self.last_event_written is not None: self.close_current_file() self.first_event_in_current_file = first_event_number self.events_written_to_current_file = 0 self.current_file = zipfile.ZipFile(self.tempfile, mode='w')
[docs] def write_event(self, event_proxy): """Write one more event to the folder, opening/closing files as needed""" if self.last_event_written is None \ or self.events_written_to_current_file >= self.events_per_file: self.open_new_file(first_event_number=event_proxy.event_number) self.current_file.writestr(str(event_proxy.event_number), event_proxy.data['blob']) self.events_written_to_current_file += 1 self.last_event_written = event_proxy.event_number
[docs] def close_current_file(self): """Closes the currently open file, if there is one. Also handles temporary file renaming. """ if self.last_event_written is None: print("You didn't write any events... Did you crash pax?") return self.current_file.close() # Rename the temporary file to reflect the events we've written to it os.rename(self.tempfile, os.path.join(self.output_dir, '%s-%d-%09d-%09d-%09d.%s' % (self.config['detector'], self.config['run_number'], self.first_event_in_current_file, self.last_event_written, self.events_written_to_current_file, self.file_extension)))
[docs] def compute(self): for event in self.pax_event(self.instructions): event = self.transfer_plugin.transfer_event(event) self.output_plugin.write_event(event) self.output_plugin.close_current_file() # Save truth file as well truth_file_path = os.path.join(self.output_plugin.output_dir, '%s-%d-truth.csv' % (self.config['detector'], self.config['run_number'])) truth = pd.DataFrame(self.pax_event.truth_buffer[self.pax_event.truth_buffer['fill']]) truth.drop(columns='fill', inplace=True) truth.to_csv(truth_file_path, index=False)