Source code for NuRadioReco.modules.channelGalacticNoiseAdder

from NuRadioReco.utilities import units, ice, geometryUtilities
from NuRadioReco.modules.base.module import register_run
import NuRadioReco.framework.channel
import NuRadioReco.framework.sim_station
import NuRadioReco.detector.antennapattern
import logging
import warnings
import numpy as np
import scipy.constants
import scipy.interpolate
import functools
from contextlib import redirect_stdout
from numpy.random import Generator, Philox
import healpy
import astropy.coordinates
import astropy.units

logger = logging.getLogger('NuRadioReco.channelGalacticNoiseAdder')

try:
    from pygdsm import (
        GlobalSkyModel16,
        GlobalSkyModel,
        LowFrequencySkyModel,
        HaslamSkyModel,
    )
except ImportError as e:
    logger.error(
        "To use the channelGalacticNoiseAdder, 'pygdsm' needs to be installed:\n\n"
        "\t pip install git+https://github.com/telegraphic/pygdsm\n"
        )
    raise(e)

try:
    with redirect_stdout(None): # suppress (usually irrelevant) print statements from pylfmap
        from pylfmap import LFmap  # Documentation: https://github.com/F-Tomas/pylfmap needs cfitsio installation
except ImportError:
    logger.info(
        "pylfmap import failed. Consider installing it from "
        "https://github.com/F-Tomas/pylfmap to use LFmap as sky model.")
except IndexError: # this is a common error if cfitsio is not found... there are probably others
    logger.error(
        "pylfmap import failed. This might be because you do not have a working "
        "installation of cfitsio. See https://github.com/F-Tomas/pylfmap/issues/2 for potential tips")

