Source code for NuRadioReco.modules.channelGenericNoiseAdder

import functools
import json
import logging
import numpy as np
from numpy.random import Generator, Philox
import os
from scipy.interpolate import interp1d
from NuRadioReco.utilities import units, fft
from NuRadioReco.modules.base.module import register_run

[docs] @functools.lru_cache(maxsize=128) def load_scale_parameters(scale_parameter_path): """ Returns interpolated scale parameters ifo frequency per channel Parameters ---------- scale_parameter_path : str path to the scale parameter json file Returns ------- scale_parameters : list list of interpolated scale parameters per channel """ with open(scale_parameter_path, "r") as scale_parameter_file: scale_parameters_dictionary = json.load(scale_parameter_file) frequencies = scale_parameters_dictionary["freq"] scale_parameters = scale_parameters_dictionary["scale_parameters"] scale_parameters = [interp1d(frequencies, scale_parameter, bounds_error=False, fill_value=0.) for scale_parameter in scale_parameters] return scale_parameters
[docs] class channelGenericNoiseAdder: """ Module that generates noise in some generic fashion (not based on measured data), which can be added to data. """
[docs] def add_random_phases(self, amps, n_samples_time_domain): """ Adding random phase information to given amplitude spectrum. Parameters ---------- amps: array of floats Data that random phase is added to. n_samples_time_domain: int number of samples in the time domain to differentiate between odd and even number of samples """ amps = np.array(amps, dtype='complex') Np = (n_samples_time_domain - 1) // 2 phases = self.__random_generator.random(Np) * 2 * np.pi phases = np.cos(phases) + 1j * np.sin(phases) amps[1:Np + 1] *= phases # Note that the last entry of the index slice is f[Np] ! return amps
[docs] def fftnoise_fullfft(self, f): """ Adding random phase information to given amplitude spectrum. Parameters ---------- f: array of floats Data that random phase is added to. """ f = np.array(f, dtype='complex') Np = (len(f) - 1) // 2 phases = self.__random_generator.random(Np) * 2 * np.pi phases = np.cos(phases) + 1j * np.sin(phases) f[1:Np + 1] *= phases # Note that the last entry of the index slice is f[Np] ! f[-1:-1 - Np:-1] = np.conj(f[1:Np + 1]) self.logger.debug(' fftnoise: Length of frequency array = {} '.format(len(f))) self.logger.debug(' fftnoise: Number of points for unilateral spectrum = {} '.format(Np)) self.logger.debug(' fftnoise: Max index and amplitude of positive part of spectrum: index = {}, A = |{}| = {} '.format(Np, f[Np], abs(f[Np]))) self.logger.debug(' fftnoise: Min index and amplitude of negative part of spectrum: index = {}, A = |{}| '.format(len(f) - Np, f[-Np])) fftprec = max(abs(np.fft.ifft(f) - np.fft.ifft(f).real)) fftcheck = fftprec - np.finfo(float).resolution self.logger.debug(' fftnoise: fft precision {} < {} (float resolution) is : {} !'.format(fftprec, np.finfo(float).resolution, fftcheck < 0)) if fftcheck >= 0: self.logger.warning(' fftnoise: Non negligibe imagniary part of inverse FFT: {} '.format(fftcheck)) return np.fft.ifft(f).real
[docs] def add_data_driven_noise(self, ampl, selection, frequencies, station_id=None, channel_id=None): """ Function to add data driven noise to a selection range of a given array of amplitudes Parameters ---------- ampl: np.ndarray array of amplitudes to which to add noise selection: list selection of amplitudes to which to add noise in the form of a list of the same length as ampl filled with booleans frequencies: np.ndarray list of frequencies to query data-driven parameters station_id: int station from which to query data to drive noise generation channel_id: int channel from which to query data to drive noise generation Returns ------- ampl: np.ndarray array of amplitudes with data driven noise included """ if len(self.scale_parameter_paths) == 0: raise KeyError("No scale parameter files found, did you pass a directory to begin(scale_parameter_dir=...)?") if station_id is None or channel_id is None: self.logger.error("When selecting data-driven noise, the station and channel ids should be passed to bandlimeted noise") raise ValueError scale_parameter_path = f"thermal_noise_scale_parameters_s{station_id}_season23.json" if scale_parameter_path in self.scale_parameter_paths: scale_parameter_full_path = self.scale_parameter_dir + "/" + scale_parameter_path else: raise NotImplementedError("Other station parameters are being generated") nbinsactive = np.sum(selection) scale_parameters = load_scale_parameters(scale_parameter_full_path) fsigma = scale_parameters[channel_id](frequencies[selection]) ampl[selection] = self.__random_generator.rayleigh(fsigma, nbinsactive) return ampl
[docs] def bandlimited_noise(self, min_freq, max_freq, n_samples, sampling_rate, amplitude, type='perfect_white', time_domain=True, bandwidth=None, station_id=None, channel_id=None): """ Generating noise of n_samples in a bandwidth [min_freq,max_freq]. Parameters ---------- min_freq: float Minimum frequency of passband for noise generation min_freq = None: Only the DC component is removed. If the DC component should be included, min_freq = 0 has to be specified max_freq: float Maximum frequency of passband for noise generation If the maximum frequency is above the Nquist frequencey (0.5 * sampling rate), the Nquist frequency is used max_freq = None: Frequencies up to Nyquist freq are used. n_samples: int number of samples in the time domain sampling_rate: float desired sampling rate of data amplitude: float desired voltage of noise as V_rms (only roughly, since bandpass limited) type: string perfect_white: flat frequency spectrum rayleigh: Amplitude of each frequency bin is drawn from a Rayleigh distribution data-driven: Amplitude of each frequency bin is drawn from a data informed Rayleigh distribution # white: flat frequency spectrum with random jitter time_domain: bool (default True) if True returns noise in the time domain, if False it returns the noise in the frequency domain. The latter might be more performant as the noise is generated internally in the frequency domain. bandwidth: float or None (default) if this parameter is specified, the amplitude is interpreted as the amplitude for the bandwidth specified here Otherwise the amplitude is interpreted for the bandwidth of min(max_freq, 0.5 * sampling rate) - min_freq If `bandwidth` is larger then (min(max_freq, 0.5 * sampling rate) - min_freq) it has the same effect as `None` station_id: int or None (default) Only necessary when selecting data-driven noise type to determine from which station/channel data to generate noise channel_id: int or None (default) Only necessary when selecting data-driven noise type to determine from which station/channel data to generate noise Notes ----- * Note that by design the max frequency is the Nyquist frequency, even if a bigger max_freq is implemented (RL 17-Sept-2018) * Add 'multi_white' noise option on 20-Sept-2018 (RL) """ frequencies = fft.freqs(n_samples, sampling_rate) n_samples_freq = len(frequencies) if min_freq is None or min_freq == 0: # remove DC component; fftfreq returns the DC component as 0-th element and the negative # frequencies at the end, so frequencies[1] should be the lowest frequency; it seems safer, # to take the difference between two frequencies to determine the minimum frequency, in case # future versions of numpy change the order and maybe put the negative frequencies first min_freq = 0.5 * (frequencies[2] - frequencies[1]) self.logger.info('Set min_freq from None to {} MHz!'.format(min_freq / units.MHz)) if max_freq is None: # sample up to Nyquist frequency max_freq = max(frequencies) self.logger.info('Set max_freq from None to {} GHz!'.format(max_freq / units.GHz)) else: if round(max_freq, 3) > round(frequencies[-1], 3): self.logger.warning( f'max_freq ({max_freq / units.MHz:.2f} MHz) is above the Nyquist frequency ' f'({frequencies[-1] / units.MHz:.2f} MHz). This means the simulated noise ampitude ' 'might deviate from what you intended. To fix that, you either need to lower ' 'max_freq or increase the sampling_rate.') selection = (frequencies >= min_freq) & (frequencies <= max_freq) nbinsactive = np.sum(selection) self.logger.debug('Total number of frequency bins (bilateral spectrum) : {} , of those active: {} '.format(n_samples, nbinsactive)) if bandwidth is not None: sampling_bandwidth = min(0.5 * sampling_rate, max_freq) - min_freq amplitude *= 1. / (bandwidth / (sampling_bandwidth)) ** 0.5 # normalize noise level to the bandwidth its generated for ampl = np.zeros(n_samples_freq) sigscale = (1. * n_samples) / np.sqrt(nbinsactive) if type == 'perfect_white': ampl[selection] = amplitude * sigscale elif type == 'rayleigh': fsigma = amplitude * sigscale / np.sqrt(2.) ampl[selection] = self.__random_generator.rayleigh(fsigma, nbinsactive) elif type == "data-driven": ampl = self.add_data_driven_noise(ampl, selection, frequencies, station_id, channel_id) # FIXME: amplitude normalization is not correct for 'white' # elif type == 'white': # ampl = np.random.rand(n_samples) * 0.05 * amplitude + amplitude * np.sqrt(2.*n_samples * 2) else: self.logger.error("Other types of noise not yet implemented.") raise NotImplementedError("Other types of noise not yet implemented.") if type == "data-driven": # data-driven parameters were sampled from spectra that follow the NuRadio conventions # and were hence already divide by the sampling rate noise = self.add_random_phases(ampl, n_samples) else: noise = self.add_random_phases(ampl, n_samples) / sampling_rate if time_domain: return fft.freq2time(noise, sampling_rate, n=n_samples) else: return noise
[docs] def precalculate_bandlimited_noise_parameters( self, min_freq, max_freq, n_samples, sampling_rate, amplitude, type='perfect_white', bandwidth=None): """ Generating noise of n_samples in a bandwidth [min_freq,max_freq]. Parameters ---------- min_freq: float Minimum frequency of passband for noise generation min_freq = None: Only the DC component is removed. If the DC component should be included, min_freq = 0 has to be specified max_freq: float Maximum frequency of passband for noise generation If the maximum frequency is above the Nquist frequencey (0.5 * sampling rate), the Nquist frequency is used max_freq = None: Frequencies up to Nyquist freq are used. n_samples: int number of samples in the time domain sampling_rate: float desired sampling rate of data amplitude: float desired voltage of noise as V_rms (only roughly, since bandpass limited) type: string perfect_white: flat frequency spectrum rayleigh: Amplitude of each frequency bin is drawn from a Rayleigh distribution # white: flat frequency spectrum with random jitter time_domain: bool (default True) if True returns noise in the time domain, if False it returns the noise in the frequency domain. The latter might be more performant as the noise is generated internally in the frequency domain. bandwidth: float or None (default) if this parameter is specified, the amplitude is interpreted as the amplitude for the bandwidth specified here Otherwise the amplitude is interpreted for the bandwidth of min(max_freq, 0.5 * sampling rate) - min_freq If `bandwidth` is larger then (min(max_freq, 0.5 * sampling rate) - min_freq) it has the same effect as `None` Notes ----- * Note that by design the max frequency is the Nyquist frequency, even if a bigger max_freq is implemented (RL 17-Sept-2018) * Add 'multi_white' noise option on 20-Sept-2018 (RL) """ frequencies = np.fft.rfftfreq(n_samples, 1. / sampling_rate) n_samples_freq = len(frequencies) if min_freq is None or min_freq == 0: # remove DC component; fftfreq returns the DC component as 0-th element and the negative # frequencies at the end, so frequencies[1] should be the lowest frequency; it seems safer, # to take the difference between two frequencies to determine the minimum frequency, in case # future versions of numpy change the order and maybe put the negative frequencies first min_freq = 0.5 * (frequencies[2] - frequencies[1]) self.logger.info(' Set min_freq from None to {} MHz!'.format(min_freq / units.MHz)) if max_freq is None: # sample up to Nyquist frequency max_freq = max(frequencies) self.logger.info(' Set max_freq from None to {} GHz!'.format(max_freq / units.GHz)) selection = (frequencies >= min_freq) & (frequencies <= max_freq) nbinsactive = np.sum(selection) self.logger.debug('Total number of frequency bins (bilateral spectrum) : {} , of those active: {} '.format(n_samples, nbinsactive)) if(bandwidth is not None): sampling_bandwidth = min(0.5 * sampling_rate, max_freq) - min_freq amplitude *= 1. / (bandwidth / (sampling_bandwidth)) ** 0.5 # normalize noise level to the bandwidth its generated for ampl = np.zeros(n_samples_freq) sigscale = (1. * n_samples) / np.sqrt(nbinsactive) fsigma = amplitude * sigscale / np.sqrt(2.) self.precalculated_parameters = { "n_samples_freq": n_samples_freq, "selection": selection, "nbinsactive": nbinsactive, "sigscale": sigscale, "fsigma": fsigma, "sampling_rate": sampling_rate, "frequencies": frequencies, "n_samples": n_samples }
[docs] def bandlimited_noise_from_precalculated_parameters(self, type='perfect_white', time_domain=True): """ Generating noise of n_samples in a bandwidth [min_freq,max_freq]. Parameters ---------- min_freq: float Minimum frequency of passband for noise generation min_freq = None: Only the DC component is removed. If the DC component should be included, min_freq = 0 has to be specified max_freq: float Maximum frequency of passband for noise generation If the maximum frequency is above the Nquist frequencey (0.5 * sampling rate), the Nquist frequency is used max_freq = None: Frequencies up to Nyquist freq are used. n_samples: int number of samples in the time domain sampling_rate: float desired sampling rate of data amplitude: float desired voltage of noise as V_rms (only roughly, since bandpass limited) type: string perfect_white: flat frequency spectrum rayleigh: Amplitude of each frequency bin is drawn from a Rayleigh distribution # white: flat frequency spectrum with random jitter time_domain: bool (default True) if True returns noise in the time domain, if False it returns the noise in the frequency domain. The latter might be more performant as the noise is generated internally in the frequency domain. bandwidth: float or None (default) if this parameter is specified, the amplitude is interpreted as the amplitude for the bandwidth specified here Otherwise the amplitude is interpreted for the bandwidth of min(max_freq, 0.5 * sampling rate) - min_freq If `bandwidth` is larger then (min(max_freq, 0.5 * sampling rate) - min_freq) it has the same effect as `None` Notes ----- * Note that by design the max frequency is the Nyquist frequency, even if a bigger max_freq is implemented (RL 17-Sept-2018) * Add 'multi_white' noise option on 20-Sept-2018 (RL) """ ampl = np.zeros(self.precalculated_parameters["n_samples_freq"]) if type == 'perfect_white': ampl[self.precalculated_parameters["selection"]] = self.precalculated_parameters["amplitude"] * self.precalculated_parameters["sigscale"] elif type == 'rayleigh': ampl[self.precalculated_parameters["selection"]] = self.__random_generator.rayleigh(self.precalculated_parameters["fsigma"], self.precalculated_parameters["nbinsactive"]) # elif type == 'white': # FIXME: amplitude normalization is not correct for 'white' # ampl = np.random.rand(n_samples) * 0.05 * amplitude + amplitude * np.sqrt(2.*n_samples * 2) else: self.logger.error("Other types of noise not yet implemented.") raise NotImplementedError("Other types of noise not yet implemented.") noise = self.add_random_phases(ampl, self.precalculated_parameters["n_samples"]) / self.precalculated_parameters["sampling_rate"] if(time_domain): return fft.freq2time(noise, self.precalculated_parameters["sampling_rate"], n=self.precalculated_parameters["n_samples"]) else: return noise
[docs] def bandlimited_noise_from_spectrum(self, n_samples, sampling_rate, spectrum, amplitude=None, type='perfect_white', time_domain=True, station_id=None, channel_id=None): """ Generating noise of n_samples in a bandwidth [min_freq,max_freq]. Parameters ---------- n_samples: int number of samples in the time domain sampling_rate: float desired sampling rate of data spectrum: numpy.ndarray, function desired spectrum of the noise, either as a numpy.ndarray of length n_frequencies or a function that takes the frequencies as an argument and returns the amplitudes. The overall normalization of the spectrum is ignored if the paramter "amplitude" is set. amplitude: float, optional desired voltage of noise as V_rms. If set to None the power of the noise will be equal to the power of the spectrum. type: string perfect_white: flat frequency spectrum rayleigh: Amplitude of each frequency bin is drawn from a Rayleigh distribution data-driven : Amplitude of each frequency bin is drawn from a data informed Rayleigh distribution # white: flat frequency spectrum with random jitter time_domain: bool (default True) if True returns noise in the time domain, if False it returns the noise in the frequency domain. The latter might be more performant as the noise is generated internally in the frequency domain. station_id: int or None (default) Only necessary when selecting data-driven noise type to determine from which station/channel data to generate noise channel_id: int or None (default) Only necessary when selecting data-driven noise type to determine from which station/channel data to generate noise """ frequencies = np.fft.rfftfreq(n_samples, 1. / sampling_rate) selection = frequencies > 0 n_samples_freq = np.sum(selection) if callable(spectrum): spectrum = spectrum(frequencies) if amplitude is not None: # power = np.sum(spectrum**2) norm = np.trapz(np.abs(spectrum) ** 2, frequencies) max_freq = frequencies[-1] amplitude = amplitude / (norm / max_freq) ** 0.5 sigscale = (1. * n_samples) / np.sqrt(n_samples_freq) elif amplitude is None: amplitude = np.sqrt(n_samples) sigscale = 1 ampl = np.zeros(len(frequencies), dtype=complex) if type == 'perfect_white': ampl = amplitude * sigscale elif type == 'rayleigh': fsigma = amplitude * sigscale / np.sqrt(2.) ampl[selection] = self.__random_generator.rayleigh(fsigma, n_samples_freq) elif type == "data-driven": ampl = self.add_data_driven_noise(ampl, selection, frequencies, station_id, channel_id) else: self.logger.error("Other types of noise not yet implemented.") raise NotImplementedError("Other types of noise not yet implemented.") if type == "data-driven": # data-driven parameters were sampled from spectra that follow the NuRadio conventions # and were hence already divide by the sampling rate noise = self.add_random_phases(ampl, n_samples) else: noise = self.add_random_phases(ampl, n_samples) / sampling_rate noise *= spectrum if time_domain: return fft.freq2time(noise, sampling_rate, n=n_samples) else: return noise
def __init__(self): self.__debug = None self.__random_generator = None self.logger = logging.getLogger('NuRadioReco.channelGenericNoiseAdder') self.begin()
[docs] def begin(self, debug=False, seed=None, scale_parameter_dir = None): """ Parameters ---------- scale_parameter_dir : string Parameter for noise type "data-driven" Path to the directory that contains the scale parameter files. One file contains one station. The module expects the files to be named thermal_noise_scale_parameters_sXX_seasonXX.json """ self.__debug = debug self.__random_generator = Generator(Philox(seed)) if debug: self.logger.setLevel(logging.DEBUG) self.scale_parameter_paths = [] if scale_parameter_dir is not None: self.scale_parameter_dir = scale_parameter_dir self.scale_parameter_paths = [scale_param_json for scale_param_json in os.listdir(scale_parameter_dir) if (scale_param_json.endswith(".json") and scale_param_json.startswith("thermal_noise_scale_parameters"))] if len(self.scale_parameter_paths) == 0: raise OSError(f"No scale parameter json files found in {self.scale_parameter_dir}")
[docs] @register_run() def run(self, event, station, detector, amplitude=1 * units.mV, min_freq=50 * units.MHz, max_freq=2000 * units.MHz, type='perfect_white', excluded_channels=None, bandwidth=None): """ Add noise to given event. Parameters ---------- event station detector amplitude: float or dict of floats desired voltage of noise as V_rms for the specified bandwidth a dict can be used to specify a different amplitude per channel, the key is the channel_id min_freq: float Minimum frequency of passband for noise generation max_freq: float Maximum frequency of passband for noise generation If the maximum frequency is above the Nquist frequencey (0.5 * sampling rate), the Nquist frequency is used type: string perfect_white: flat frequency spectrum rayleigh: Amplitude of each frequency bin is drawn from a Rayleigh distribution data-driven: Amplitude of each frequency bin is drawn from a data-informed Rayleigh distribution excluded_channels: list of ints the channels ids of channels where no noise will be added, default is that no channel is excluded bandwidth: float or None (default) if this parameter is specified, the amplitude is interpreted as the amplitude for the bandwidth specified here Otherwise the amplitude is interpreted for the bandwidth of min(max_freq, 0.5 * sampling rate) - min_freq If `bandwidth` is larger then (min(max_freq, 0.5 * sampling rate) - min_freq) it has the same effect as `None` """ if excluded_channels is None: excluded_channels = [] station_id = station.get_id() channels = station.iter_channels() for channel in channels: channel_id = channel.get_id() if(channel_id in excluded_channels): continue trace = channel.get_trace() sampling_rate = channel.get_sampling_rate() if(isinstance(amplitude, dict)): tmp_ampl = amplitude[channel.get_id()] else: tmp_ampl = amplitude noise = self.bandlimited_noise(min_freq=min_freq, max_freq=max_freq, n_samples=trace.shape[0], sampling_rate=sampling_rate, amplitude=tmp_ampl, type=type, bandwidth=bandwidth, station_id=station_id, channel_id=channel_id) if self.__debug: new_trace = trace + noise self.logger.debug("imput amplitude {}".format(amplitude)) self.logger.debug("voltage RMS {}".format(np.sqrt(np.mean(noise ** 2)))) import matplotlib.pyplot as plt plt.plot(trace) plt.plot(noise) plt.plot(new_trace) plt.figure() plt.plot(np.abs(fft.time2freq(trace, channel.get_sampling_rate()))) plt.plot(np.abs(fft.time2freq(noise, channel.get_sampling_rate()))) plt.plot(np.abs(fft.time2freq(new_trace, channel.get_sampling_rate()))) plt.show() new_trace = trace + noise channel.set_trace(new_trace, sampling_rate)
[docs] def end(self): pass
if __name__ == "__main__": import argparse from astropy.time import Time import matplotlib.pyplot as plt from NuRadioReco.framework.event import Event from NuRadioReco.framework.station import Station from NuRadioReco.framework.channel import Channel from NuRadioReco.detector import detector parser =argparse.ArgumentParser() parser.add_argument("--station", "-s", type=int, default=11) parser.add_argument("--channel", "-c", type=int, default=0) args = parser.parse_args() def create_sim_event(station_id, channel_id, detector, frequencies, sampling_rate): event = Event(run_number=-1, event_id=-1) station = Station(station_id) station.set_station_time(detector.get_detector_time()) channel = Channel(channel_id) channel.set_frequency_spectrum(np.zeros_like(frequencies, dtype=np.complex128), sampling_rate) station.add_channel(channel) event.set_station(station) return event, station log_level = logging.DEBUG det = detector.Detector(source="rnog_mongo", always_query_entire_description=False, database_connection="RNOG_public", select_stations=args.station, log_level=log_level) det.update(Time("2023-08-01")) nr_samples = 2048 sampling_rate = 3.2 * units.GHz frequencies = np.fft.rfftfreq(nr_samples, d=1./sampling_rate) event, station = create_sim_event(args.station, args.channel, det, frequencies, sampling_rate) scale_parameter_dir = "/insert/path/to/scale/parameters/here" generic_noise_adder = channelGenericNoiseAdder() generic_noise_adder.begin(scale_parameter_dir=scale_parameter_dir) channel = station.get_channel(args.channel) # noise = generic_noise_adder.bandlimited_noise(0, 1.6, nr_samples, sampling_rate, amplitude=None, type="data-driven", time_domain=False, # station_id=args.station, channel_id=args.channel) spectrum = np.zeros_like(frequencies) spectrum[100:700] = 1 noise = generic_noise_adder.bandlimited_noise_from_spectrum(nr_samples, sampling_rate, spectrum, amplitude=None, type="data-driven", time_domain=False, station_id=args.station, channel_id=args.channel) channel.set_frequency_spectrum(noise, sampling_rate) frequency_spectrum = channel.get_frequency_spectrum() times = channel.get_times() trace = channel.get_trace() plt.plot(frequencies, np.abs(frequency_spectrum)) plt.xlabel("freq / GHz") plt.ylabel("spectral amplitude / V/GHz") plt.savefig("channelGenericNoiseAdder_spectrumtest.png") plt.close() plt.plot(times, trace) plt.xlabel("times / ns") plt.ylabel("amplitude / V") plt.savefig("channelGenericNoiseAdder_tracetest.png") plt.close()