from NuRadioReco.modules.base.module import register_run
import h5py
import NuRadioReco.framework.event
import NuRadioReco.framework.station
import NuRadioReco.framework.radio_shower
from radiotools import coordinatesystems as cstrafo
from NuRadioReco.framework.parameters import showerParameters as shp
from NuRadioReco.modules.io.coreas import coreas
from NuRadioReco.utilities import units
import numpy as np
import numpy.random
import logging
import time
import os
[docs]class readCoREAS:
"""
coreas input module for fixed grid of stations.
This module distributes core positions randomly within a user defined area and calculates the electric field
at the detector positions as specified in the detector description by choosing the closest antenna
of the star shape pattern simulation
"""
def __init__(self):
self.__t = 0
self.__t_event_structure = 0
self.__t_per_event = 0
self.__input_files = None
self.__station_id = None
self.__n_cores = None
self.__max_distace = None
self.__current_input_file = None
self.__random_generator = None
self.logger = logging.getLogger('NuRadioReco.readCoREAS')
[docs] def begin(self, input_files, xmin, xmax, ymin, ymax, n_cores=10, seed=None, log_level=logging.NOTSET):
"""
begin method
initialize readCoREAS module
Parameters
----------
input_files: input files
list of coreas hdf5 files
xmin: float
minimum x coordinate of the area in which core positions are distributed
xmax: float
maximum x coordinate of the area in which core positions are distributed
ymin: float
minimum y coordinate of the area in which core positions are distributed
ynax: float
maximum y coordinate of the area in which core positions are distributed
n_cores: number of cores (integer)
the number of random core positions to generate for each input file
seed: int (default: None)
Seed for the random number generation. If None is passed, no seed is set
"""
self.__input_files = input_files
self.__n_cores = n_cores
self.__current_input_file = 0
self.__area = [xmin, xmax, ymin, ymax]
self.__random_generator = numpy.random.RandomState(seed)
self.logger.setLevel(log_level)
[docs] @register_run()
def run(self, detector, output_mode=0):
"""
Read in a random sample of stations from a CoREAS file.
For each position the closest observer is selected and a simulated
event is created for that observer.
Parameters
----------
detector: Detector object
Detector description of the detector that shall be simulated
output_mode: integer (default 0)
* 0: only the event object is returned
* 1: the function reuturns the event object, the current inputfilename, the distance between the choosen station and the requested core position,
and the area in which the core positions are randomly distributed
"""
while (self.__current_input_file < len(self.__input_files)):
t = time.time()
t_per_event = time.time()
filesize = os.path.getsize(self.__input_files[self.__current_input_file])
if(filesize < 18456 * 2): # based on the observation that a file with such a small filesize is corrupt
self.logger.warning("file {} seems to be corrupt, skipping to next file".format(self.__input_files[self.__current_input_file]))
self.__current_input_file += 1
continue
try:
corsika = h5py.File(self.__input_files[self.__current_input_file], "r")
except OSError as error:
self.logger.error(f"error opening file number {self.__current_input_file}: {self.__input_files[self.__current_input_file]}")
self.logger.error(error)
self.__current_input_file += 1
continue
self.logger.info(
"using coreas simulation {} with E={:2g} theta = {:.0f}".format(
self.__input_files[self.__current_input_file],
corsika['inputs'].attrs["ERANGE"][0] * units.GeV,
corsika['inputs'].attrs["THETAP"][0]
)
)
positions = []
for i, observer in enumerate(corsika['CoREAS']['observers'].values()):
position = observer.attrs['position']
positions.append(np.array([-position[1], position[0], 0]) * units.cm)
# self.logger.debug("({:.0f}, {:.0f})".format(positions[i][0], positions[i][1]))
positions = np.array(positions)
zenith, azimuth, magnetic_field_vector = coreas.get_angles(corsika)
cs = cstrafo.cstrafo(zenith, azimuth, magnetic_field_vector)
positions_vBvvB = cs.transform_from_magnetic_to_geographic(positions.T)
positions_vBvvB = cs.transform_to_vxB_vxvxB(positions_vBvvB).T
# for i, pos in enumerate(positions_vBvvB):
# self.logger.debug("star shape")
# self.logger.debug("({:.0f}, {:.0f}); ({:.0f}, {:.0f})".format(positions[i, 0], positions[i, 1], pos[0], pos[1]))
dd = (positions_vBvvB[:, 0] ** 2 + positions_vBvvB[:, 1] ** 2) ** 0.5
ddmax = dd.max()
self.logger.info("star shape from: {} - {}".format(-dd.max(), dd.max()))
# generate core positions randomly within a rectangle
cores = np.array([self.__random_generator.uniform(self.__area[0], self.__area[1], self.__n_cores),
self.__random_generator.uniform(self.__area[2], self.__area[3], self.__n_cores),
np.zeros(self.__n_cores)]).T
self.__t_per_event += time.time() - t_per_event
self.__t += time.time() - t
station_ids = detector.get_station_ids()
for iCore, core in enumerate(cores):
t = time.time()
evt = NuRadioReco.framework.event.Event(self.__current_input_file, iCore) # create empty event
sim_shower = coreas.make_sim_shower(corsika)
sim_shower.set_parameter(shp.core, core)
evt.add_sim_shower(sim_shower)
rd_shower = NuRadioReco.framework.radio_shower.RadioShower(station_ids=station_ids)
evt.add_shower(rd_shower)
for station_id in station_ids:
# convert into vxvxB frame to calculate closests simulated station to detecor station
det_station_position = detector.get_absolute_position(station_id)
det_station_position[2] = 0
core_rel_to_station = core - det_station_position
# core_rel_to_station_vBvvB = cs.transform_from_magnetic_to_geographic(core_rel_to_station)
core_rel_to_station_vBvvB = cs.transform_to_vxB_vxvxB(core_rel_to_station)
dcore = (core_rel_to_station_vBvvB[0] ** 2 + core_rel_to_station_vBvvB[1] ** 2) ** 0.5
# print(f"{core_rel_to_station}, {core_rel_to_station_vBvvB} -> {dcore}")
if(dcore > ddmax):
# station is outside of the star shape pattern, create empty station
station = NuRadioReco.framework.station.Station(station_id)
channel_ids = detector.get_channel_ids(station_id)
sim_station = coreas.make_sim_station(station_id, corsika, None, channel_ids)
station.set_sim_station(sim_station)
evt.set_station(station)
self.logger.debug(f"station {station_id} is outside of star shape, channel_ids {channel_ids}")
else:
distances = np.linalg.norm(core_rel_to_station_vBvvB[:2] - positions_vBvvB[:,:2], axis=1)
index = np.argmin(distances)
distance = distances[index]
key = list(corsika['CoREAS']['observers'].keys())[index]
self.logger.debug(
"generating core at ground ({:.0f}, {:.0f}), rel to station ({:.0f}, {:.0f}) vBvvB({:.0f}, {:.0f}), nearest simulated station is {:.0f}m away at ground ({:.0f}, {:.0f}), vBvvB({:.0f}, {:.0f})".format(
cores[iCore][0],
cores[iCore][1],
core_rel_to_station[0],
core_rel_to_station[1],
core_rel_to_station_vBvvB[0],
core_rel_to_station_vBvvB[1],
distance / units.m,
positions[index][0],
positions[index][1],
positions_vBvvB[index][0],
positions_vBvvB[index][1]
)
)
t_event_structure = time.time()
observer = corsika['CoREAS']['observers'].get(key)
station = NuRadioReco.framework.station.Station(station_id)
channel_ids = detector.get_channel_ids(station_id)
sim_station = coreas.make_sim_station(station_id, corsika, observer, channel_ids)
station.set_sim_station(sim_station)
evt.set_station(station)
if(output_mode == 0):
self.__t += time.time() - t
yield evt
elif(output_mode == 1):
self.__t += time.time() - t
self.__t_event_structure += time.time() - t_event_structure
yield evt, self.__current_input_file
else:
self.logger.debug("output mode > 1 not implemented")
raise NotImplementedError
self.__current_input_file += 1
[docs] def end(self):
from datetime import timedelta
self.logger.setLevel(logging.INFO)
dt = timedelta(seconds=self.__t)
self.logger.info("total time used by this module is {}".format(dt))
self.logger.info("\tcreate event structure {}".format(timedelta(seconds=self.__t_event_structure)))
self.logger.info("per event {}".format(timedelta(seconds=self.__t_per_event)))
return dt