Source code for NuRadioReco.modules.io.coreas.readCoREASShower

import NuRadioReco.framework.event
import NuRadioReco.framework.station
from NuRadioReco.framework.parameters import showerParameters as shp
from NuRadioReco.modules.base.module import register_run
from NuRadioReco.modules.io.coreas import coreas
from NuRadioReco.utilities import units
from radiotools import coordinatesystems
import h5py
import numpy as np
import time
import re
import os
import logging
logger = logging.getLogger('NuRadioReco.coreas.readCoREASShower')


[docs] class readCoREASShower: """ This module can be used to read in all simulated "observers" from a CoREAS simulation and return them as stations. This is in particular useful for air shower array experiments like Auger, LOFAR or SKA. However, it is important to stress that this module will return a `station` object per simulated observer. That fits well the terminology used in Auger were a "Station" is a dual-polerized antenna. However, for other experiments like LOFAR or SKA where a "Station" is a cluster of (dual-polerized) antennas, the user has to be aware that this module will return a `station` object per antenna. """ def __init__(self): self.__t = 0 self.__t_event_structure = 0 self.__t_per_event = 0 self.__input_files = None self.__current_input_file = None self.__det = None self.__ascending_run_and_event_number = None
[docs] def begin(self, input_files, det=None, logger_level=logging.NOTSET, set_ascending_run_and_event_number=False): """ begin method initialize readCoREASShower module Parameters ---------- input_files: input files list of coreas hdf5 files det: genericDetector object If a genericDetector is passed, the stations from the CoREAS file will be added to it and the run method returns both the event and the detector logger_level: string or logging variable Set verbosity level for logger (default: logging.NOTSET) set_ascending_run_and_event_number: bool If set to True the run number and event id is set to self.__ascending_run_and_event_number instead of beeing taken from the simulation file. The value is increases monoton. This can be used to avoid ambiguities values (default: False) """ self.__input_files = input_files self.__current_input_file = 0 self.__det = det logger.setLevel(logger_level) self.__ascending_run_and_event_number = 1 if set_ascending_run_and_event_number else 0
[docs] @register_run() def run(self): """ Reads in CoREAS file(s) and returns one event containing all simulated observer positions as stations. Yields ------ evt : `NuRadioReco.framework.event.Event` The event containing the simulated observer as sim. stations. det : `NuRadioReco.detector.generic_detector.GenericDetector` Optional, only if a detector description is passed to the begin method. Contains the detector description with the on-the-fly added stations. """ while self.__current_input_file < len(self.__input_files): t = time.time() t_per_event = time.time() filesize = os.path.getsize( self.__input_files[self.__current_input_file]) if filesize < 18456 * 2: # based on the observation that a file with such a small filesize is corrupt logger.warning( "file {} seems to be corrupt, skipping to next file".format( self.__input_files[self.__current_input_file] ) ) self.__current_input_file += 1 continue logger.info('Reading %s ...' % self.__input_files[self.__current_input_file]) corsika = h5py.File( self.__input_files[self.__current_input_file], "r") logger.info("using coreas simulation {} with E={:2g} theta = {:.0f}".format( self.__input_files[self.__current_input_file], corsika['inputs'].attrs["ERANGE"][0] * units.GeV, corsika['inputs'].attrs["THETAP"][0])) f_coreas = corsika["CoREAS"] if self.__ascending_run_and_event_number: evt = NuRadioReco.framework.event.Event(self.__ascending_run_and_event_number, self.__ascending_run_and_event_number) self.__ascending_run_and_event_number += 1 else: evt = NuRadioReco.framework.event.Event( corsika['inputs'].attrs['RUNNR'], corsika['inputs'].attrs['EVTNR']) evt.__event_time = f_coreas.attrs["GPSSecs"] # create sim shower, no core is set since no external detector description is given sim_shower = coreas.make_sim_shower(corsika) sim_shower.set_parameter(shp.core, np.array( [-f_coreas.attrs["CoreCoordinateWest"], f_coreas.attrs["CoreCoordinateNorth"], f_coreas.attrs["CoreCoordinateVertical"]] ) * units.cm) # set core evt.add_sim_shower(sim_shower) # initialize coordinate transformation cs = coordinatesystems.cstrafo( sim_shower.get_parameter(shp.zenith), sim_shower.get_parameter(shp.azimuth), magnetic_field_vector=sim_shower.get_parameter(shp.magnetic_field_vector)) # add simulated pulses as sim station for idx, (name, observer) in enumerate(f_coreas['observers'].items()): # returns proper station id if possible station_id = antenna_id(name, idx) station = NuRadioReco.framework.station.Station(station_id) if self.__det is None: sim_station = coreas.make_sim_station( station_id, corsika, observer, channel_ids=[0, 1]) else: sim_station = coreas.make_sim_station( station_id, corsika, observer, channel_ids=self.__det.get_channel_ids(self.__det.get_default_station_id())) station.set_sim_station(sim_station) evt.set_station(station) if self.__det is not None: position = observer.attrs['position'] antenna_position = np.array([-position[1], position[0], position[2]]) * units.cm antenna_position = cs.transform_from_magnetic_to_geographic(antenna_position) if not self.__det.has_station(station_id): self.__det.add_generic_station({ 'station_id': station_id, 'pos_easting': antenna_position[0], 'pos_northing': antenna_position[1], 'pos_altitude': antenna_position[2], 'reference_station': self.__det.get_reference_station_ids()[0] }) else: self.__det.add_station_properties_for_event({ 'pos_easting': antenna_position[0], 'pos_northing': antenna_position[1], 'pos_altitude': antenna_position[2] }, station_id, evt.get_run_number(), evt.get_id()) self.__t_per_event += time.time() - t_per_event self.__t += time.time() - t self.__current_input_file += 1 if self.__det is None: yield evt else: self.__det.set_event(evt.get_run_number(), evt.get_id()) yield evt, self.__det
[docs] def end(self): from datetime import timedelta dt = timedelta(seconds=self.__t) logger.info("total time used by this module is {}".format(dt)) logger.info("\tcreate event structure {}".format( timedelta(seconds=self.__t_event_structure))) logger.info("per event {}".format( timedelta(seconds=self.__t_per_event))) return dt
[docs] def antenna_id(antenna_name, default_id): """ This function parses the antenna names given in a CoREAS simulation and tries to find an ID It can be extended to other name patterns """ if re.match("AERA_", antenna_name): new_id = int(antenna_name.strip("AERA_")) return new_id else: return default_id