Source code for NuRadioReco.modules.io.NuRadioRecoio

from __future__ import absolute_import, division, print_function, unicode_literals
import NuRadioReco.framework.event
import NuRadioReco.detector.detector
import NuRadioReco.modules.io.event_parser_factory

import numpy as np
import astropy.time

import logging

import time
import os

VERSION = 2
VERSION_MINOR = 2


[docs]class NuRadioRecoio(object): def __init__(self, filenames, parse_header=True, parse_detector=True, fail_on_version_mismatch=True, fail_on_minor_version_mismatch=False, max_open_files=10, log_level=logging.NOTSET, buffer_size=104857600): """ Initialize NuRadioReco io Parameters ---------- filenames: string or list of strings the input file/files parse_header: boolean If True, the event headers are parsed and can be accessed through the get_header() function parse_detector: boolean If True, detector information in the files is parsed and can be accessed through the get_detector() function fail_on_version_mismatch: boolean Controls if the module should try to read files with a different major version fail_on_minor_version_mismatch: boolean Controls if the module should try to read files with a different minor version max_open_files: int the maximum number of files that remain open simultaneously log_level: int, default=logging.NOTSET Override the log level of this class buffer_size: int the size of the read buffer in bytes (default 100MB) """ if not isinstance(filenames, list): filenames = [filenames] self.__file_scanned = False self.logger = logging.getLogger('NuRadioReco.NuRadioRecoio') self.logger.info("initializing NuRadioRecoio with file {}".format(filenames)) t = time.time() self.logger.setLevel(log_level) # Initialize attributes self._filenames = None self.__n_events = None self._bytes_start_header = None self._bytes_length_header = None self._bytes_start = None self._bytes_length = None self.__event_ids = None self.__open_files = None self._detector_dicts = None self.__detectors = None self._event_specific_detector_changes = None self.__event_headers = None self._current_event_id = None self._current_run_number = None self.__fail_on_version_mismatch = fail_on_version_mismatch self.__fail_on_minor_version_mismatch = fail_on_minor_version_mismatch self.__parse_header = parse_header self._parse_detector = parse_detector self.__read_lock = False self.__max_open_files = max_open_files self.__buffer_size = buffer_size self.openFile(filenames) self._current_file_id = 0 self.logger.info("... finished in {:.0f} seconds".format(time.time() - t)) def _get_file(self, iF): if iF not in self.__open_files: self.logger.debug("file {} is not yet open, opening file".format(iF)) self.__open_files[iF] = {} self.__open_files[iF]['file'] = open( self._filenames[iF], 'rb', buffering=self.__buffer_size) # 100 MB buffering self.__open_files[iF]['time'] = time.time() self.__check_file_version(iF) if len(self.__open_files) > self.__max_open_files: self.logger.debug("more than {} file are open, closing oldest file".format(self.__max_open_files)) tnow = time.time() iF_close = 0 for key, value in self.__open_files.items(): if value['time'] < tnow: tnow = value['time'] iF_close = key self.logger.debug("closing file {} that was opened at {}".format(iF_close, tnow)) self.__open_files[iF_close]['file'].close() del self.__open_files[iF_close] return self.__open_files[iF]['file'] def __check_file_version(self, iF): self.__file_version = int.from_bytes(self._get_file(iF).read(6), 'little') self.__file_version_minor = int.from_bytes(self._get_file(iF).read(6), 'little') if self.__file_version == 0: self.logger.error( f"File might be corrupt, file has version {self.__file_version}.{self.__file_version} " f"but current version is {VERSION}.{VERSION_MINOR}. " f"This might indicate the file is empty. The file size is {os.stat(self._filenames[iF]).st_size} B.") elif self.__file_version > VERSION: self.logger.error( f"File might be corrupt, file has version {self.__file_version}.{self.__file_version} " f"but current version is {VERSION}.{VERSION_MINOR}. " f"This might indicate the file is empty. The file size is {os.stat(self._filenames[iF]).st_size} B.") elif self.__file_version == 1: self.logger.error( "data file not readable. File has version {}.{} but current version is {}.{}".format( self.__file_version, self.__file_version_minor, VERSION, VERSION_MINOR ) ) if self.__fail_on_version_mismatch: raise IOError elif self.__file_version_minor != VERSION_MINOR: self.logger.error( "Data file might not readable, File has version {}.{} but current version is {}.{}".format( self.__file_version, self.__file_version_minor, VERSION, VERSION_MINOR ) ) if self.__fail_on_minor_version_mismatch: raise IOError self.__scan_files_versioned = NuRadioReco.modules.io.event_parser_factory.scan_files_function( self.__file_version, self.__file_version_minor) self.__iter_events = NuRadioReco.modules.io.event_parser_factory.iter_events_function( self.__file_version, self.__file_version_minor)
[docs] def openFile(self, filenames): self._filenames = filenames self.__n_events = 0 self.__event_ids = [] self._bytes_start_header = [[]] self._bytes_length_header = [[]] self._bytes_start = [[]] self._bytes_length = [[]] self.__open_files = {} self._detector_dicts = {} self.__detectors = {} self._event_specific_detector_changes = {} self.__event_headers = {} if self.__parse_header: self.__scan_files()
[docs] def close_files(self): for f in self.__open_files.values(): f['file'].close()
[docs] def get_filenames(self): return self._filenames
def _parse_event_header(self, evt_header): from NuRadioReco.framework.parameters import stationParameters as stnp self.__event_ids.append(evt_header['event_id']) for station_id, station in evt_header['stations'].items(): if station_id not in self.__event_headers: self.__event_headers[station_id] = {} for key, value in station.items(): # treat sim_station differently if key != 'sim_station': if key not in self.__event_headers[station_id]: self.__event_headers[station_id][key] = [] if key == stnp.station_time: station_time = None if value is not None: if isinstance(value, dict): station_time = astropy.time.Time(value["value"], format=value["format"]) # For backward compatibility, we also keep supporting station times stored # as astropy.time objects elif isinstance(value, astropy.time.Time): station_time = value else: err = f"Station time not stored as dict or astropy.time.Time: ({type(value)})" self.logger.error(err) raise ValueError(err) try: station_time.format = 'isot' except AttributeError: try: station_time.precision = station_time._time.__dict__["precision"] station_time.format = 'isot' except AttributeError: self.logger.warning("setting format to 'isot' resulted in error.") pass self.__event_headers[station_id][key].append(station_time) else: self.__event_headers[station_id][key].append(value) def __scan_files(self): current_byte = 12 # skip datafile header iF = 0 iF_prev = None while True: if iF_prev != iF: self.logger.debug(f"Start scanning file {iF} ...") iF_prev = iF self._get_file(iF).seek(current_byte) continue_loop, iF, current_byte = self.__scan_files_versioned(self, iF, current_byte) if iF_prev != iF: self.logger.debug(f"Finished scanning file {iF_prev}.") if not continue_loop: break self.logger.debug(f"Finished scanning file {iF}. Finished all") self.__event_ids = np.array(self.__event_ids) self.__file_scanned = True # compute number of events n = 0 for b in self._bytes_start: n += len(b) self.__n_events = n # convert lists to numpy arrays for convenience for station_id, station in self.__event_headers.items(): for key, value in station.items(): self.__event_headers[station_id][key] = np.array(value)
[docs] def get_header(self): if not self.__file_scanned: self.__scan_files() return self.__event_headers
[docs] def get_event_ids(self): """ returns a list of (run, eventid) tuples of all events contained in the data file """ if not self.__file_scanned: self.__scan_files() return self.__event_ids
[docs] def get_event_i(self, event_number): while self.__read_lock: time.sleep(1) self.logger.debug("read lock waiting 1ms") self.__read_lock = True if not self.__file_scanned: self.__scan_files() if event_number < 0 or event_number >= self.get_n_events(): self.logger.error( 'event number {} out of bounds, only {} present in file'.format( event_number, self.get_n_events())) self.__read_lock = False return None # determine in which file event i is istart = 0 file_id = 0 for iF in range(len(self._filenames)): istop = istart + len(self._bytes_start[iF]) if (event_number >= istart) and (event_number < istop): file_id = iF event_id = event_number - istart break else: istart = istop self._get_file(file_id).seek(self._bytes_start[file_id][event_id]) evtstr = self._get_file(file_id).read(self._bytes_length[file_id][event_id]) event = NuRadioReco.framework.event.Event(0, 0) event.deserialize(evtstr) self.__read_lock = False self._current_file_id = file_id self._current_event_id = event.get_id() self._current_run_number = event.get_run_number() self.__set_event_to_detector() return event
[docs] def get_event(self, event_id): if not self.__file_scanned: self.__scan_files() mask = (self.__event_ids[:, 0] == event_id[0]) & (self.__event_ids[:, 1] == event_id[1]) if np.sum(mask) == 0: self.logger.error('event number {} not found in file'.format(event_id)) return None elif np.sum(mask) > 1: self.logger.warning( f"{np.sum(mask):d} events with the same run event id pair found. Returning first occurence.") self._current_run_number = event_id[0] self._current_event_id = event_id[1] i = np.argwhere(mask)[0][0] return self.get_event_i(i)
[docs] def get_events(self): self._current_file_id = 0 self._get_file(self._current_file_id).seek(12) # skip file header for event in self.__iter_events(self): self._current_event_id = event.get_id() self._current_run_number = event.get_run_number() self.__set_event_to_detector() yield event
def __set_event_to_detector(self): # If the event file contains a detector description that is a # generic detector, it might have event-specific properties and we # need to set the detector to the right event if self._current_file_id in self.__detectors.keys(): if 'generic_detector' in self._detector_dicts[self._current_file_id]: if self._detector_dicts[self._current_file_id]['generic_detector']: self.__detectors[self._current_file_id].set_event(self._current_run_number, self._current_event_id)
[docs] def get_detector(self): """ If parse_detector was set True in the __init__() function, this function return the detector description (assuming there is one in the files). If several files with different detectors are read, the detector for the last returned event is given. """ if not self._parse_detector: self.logger.warn("You called \"get_detector\", however, \"parse_detector\" is set to false. Return None!") return None # Check if detector object for current file already exists if self._current_file_id not in self.__detectors.keys(): # Detector object for current file does not exist, so we create it if self._current_file_id not in self._detector_dicts: if not self.__file_scanned: self.__scan_files() # Maybe we just forgot to scan the file if self._current_file_id not in self._detector_dicts: self.logger.warn('The current file does not contain a detector description. Return None') return None detector_dict = self._detector_dicts[self._current_file_id] # Extract keywords from detector dict (if not present in nur file, "detector_parameters" is empty dict) assume_inf = detector_dict['detector_parameters'].get('assume_inf', None) antenna_by_depth = detector_dict['detector_parameters'].get('antenna_by_depth', None) if 'generic_detector' in detector_dict: if detector_dict['generic_detector']: # Detector is a generic detector, so we have to consider default # station/channel and event-specific changes # the use of default_station and default_channel is deprecated. Allow to # set it for now, to ensure backward compatibility self.__detectors[self._current_file_id] = NuRadioReco.detector.detector.Detector( source='dictionary', dictionary=detector_dict, default_station=detector_dict.get('default_station', None), default_channel=detector_dict.get('default_channel', None), assume_inf=assume_inf, antenna_by_depth=antenna_by_depth, create_new=True) if self._current_file_id in self._event_specific_detector_changes.keys(): for change in self._event_specific_detector_changes[self._current_file_id]: self.__detectors[self._current_file_id].add_station_properties_for_event( properties=change['properties'], station_id=change['station_id'], run_number=change['run_number'], event_id=change['event_id']) self.__detectors[self._current_file_id].set_event(self._current_run_number, self._current_event_id) return self.__detectors[self._current_file_id] # Detector is a normal detector self.__detectors[self._current_file_id] = NuRadioReco.detector.detector.Detector( source='dictionary', dictionary=self._detector_dicts[self._current_file_id], assume_inf=assume_inf, antenna_by_depth=antenna_by_depth, create_new=True ) # Detector object for current file already exists. If it is a generic detector, # we update it to the run number and ID of the last event that was requested # (in case there are event-specific changes) and return it if 'generic_detector' in self._detector_dicts[self._current_file_id].keys(): if self._detector_dicts[self._current_file_id]['generic_detector']: self.__detectors[self._current_file_id].set_event(self._current_run_number, self._current_event_id) return self.__detectors[self._current_file_id]
[docs] def get_n_events(self): if not self.__file_scanned: self.__scan_files() return self.__n_events