Source code for NuRadioMC.SignalProp.radioproparaytracing

from __future__ import absolute_import, division, print_function
import numpy as np
from radiotools import helper as hp
import logging
from NuRadioMC.utilities import attenuation as attenuation_util
from NuRadioMC.utilities import medium_base
from scipy import interpolate, optimize
import NuRadioReco.utilities.geometryUtilities
from NuRadioReco.utilities import units
from NuRadioReco.framework.parameters import electricFieldParameters as efp
from NuRadioMC.SignalProp.propagation_base_class import ray_tracing_base
from NuRadioMC.SignalProp import analyticraytracing as ana
from NuRadioMC.utilities import medium
from NuRadioMC.SignalProp.propagation import solution_types, solution_types_revert
import radiopropa
import scipy.constants 
import copy
import logging
import time
from matplotlib import pyplot as plt
logging.basicConfig()

"""
RadioPropa is a C++ module dedicated for ray tracing. It is a seperate module and
it has its own unit system. However, all object within NuRadio ecosystem are in the
NuRadio unit system. Therefore, when passing argument from NuRadio to RadioPropa, or
when receiving object from RadioPropa into NuRadio the units of object needed to be 
converted to the right unit system. Below is an example given for an object 'distance'

- from NuRadio to RadioPropa:
    distance_in_meter = distance_in_nuradio / units.meter
    --> this converts the distance from NuRadio units into SI unit meter  
    distance_in_radiopropa = distance_in_meter * radiopropa.meter
    --> this converts the distance from SI unit meter into RadioPropa units

- from RadioPropa to NuRadio:
    distance_in_meter = distance_in_radiopropa / radiopropa.meter
    --> this converts the distance from RadioPropa units into SI unit meter  
    distance_in_nuradio = distance_in_meter * units.meter
    --> this converts the distance from SI unit meter into NuRadio units
"""


