import functools
import json
import numpy as np
import os
import warnings
import NuRadioReco.detector.antennapattern
import NuRadioReco.framework.channel
import NuRadioReco.framework.sim_station
from NuRadioReco.modules.base.module import register_run
from NuRadioReco.utilities import units, ice, geometryUtilities, signal_processing
import logging
logger = logging.getLogger('NuRadioReco.channelThermalNoiseAdder')
[docs]
class channelIceThermalNoiseAdder:
"""
Module to generate thermal noise from ice
This class is a stripped version of channelGalacticNoiseAdder
Intead of using a skymap, the module uses pre generated effective antenna temperatures,
generated by Steffen's effective temperature code which was modified by Felix
(see NuRadioMC/examples/simulate_effective_ice_temperature )
"""
def __init__(self):
self.__antenna_pattern_provider = NuRadioReco.detector.antennapattern.AntennaPatternProvider()
[docs]
@functools.lru_cache(maxsize=1024 * 32)
def get_cached_antenna_response(self, antenna_pattern, zen, azi, *ant_orient):
return antenna_pattern.get_antenna_response_vectorized(self.freqs, zen, azi, *ant_orient)
[docs]
def solid_angle(self, theta, d_theta, d_phi):
return np.abs(np.sin(theta) * np.sin(d_theta / 2) * 2 * d_phi)
[docs]
def get_temperature_from_json(self, temperature_file):
"""
Function to open the effective temperature files created by
NuRadioMC/examples/simulate_effective_ice_temperature
The effective temperatures are generated at an antenna depth in function of incident angle
by integrating ice temperature (weighted by attenuation effects) along a ray path starting from an incident angle.
For more info see the example.
This module was made for RNO-G but given you have an ice temperature / attenuation profile one can re-generate these files
for arbitrary experiments
"""
with open(temperature_file, "r") as file_open:
temperature_file_dict = json.load(file_open)
z_antenna = temperature_file_dict["z_antenna"]
theta = temperature_file_dict["theta"] * units.rad
if not np.isclose(theta[-1] - theta[0], 180 * units.degree, atol=1 * units.degree):
raise ValueError(f"Theta angles of {temperature_file} do not extend over 180 degrees.")
if np.any([t > 10 for t in theta]):
raise ValueError("Thetas seem too large to be in radians (> 10), the files should give angles in units of radians.")
if z_antenna > 0:
raise ValueError(f"Antenna depth {z_antenna} should be negative, the code does not account for antennas above the ice")
eff_temperature = temperature_file_dict["eff_temperature"]
return z_antenna, theta, eff_temperature
[docs]
def begin(self, eff_temperature_files, nr_phi_bins=64, channel_depth_matching_error=10, debug=False):
"""
Set up important parameters for the module
Parameters
----------
eff_temperature_files : str
Files generated by NuRadioMC.examples.simulate_effective_ice_temperature.ice_integrator.py
Files contain the effective ice noise temperatures at the antenna in function of the incidence angle
nr_phi_bins : int
Binning of the azimuth, this can be reduced for azimuthally symmetric antennas,
such as vertically polarized antennas
channel_depth_matching_error : float
error the module can ake in assigning eff temperatures to channels based on channel depth
e.g. if error is 2m, channel depth given by detector is 36m and effective temperatures were generated at 40m,
then the module will not match the temperature at 40m to this channel and throw an error
debug : bool
If True removes randomization of electric field phases and polarizations to speed up testing
"""
self.temperature_files = eff_temperature_files
self.channel_depth_matching_error = channel_depth_matching_error
self.eff_temperature_depths = []
self.eff_temperature = []
for temperature_file in self.temperature_files:
z_antenna, self.thetas, eff_temperature = self.get_temperature_from_json(temperature_file)
self.eff_temperature_depths.append(z_antenna)
self.eff_temperature.append(eff_temperature)
self.eff_temperature = np.array(self.eff_temperature)
self.nr_theta_bins = len(self.thetas)
self.debug = debug
if self.debug:
nr_phi_bins = 32
self.phis = np.linspace(0 * units.degree, 360 * units.degree, nr_phi_bins)
return
[docs]
@register_run()
def run(
self,
event,
station,
detector,
passband=None
):
"""
Adds noise resulting from thermal emission to the channel traces
Parameters
----------
event: Event object
The event containing the station to whose channels noise shall be added
station: Station object
The station whose channels noise shall be added to
detector: Detector object
The detector description
passband: list of float, optional
Lower and upper bound of the frequency range in which noise shall be
added. The default (no passband specified) is [10, 1600] MHz
"""
# check that for all channels channel.get_frequencies() is identical
last_freqs = None
for channel in station.iter_channels():
if last_freqs is not None and (
not np.allclose(last_freqs, channel.get_frequencies(), rtol=0, atol=0.1 * units.MHz)):
logger.error("The frequencies of each channel must be the same, but they are not!")
return
last_freqs = channel.get_frequencies()
freqs = last_freqs
self.freqs = freqs
if passband is None:
passband = [10 * units.MHz, 1600 * units.MHz]
passband_filter = (freqs > passband[0]) & (freqs < passband[1])
channel_spectra = {}
for channel in station.iter_channels():
channel_spectra[channel.get_id()] = channel.get_frequency_spectrum()
d_thetas = np.diff(self.thetas)
d_phis = np.diff(self.phis)
for channel in station.iter_channels():
channel_depth = detector.get_channel(station.get_id(), channel.get_id())["channel_position"]["position"][-1]
depth_mask = np.isclose(self.eff_temperature_depths, channel_depth, atol=self.channel_depth_matching_error)
if np.all(depth_mask is False):
raise KeyError(f"No eff temperature found for depth {channel_depth}, either change the depth_error tolerance or generate effective temperatures closer to this depth")
eff_temperature = self.eff_temperature[depth_mask]
if len(eff_temperature) > 1:
raise KeyError(f"Channel depth {channel_depth} corresponds to multiple eff temperature files, decrease the channel_depth_matching_error tolerance")
eff_temperature = eff_temperature[0]
for phi, d_phi in zip(self.phis, d_phis):
for theta_i, (theta, d_theta) in enumerate(zip(self.thetas, d_thetas)):
solid_angle = self.solid_angle(theta, d_theta, d_phi)
noise_spectrum = np.zeros((3, freqs.shape[0]), dtype=complex)
channel_noise_spec = np.zeros_like(noise_spectrum)
# calculate spectral radiance of radio signal using rayleigh-jeans law
efield_amplitude = signal_processing.get_electric_field_from_temperature(
freqs[passband_filter], eff_temperature[theta_i], solid_angle)
# assign random phases to electric field
if self.debug:
phases = np.zeros(len(efield_amplitude))
else:
phases = np.random.uniform(0, 2 * np.pi, len(efield_amplitude))
noise_spectrum[1][passband_filter] = np.exp(1j * phases) * efield_amplitude
noise_spectrum[2][passband_filter] = np.exp(1j * phases) * efield_amplitude
antenna_pattern = self.__antenna_pattern_provider.load_antenna_pattern(
detector.get_antenna_model(station.get_id(), channel.get_id()),
)
antenna_orientation = detector.get_antenna_orientation(station.get_id(), channel.get_id())
# add random polarizations and phase to electric field
if self.debug:
polarizations = np.zeros(len(efield_amplitude))
else:
polarizations = np.random.uniform(0, 2 * np.pi, len(efield_amplitude))
channel_noise_spec[1][passband_filter] = noise_spectrum[1][passband_filter] * np.cos(polarizations)
channel_noise_spec[2][passband_filter] = noise_spectrum[2][passband_filter] * np.sin(polarizations)
# fold electric field with antenna response
antenna_response = self.get_cached_antenna_response(
antenna_pattern, theta, phi, *antenna_orientation)
channel_noise_spectrum = (
antenna_response['theta'] * channel_noise_spec[1]
+ antenna_response['phi'] * channel_noise_spec[2]
)
# add noise spectrum from pixel in the sky to channel spectrum
channel_spectra[channel.get_id()] += channel_noise_spectrum
# store the updated channel spectra
for channel in station.iter_channels():
channel.set_frequency_spectrum(channel_spectra[channel.get_id()], "same")
if __name__ == "__main__":
import datetime
import matplotlib.pyplot as plt
from NuRadioReco.detector.RNO_G.rnog_detector import Detector
from NuRadioReco.framework.event import Event
from NuRadioReco.framework.station import Station
from NuRadioReco.framework.channel import Channel
station_id = 23
nr_samples = 2048
sampling_rate = 3.2 * units.GHz
frequencies = np.fft.rfftfreq(nr_samples, d=1./sampling_rate)
channel_ids = [0]
detector = Detector(database_connection='RNOG_public', log_level=logging.NOTSET,
select_stations=station_id)
detector_time = datetime.datetime(2023, 8, 1)
detector.update(detector_time)
event = Event(run_number=-1, event_id=-1)
station = Station(station_id)
station.set_station_time(detector.get_detector_time())
for channel_id in channel_ids:
channel = Channel(channel_id)
channel.set_frequency_spectrum(np.zeros_like(frequencies, dtype=np.complex128), sampling_rate)
station.add_channel(channel)
event.set_station(station)
# These files were generated using the code NuRadioMC/examples/simulate_effective_ice_temperature
# see the README there for more details
eff_temperature_dir = f"{NuRadioReco.__path__[0]}/examples/RNOG/thermal_noise/eff_temperatures"
eff_temperature_files = ["eff_temperature_-1.0m_ntheta100_GL3.json", "eff_temperature_-40m_ntheta100.json", "eff_temperature_-100m_ntheta100_GL3.json"]
eff_temperature_paths = [os.path.join(eff_temperature_dir, eff_temp_f) for eff_temp_f in eff_temperature_files]
thermal_noise_adder = channelIceThermalNoiseAdder()
thermal_noise_adder.begin(eff_temperature_files=eff_temperature_paths)
thermal_noise_adder.run(event, station, detector)
station = event.get_station()
channel = station.get_channel(0)
plt.plot(channel.get_times(), channel.get_trace())
plt.xlabel("time / ns")
plt.ylabel("amplitude / V")
plt.show()
plt.clf()
plt.plot(channel.get_frequencies(), np.abs(channel.get_frequency_spectrum()))
plt.xlabel("freq / GHz")
plt.ylabel("spectral amplitude / V/GHz")
plt.savefig("test_thermal_noise")