Source code for NuRadioReco.modules.channelIceThermalNoiseAdder

import functools
import json
import numpy as np
import os
import warnings

import NuRadioReco.detector.antennapattern
import NuRadioReco.framework.channel
import NuRadioReco.framework.sim_station
from NuRadioReco.modules.base.module import register_run
from NuRadioReco.utilities import units, ice, geometryUtilities, signal_processing

import logging
logger = logging.getLogger('NuRadioReco.channelThermalNoiseAdder')


[docs] class channelIceThermalNoiseAdder: """ Module to generate thermal noise from ice This class is a stripped version of channelGalacticNoiseAdder Intead of using a skymap, the module uses pre generated effective antenna temperatures, generated by Steffen's effective temperature code which was modified by Felix (see NuRadioMC/examples/simulate_effective_ice_temperature ) """ def __init__(self): self.__antenna_pattern_provider = NuRadioReco.detector.antennapattern.AntennaPatternProvider()
[docs] @functools.lru_cache(maxsize=1024 * 32) def get_cached_antenna_response(self, antenna_pattern, zen, azi, *ant_orient): return antenna_pattern.get_antenna_response_vectorized(self.freqs, zen, azi, *ant_orient)
[docs] def solid_angle(self, theta, d_theta, d_phi): return np.abs(np.sin(theta) * np.sin(d_theta / 2) * 2 * d_phi)
[docs] def get_temperature_from_json(self, temperature_file): """ Function to open the effective temperature files created by NuRadioMC/examples/simulate_effective_ice_temperature The effective temperatures are generated at an antenna depth in function of incident angle by integrating ice temperature (weighted by attenuation effects) along a ray path starting from an incident angle. For more info see the example. This module was made for RNO-G but given you have an ice temperature / attenuation profile one can re-generate these files for arbitrary experiments """ with open(temperature_file, "r") as file_open: temperature_file_dict = json.load(file_open) z_antenna = temperature_file_dict["z_antenna"] theta = temperature_file_dict["theta"] * units.rad if not np.isclose(theta[-1] - theta[0], 180 * units.degree, atol=1 * units.degree): raise ValueError(f"Theta angles of {temperature_file} do not extend over 180 degrees.") if np.any([t > 10 for t in theta]): raise ValueError("Thetas seem too large to be in radians (> 10), the files should give angles in units of radians.") if z_antenna > 0: raise ValueError(f"Antenna depth {z_antenna} should be negative, the code does not account for antennas above the ice") eff_temperature = temperature_file_dict["eff_temperature"] return z_antenna, theta, eff_temperature
[docs] def begin(self, eff_temperature_files, nr_phi_bins=64, channel_depth_matching_error=10, debug=False): """ Set up important parameters for the module Parameters ---------- eff_temperature_files : str Files generated by NuRadioMC.examples.simulate_effective_ice_temperature.ice_integrator.py Files contain the effective ice noise temperatures at the antenna in function of the incidence angle nr_phi_bins : int Binning of the azimuth, this can be reduced for azimuthally symmetric antennas, such as vertically polarized antennas channel_depth_matching_error : float error the module can ake in assigning eff temperatures to channels based on channel depth e.g. if error is 2m, channel depth given by detector is 36m and effective temperatures were generated at 40m, then the module will not match the temperature at 40m to this channel and throw an error debug : bool If True removes randomization of electric field phases and polarizations to speed up testing """ self.temperature_files = eff_temperature_files self.channel_depth_matching_error = channel_depth_matching_error self.eff_temperature_depths = [] self.eff_temperature = [] for temperature_file in self.temperature_files: z_antenna, self.thetas, eff_temperature = self.get_temperature_from_json(temperature_file) self.eff_temperature_depths.append(z_antenna) self.eff_temperature.append(eff_temperature) self.eff_temperature = np.array(self.eff_temperature) self.nr_theta_bins = len(self.thetas) self.debug = debug if self.debug: nr_phi_bins = 32 self.phis = np.linspace(0 * units.degree, 360 * units.degree, nr_phi_bins) return
[docs] @register_run() def run( self, event, station, detector, passband=None ): """ Adds noise resulting from thermal emission to the channel traces Parameters ---------- event: Event object The event containing the station to whose channels noise shall be added station: Station object The station whose channels noise shall be added to detector: Detector object The detector description passband: list of float, optional Lower and upper bound of the frequency range in which noise shall be added. The default (no passband specified) is [10, 1600] MHz """ # check that for all channels channel.get_frequencies() is identical last_freqs = None for channel in station.iter_channels(): if last_freqs is not None and ( not np.allclose(last_freqs, channel.get_frequencies(), rtol=0, atol=0.1 * units.MHz)): logger.error("The frequencies of each channel must be the same, but they are not!") return last_freqs = channel.get_frequencies() freqs = last_freqs self.freqs = freqs if passband is None: passband = [10 * units.MHz, 1600 * units.MHz] passband_filter = (freqs > passband[0]) & (freqs < passband[1]) channel_spectra = {} for channel in station.iter_channels(): channel_spectra[channel.get_id()] = channel.get_frequency_spectrum() d_thetas = np.diff(self.thetas) d_phis = np.diff(self.phis) for channel in station.iter_channels(): channel_depth = detector.get_channel(station.get_id(), channel.get_id())["channel_position"]["position"][-1] depth_mask = np.isclose(self.eff_temperature_depths, channel_depth, atol=self.channel_depth_matching_error) if np.all(depth_mask is False): raise KeyError(f"No eff temperature found for depth {channel_depth}, either change the depth_error tolerance or generate effective temperatures closer to this depth") eff_temperature = self.eff_temperature[depth_mask] if len(eff_temperature) > 1: raise KeyError(f"Channel depth {channel_depth} corresponds to multiple eff temperature files, decrease the channel_depth_matching_error tolerance") eff_temperature = eff_temperature[0] for phi, d_phi in zip(self.phis, d_phis): for theta_i, (theta, d_theta) in enumerate(zip(self.thetas, d_thetas)): solid_angle = self.solid_angle(theta, d_theta, d_phi) noise_spectrum = np.zeros((3, freqs.shape[0]), dtype=complex) channel_noise_spec = np.zeros_like(noise_spectrum) # calculate spectral radiance of radio signal using rayleigh-jeans law efield_amplitude = signal_processing.get_electric_field_from_temperature( freqs[passband_filter], eff_temperature[theta_i], solid_angle) # assign random phases to electric field if self.debug: phases = np.zeros(len(efield_amplitude)) else: phases = np.random.uniform(0, 2 * np.pi, len(efield_amplitude)) noise_spectrum[1][passband_filter] = np.exp(1j * phases) * efield_amplitude noise_spectrum[2][passband_filter] = np.exp(1j * phases) * efield_amplitude antenna_pattern = self.__antenna_pattern_provider.load_antenna_pattern( detector.get_antenna_model(station.get_id(), channel.get_id()), ) antenna_orientation = detector.get_antenna_orientation(station.get_id(), channel.get_id()) # add random polarizations and phase to electric field if self.debug: polarizations = np.zeros(len(efield_amplitude)) else: polarizations = np.random.uniform(0, 2 * np.pi, len(efield_amplitude)) channel_noise_spec[1][passband_filter] = noise_spectrum[1][passband_filter] * np.cos(polarizations) channel_noise_spec[2][passband_filter] = noise_spectrum[2][passband_filter] * np.sin(polarizations) # fold electric field with antenna response antenna_response = self.get_cached_antenna_response( antenna_pattern, theta, phi, *antenna_orientation) channel_noise_spectrum = ( antenna_response['theta'] * channel_noise_spec[1] + antenna_response['phi'] * channel_noise_spec[2] ) # add noise spectrum from pixel in the sky to channel spectrum channel_spectra[channel.get_id()] += channel_noise_spectrum # store the updated channel spectra for channel in station.iter_channels(): channel.set_frequency_spectrum(channel_spectra[channel.get_id()], "same")
if __name__ == "__main__": import datetime import matplotlib.pyplot as plt from NuRadioReco.detector.RNO_G.rnog_detector import Detector from NuRadioReco.framework.event import Event from NuRadioReco.framework.station import Station from NuRadioReco.framework.channel import Channel station_id = 23 nr_samples = 2048 sampling_rate = 3.2 * units.GHz frequencies = np.fft.rfftfreq(nr_samples, d=1./sampling_rate) channel_ids = [0] detector = Detector(database_connection='RNOG_public', log_level=logging.NOTSET, select_stations=station_id) detector_time = datetime.datetime(2023, 8, 1) detector.update(detector_time) event = Event(run_number=-1, event_id=-1) station = Station(station_id) station.set_station_time(detector.get_detector_time()) for channel_id in channel_ids: channel = Channel(channel_id) channel.set_frequency_spectrum(np.zeros_like(frequencies, dtype=np.complex128), sampling_rate) station.add_channel(channel) event.set_station(station) # These files were generated using the code NuRadioMC/examples/simulate_effective_ice_temperature # see the README there for more details eff_temperature_dir = f"{NuRadioReco.__path__[0]}/examples/RNOG/thermal_noise/eff_temperatures" eff_temperature_files = ["eff_temperature_-1.0m_ntheta100_GL3.json", "eff_temperature_-40m_ntheta100.json", "eff_temperature_-100m_ntheta100_GL3.json"] eff_temperature_paths = [os.path.join(eff_temperature_dir, eff_temp_f) for eff_temp_f in eff_temperature_files] thermal_noise_adder = channelIceThermalNoiseAdder() thermal_noise_adder.begin(eff_temperature_files=eff_temperature_paths) thermal_noise_adder.run(event, station, detector) station = event.get_station() channel = station.get_channel(0) plt.plot(channel.get_times(), channel.get_trace()) plt.xlabel("time / ns") plt.ylabel("amplitude / V") plt.show() plt.clf() plt.plot(channel.get_frequencies(), np.abs(channel.get_frequency_spectrum())) plt.xlabel("freq / GHz") plt.ylabel("spectral amplitude / V/GHz") plt.savefig("test_thermal_noise")