[docs]class radiopropa_ray_tracing(ray_tracing_base): """ Numerical raytracing using Radiopropa. Currently this only works for icemodels that have only changing refractive index in z. More information on RadioPropa and how to install it can be found at https://github.com/nu-radio/RadioPropa""" def __init__(self, medium, attenuation_model="SP1", log_level=logging.WARNING, n_frequencies_integration=100, n_reflections=0, config=None, detector=None): """ class initilization Parameters ---------- medium: medium class class describing the index-of-refraction profile attenuation_model: string signal attenuation model log_level: logging object specify the log level of the ray tracing class * logging.ERROR * logging.WARNING * logging.INFO * logging.DEBUG default is WARNING n_frequencies_integration: int the number of frequencies for which the frequency dependent attenuation length is being calculated. The attenuation length for all other frequencies is obtained via linear interpolation. n_reflections: int (default 0) in case of a medium with a reflective layer at the bottom, how many reflections should be considered config: dict or None a dictionary with the optional config settings. If None, the default config is used: config['propagation']['attenuate_ice'] = True config['propagation']['focusing_limit'] = 2 config['propagation']['focusing'] = False config['propagation']['birefringence'] = False config['speedup']['delta_C_cut'] = 40 * units.degree config['propagation']['radiopropa']['mode'] = 'iterative' config['propagation']['radiopropa']['max_traj_length'] = 10000 config['propagation']['radiopropa']['iter_steps_channel'] = [25., 2., .5] config['propagation']['radiopropa']['auto_step_size'] = False config['propagation']['radiopropa']['iter_steps_zenith'] = [.5, .05, .005] detector: detector object """ self.__logger = logging.getLogger('radiopropa_ray_tracing') self.__logger.setLevel(log_level) try: import radiopropa except ImportError: self.__logger.error('ImportError: This raytracer depends on radiopropa which could not be imported. '+ 'Check wether all dependencies are installed correctly. More information on '+ 'https://github.com/nu-radio/RadioPropa') raise ImportError('This raytracer depends on radiopropa which could not be imported. Check wether all dependencies are installed correctly. More information on https://github.com/nu-radio/RadioPropa') super().__init__(medium=medium, attenuation_model=attenuation_model, log_level=log_level, n_frequencies_integration=n_frequencies_integration, n_reflections=n_reflections, config=config, detector=detector) self.set_config(config=config) self._ice_model = self._medium.get_ice_model_radiopropa() self.set_minimizer_tolerance() self._shower_axis = None ## this is given so we can limit the rays that are checked around the cherenkov angle self._rays = None self._source = None self._antenna = None
[docs] def reset_solutions(self): """ Resets the raytracing solutions back to None. This is useful to do in the loop before a new raytracing is prepared. """ super().reset_solutions() self._shower_axis = None self._rays = None self._source = None self._antenna = None
[docs] def set_start_and_end_point(self, x1, x2): """ Set the start and end points of the raytracing Parameters ---------- x1: 3dim np.array start point of the ray x2: 3dim np.array stop point of the ray """ self._source = x1 self._antenna = x2 super().set_start_and_end_point(x1, x2) self.set_iterative_step_sizes(step_zeniths=self._step_zeniths) #if auto is on this set the automated step size, otherwise nothing happens
[docs] def set_shower_axis(self, shower_axis): """ Set the the shower axis. This is oposite to the neutrino arrival direction Parameters ---------- shower_axis: np.array of shape (3,), default unit the direction of the shower in cartesian coordinates """ self._shower_axis = shower_axis / np.linalg.norm(shower_axis)
[docs] def set_iterative_sphere_sizes(self, sphere_sizes=None): """ Set the sphere_sizes for the iterative ray tracer Parameters ---------- sphere_sizes: np.array of size (n,), default unit the sphere size used by the iterative ray tracer iteration from big to small observer around channel """ if sphere_sizes is None: sphere_sizes = np.array(self._config['propagation']['radiopropa']['iter_steps_channel']) * units.meter if (sphere_sizes.ndim == 1): self._sphere_sizes = sphere_sizes else: self.__logger.error('sphere_sizes array should be 1 dimensional') raise ValueError('sphere_sizes array should be 1 dimensional')
[docs] def set_iterative_step_sizes(self, step_zeniths= None): """ Set the steps_sizes for the iterative ray tracer Parameters ---------- sphere_sizes: np.array of size (n,), default unit the sphere size used by the iterative ray tracer iteration from big to small observer around channel step_zeniths: np.array size (n,), default unit the step size for theta used by the iterative ray tracer corresponding to the sphere size, should have same lenght as _sphere_sizes auto_step: boolean defines whether or not an automatic step_size should be calculated for each sphere_size depending on the horizontal distance of the event """ if step_zeniths is None: step_zeniths = np.array(self._config['propagation']['radiopropa']['iter_steps_zenith']) * units.degree if self._auto_step: if (self._X1 != None) and (self._X2 != None): for s, sphere_size in enumerate(self._sphere_sizes): self._step_zeniths[s] = min(abs(self.delta_theta_reflective(dz=sphere_size, n_bottom_reflections=self._n_reflections)), self._step_zeniths[s]) else: return else: if (self._sphere_sizes.shape == step_zeniths.shape): self._step_zeniths = step_zeniths else: self.__logger.error('sphere_sizes array and step_zeniths array should have the same dimensions') raise ValueError('sphere_sizes array and step_zeniths array should have the same dimensions')
[docs] def activate_auto_step_size(self): self._auto_step = True self.set_iterative_step_sizes()
[docs] def deactivate_auto_step_size(self): self._auto_step = False
[docs] def set_cut_viewing_angle(self, cut): """ Set a cut on the viewing angle around the cherenkov angle. Rays with a viewing angle out of this range will be to dim and won't be seen --> limiting computing time Parameters ---------- cut: float, default unit range around the cherenkov angle """ self._cut_viewing_angle = cut
[docs] def set_maximum_trajectory_length(self, max_traj_length): """ Set a cut on the trajectory length. Otherwise computing infinite may be possible Parameters ---------- max_traj_length: float, default units maxmimal length to trace a ray. tracing aborted when reached """ self._max_traj_length = max_traj_length
[docs] def raytracer_iterative(self, n_reflections=0): """ Uses RadioPropa to find all the numerical ray tracing solutions between sphere X1 and X2. If reflections is bigger than 0, also bottom reflected rays are searched for with a max of n_reflections of the bottom """ try: X1 = self._X1 * (radiopropa.meter/units.meter) X2 = self._X2 * (radiopropa.meter/units.meter) except TypeError: self.__logger.error('NoneType: start or endpoint not initialized') raise TypeError('NoneType: start or endpoint not initialized') v = (self._X2 - self._X1) u = copy.deepcopy(v) u[2] = 0 theta_direct, phi_direct = hp.cartesian_to_spherical(*v) # zenith and azimuth for the direct linear ray solution (radians) cherenkov_angle = np.arccos(1. / self._medium.get_index_of_refraction(self._X1)) if np.linalg.norm(u) != 0: delta_theta = 2*abs(self.delta_theta_direct(dz=self._sphere_sizes[0])) else: delta_theta = self._step_sizes[0]/units.radian ## regions of theta with posible solutions (radians) launch_lower = [0] launch_upper = [theta_direct + delta_theta] # below theta_direct no solutions are possible without upward reflections if n_reflections > 0: if self.medium.reflection is None: self.__logger.error("a solution for {:d} reflection(s) off the bottom reflective layer is requested," +"but ice model does not specify a reflective layer".format(n_reflections)) raise AttributeError("a solution for {:d} reflection(s) off the bottom reflective layer is requested," +"but ice model does not specify a reflective layer".format(n_reflections)) else: z_refl = self._medium.reflection rho_channel = np.linalg.norm(u) if self._X2[2] > self._X1[2]: z_up = self._X2[2] z_down = self._X1[2] else: z_up = self._X1[2] z_down = self._X2[2] rho_bottom = (rho_channel * (z_refl - z_down)) / (2*z_refl - z_up - z_down) alpha = np.arctan((z_down - z_refl)/rho_bottom) ## when reflection on the bottom are allowed, a initial region for theta from 180-alpha to 180 degrees is added launch_lower.append(((np.pi/2 + alpha) - 2*abs(self.delta_theta_bottom(dz=self._sphere_sizes[0], z_refl=z_refl) / units.radian))) launch_upper.append(np.pi) for s,sphere_size in enumerate(self._sphere_sizes): sphere_size = sphere_size * (radiopropa.meter/units.meter) detected_rays = [] results = [] ##define module list for simulation sim = radiopropa.ModuleList() sim.add(radiopropa.PropagationCK(self._ice_model.get_scalar_field(), 1E-8, .001, 1.)) ## add propagation to module list for module in self._ice_model.get_modules().values(): sim.add(module) sim.add(radiopropa.MaximumTrajectoryLength(self._max_traj_length * (radiopropa.meter/units.meter))) ## define observer for detection (channel) obs = radiopropa.Observer() obs.setDeactivateOnDetection(True) channel = radiopropa.ObserverSurface(radiopropa.Sphere(radiopropa.Vector3d(*X2), sphere_size)) ## when making the radius larger than 2 meters, somethimes three solution times are found obs.add(channel) sim.add(obs) ## define observer for stopping simulation (boundaries) obs2 = radiopropa.Observer() obs2.setDeactivateOnDetection(True) w = (u / np.linalg.norm(u)) * 2*sphere_size boundary_behind_channel = radiopropa.ObserverSurface(radiopropa.Plane(radiopropa.Vector3d(*(X2 + w)), radiopropa.Vector3d(*w))) obs2.add(boundary_behind_channel) boundary_above_surface = radiopropa.ObserverSurface(radiopropa.Plane(radiopropa.Vector3d(0, 0, 1*radiopropa.meter), radiopropa.Vector3d(0, 0, 1))) obs2.add(boundary_above_surface) sim.add(obs2) #create total scanning range from the upper and lower thetas of the bundles step = self._step_zeniths[s] / units.radian theta_scanning_range = np.array([]) for iL in range(len(launch_lower)): new_scanning_range = np.arange(launch_lower[iL], launch_upper[iL]+step, step) theta_scanning_range = np.concatenate((theta_scanning_range, new_scanning_range)) for theta in theta_scanning_range: ray_dir = hp.spherical_to_cartesian(theta, phi_direct) def delta(ray_dir,shower_dir): viewing = np.arccos(np.dot(shower_dir, ray_dir)) * units.radian return viewing - cherenkov_angle if (self._shower_axis is None) or (abs(delta(ray_dir,self._shower_axis)) < self._cut_viewing_angle): source = radiopropa.Source() source.add(radiopropa.SourcePosition(radiopropa.Vector3d(*X1))) source.add(radiopropa.SourceDirection(radiopropa.Vector3d(*ray_dir))) sim.setShowProgress(True) ray = source.getCandidate() sim.run(ray, True) current_rays = [ray] while len(current_rays) > 0: next_rays = [] for ray in current_rays: if channel.checkDetection(ray.get()) == radiopropa.DETECTED: detected_rays.append(ray) result = {} if n_reflections == 0: result['reflection']=0 result['reflection_case']=1 elif self._ice_model.get_modules()["bottom reflection"].get_times_reflectedoff(ray.get()) <= n_reflections: result['reflection']=self._ice_model.get_modules()["bottom reflection"].get_times_reflectedoff(ray.get()) result['reflection_case']=int(np.ceil(theta/np.deg2rad(90))) results.append(result) for secondary in ray.secondaries: next_rays.append(secondary) current_rays = next_rays #loop over previous rays to find the upper and lower theta of each bundle of rays #uses step, but because step is initialized after this loop this ios the previous step size as intented if len(detected_rays) > 0: launch_lower.clear() launch_upper.clear() launch_theta_prev = None for iDC,DC in enumerate(detected_rays): launch_theta = DC.getLaunchVector().getTheta()/radiopropa.rad if iDC == (len(detected_rays)-1) or iDC == 0: if iDC == 0: launch_lower.append(launch_theta-step) if iDC == (len(detected_rays)-1): launch_upper.append(launch_theta+step) elif abs(launch_theta - launch_theta_prev) > 1.1*step: ##take 1.1 times the step to be sure the next ray is not in the bundle of the previous one launch_upper.append(launch_theta_prev+step) launch_lower.append(launch_theta-step) else: pass launch_theta_prev = launch_theta else: #if detected_rays is empthy, no solutions where found and the tracer is terminated break self._rays = detected_rays self._results = results self.__used_method = 'iterator' launch_bundles = np.transpose([launch_lower, launch_upper]) return launch_bundles
[docs] def set_minimizer_tolerance(self, xtol=1e-3*units.deg, ztol=1e-3*units.meter): self.__xtol = xtol self.__ztol = ztol
[docs] def raytracer_birefringence(self, launch_v, spec, s_rate, bire_model = 'A'): """ Function for the time trace propagation according to the polarization change due to birefringence. The trace propagation is explained in this paper: https://link.springer.com/article/10.1140/epjc/s10052-023-11238-y RadioPropa propagates an electric field through the correct solution of the ray path. To use add to the config.yaml file: propagation: module: radiopropa ice_model: southpole_2015 birefringence: True Parameters ---------- launch_v: 3d array launch vector calculated by a previous analytical or numerical ray tracing algorithm spec: numpy array frequency spectrum of the electric field s_rate: float sampling rate of the electric field bire_model: string choose the interpolation to fit the measured refractive index data options include (A, B, C, D, E) description can be found under: NuRadioMC/NuRadioMC/utilities/birefringence_models/model_description Returns ------- E_field: np.ndarray([eR, eTheta, ePhi]) [0] - eR - final frequency spectrum of the radial component after birefringence propagation [1] - eTheta - final frequency spectrum of the theta componentnt after birefringence propagation [2] - ePhi - final frequency spectrum of the phi component after birefringence propagation """ if (self._X1 is None) or (self._X2 is None): self.__logger.error('NoneType: start or endpoint not initialized') raise TypeError('NoneType: start or endpoint not initialized') else: X1 = self._X1 * (radiopropa.meter/units.meter) X2 = self._X2 * (radiopropa.meter/units.meter) v = (self._X2 - self._X1) u = copy.deepcopy(v) u[2] = 0 sphere_size = 0.5 * (radiopropa.meter/units.meter) ##define module list for simulation sim = radiopropa.ModuleList() sim.add(radiopropa.PropagationCK(self._ice_model.get_scalar_field(), 1E-8, .001, 1., bire_model)) ## add propagation to module list for module in self._ice_model.get_modules().values(): sim.add(module) sim.add(radiopropa.MaximumTrajectoryLength(self._max_traj_length * (radiopropa.meter/units.meter))) ## define observer for detection (channel) obs = radiopropa.Observer() obs.setDeactivateOnDetection(True) channel = radiopropa.ObserverSurface(radiopropa.Sphere(radiopropa.Vector3d(*X2), sphere_size)) ## when making the radius larger than 2 meters, somethimes three solution times are found obs.add(channel) sim.add(obs) ## define observer for stopping simulation (boundaries) obs2 = radiopropa.Observer() obs2.setDeactivateOnDetection(True) w = (u / np.linalg.norm(u)) * 2*sphere_size boundary_behind_channel = radiopropa.ObserverSurface(radiopropa.Plane(radiopropa.Vector3d(*(X2 + w)), radiopropa.Vector3d(*w))) obs2.add(boundary_behind_channel) boundary_above_surface = radiopropa.ObserverSurface(radiopropa.Plane(radiopropa.Vector3d(0, 0, 1*radiopropa.meter), radiopropa.Vector3d(0, 0, 1))) obs2.add(boundary_above_surface) sim.add(obs2) ## define electric field spec_r_real = radiopropa.DoubleVector_1D() spec_theta_real = radiopropa.DoubleVector_1D() spec_phi_real = radiopropa.DoubleVector_1D() spec_r_imag = radiopropa.DoubleVector_1D() spec_theta_imag = radiopropa.DoubleVector_1D() spec_phi_imag = radiopropa.DoubleVector_1D() for i in range(len(spec[0])): spec_r_real.push_back(spec[0][i].real) spec_theta_real.push_back(spec[1][i].real) spec_phi_real.push_back(spec[2][i].real) spec_r_imag.push_back(spec[0][i].imag) spec_theta_imag.push_back(spec[1][i].imag) spec_phi_imag.push_back(spec[2][i].imag) efield = radiopropa.ElectricField() efield.setFrequencySpectrum(spec_r_real,spec_r_imag, spec_theta_real,spec_theta_imag, spec_phi_real,spec_phi_imag, s_rate) ## define source source = radiopropa.Source() source.add(radiopropa.SourcePosition(radiopropa.Vector3d(*X1))) source.add(radiopropa.SourceDirection(radiopropa.Vector3d(*launch_v))) source.add(radiopropa.SourceElectricField(efield)) ## run simulation ray = source.getCandidate() sim.run(ray, True) ## extract electric field R_real = ray.get().current.getElectricField().getTraces()[0].getFrequencySpectrum_real() R_imag = ray.get().current.getElectricField().getTraces()[0].getFrequencySpectrum_imag() Th_real = ray.get().current.getElectricField().getTraces()[1].getFrequencySpectrum_real() Th_imag = ray.get().current.getElectricField().getTraces()[1].getFrequencySpectrum_imag() Ph_real = ray.get().current.getElectricField().getTraces()[2].getFrequencySpectrum_real() Ph_imag = ray.get().current.getElectricField().getTraces()[2].getFrequencySpectrum_imag() Radius = np.vectorize(complex)(R_real, R_imag) Th = np.vectorize(complex)(Th_real, Th_imag) Ph = np.vectorize(complex)(Ph_real, Ph_imag) end_field = np.vstack((Radius, Th, Ph)) return end_field
[docs] def raytracer_minimizer(self, n_reflections=0): """ Uses RadioPropa to find all the numerical ray tracing solutions between sphere x1 and x2. Tracer does not work for reflective bottoms or secondary creation at the moment, it only allows for 2 solutions at the moment which should also be further improved. """ try: x1 = self._X1 * (radiopropa.meter/units.meter) x2 = self._X2 * (radiopropa.meter/units.meter) except TypeError: self.__logger.error('NoneType: start or endpoint not initialized') raise TypeError('NoneType: start or endpoint not initialized') v = (self._X2-self._X1) u = copy.deepcopy(v) u[2] = 0 theta_direct, phi_direct = hp.cartesian_to_spherical(*v) ## zenith and azimuth for the direct linear ray solution (radians) if n_reflections > 0: self.__logger.error("Radiopropa minimizer tracer can not be used for reflections at the bottom") raise AttributeError("Radiopropa minimizer tracer can not be used for reflections at the bottom") ##define module list for simulation sim = radiopropa.ModuleList() sim.add(radiopropa.PropagationCK(self._ice_model.get_scalar_field(), 1E-8, .001, 1.)) ## add propagation to module list for module in self._ice_model.get_modules().values(): sim.add(module) sim.add(radiopropa.MaximumTrajectoryLength(self._max_traj_length*(radiopropa.meter/units.meter))) ## define observer obs2 = radiopropa.Observer() obs2.setDeactivateOnDetection(True) plane_channel = radiopropa.ObserverSurface(radiopropa.Plane(radiopropa.Vector3d(*x2), radiopropa.Vector3d(*u))) obs2.add(plane_channel) sim.add(obs2) detected_rays = [] detected_theta = [] results = [] res_angle = 0.001*units.degree/units.radian def get_ray(theta,phi): ray_dir = hp.spherical_to_cartesian(theta,phi) if ray_dir.shape==(3,1): ray_dir = ray_dir.T[0] source = radiopropa.Source() source.add(radiopropa.SourcePosition(radiopropa.Vector3d(*x1))) source.add(radiopropa.SourceDirection(radiopropa.Vector3d(*ray_dir))) ray = source.getCandidate() return ray def shoot_ray(theta): ray = get_ray(theta,phi_direct) sim.run(ray, True) return ray def cot(x): return 1/np.tan(x) def arccot(x): return np.arctan(-x) + np.pi/2 def delta_z(cot_theta): theta = arccot(cot_theta) ray = shoot_ray(theta) ray_endpoint = self.get_path_candidate(ray)[-1] return (ray_endpoint-self._X2)[2] def delta_z_squared(cot_theta): return delta_z(cot_theta)**2 t1 = time.time() #we minimize the cotangens of the zenith to reflect the same resolution in z to the different angles (vertical vs horizontal) root1 = optimize.minimize(delta_z_squared,x0=cot(theta_direct),method='Nelder-Mead',options={'xatol':self.__xtol**2,'fatol':self.__ztol**2}) t2 = time.time() if root1.success:# and np.sqrt(root1.fun) < 1e-2*units.meter: #if root1.fun > self.__ztol**2: print(root1,'\n',20*'#') theta1 = arccot(root1.x) detected_theta.append(theta1) detected_rays.append(shoot_ray(theta1)) theta_min = theta1 - res_angle theta_plus = theta1 + res_angle delta_z_min = delta_z(cot(theta_min)) delta_z_plus = delta_z(cot(theta_plus)) delta_z_vertical = delta_z(cot(res_angle)) delta_z_direct = delta_z(cot(theta_direct)) def find_second_root(theta_a,theta_b): try: root2 = optimize.brentq(delta_z, a=cot(theta_a), b=cot(theta_b), xtol=self.__ztol) theta2 = arccot(root2) if(np.round(theta2, 2) not in np.round(detected_theta, 2)): detected_theta.append(theta2) detected_rays.append(shoot_ray(theta2)) return root2 except RuntimeError: pass if np.sign(delta_z_min) != np.sign(delta_z_vertical): root2 = find_second_root(theta_a = theta_min, theta_b = res_angle) elif np.sign(delta_z_plus) != np.sign(delta_z_direct): root2 = find_second_root(theta_a = theta_plus, theta_b = theta_direct) self._rays = detected_rays self._results = [{'reflection':0,'reflection_case':1} for ray in detected_rays] self.__used_method = 'minimizer'
[docs] def set_solutions(self,raytracing_results): """ Read an already calculated raytracing solution from the input array Parameters ---------- raytracing_results: dict The dictionary containing the raytracing solution. """ results = [] rays = [] for iS in range(len(raytracing_results['ray_tracing_solution_type'])): results.append({'type' : raytracing_results['ray_tracing_solution_type'][iS], 'reflection' : raytracing_results['ray_tracing_reflection'][iS], 'reflection_case' : raytracing_results['ray_tracing_reflection_case'][iS] }) launch_vector = raytracing_results['launch_vector'][iS] ##use launch vector to contruct the candidate again rays.append(None) self._results = results self._rays = rays
[docs] def find_solutions(self): """ find all solutions between X1 and X2 """ results = [] rays_results = [] if self._config['propagation']['radiopropa']['mode'] == 'minimizing': has_reflec = (hasattr(self._medium,'reflection') or self.__ice_model_nuradio.reflection is not None) if isinstance(self._medium, medium_base.IceModelSimple) and not has_reflec: self.raytracer_minimizer(n_reflections=self._n_reflections) results = [] for iS in range(len(self._rays)): results.append({'type':self.get_solution_type(iS), 'reflection':0, 'reflection_case':1}) self._results = results else: self.__logger.error("at the moment the RadioPropa ray tracer in minimize mode can only handle simple ice models without a reflective bottom") else: launch_bundles = self.raytracer_iterative(self._n_reflections) launch_zeniths = [] iSs = np.array(np.arange(0, len(self._rays), 1)) for iS in iSs: launch_zeniths.append(hp.cartesian_to_spherical(*(self.get_launch_vector(iS)))[0]) mask_lower = {i: (launch_zeniths>launch_bundles[i, 0]) for i in range(len(launch_bundles))} mask_upper = {i: (launch_zeniths<launch_bundles[i, 1]) for i in range(len(launch_bundles))} for i in range(len(launch_bundles)): mask = (mask_lower[i] & mask_upper[i]) if mask.any(): delta_min = np.deg2rad(90) final_iS = None for iS in iSs[mask]: #index of rays in the bundle vector = self.get_path_candidate(self._rays[iS])[-1] - self._X2 #position of the receive vector on the sphere around the channel vector_zenith = hp.cartesian_to_spherical(vector[0],vector[1],vector[2])[0] receive_zenith = hp.cartesian_to_spherical(*(self.get_receive_vector(iS)))[0] delta = abs(vector_zenith - receive_zenith) if delta < delta_min: #select the most normal ray on the sphere in the bundle final_iS = iS delta_min = delta rays_results.append(self._rays[final_iS]) results.append({'type' : self.get_solution_type(final_iS), 'reflection' : self._results[final_iS]['reflection'], 'reflection_case' : self._results[final_iS]['reflection_case']}) self._rays = rays_results self._results = results if(self.get_number_of_solutions() > self.get_number_of_raytracing_solutions()): self.__logger.error(f"{self.get_number_of_solutions()} were found but only {self.get_number_of_raytracing_solutions()} are allowed!")
[docs] def get_path_candidate(self, candidate): """ helper function that returns the 3D ray tracing path of a candidate Parameters ---------- candidate: radiopropa.candidate Returns ------- path: 2dim np.array of shape (n,3) x, y, z coordinates along second axis """ path_x = np.array([x * (units.meter/radiopropa.meter) for x in candidate.getPathX()]) path_y = np.array([y * (units.meter/radiopropa.meter) for y in candidate.getPathY()]) path_z = np.array([z * (units.meter/radiopropa.meter) for z in candidate.getPathZ()]) return np.stack([path_x, path_y, path_z], axis=1)
[docs] def get_path(self, iS, n_points=None): """ function that returns the 3D ray tracing path of solution iS Parameters ---------- iS: int ray tracing solution n_points: int number of points of path if none, the original calculated path is returned Returns ------- path: 2dim np.array of shape (n_points,3) x, y, z coordinates along second axis """ n = self.get_number_of_solutions() if(iS >= n): self.__logger.error("solution number {:d} requested but only {:d} solutions exist".format(iS + 1, n)) raise IndexError path = self.get_path_candidate(self._rays[iS]) if n_points != None: path_x = path[:, 0] path_y = path[:, 1] path_z = path[:, 2] phi = hp.cartesian_to_spherical(*(self._X2 - self._X1))[1] path_r = path_x / np.cos(phi) interpol = interpolate.interp1d(path_r, path_z) new_path_r = np.linspace(path_r[0], path_r[-1], num=n_points) path_x = new_path_r * np.cos(phi) path_y = new_path_r * np.sin(phi) path_z = interpol(new_path_r) path = np.stack([path_x, path_y, path_z], axis=1) return path
[docs] def get_solution_type(self, iS): """ returns the type of the solution Parameters ---------- iS: int choose for which solution to compute the solution type, counting starts at zero Returns ------- solution_type: int integer corresponding to the types in the dictionary solution_types """ n = self.get_number_of_solutions() if(iS >= n): self.__logger.error("solution number {:d} requested but only {:d} solutions exist".format(iS + 1, n)) raise IndexError pathz = self.get_path(iS)[:, 2] if (self._results[iS]['reflection'] != 0) or (self.get_reflection_angle(iS) != None): solution_type = solution_types_revert['reflected'] elif(pathz[-1] < max(pathz)): solution_type = solution_types_revert['refracted'] else: solution_type = solution_types_revert['direct'] return solution_type
[docs] def get_launch_vector_analytic(self, iS, ref_index_model ='southpole_2015'): """ calculates the launch vector (in 3D) of solution iS (of the analytical ray tracer) Parameters ---------- iS: int choose for which solution to compute the launch vector, counting starts at zero ref_index_model: choose the ice model for the analytic ray tracer southpole_2015 Returns ------- launch_vector: np.array of shape (3,) the launch vector """ ice = medium.get_ice_model(ref_index_model) ana_rays = ana.ray_tracing(ice) ana_rays.set_start_and_end_point(self._X1, self._X2) ana_rays.find_solutions() launch_vector = ana_rays.get_launch_vector(iS) return launch_vector/np.linalg.norm(launch_vector)
[docs] def get_launch_vector(self, iS): """ calculates the launch vector (in 3D) of solution iS Parameters ---------- iS: int choose for which solution to compute the launch vector, counting starts at zero Returns ------- launch_vector: np.array of shape (3,) the launch vector """ n = self.get_number_of_solutions() if(iS >= n): self.__logger.error("solution number {:d} requested but only {:d} solutions exist".format(iS + 1, n)) raise IndexError launch_vector = np.array([self._rays[iS].getLaunchVector().x, self._rays[iS].getLaunchVector().y, self._rays[iS].getLaunchVector().z]) return launch_vector/np.linalg.norm(launch_vector)
[docs] def get_receive_vector(self, iS): """ calculates the receive vector (in 3D) of solution iS Parameters ---------- iS: int choose for which solution to compute the receive vector, counting starts at zero Returns ------- receive_vector: np.array of shape (3,) the receive vector """ n = self.get_number_of_solutions() if(iS >= n): self.__logger.error("solution number {:d} requested but only {:d} solutions exist".format(iS + 1, n)) raise IndexError receive_vector = np.array([self._rays[iS].getReceiveVector().x, self._rays[iS].getReceiveVector().y, self._rays[iS].getReceiveVector().z]) return receive_vector/np.linalg.norm(receive_vector)
[docs] def get_reflection_angle(self, iS): """ calculates the angle of reflection at the surface (in case of a reflected ray) Parameters ---------- iS: int choose for which solution to compute the reflection angle, counting starts at zero Returns ------- reflection_angle: 1dim np.array the reflection angle (for reflected rays) or None for direct and refracted rays """ n = self.get_number_of_solutions() if(iS >= n): self.__logger.error("solution number {:d} requested but only {:d} solutions exist".format(iS + 1, n)) raise IndexError reflection_angles = np.array([ra * (units.degree/radiopropa.deg) for ra in self._rays[iS].getReflectionAngles()]) if len(reflection_angles) == 0: return None else: return np.squeeze(reflection_angles)
[docs] def get_correction_path_length(self, iS): """ calculates the correction of the path length of solution iS due to the sphere around the channel Parameters ---------- iS: int choose for which solution to compute the path length correction, counting starts at zero Returns ------- distance: float distance that should be added to the path length """ n = self.get_number_of_solutions() if(iS >= n): self.__logger.error("solution number {:d} requested but only {:d} solutions exist".format(iS + 1, n)) raise IndexError end_of_path = self.get_path_candidate(self._rays[iS])[-1] #position of the receive vector on the sphere around the channel in detector coordinates receive_vector = self.get_receive_vector(iS) vector = end_of_path - self._X2 #position of the receive vector on the sphere around the channel vector_zen,vector_az = hp.cartesian_to_spherical(vector[0], vector[1], vector[2]) receive_zen,receive_az = hp.cartesian_to_spherical(receive_vector[0], receive_vector[1], receive_vector[2]) path_correction_arrival_direction = abs(np.cos(receive_zen - vector_zen)) * self._sphere_sizes[-1] if abs(receive_az - vector_az) > np.deg2rad(90): path_correction_overshoot = np.linalg.norm(vector[0: 2]) * abs(np.cos(receive_az - vector_az)) else: path_correction_overshoot = 0 return path_correction_arrival_direction - path_correction_overshoot
[docs] def get_correction_travel_time(self, iS): """ calculates the correction of the travel time of solution iS due to the sphere around the channel Parameters ---------- iS: int choose for which solution to compute the travel time correction, counting starts at zero Returns ------- distance: float distance that should be added to the path length """ n = self.get_number_of_solutions() if(iS >= n): self.__logger.error("solution number {:d} requested but only {:d} solutions exist".format(iS + 1, n)) raise IndexError refrac_index = self._medium.get_index_of_refraction(self._X2) return self.get_correction_path_length(iS) / ((scipy.constants.c*units.meter/units.second)/refrac_index)
[docs] def get_path_length(self, iS): """ calculates the path length of solution iS Parameters ---------- iS: int choose for which solution to compute the path length, counting starts at zero Returns ------- distance: float distance from X1 to X2 along the ray path """ n = self.get_number_of_solutions() if(iS >= n): self.__logger.error("solution number {:d} requested but only {:d} solutions exist".format(iS + 1, n)) raise IndexError path_length = self._rays[iS].getTrajectoryLength() * (units.meter/radiopropa.meter) if self.__used_method == 'iterator': path_length += self.get_correction_path_length(iS) return path_length
[docs] def get_travel_time(self, iS): """ calculates the travel time of solution iS Parameters ---------- iS: int choose for which solution to compute the travel time, counting starts at zero Returns ------- time: float travel time """ n = self.get_number_of_solutions() if(iS >= n): self.__logger.error("solution number {:d} requested but only {:d} solutions exist".format(iS + 1, n)) raise IndexError travel_time = self._rays[iS].getPropagationTime() * (units.second/radiopropa.second) if self.__used_method == 'iterator': travel_time += self.get_correction_travel_time(iS) return travel_time
[docs] def get_frequencies_for_attenuation(self, frequency, max_detector_freq): """ helper function to get the frequencies for applying attenuation Parameters ---------- frequency: array of float of dim (n,) frequencies of the signal max_detector_freq: float or None the maximum frequency of the final detector sampling (the simulation is internally run with a higher sampling rate, but the relevant part of the attenuation length calculation is the frequency interval visible by the detector, hence a finer calculation is more important) Returns ------- freqs: array of float of dim (m,) the frequencies for which the attenuation is calculated """ mask = frequency > 0 nfreqs = min(self._n_frequencies_integration, np.sum(mask)) freqs = np.linspace(frequency[mask].min(), frequency[mask].max(), nfreqs) if(nfreqs < np.sum(mask) and max_detector_freq is not None): mask2 = frequency <= max_detector_freq nfreqs2 = min(self._n_frequencies_integration, np.sum(mask2 & mask)) freqs = np.linspace(frequency[mask2 & mask].min(), frequency[mask2 & mask].max(), nfreqs2) if(np.sum(~mask2)>1): freqs = np.append(freqs, np.linspace(frequency[~mask2].min(), frequency[~mask2].max(), nfreqs // 2)) return freqs
[docs] def get_attenuation(self, iS, frequency, max_detector_freq=None): """ calculates the signal attenuation due to attenuation in the medium (ice) Parameters ---------- iS: int choose for which solution to compute the attenuation, counting starts at zero frequency: array of floats the frequencies for which the attenuation is calculated max_detector_freq: float or None the maximum frequency of the final detector sampling (the simulation is internally run with a higher sampling rate, but the relevant part of the attenuation length calculation is the frequency interval visible by the detector, hence a finer calculation is more important) Returns ------- attenuation: array of floats the fraction of the signal that reaches the observer (only ice attenuation, the 1/R signal falloff not considered here) """ n = self.get_number_of_solutions() if(iS >= n): self.__logger.error("solution number {:d} requested but only {:d} solutions exist".format(iS + 1, n)) raise IndexError path = self.get_path(iS) mask = frequency > 0 freqs = self.get_frequencies_for_attenuation(frequency, self._max_detector_frequency) integral = np.zeros(len(freqs)) def dt(depth, freqs): ds = np.sqrt((path[:, 0][depth] - path[:, 0][depth+1])**2 + (path[:, 1][depth] - path[:, 1][depth+1])**2 + (path[:, 2][depth] - path[:, 2][depth+1])**2) # get step size return ds / attenuation_util.get_attenuation_length(path[:, 2][depth], freqs, self._attenuation_model) for z_position in range(len(path[:, 2]) - 1): integral += dt(z_position, freqs) att_func = interpolate.interp1d(freqs, integral) tmp = att_func(frequency[mask]) attenuation = np.ones_like(frequency) tmp = np.exp(-1 * tmp) attenuation[mask] = tmp return attenuation
[docs] def get_focusing(self, iS, dz=-1. * units.cm, limit=2.): """ calculate the focusing effect in the medium Parameters ---------- iS: int choose for which solution to compute the launch vector, counting starts at zero dz: float the infinitesimal change of the depth of the receiver, 1cm by default Returns ------- focusing: a float gain of the signal at the receiver due to the focusing effect """ recVec = self.get_receive_vector(iS) recVec = -1.0 * recVec recAng = np.arccos(recVec[2] / np.sqrt(recVec[0]**2 + recVec[1]**2 + recVec[2] **2)) lauVec = self.get_launch_vector(iS) lauAng = np.arccos(lauVec[2] / np.sqrt(lauVec[0] ** 2 + lauVec[1] ** 2 + lauVec[2] ** 2)) distance = self.get_path_length(iS) vetPos = copy.copy(self._X1) recPos = copy.copy(self._X2) recPos1 = np.array([self._X2[0], self._X2[1], self._X2[2] + dz]) if not hasattr(self, "_r1"): self._r1 = radiopropa_ray_tracing(self._medium, self._attenuation_model, logging.WARNING, self._n_frequencies_integration, self._n_reflections, config = self._config) self._r1.set_shower_axis(self._shower_axis) self._r1.set_start_and_end_point(vetPos, recPos1) self._r1.find_solutions() if iS < self._r1.get_number_of_solutions(): lauVec1 = self._r1.get_launch_vector(iS) lauAng1 = np.arccos(lauVec1[2] / np.sqrt(lauVec1[0] ** 2 + lauVec1[1] ** 2 + lauVec1[2] ** 2)) focusing = np.sqrt(distance / np.sin(recAng) * np.abs((lauAng1 - lauAng) / (recPos1[2] - recPos[2]))) if(self.get_solution_type(iS) != self._r1.get_solution_type(iS)): self.__logger.error("solution types are not the same") else: focusing = 1.0 self.__logger.info("too few ray tracing solutions, setting focusing factor to 1") self.__logger.debug(f'amplification due to focusing of solution {iS:d} = {focusing:.3f}') if(focusing > limit): self.__logger.info(f"amplification due to focusing is {focusing:.1f}x -> limiting amplification factor to {limit:.1f}x") focusing = limit # now also correct for differences in refractive index between emitter and receiver position n1 = self._medium.get_index_of_refraction(self._X1) # emitter n2 = self._medium.get_index_of_refraction(self._X2) # receiver return focusing * (n1 / n2) ** 0.5
[docs] def apply_propagation_effects(self, efield, i_solution): """ Apply propagation effects to the electric field Note that the 1/r weakening of the electric field is already accounted for in the signal generation Parameters ---------- efield: ElectricField object The electric field that the effects should be applied to i_solution: int Index of the raytracing solution the propagation effects should be based on Returns ------- efield: ElectricField object The modified ElectricField object """ s_rate = efield.get_sampling_rate() spec = efield.get_frequency_spectrum() ## aply attenuation apply_attenuation = self._config['propagation']['attenuate_ice'] if apply_attenuation: if self._max_detector_frequency is None: max_freq = np.max(efield.get_frequencies()) else: max_freq = self._max_detector_frequency attenuation = self.get_attenuation(i_solution, efield.get_frequencies(), max_freq) spec *= attenuation ## apply reflections zenith_reflections = np.atleast_1d(self.get_reflection_angle(i_solution)) for zenith_reflection in zenith_reflections: if (zenith_reflection is None): continue r_theta = NuRadioReco.utilities.geometryUtilities.get_fresnel_r_p(zenith_reflection, n_2=self._medium.get_index_of_refraction(np.array([self._X2[0], self._X2[1], +1 * units.cm])), n_1=self._medium.get_index_of_refraction(np.array([self._X2[0], self._X2[1], -1 * units.cm]))) r_phi = NuRadioReco.utilities.geometryUtilities.get_fresnel_r_s(zenith_reflection, n_2=self._medium.get_index_of_refraction(np.array([self._X2[0], self._X2[1], +1 * units.cm])), n_1=self._medium.get_index_of_refraction(np.array([self._X2[0], self._X2[1], -1 * units.cm]))) efield[efp.reflection_coefficient_theta] = r_theta efield[efp.reflection_coefficient_phi] = r_phi spec[1] *= r_theta spec[2] *= r_phi self.__logger.debug( "ray hits the surface at an angle {:.2f}deg -> reflection coefficient is r_theta = {:.2f}, r_phi = {:.2f}".format( zenith_reflection / units.deg, r_theta, r_phi)) i_reflections = self.get_results()[i_solution]['reflection'] if (i_reflections > 0): # take into account possible bottom reflections # each reflection lowers the amplitude by the reflection coefficient and introduces a phase shift reflection_coefficient = self._medium.reflection_coefficient ** i_reflections phase_shift = (i_reflections * self._medium.reflection_phase_shift) % (2 * np.pi) # we assume that both efield components are equally affected spec[1] *= reflection_coefficient * np.exp(1j * phase_shift) spec[2] *= reflection_coefficient * np.exp(1j * phase_shift) self.__logger.debug( f"ray is reflecting {i_reflections:d} times at the bottom -> reducing the signal by a factor of {reflection_coefficient:.2f}") ## apply focussing effect if self._config['propagation']['focusing']: focusing = self.get_focusing(i_solution, limit=float(self._config['propagation']['focusing_limit'])) spec[1:] *= focusing # apply the birefringence effect if self._config['propagation']['birefringence']: launch_v = self.get_launch_vector(i_solution) spec = self.raytracer_birefringence(launch_v, spec, s_rate) efield.set_frequency_spectrum(spec, efield.get_sampling_rate()) return efield
[docs] def get_output_parameters(self): """ Returns a list with information about parameters to include in the output data structure that are specific to this raytracer ! be sure that the first entry is specific to your raytracer ! Returns ------- list with entries of form [{'name': str, 'ndim': int}] ! be sure that the first entry is specific to your raytracer ! 'name': Name of the new parameter to include in the data structure 'ndim': Dimension of the data structure for the parameter """ return [ {'name': 'sphere_sizes','ndim':len(self._sphere_sizes)}, {'name': 'launch_vector', 'ndim': 3}, {'name': 'focusing_factor', 'ndim': 1}, {'name': 'ray_tracing_reflection', 'ndim': 1}, {'name': 'ray_tracing_reflection_case', 'ndim': 1}, {'name': 'ray_tracing_solution_type', 'ndim': 1} ]
[docs] def get_raytracing_output(self, i_solution): """ Write parameters that are specific to this raytracer into the output data. Parameters ---------- i_solution: int The index of the raytracing solution Returns ------- dictionary with the keys matching the parameter names specified in get_output_parameters and the values being the results from the raytracing """ if self._config['propagation']['focusing']: focusing = self.get_focusing(i_solution, limit=float(self._config['propagation']['focusing_limit'])) else: focusing = 1 output_dict = { 'sphere_sizes': self._sphere_sizes, 'launch_vector': self.get_launch_vector(i_solution), 'focusing_factor': focusing, 'ray_tracing_reflection': self.get_results()[i_solution]['reflection'], 'ray_tracing_reflection_case': self.get_results()[i_solution]['reflection_case'], 'ray_tracing_solution_type': self.get_solution_type(i_solution) } return output_dict
[docs] def set_config(self, config): """ Function to change the configuration file used by the raytracer Parameters ---------- config: dict The new configuration settings """ if config == None: config = dict() config['propagation'] = dict( attenuate_ice = True, focusing_limit = 2, focusing = False, birefringence = False, radiopropa = dict( mode = 'iterative', iter_steps_channel = [25., 2., .5], #unit is meter iter_steps_zenith = [.5, .05, .005], #unit is degree auto_step_size = False, max_traj_length = 10000) #unit is meter ) config['speedup'] = dict( delta_C_cut = 40 * units.degree ) super().set_config(config) self._cut_viewing_angle = self._config['speedup']['delta_C_cut'] * units.radian self._max_traj_length = self._config['propagation']['radiopropa']['max_traj_length'] * units.meter self.set_iterative_sphere_sizes(np.array(self._config['propagation']['radiopropa']['iter_steps_channel']) * units.meter) self._auto_step = self._config['propagation']['radiopropa']['auto_step_size'] self.set_iterative_step_sizes(np.array(self._config['propagation']['radiopropa']['iter_steps_zenith']) * units.degree)
## helper functions
[docs] def delta_theta_direct(self, dz): v = (self._X2 - self._X1) u = copy.deepcopy(v) u[2] = 0 rho = np.linalg.norm(u) return dz * rho / ((self._X1[2] - self._X2[2])**2 + rho**2) * units.radian
[docs] def delta_theta_bottom(self, dz, z_refl): v = (self._X2 - self._X1) u = copy.deepcopy(v) u[2] = 0 rho = np.linalg.norm(u) return dz * rho / ((self._X1[2] + self._X2[2] - 2*z_refl)**2 + rho**2) * units.radian
[docs] def delta_theta_reflective(self, dz, n_bottom_reflections): v = (self._X2 - self._X1) u = copy.deepcopy(v) u[2] = 0 rho = np.linalg.norm(u) ice_thickness = self._medium.z_air_boundary - self._medium.z_bottom if n_bottom_reflections > 0: if self.medium.reflection is None: self.__logger.error("a solution for {:d} reflection(s) off the bottom reflective layer is requested," +"but ice model does not specify a reflective layer".format(n_bottom_reflections)) raise AttributeError("a solution for {:d} reflection(s) off the bottom reflective layer is requested," +"but ice model does not specify a reflective layer".format(n_bottom_reflections)) else: ice_thickness = self._medium.z_air_boundary - self._medium.reflection return -dz * rho / ((self._X1[2] + self._X2[2] + 2*n_bottom_reflections*ice_thickness)**2 + rho**2) * units.radian