[docs] class channelGalacticNoiseAdder: """ Class that simulates the noise produced by galactic radio emission Uses the pydgsm package (https://github.com/telegraphic/pygdsm), which provides radio background data based on Oliveira-Costa et al. (2008) (https://arxiv.org/abs/0802.1525) and Zheng et al. (2016) (https://arxiv.org/abs/1605.04920) The radio sky model is evaluated on a number of points above the horizon folded with the antenna response. Since evaluating every frequency individually would be too slow, the model is evaluated for a few frequencies and the log10 of the brightness temperature is interpolated in between. """ def __init__(self): self.__n_side = None self.__interpolation_frequencies = None self.__radio_sky = None self.__noise_temperatures = None self.__antenna_pattern_provider = NuRadioReco.detector.antennapattern.AntennaPatternProvider()
[docs] def begin( self, skymodel=None, debug=False, n_side=4, freq_range=None, interpolation_frequencies=None, seed=None, caching=True ): """ Set up important parameters for the module Parameters ---------- skymodel: {'gsm2008', 'lfmap', 'lfss', 'gsm2016', 'haslam'}, optional Choose the sky model to use. If none is provided, the Global Sky Model (2008) is used as a default. debug: bool, default: False Deprecated. Will be removed in future versions. n_side: int, default: 4 The n_side parameter of the healpix map. Has to be power of 2 The radio skymap is downsized to the resolution specified by the n_side parameter and for every pixel above the horizon the radio noise coming from that direction is calculated. The number of pixels used is 12 * n_side ** 2, so a larger value for n_side will result better accuracy but also greatly increase computing time. freq_range: array of len=2, default: [10, 1000] * units.MHZ The sky brightness temperature will be evaluated for the frequencies within this limit. Brightness temperature for frequencies in between are calculated by interpolation the log10 of the temperature The interpolation_frequencies have to cover the entire passband specified in the run method. interpolation_frequencies: array of frequencies to interpolate to. Kept for historic purposes with intention to deprecate in the future. seed : {None, int, array_like[ints], SeedSequence}, optional The seed that is passed on to the `numpy.random.Philox` bitgenerator used for random number generation. caching: bool, default: True If True, the antenna response is cached for each channel. This can speed up this module by a lot. If the frequencies of the channels change, the cache is cleared. """ if debug: warnings.warn("This argument is deprecated and will be removed in future versions.", DeprecationWarning) self.__random_generator = Generator(Philox(seed)) self.__n_side = n_side self.solid_angle = healpy.pixelfunc.nside2pixarea(self.__n_side, degrees=False) self.__caching = caching self.__freqs = None if self.__caching and self.__n_side >= 10: logger.warning( "Caching for the vector effective length is enabled (with `maxsize=1024`) and `n_side >= 10`, and thus " "it produces to many different caching entries for two antenna models to be stored of one `station_time`. " "Either decrease `n_side` or increase `maxsize` (has to be done in the source code).") if interpolation_frequencies is None: if freq_range is None: freq_range = np.array([10, 1000]) * units.MHz # define interpolation frequencies. Set in logarithmic range from freq_range[0] to freq_range[1], # rounded to MHz to avoid import errors from LFmap and tabulated models. self.__interpolation_frequencies = np.around(np.logspace(*np.log10(freq_range), num=15), 3) else: self.__interpolation_frequencies = interpolation_frequencies logger.warning("DeprecationWarning: Optional argument 'interpolation_frequencies' was replaced by 'freq_range'.") # initialise sky model try: if skymodel is None: sky_model = GlobalSkyModel(freq_unit="MHz") logger.info("No sky model specified. Using standard: Global Sky Model (2008). Available models: " "gsm2008, lfmap, lfss, gsm2016, haslam") elif skymodel.lower() == 'lfss': sky_model = LowFrequencySkyModel(freq_unit="MHz") logger.info("Using LFSS as sky model") elif skymodel.lower() == 'gsm2008': sky_model = GlobalSkyModel(freq_unit="MHz") logger.info("Using GSM2008 as sky model") elif skymodel.lower() == 'gsm2016': sky_model = GlobalSkyModel16(freq_unit="MHz") logger.info("Using GSM2016 as sky model") elif skymodel.lower() == 'haslam': sky_model = HaslamSkyModel(freq_unit="MHz", spectral_index=-2.53) logger.info("Using Haslam as sky model") elif skymodel.lower() == 'lfmap': sky_model = LFmap() logger.info("Using LFmap as sky model") else: logger.error(f"Sky model {skymodel} unknown. Defaulting to Global Sky Model (2008).") sky_model = GlobalSkyModel(freq_unit="MHz") except NameError: logger.error(f"Could not find {skymodel} skymodel. Do you have the correct package installed? \n" f"Defaulting to Global Sky Model (2008) as sky model.") sky_model = GlobalSkyModel(freq_unit="MHz") self.__noise_temperatures = np.zeros( (len(self.__interpolation_frequencies), healpy.pixelfunc.nside2npix(self.__n_side)) ) logger.info("generating noise temperatures") # generating sky maps and noise temperatures from chosen sky model in given frequency range for i_freq, noise_freq in enumerate(self.__interpolation_frequencies): self.__radio_sky = sky_model.generate(noise_freq / units.MHz) self.__radio_sky = healpy.pixelfunc.ud_grade(self.__radio_sky, self.__n_side) self.__noise_temperatures[i_freq] = self.__radio_sky
@functools.lru_cache(maxsize=1024) def _get_cached_antenna_response(self, ant_pattern, zen, azi, *ant_orient): """ Returns the cached antenna reponse for a given antenna patter, antenna orientation and signal arrival direction. This wrapper is necessary as arrays and list are not hashable (i.e., can not be used as arguments in functions one wants to cache). This module ensures that the cache is clearied if the vector `self.__freqs` changes. """ return ant_pattern.get_antenna_response_vectorized(self.__freqs, zen, azi, *ant_orient)
[docs] @register_run() def run( self, event, station, detector, passband=None ): """ Adds noise resulting from galactic radio emission to the channel traces Parameters ---------- event: Event object The event containing the station to whose channels noise shall be added station: Station object The station whose channels noise shall be added to detector: Detector object The detector description passband: list of float, optional Lower and upper bound of the frequency range in which noise shall be added. The default (no passband specified) is [10, 1000] MHz """ if self.__noise_temperatures is None: # check if .begin has been called, give helpful error message if not msg = "channelGalacticNoiseAdder was not initialized correctly. Maybe you forgot to call `.begin()`?" logger.error(msg) raise ValueError(msg) # check that or all channels channel.get_frequencies() is identical last_freqs = None for channel in station.iter_channels(): if last_freqs is not None and ( not np.allclose(last_freqs, channel.get_frequencies(), rtol=0, atol=0.1 * units.MHz)): logger.error("The frequencies of each channel must be the same, but they are not!") return last_freqs = channel.get_frequencies() freqs = last_freqs d_f = freqs[2] - freqs[1] # If we cache the antenna pattern, we need to make sure that the frequencies have not changed # between stations. If they have, we need to clear the cache. if self.__caching: if self.__freqs is None: self.__freqs = freqs else: if not np.allclose(self.__freqs, freqs, rtol=0, atol=0.01 * units.MHz): self.__freqs = freqs self._get_cached_antenna_response.cache_clear() logger.warning( "Frequencies have changed. Clearing antenna response cache. " "(If this happens often, something might be wrong...") if passband is None: passband = [10 * units.MHz, 1000 * units.MHz] passband_filter = (freqs > passband[0]) & (freqs < passband[1]) site_latitude, site_longitude = detector.get_site_coordinates(station.get_id()) station_time = station.get_station_time() local_coordinates = get_local_coordinates((site_latitude, site_longitude), station_time, self.__n_side) n_ice = ice.get_refractive_index(-0.01, detector.get_site(station.get_id())) n_air = ice.get_refractive_index(depth=1, site=detector.get_site(station.get_id())) c_vac = scipy.constants.c * units.m / units.s channel_spectra = {} for channel in station.iter_channels(): channel_spectra[channel.get_id()] = channel.get_frequency_spectrum() for i_pixel in range(healpy.pixelfunc.nside2npix(self.__n_side)): azimuth = local_coordinates[i_pixel].az.rad zenith = np.pi / 2. - local_coordinates[i_pixel].alt.rad # this is the in-air zenith if zenith > 90. * units.deg: continue # consider signal reflection at ice surface t_theta = geometryUtilities.get_fresnel_t_p(zenith, n_ice, 1) t_phi = geometryUtilities.get_fresnel_t_s(zenith, n_ice, 1) fresnel_zenith = geometryUtilities.get_fresnel_angle(zenith, n_ice, 1.) if fresnel_zenith is None: continue temperature_interpolator = scipy.interpolate.interp1d( self.__interpolation_frequencies, np.log10(self.__noise_temperatures[:, i_pixel]), kind='quadratic') noise_temperature = np.power(10, temperature_interpolator(freqs[passband_filter])) # calculate spectral radiance of radio signal using rayleigh-jeans law spectral_radiance = (2. * (scipy.constants.Boltzmann * units.joule / units.kelvin) * freqs[passband_filter] ** 2 * noise_temperature * self.solid_angle / c_vac ** 2) spectral_radiance[np.isnan(spectral_radiance)] = 0 # calculate radiance per energy bin spectral_radiance_per_bin = spectral_radiance * d_f # calculate electric field per frequency bin from the radiance per bin efield_amplitude = np.sqrt( spectral_radiance_per_bin / (c_vac * scipy.constants.epsilon_0 * ( units.coulomb / units.V / units.m))) / d_f # assign random phases to electric field noise_spectrum = np.zeros((3, freqs.shape[0]), dtype=complex) phases = self.__random_generator.uniform(0, 2. * np.pi, len(spectral_radiance)) noise_spectrum[1][passband_filter] = np.exp(1j * phases) * efield_amplitude noise_spectrum[2][passband_filter] = np.exp(1j * phases) * efield_amplitude channel_noise_spec = np.zeros_like(noise_spectrum) for channel in station.iter_channels(): channel_pos = detector.get_relative_position(station.get_id(), channel.get_id()) if channel_pos[2] < 0: curr_t_theta = t_theta curr_t_phi = t_phi curr_fresnel_zenith = fresnel_zenith curr_n = n_ice else: # we are in air curr_t_theta = 1 curr_t_phi = 1 curr_fresnel_zenith = zenith curr_n = n_air antenna_pattern = self.__antenna_pattern_provider.load_antenna_pattern( detector.get_antenna_model(station.get_id(), channel.get_id())) antenna_orientation = detector.get_antenna_orientation(station.get_id(), channel.get_id()) # calculate the phase offset in comparison to station center # consider additional distance in air & ice # assume for air & ice constant index of refraction dt = geometryUtilities.get_time_delay_from_direction( curr_fresnel_zenith, azimuth, channel_pos, n=curr_n) if channel_pos[2] < -5: logger.warning( "Galactic noise cannot be simulated accurately for deep in-ice channels. " "Coherence and arrival direction of noise are probably inaccurate.") delta_phases = -2 * np.pi * freqs[passband_filter] * dt # add random polarizations and phase to electric field polarizations = self.__random_generator.uniform(0, 2. * np.pi, len(spectral_radiance)) channel_noise_spec[1][passband_filter] = noise_spectrum[1][passband_filter] * np.exp( 1j * delta_phases) * np.cos(polarizations) channel_noise_spec[2][passband_filter] = noise_spectrum[2][passband_filter] * np.exp( 1j * delta_phases) * np.sin(polarizations) # fold electric field with antenna response if self.__caching: antenna_response = self._get_cached_antenna_response( antenna_pattern, curr_fresnel_zenith, azimuth, *antenna_orientation) else: antenna_response = antenna_pattern.get_antenna_response_vectorized( freqs, curr_fresnel_zenith, azimuth, *antenna_orientation) channel_noise_spectrum = ( antenna_response['theta'] * channel_noise_spec[1] * curr_t_theta + antenna_response['phi'] * channel_noise_spec[2] * curr_t_phi ) # add noise spectrum from pixel in the sky to channel spectrum channel_spectra[channel.get_id()] += channel_noise_spectrum # store the updated channel spectra for channel in station.iter_channels(): channel.set_frequency_spectrum(channel_spectra[channel.get_id()], "same")
[docs] @functools.lru_cache(maxsize=1) def get_local_coordinates(coordinates, time, n_side): """ Calculates the local coordinates of the pixels of a healpix map given the site coordinates and time. Parameters ---------- coordinates: tuple of float The latitude and longitude of the site time: astropy.time.Time The time at which the observation is made (station time) n_side: int The n_side parameter of the healpix map Returns ------- local_coordinates: astropy.coordinates.SkyCoord The local coordinates of the pixels of the healpix map """ site_latitude, site_longitude = coordinates site_location = astropy.coordinates.EarthLocation(lat=site_latitude * astropy.units.deg, lon=site_longitude * astropy.units.deg) local_cs = astropy.coordinates.AltAz(location=site_location, obstime=time) pixel_longitudes, pixel_latitudes = healpy.pixelfunc.pix2ang( n_side, range(healpy.pixelfunc.nside2npix(n_side)), lonlat=True) pixel_longitudes *= units.deg pixel_latitudes *= units.deg galactic_coordinates = astropy.coordinates.Galactic(l=pixel_longitudes * astropy.units.rad, b=pixel_latitudes * astropy.units.rad) local_coordinates = galactic_coordinates.transform_to(local_cs) return local_coordinates