Source code for NuRadioReco.utilities.trace_utilities

import numpy as np
import scipy.constants
import scipy.signal
from NuRadioReco.utilities import units
from NuRadioReco.utilities import ice
from NuRadioReco.utilities import geometryUtilities as geo_utl
from NuRadioReco.utilities import fft
import logging
logger = logging.getLogger('NuRadioReco.trace_utilities')

conversion_factor_integrated_signal = scipy.constants.c * scipy.constants.epsilon_0 * units.joule / units.s / units.volt ** 2

# see Phys. Rev. D DOI: 10.1103/PhysRevD.93.122005
# to convert V**2/m**2 * s -> J/m**2 -> eV/m**2


[docs]def get_efield_antenna_factor(station, frequencies, channels, detector, zenith, azimuth, antenna_pattern_provider): """ Returns the antenna response to a radio signal coming from a specific direction Parameters ---------- station: Station frequencies: array of complex frequencies of the radio signal for which the antenna response is needed channels: array of int IDs of the channels detector: Detector zenith, azimuth: float, float incoming direction of the signal. Note that refraction and reflection at the ice/air boundary are taken into account antenna_pattern_provider: AntennaPatternProvider """ n_ice = ice.get_refractive_index(-0.01, detector.get_site(station.get_id())) efield_antenna_factor = np.zeros((len(channels), 2, len(frequencies)), dtype=complex) # from antenna model in e_theta, e_phi for iCh, channel_id in enumerate(channels): zenith_antenna = zenith t_theta = 1. t_phi = 1. # first check case if signal comes from above if zenith <= 0.5 * np.pi and station.is_cosmic_ray(): # is antenna below surface? position = detector.get_relative_position(station.get_id(), channel_id) if position[2] <= 0: zenith_antenna = geo_utl.get_fresnel_angle(zenith, n_ice, 1) t_theta = geo_utl.get_fresnel_t_p(zenith, n_ice, 1) t_phi = geo_utl.get_fresnel_t_s(zenith, n_ice, 1) logger.info("channel {:d}: electric field is refracted into the firn. theta {:.0f} -> {:.0f}. Transmission coefficient p (eTheta) {:.2f} s (ePhi) {:.2f}".format(iCh, zenith / units.deg, zenith_antenna / units.deg, t_theta, t_phi)) else: # now the signal is coming from below, do we have an antenna above the surface? position = detector.get_relative_position(station.get_id(), channel_id) if(position[2] > 0): zenith_antenna = geo_utl.get_fresnel_angle(zenith, 1., n_ice) if(zenith_antenna is None): logger.warning("fresnel reflection at air-firn boundary leads to unphysical results, no reconstruction possible") return None logger.debug("angles: zenith {0:.0f}, zenith antenna {1:.0f}, azimuth {2:.0f}".format(np.rad2deg(zenith), np.rad2deg(zenith_antenna), np.rad2deg(azimuth))) antenna_model = detector.get_antenna_model(station.get_id(), channel_id, zenith_antenna) antenna_pattern = antenna_pattern_provider.load_antenna_pattern(antenna_model) ori = detector.get_antenna_orientation(station.get_id(), channel_id) VEL = antenna_pattern.get_antenna_response_vectorized(frequencies, zenith_antenna, azimuth, *ori) efield_antenna_factor[iCh] = np.array([VEL['theta'] * t_theta, VEL['phi'] * t_phi]) return efield_antenna_factor
[docs]def get_channel_voltage_from_efield(station, electric_field, channels, detector, zenith, azimuth, antenna_pattern_provider, return_spectrum=True): """ Returns the voltage traces that would result in the channels from the station's E-field. Parameters ---------- station: Station electric_field: ElectricField channels: array of int IDs of the channels for which the expected voltages should be calculated detector: Detector zenith, azimuth: float incoming direction of the signal. Note that reflection and refraction at the air/ice boundary are already being taken into account. antenna_pattern_provider: AntennaPatternProvider return_spectrum: boolean if True, returns the spectrum, if False return the time trace """ frequencies = electric_field.get_frequencies() spectrum = electric_field.get_frequency_spectrum() efield_antenna_factor = get_efield_antenna_factor(station, frequencies, channels, detector, zenith, azimuth, antenna_pattern_provider) if return_spectrum: voltage_spectrum = np.zeros((len(channels), len(frequencies)), dtype=complex) for i_ch, ch in enumerate(channels): voltage_spectrum[i_ch] = np.sum(efield_antenna_factor[i_ch] * np.array([spectrum[1], spectrum[2]]), axis=0) return voltage_spectrum else: voltage_trace = np.zeros((len(channels), 2 * (len(frequencies) - 1)), dtype=complex) for i_ch, ch in enumerate(channels): voltage_trace[i_ch] = fft.freq2time(np.sum(efield_antenna_factor[i_ch] * np.array([spectrum[1], spectrum[2]]), axis=0), electric_field.get_sampling_rate()) return np.real(voltage_trace)
[docs]def get_electric_field_energy_fluence(electric_field_trace, times, signal_window_mask=None, noise_window_mask=None): if signal_window_mask is None: f_signal = np.sum(electric_field_trace ** 2, axis=1) else: f_signal = np.sum(electric_field_trace[:, signal_window_mask] ** 2, axis=1) dt = times[1] - times[0] if noise_window_mask is not None: f_noise = np.sum(electric_field_trace[:, noise_window_mask] ** 2, axis=1) f_signal -= f_noise * np.sum(signal_window_mask) / np.sum(noise_window_mask) return f_signal * dt * conversion_factor_integrated_signal
[docs]def get_stokes(trace_u, trace_v, window_samples=128, squeeze=True): """ Compute the stokes parameters for electric field traces Parameters ---------- trace_u : 1d array (float) The u component of the electric field trace trace_v : 1d array (float) The v component of the electric field trace. The two components should have equal lengths, and the (u, v) coordinates should be perpendicular. Common choices are (theta, phi) or (vxB, vxvxB) window_samples : int | None, default: 128 If not None, return a running average of the stokes parameters over ``window_samples``. If None, compute the stokes parameters over the whole trace (equivalent to ``window_samples=len(trace_u)``). squeeze : bool, default: True Only relevant if ``window_samples=None``. Squeezes out the second axis (which has a length of one) and returns an array of shape (4,) Returns ------- stokes : 2d array of floats The stokes parameters I, Q, U, V. The shape of the returned array is ``(4, len(trace_u) - window_samples +1)``, i.e. stokes[0] returns the I parameter, stokes[1] corresponds to Q, and so on. Examples -------- For an electric field defined in (eR, eTheta, ePhi) components, the stokes parameters can be given simply by: .. code-block:: get_stokes(electric_field.get_trace()[1], electric_field.get_trace()[2]) To instead get the stokes parameters in vxB and vxvxB, we need to first obtain the appropriate electric field components .. code-block:: cs = radiotools.coordinatesystems.cstrafo(zenith, azimuth, magnetic_field_vector) efield_trace_vxB_vxvxB = cs.transform_to_vxB_vxvxB( cs.transform_from_onsky_to_ground(efield.get_trace()) ) """ assert len(trace_u) == len(trace_v) h1 = scipy.signal.hilbert(trace_u) h2 = scipy.signal.hilbert(trace_v) stokes_i = np.abs(h1)**2 + np.abs(h2)**2 stokes_q = np.abs(h1)**2 - np.abs(h2)**2 uv = 2 * h1 * np.conjugate(h2) stokes_u = np.real(uv) stokes_v = np.imag(uv) stokes = np.array([stokes_i, stokes_q, stokes_u, stokes_v]) if window_samples == 1: # no need to average return stokes elif window_samples is None: window_samples = len(h1) stokes = np.asarray([ scipy.signal.convolve(i, np.ones(window_samples), mode='valid') for i in stokes ]) stokes /= window_samples if squeeze: return np.squeeze(stokes) return stokes
[docs]def upsampling_fir(trace, original_sampling_frequency, int_factor=2, ntaps=2 ** 7): """ This function performs an upsampling by inserting a number of zeroes between samples and then applying a finite impulse response (FIR) filter. Parameters ---------- trace: array of floats Trace to be upsampled original_sampling_frequency: float Sampling frequency of the input trace int_factor: integer Upsampling factor. The resulting trace will have a sampling frequency int_factor times higher than the original one ntaps: integer Number of taps (order) of the FIR filter Returns ------- upsampled_trace: array of floats The upsampled trace """ if (np.abs(int(int_factor) - int_factor) > 1e-3): warning_msg = "The input upsampling factor does not seem to be close to an integer." warning_msg += "It has been rounded to {}".format(int(int_factor)) logger.warning(warning_msg) int_factor = int(int_factor) if (int_factor <= 1): error_msg = "Upsampling factor is less or equal to 1. Upsampling will not be performed." raise ValueError(error_msg) zeroed_trace = np.zeros(len(trace) * int_factor) for i_point, point in enumerate(trace[:-1]): zeroed_trace[i_point * int_factor] = point upsampled_delta_time = 1 / (int_factor * original_sampling_frequency) upsampled_times = np.arange(0, len(zeroed_trace) * upsampled_delta_time, upsampled_delta_time) cutoff = 1. / int_factor fir_coeffs = scipy.signal.firwin(ntaps, cutoff, window='boxcar') upsampled_trace = np.convolve(zeroed_trace, fir_coeffs)[:len(upsampled_times)] * int_factor return upsampled_trace
[docs]def butterworth_filter_trace(trace, sampling_frequency, passband, order=8): """ Filters a trace using a Butterworth filter. Parameters ---------- trace: array of floats Trace to be filtered sampling_frequency: float Sampling frequency passband: (float, float) tuple Tuple indicating the cutoff frequencies order: integer Filter order Returns ------- filtered_trace: array of floats The filtered trace """ n_samples = len(trace) spectrum = fft.time2freq(trace, sampling_frequency) frequencies = np.fft.rfftfreq(n_samples, 1 / sampling_frequency) filtered_spectrum = apply_butterworth(spectrum, frequencies, passband, order) filtered_trace = fft.freq2time(filtered_spectrum, sampling_frequency) return filtered_trace
[docs]def apply_butterworth(spectrum, frequencies, passband, order=8): """ Calculates the response from a Butterworth filter and applies it to the input spectrum Parameters ---------- spectrum: array of complex Fourier spectrum to be filtere frequencies: array of floats Frequencies of the input spectrum passband: (float, float) tuple Tuple indicating the cutoff frequencies order: integer Filter order Returns ------- filtered_spectrum: array of complex The filtered spectrum """ f = np.zeros_like(frequencies, dtype=complex) mask = frequencies > 0 b, a = scipy.signal.butter(order, passband, 'bandpass', analog=True) w, h = scipy.signal.freqs(b, a, frequencies[mask]) f[mask] = h filtered_spectrum = f * spectrum return filtered_spectrum
[docs]def delay_trace(trace, sampling_frequency, time_delay, delayed_samples=None): """ Delays a trace by transforming it to frequency and multiplying by phases. Since this method is cyclic, the trace has to be cropped. It only accepts positive delays, so some samples from the beginning are thrown away and then some samples from the end so that the total number of samples is equal to the argument delayed samples. Parameters ---------- trace: array of floats Array containing the trace sampling_frequency: float Sampling rate for the trace time_delay: float Time delay used for transforming the trace. Must be positive or 0 delayed_samples: integer or None Number of samples that the delayed trace must contain if None: the trace is not cut Returns ------- delayed_trace: array of floats The delayed, cropped trace """ if time_delay < 0: msg = 'Time delay must be positive' raise ValueError(msg) n_samples = len(trace) spectrum = fft.time2freq(trace, sampling_frequency) frequencies = np.fft.rfftfreq(n_samples, 1 / sampling_frequency) spectrum *= np.exp(-1j * 2 * np.pi * frequencies * time_delay) delayed_trace = fft.freq2time(spectrum, sampling_frequency) init_sample = int(time_delay * sampling_frequency) + 1 if delayed_samples is not None: delayed_trace = delayed_trace[init_sample:None] delayed_trace = delayed_trace[:delayed_samples] return delayed_trace