Source code for NuRadioReco.modules.io.coreas.readCoREAS

from NuRadioReco.modules.base.module import register_run
import h5py
import NuRadioReco.framework.event
import NuRadioReco.framework.station
import NuRadioReco.framework.radio_shower
from radiotools import coordinatesystems as cstrafo
from NuRadioReco.modules.io.coreas import coreas
from NuRadioReco.utilities import units
import numpy as np
import numpy.random
import logging
import time
import os


[docs]class readCoREAS: def __init__(self): self.__t = 0 self.__t_event_structure = 0 self.__t_per_event = 0 self.__input_files = None self.__station_id = None self.__n_cores = None self.__max_distace = None self.__current_input_file = None self.__random_generator = None self.logger = logging.getLogger('NuRadioReco.readCoREAS')
[docs] def begin(self, input_files, station_id, n_cores=10, max_distance=2 * units.km, seed=None): """ begin method initialize readCoREAS module Parameters ---------- input_files: input files list of coreas hdf5 files station_id: station id id number of the station n_cores: number of cores (integer) the number of random core positions to generate for each input file max_distance: radius of random cores (double or None) if None: max distance is set to the maximum ground distance of the star pattern simulation seed: int (default: None) Seed for the random number generation. If None is passed, no seed is set """ self.__input_files = input_files self.__station_id = station_id self.__n_cores = n_cores self.__max_distace = max_distance self.__current_input_file = 0 self.__random_generator = numpy.random.RandomState(seed)
[docs] @register_run() def run(self, detector, output_mode=0): """ Read in a random sample of stations from a CoREAS file. A number of random positions is selected within a certain radius. For each position the closest observer is selected and a simulated event is created for that observer. Parameters ---------- detector: Detector object Detector description of the detector that shall be simulated output_mode: integer (default 0) * 0: only the event object is returned * 1: the function reuturns the event object, the current inputfilename, the distance between the choosen station and the requested core position, and the area in which the core positions are randomly distributed """ while (self.__current_input_file < len(self.__input_files)): t = time.time() t_per_event = time.time() filesize = os.path.getsize(self.__input_files[self.__current_input_file]) if(filesize < 18456 * 2): # based on the observation that a file with such a small filesize is corrupt self.logger.warning("file {} seems to be corrupt, skipping to next file".format(self.__input_files[self.__current_input_file])) self.__current_input_file += 1 continue corsika = h5py.File(self.__input_files[self.__current_input_file], "r") self.logger.info( "using coreas simulation {} with E={:2g} theta = {:.0f}".format( self.__input_files[self.__current_input_file], corsika['inputs'].attrs["ERANGE"][0] * units.GeV, corsika['inputs'].attrs["THETAP"][0] ) ) positions = [] for i, observer in enumerate(corsika['CoREAS']['observers'].values()): position = observer.attrs['position'] positions.append(np.array([-position[1], position[0], 0]) * units.cm) self.logger.debug("({:.0f}, {:.0f})".format(position[0], position[1])) positions = np.array(positions) max_distance = self.__max_distace if(max_distance is None): max_distance = np.max(np.abs(positions[:, 0:2])) area = np.pi * max_distance ** 2 if(output_mode == 0): n_cores = self.__n_cores * 100 # for output mode 1 we want always n_cores in star pattern. Therefore we generate more core positions to be able to select n_cores in the star pattern afterwards elif(output_mode == 1): n_cores = self.__n_cores else: raise ValueError('output mode {} not defined.'.format(output_mode)) theta = self.__random_generator.rand(n_cores) * 2 * np.pi r = (self.__random_generator.rand(n_cores)) ** 0.5 * max_distance cores = np.array([r * np.cos(theta), r * np.sin(theta), np.zeros(n_cores)]).T zenith, azimuth, magnetic_field_vector = coreas.get_angles(corsika) cs = cstrafo.cstrafo(zenith, azimuth, magnetic_field_vector) positions_vBvvB = cs.transform_from_magnetic_to_geographic(positions.T) positions_vBvvB = cs.transform_to_vxB_vxvxB(positions_vBvvB).T dd = (positions_vBvvB[:, 0] ** 2 + positions_vBvvB[:, 1] ** 2) ** 0.5 ddmax = dd.max() self.logger.info("star shape from: {} - {}".format(-dd.max(), dd.max())) cores_vBvvB = cs.transform_from_magnetic_to_geographic(cores.T) cores_vBvvB = cs.transform_to_vxB_vxvxB(cores_vBvvB).T dcores = (cores_vBvvB[:, 0] ** 2 + cores_vBvvB[:, 1] ** 2) ** 0.5 mask_cores_in_starpattern = dcores <= ddmax if((not np.sum(mask_cores_in_starpattern)) and (output_mode == 1)): # handle special case of no core position being generated within star pattern observer = corsika['CoREAS']['observers'].values()[0] evt = NuRadioReco.framework.event.Event(corsika['inputs'].attrs['RUNNR'], corsika['inputs'].attrs['EVTNR']) # create empty event station = NuRadioReco.framework.station.Station(self.__station_id) sim_station = coreas.make_sim_station(self.__station_id, corsika, observer, detector.get_channel_ids(self.__station_id)) station.set_sim_station(sim_station) evt.set_station(station) yield evt, self.__current_input_file, None, area cores_to_iterate = cores_vBvvB[mask_cores_in_starpattern] if(output_mode == 0): # select first n_cores that are in star pattern if(np.sum(mask_cores_in_starpattern) < self.__n_cores): self.logger.warning("only {0} cores contained in star pattern, returning {0} cores instead of {1} cores that were requested".format(np.sum(mask_cores_in_starpattern), self.__n_cores)) else: cores_to_iterate = cores_vBvvB[mask_cores_in_starpattern][:self.__n_cores] self.__t_per_event += time.time() - t_per_event self.__t += time.time() - t for iCore, core in enumerate(cores_to_iterate): t = time.time() # check if out of bounds distances = np.linalg.norm(core[:2] - positions_vBvvB[:, :2], axis=1) index = np.argmin(distances) distance = distances[index] key = list(corsika['CoREAS']['observers'].keys())[index] self.logger.info( "generating core at ground ({:.0f}, {:.0f}), vBvvB({:.0f}, {:.0f}), nearest simulated station is {:.0f}m away at ground ({:.0f}, {:.0f}), vBvvB({:.0f}, {:.0f})".format( cores[iCore][0], cores[iCore][1], core[0], core[1], distance / units.m, positions[index][0], positions[index][1], positions_vBvvB[index][0], positions_vBvvB[index][1] ) ) t_event_structure = time.time() observer = corsika['CoREAS']['observers'].get(key) evt = NuRadioReco.framework.event.Event(self.__current_input_file, iCore) # create empty event station = NuRadioReco.framework.station.Station(self.__station_id) channel_ids = detector.get_channel_ids(self.__station_id) sim_station = coreas.make_sim_station(self.__station_id, corsika, observer, channel_ids) station.set_sim_station(sim_station) evt.set_station(station) sim_shower = coreas.make_sim_shower(corsika, observer, detector, self.__station_id) evt.add_sim_shower(sim_shower) rd_shower = NuRadioReco.framework.radio_shower.RadioShower(station_ids=[station.get_id()]) evt.add_shower(rd_shower) if(output_mode == 0): self.__t += time.time() - t self.__t_event_structure += time.time() - t_event_structure yield evt elif(output_mode == 1): self.__t += time.time() - t self.__t_event_structure += time.time() - t_event_structure yield evt, self.__current_input_file, distance, area else: self.logger.debug("output mode > 1 not implemented") raise NotImplementedError self.__current_input_file += 1
[docs] def end(self): from datetime import timedelta self.logger.setLevel(logging.INFO) dt = timedelta(seconds=self.__t) self.logger.info("total time used by this module is {}".format(dt)) self.logger.info("\tcreate event structure {}".format(timedelta(seconds=self.__t_event_structure))) self.logger.info("per event {}".format(timedelta(seconds=self.__t_per_event))) return dt