Source code for NuRadioReco.modules.channelGenericNoiseAdder

import logging
import numpy as np
from numpy.random import Generator, Philox
from NuRadioReco.utilities import units, fft
from NuRadioReco.modules.base.module import register_run


[docs]class channelGenericNoiseAdder: """ Module that generates noise in some generic fashion (not based on measured data), which can be added to data. """
[docs] def add_random_phases(self, amps, n_samples_time_domain): """ Adding random phase information to given amplitude spectrum. Parameters ---------- amps: array of floats Data that random phase is added to. n_samples_time_domain: int number of samples in the time domain to differentiate between odd and even number of samples """ amps = np.array(amps, dtype='complex') Np = (n_samples_time_domain - 1) // 2 phases = self.__random_generator.random(Np) * 2 * np.pi phases = np.cos(phases) + 1j * np.sin(phases) amps[1:Np + 1] *= phases # Note that the last entry of the index slice is f[Np] ! return amps
[docs] def fftnoise_fullfft(self, f): """ Adding random phase information to given amplitude spectrum. Parameters ---------- f: array of floats Data that random phase is added to. """ f = np.array(f, dtype='complex') Np = (len(f) - 1) // 2 phases = self.__random_generator.random(Np) * 2 * np.pi phases = np.cos(phases) + 1j * np.sin(phases) f[1:Np + 1] *= phases # Note that the last entry of the index slice is f[Np] ! f[-1:-1 - Np:-1] = np.conj(f[1:Np + 1]) self.logger.debug(' fftnoise: Length of frequency array = {} '.format(len(f))) self.logger.debug(' fftnoise: Number of points for unilateral spectrum = {} '.format(Np)) self.logger.debug(' fftnoise: Max index and amplitude of positive part of spectrum: index = {}, A = |{}| = {} '.format(Np, f[Np], abs(f[Np]))) self.logger.debug(' fftnoise: Min index and amplitude of negative part of spectrum: index = {}, A = |{}| '.format(len(f) - Np, f[-Np])) fftprec = max(abs(np.fft.ifft(f) - np.fft.ifft(f).real)) fftcheck = fftprec - np.finfo(float).resolution self.logger.debug(' fftnoise: fft precision {} < {} (float resolution) is : {} !'.format(fftprec, np.finfo(float).resolution, fftcheck < 0)) if fftcheck >= 0: self.logger.warning(' fftnoise: Non negligibe imagniary part of inverse FFT: {} '.format(fftcheck)) return np.fft.ifft(f).real
[docs] def bandlimited_noise(self, min_freq, max_freq, n_samples, sampling_rate, amplitude, type='perfect_white', time_domain=True, bandwidth=None): """ Generating noise of n_samples in a bandwidth [min_freq,max_freq]. Parameters ---------- min_freq: float Minimum frequency of passband for noise generation min_freq = None: Only the DC component is removed. If the DC component should be included, min_freq = 0 has to be specified max_freq: float Maximum frequency of passband for noise generation If the maximum frequency is above the Nquist frequencey (0.5 * sampling rate), the Nquist frequency is used max_freq = None: Frequencies up to Nyquist freq are used. n_samples: int number of samples in the time domain sampling_rate: float desired sampling rate of data amplitude: float desired voltage of noise as V_rms (only roughly, since bandpass limited) type: string perfect_white: flat frequency spectrum rayleigh: Amplitude of each frequency bin is drawn from a Rayleigh distribution # white: flat frequency spectrum with random jitter time_domain: bool (default True) if True returns noise in the time domain, if False it returns the noise in the frequency domain. The latter might be more performant as the noise is generated internally in the frequency domain. bandwidth: float or None (default) if this parameter is specified, the amplitude is interpreted as the amplitude for the bandwidth specified here Otherwise the amplitude is interpreted for the bandwidth of min(max_freq, 0.5 * sampling rate) - min_freq If `bandwidth` is larger then (min(max_freq, 0.5 * sampling rate) - min_freq) it has the same effect as `None` Notes ----- * Note that by design the max frequency is the Nyquist frequency, even if a bigger max_freq is implemented (RL 17-Sept-2018) * Add 'multi_white' noise option on 20-Sept-2018 (RL) """ frequencies = np.fft.rfftfreq(n_samples, 1. / sampling_rate) n_samples_freq = len(frequencies) if min_freq is None or min_freq == 0: # remove DC component; fftfreq returns the DC component as 0-th element and the negative # frequencies at the end, so frequencies[1] should be the lowest frequency; it seems safer, # to take the difference between two frequencies to determine the minimum frequency, in case # future versions of numpy change the order and maybe put the negative frequencies first min_freq = 0.5 * (frequencies[2] - frequencies[1]) self.logger.info(' Set min_freq from None to {} MHz!'.format(min_freq / units.MHz)) if max_freq is None: # sample up to Nyquist frequency max_freq = max(frequencies) self.logger.info(' Set max_freq from None to {} GHz!'.format(max_freq / units.GHz)) selection = (frequencies >= min_freq) & (frequencies <= max_freq) nbinsactive = np.sum(selection) self.logger.debug('Total number of frequency bins (bilateral spectrum) : {} , of those active: {} '.format(n_samples, nbinsactive)) # Debug plots # f1 = plt.figure() # plt.plot (frequencies/max(frequencies)) # plt.plot(fbinsactive,'kx') if(bandwidth is not None): sampling_bandwidth = min(0.5 * sampling_rate, max_freq) - min_freq amplitude *= 1. / (bandwidth / (sampling_bandwidth)) ** 0.5 # normalize noise level to the bandwidth its generated for ampl = np.zeros(n_samples_freq) sigscale = (1. * n_samples) / np.sqrt(nbinsactive) if type == 'perfect_white': ampl[selection] = amplitude * sigscale elif type == 'rayleigh': fsigma = amplitude * sigscale / np.sqrt(2.) ampl[selection] = self.__random_generator.rayleigh(fsigma, nbinsactive) # elif type == 'white': # FIXME: amplitude normalization is not correct for 'white' # ampl = np.random.rand(n_samples) * 0.05 * amplitude + amplitude * np.sqrt(2.*n_samples * 2) else: self.logger.error("Other types of noise not yet implemented.") raise NotImplementedError("Other types of noise not yet implemented.") noise = self.add_random_phases(ampl, n_samples) / sampling_rate if(time_domain): return fft.freq2time(noise, sampling_rate, n=n_samples) else: return noise
[docs] def precalculate_bandlimited_noise_parameters(self, min_freq, max_freq, n_samples, sampling_rate, amplitude, type='perfect_white', bandwidth=None): """ Generating noise of n_samples in a bandwidth [min_freq,max_freq]. Parameters ---------- min_freq: float Minimum frequency of passband for noise generation min_freq = None: Only the DC component is removed. If the DC component should be included, min_freq = 0 has to be specified max_freq: float Maximum frequency of passband for noise generation If the maximum frequency is above the Nquist frequencey (0.5 * sampling rate), the Nquist frequency is used max_freq = None: Frequencies up to Nyquist freq are used. n_samples: int number of samples in the time domain sampling_rate: float desired sampling rate of data amplitude: float desired voltage of noise as V_rms (only roughly, since bandpass limited) type: string perfect_white: flat frequency spectrum rayleigh: Amplitude of each frequency bin is drawn from a Rayleigh distribution # white: flat frequency spectrum with random jitter time_domain: bool (default True) if True returns noise in the time domain, if False it returns the noise in the frequency domain. The latter might be more performant as the noise is generated internally in the frequency domain. bandwidth: float or None (default) if this parameter is specified, the amplitude is interpreted as the amplitude for the bandwidth specified here Otherwise the amplitude is interpreted for the bandwidth of min(max_freq, 0.5 * sampling rate) - min_freq If `bandwidth` is larger then (min(max_freq, 0.5 * sampling rate) - min_freq) it has the same effect as `None` Notes ----- * Note that by design the max frequency is the Nyquist frequency, even if a bigger max_freq is implemented (RL 17-Sept-2018) * Add 'multi_white' noise option on 20-Sept-2018 (RL) """ frequencies = np.fft.rfftfreq(n_samples, 1. / sampling_rate) n_samples_freq = len(frequencies) if min_freq is None or min_freq == 0: # remove DC component; fftfreq returns the DC component as 0-th element and the negative # frequencies at the end, so frequencies[1] should be the lowest frequency; it seems safer, # to take the difference between two frequencies to determine the minimum frequency, in case # future versions of numpy change the order and maybe put the negative frequencies first min_freq = 0.5 * (frequencies[2] - frequencies[1]) self.logger.info(' Set min_freq from None to {} MHz!'.format(min_freq / units.MHz)) if max_freq is None: # sample up to Nyquist frequency max_freq = max(frequencies) self.logger.info(' Set max_freq from None to {} GHz!'.format(max_freq / units.GHz)) selection = (frequencies >= min_freq) & (frequencies <= max_freq) nbinsactive = np.sum(selection) self.logger.debug('Total number of frequency bins (bilateral spectrum) : {} , of those active: {} '.format(n_samples, nbinsactive)) if(bandwidth is not None): sampling_bandwidth = min(0.5 * sampling_rate, max_freq) - min_freq amplitude *= 1. / (bandwidth / (sampling_bandwidth)) ** 0.5 # normalize noise level to the bandwidth its generated for ampl = np.zeros(n_samples_freq) sigscale = (1. * n_samples) / np.sqrt(nbinsactive) fsigma = amplitude * sigscale / np.sqrt(2.) self.precalculated_parameters = { "n_samples_freq": n_samples_freq, "selection": selection, "nbinsactive": nbinsactive, "sigscale": sigscale, "fsigma": fsigma, "sampling_rate": sampling_rate, "frequencies": frequencies, "n_samples": n_samples }
[docs] def bandlimited_noise_from_precalculated_parameters(self, type='perfect_white', time_domain=True): """ Generating noise of n_samples in a bandwidth [min_freq,max_freq]. Parameters ---------- min_freq: float Minimum frequency of passband for noise generation min_freq = None: Only the DC component is removed. If the DC component should be included, min_freq = 0 has to be specified max_freq: float Maximum frequency of passband for noise generation If the maximum frequency is above the Nquist frequencey (0.5 * sampling rate), the Nquist frequency is used max_freq = None: Frequencies up to Nyquist freq are used. n_samples: int number of samples in the time domain sampling_rate: float desired sampling rate of data amplitude: float desired voltage of noise as V_rms (only roughly, since bandpass limited) type: string perfect_white: flat frequency spectrum rayleigh: Amplitude of each frequency bin is drawn from a Rayleigh distribution # white: flat frequency spectrum with random jitter time_domain: bool (default True) if True returns noise in the time domain, if False it returns the noise in the frequency domain. The latter might be more performant as the noise is generated internally in the frequency domain. bandwidth: float or None (default) if this parameter is specified, the amplitude is interpreted as the amplitude for the bandwidth specified here Otherwise the amplitude is interpreted for the bandwidth of min(max_freq, 0.5 * sampling rate) - min_freq If `bandwidth` is larger then (min(max_freq, 0.5 * sampling rate) - min_freq) it has the same effect as `None` Notes ----- * Note that by design the max frequency is the Nyquist frequency, even if a bigger max_freq is implemented (RL 17-Sept-2018) * Add 'multi_white' noise option on 20-Sept-2018 (RL) """ ampl = np.zeros(self.precalculated_parameters["n_samples_freq"]) if type == 'perfect_white': ampl[self.precalculated_parameters["selection"]] = self.precalculated_parameters["amplitude"] * self.precalculated_parameters["sigscale"] elif type == 'rayleigh': ampl[self.precalculated_parameters["selection"]] = self.__random_generator.rayleigh(self.precalculated_parameters["fsigma"], self.precalculated_parameters["nbinsactive"]) # elif type == 'white': # FIXME: amplitude normalization is not correct for 'white' # ampl = np.random.rand(n_samples) * 0.05 * amplitude + amplitude * np.sqrt(2.*n_samples * 2) else: self.logger.error("Other types of noise not yet implemented.") raise NotImplementedError("Other types of noise not yet implemented.") noise = self.add_random_phases(ampl, self.precalculated_parameters["n_samples"]) / self.precalculated_parameters["sampling_rate"] if(time_domain): return fft.freq2time(noise, self.precalculated_parameters["sampling_rate"], n=self.precalculated_parameters["n_samples"]) else: return noise
[docs] def bandlimited_noise_from_spectrum(self, n_samples, sampling_rate, spectrum, amplitude = None, type='perfect_white', time_domain=True): """ Generating noise of n_samples in a bandwidth [min_freq,max_freq]. Parameters ---------- n_samples: int number of samples in the time domain sampling_rate: float desired sampling rate of data spectrum: numpy.ndarray, function disired spectrum of the noise, either as a numpy.ndarray of length n_frequencies or a function that takes the frequencies as an argument and returns the amplitudes. The overall normalization of the spectrum is ignored if the paramter "amplitude" is set. amplitude: float, optional desired voltage of noise as V_rms. If set to None the power of the noise will be equal to the power of the spectrum. type: string perfect_white: flat frequency spectrum rayleigh: Amplitude of each frequency bin is drawn from a Rayleigh distribution # white: flat frequency spectrum with random jitter time_domain: bool (default True) if True returns noise in the time domain, if False it returns the noise in the frequency domain. The latter might be more performant as the noise is generated internally in the frequency domain. """ frequencies = np.fft.rfftfreq(n_samples, 1. / sampling_rate) selection = frequencies > 0 n_samples_freq = np.sum(selection) if callable(spectrum): spectrum = spectrum(frequencies) if amplitude is not None: # power = np.sum(spectrum**2) norm = np.trapz(np.abs(spectrum) ** 2, frequencies) max_freq = frequencies[-1] amplitude = amplitude / (norm / max_freq) ** 0.5 sigscale = (1. * n_samples) / np.sqrt(n_samples_freq) elif amplitude is None: amplitude = np.sqrt(n_samples) sigscale = 1 ampl = np.zeros(len(frequencies), dtype=complex) if type == 'perfect_white': ampl = amplitude * sigscale elif type == 'rayleigh': fsigma = amplitude * sigscale / np.sqrt(2.) ampl[selection] = self.__random_generator.rayleigh(fsigma, n_samples_freq) else: self.logger.error("Other types of noise not yet implemented.") raise NotImplementedError("Other types of noise not yet implemented.") noise = self.add_random_phases(ampl, n_samples) / sampling_rate noise *= spectrum if time_domain: return fft.freq2time(noise, sampling_rate, n=n_samples) else: return noise
def __init__(self): self.__debug = None self.__random_generator = None self.logger = logging.getLogger('NuRadioReco.channelGenericNoiseAdder') self.begin()
[docs] def begin(self, debug=False, seed=None): self.__debug = debug self.__random_generator = Generator(Philox(seed)) if debug: self.logger.setLevel(logging.DEBUG)
[docs] @register_run() def run(self, event, station, detector, amplitude=1 * units.mV, min_freq=50 * units.MHz, max_freq=2000 * units.MHz, type='perfect_white', excluded_channels=None, bandwidth=None): """ Add noise to given event. Parameters ---------- event station detector amplitude: float or dict of floats desired voltage of noise as V_rms for the specified bandwidth a dict can be used to specify a different amplitude per channel, the key is the channel_id min_freq: float Minimum frequency of passband for noise generation max_freq: float Maximum frequency of passband for noise generation If the maximum frequency is above the Nquist frequencey (0.5 * sampling rate), the Nquist frequency is used type: string perfect_white: flat frequency spectrum rayleigh: Amplitude of each frequency bin is drawn from a Rayleigh distribution excluded_channels: list of ints the channels ids of channels where no noise will be added, default is that no channel is excluded bandwidth: float or None (default) if this parameter is specified, the amplitude is interpreted as the amplitude for the bandwidth specified here Otherwise the amplitude is interpreted for the bandwidth of min(max_freq, 0.5 * sampling rate) - min_freq If `bandwidth` is larger then (min(max_freq, 0.5 * sampling rate) - min_freq) it has the same effect as `None` """ if excluded_channels is None: excluded_channels = [] channels = station.iter_channels() for channel in channels: if(channel.get_id() in excluded_channels): continue trace = channel.get_trace() sampling_rate = channel.get_sampling_rate() if(isinstance(amplitude, dict)): tmp_ampl = amplitude[channel.get_id()] else: tmp_ampl = amplitude noise = self.bandlimited_noise(min_freq=min_freq, max_freq=max_freq, n_samples=trace.shape[0], sampling_rate=sampling_rate, amplitude=tmp_ampl, type=type, bandwidth=bandwidth) if self.__debug: new_trace = trace + noise self.logger.debug("imput amplitude {}".format(amplitude)) self.logger.debug("voltage RMS {}".format(np.sqrt(np.mean(noise ** 2)))) import matplotlib.pyplot as plt plt.plot(trace) plt.plot(noise) plt.plot(new_trace) plt.figure() plt.plot(np.abs(fft.time2freq(trace, channel.get_sampling_rate()))) plt.plot(np.abs(fft.time2freq(noise, channel.get_sampling_rate()))) plt.plot(np.abs(fft.time2freq(new_trace, channel.get_sampling_rate()))) plt.show() new_trace = trace + noise channel.set_trace(new_trace, sampling_rate)
[docs] def end(self): pass