from NuRadioReco.modules.base.module import register_run
import numpy as np
import copy
from NuRadioReco.utilities import geometryUtilities as geo_utl
from NuRadioReco.utilities import units
from NuRadioReco.utilities import ice
from NuRadioReco.detector import antennapattern
from NuRadioReco.utilities import trace_utilities
import NuRadioReco.framework.base_trace
import NuRadioReco.framework.electric_field
import matplotlib.pyplot as plt
import logging
from NuRadioReco.framework.parameters import stationParameters as stnp
from NuRadioReco.framework.parameters import electricFieldParameters as efp
logger = logging.getLogger('NuRadioReco.voltageToEfieldConverter')
[docs]def get_array_of_channels(station, use_channels, det, zenith, azimuth,
antenna_pattern_provider, time_domain=False):
time_shifts = np.zeros(len(use_channels))
t_cables = np.zeros(len(use_channels))
t_geos = np.zeros(len(use_channels))
station_id = station.get_id()
site = det.get_site(station_id)
for iCh, channel in enumerate(station.iter_channels(use_channels)):
channel_id = channel.get_id()
antenna_position = det.get_relative_position(station_id, channel_id)
# determine refractive index of signal propagation speed between antennas
refractive_index = ice.get_refractive_index(1, site) # if signal comes from above, in-air propagation speed
if station.is_cosmic_ray():
if(zenith > 0.5 * np.pi):
refractive_index = ice.get_refractive_index(antenna_position[2], site) # if signal comes from below, use refractivity at antenna position
if station.is_neutrino():
refractive_index = ice.get_refractive_index(antenna_position[2], site)
time_shift = -geo_utl.get_time_delay_from_direction(zenith, azimuth, antenna_position, n=refractive_index)
t_geos[iCh] = time_shift
t_cables[iCh] = channel.get_trace_start_time()
logger.debug("time shift channel {}: {:.2f}ns (signal prop), {:.2f}ns (trace start time)".format(channel.get_id(), time_shift, channel.get_trace_start_time()))
time_shift += channel.get_trace_start_time()
time_shifts[iCh] = time_shift
delta_t = time_shifts.max() - time_shifts.min()
tmin = time_shifts.min()
tmax = time_shifts.max()
logger.debug("adding relative station time = {:.0f}ns".format((t_cables.min() + t_geos.max()) / units.ns))
logger.debug("delta t is {:.2f}".format(delta_t / units.ns))
trace_length = station.get_channel(use_channels[0]).get_times()[-1] - station.get_channel(use_channels[0]).get_times()[0]
debug_cut = 0
if(debug_cut):
fig, ax = plt.subplots(len(use_channels), 1)
traces = []
n_samples = None
for iCh, channel in enumerate(station.iter_channels(use_channels)):
tstart = delta_t - (time_shifts[iCh] - tmin)
tstop = tmax - time_shifts[iCh] - delta_t + trace_length
iStart = int(round(tstart * channel.get_sampling_rate()))
iStop = int(round(tstop * channel.get_sampling_rate()))
if(n_samples is None):
n_samples = iStop - iStart
if(n_samples % 2):
n_samples -= 1
trace = copy.copy(channel.get_trace()) # copy to not modify data structure
trace = trace[iStart:(iStart + n_samples)]
if(debug_cut):
ax[iCh].plot(trace)
base_trace = NuRadioReco.framework.base_trace.BaseTrace() # create base trace class to do the fft with correct normalization etc.
base_trace.set_trace(trace, channel.get_sampling_rate())
traces.append(base_trace)
times = traces[0].get_times() # assumes that all channels have the same sampling rate
if(time_domain): # save time domain traces first to avoid extra fft
V_timedomain = np.zeros((len(use_channels), len(times)))
for iCh, trace in enumerate(traces):
V_timedomain[iCh] = trace.get_trace()
frequencies = traces[0].get_frequencies() # assumes that all channels have the same sampling rate
V = np.zeros((len(use_channels), len(frequencies)), dtype=complex)
for iCh, trace in enumerate(traces):
V[iCh] = trace.get_frequency_spectrum()
efield_antenna_factor = trace_utilities.get_efield_antenna_factor(station, frequencies, use_channels, det, zenith, azimuth, antenna_pattern_provider)
if(debug_cut):
plt.show()
if(time_domain):
return efield_antenna_factor, V, V_timedomain
return efield_antenna_factor, V
[docs]def stacked_lstsq(L, b, rcond=1e-10):
"""
Solve L x = b, via SVD least squares cutting of small singular values
L is an array of shape (..., M, N) and b of shape (..., M).
Returns x of shape (..., N)
Note that if L is symmetric, it is inverted analytically instead.
"""
if L.shape[-2] == L.shape[-1]: # try analytic matrix inversion if possible
if L.shape[-1] == 2: # use explicit formula for matrix inverse
denom = (L[:,0,0] * L[:,1,1] - L[:,0,1] * L[:,1,0])
e_theta = (b[:,0] * L[:,1,1] - b[:,1] * L[:,0,1]) / denom
e_phi = (b[:,1] - L[:,1,0] * e_theta) / L[:,1,1]
return np.stack((e_theta, e_phi), axis=-1)
else:
return np.sum(np.linalg.inv(L) * b[:, None], axis=-1)
u, s, v = np.linalg.svd(L, full_matrices=False)
s_max = s.max(axis=-1, keepdims=True)
s_min = rcond * s_max
inv_s = np.zeros_like(s)
inv_s[s >= s_min] = 1 / s[s >= s_min]
x = np.einsum('...ji,...j->...i', v,
inv_s * np.einsum('...ji,...j->...i', u, b.conj()))
return np.conj(x, x)
[docs]class voltageToEfieldConverter:
"""
Unfold the electric field from the channel voltages
This module reconstructs the electric field by solving the system of equation
that relate the incident electric field via the antenna response functions
to the measured voltages (see Eq. 4 of the NuRadioReco paper
https://link.springer.com/article/10.1140/epjc/s10052-019-6971-5).
The module assumed that the electric field is identical at the antennas/channels
that are used for the reconstruction. Furthermore, at least two antennas with
orthogonal polarization response are needed to reconstruct the 3dim electric field.
Alternatively, the polarization of the resulting efield could be forced to a
single polarization component. In that case, a single antenna is sufficient.
"""
def __init__(self):
self.antenna_provider = None
self.begin()
[docs] def begin(self):
self.antenna_provider = antennapattern.AntennaPatternProvider()
pass
[docs] @register_run()
def run(self, evt, station, det, use_channels=None, use_MC_direction=False, force_Polarization=''):
"""
run method. This function is executed for each event
Parameters
----------
evt : `NuRadioReco.framework.event.Event`
station : `NuRadioReco.framework.base_station.BaseStation`
det : Detector object
use_channels: array of ints (default: [0, 1, 2, 3])
the channel ids to use for the electric field reconstruction
use_MC_direction: bool, default: False
If True uses zenith and azimuth direction from simulated station.
Otherwise, uses reconstructed direction from station parameters.
force_Polarization: str, optional
If eTheta or ePhi, then only reconstructs chosen polarization of electric field,
assuming the other is 0. Otherwise (default), reconstructs electric field for both eTheta and ePhi
"""
if use_channels is None:
use_channels = [0, 1, 2, 3]
station_id = station.get_id()
if use_MC_direction:
zenith = station.get_sim_station()[stnp.zenith]
azimuth = station.get_sim_station()[stnp.azimuth]
else:
logger.info("Using reconstructed (or starting) angles as no signal arrival angles are present")
zenith = station[stnp.zenith]
azimuth = station[stnp.azimuth]
efield_antenna_factor, V = get_array_of_channels(station, use_channels, det, zenith, azimuth, self.antenna_provider)
n_frequencies = len(V[0])
denom = (efield_antenna_factor[0][0] * efield_antenna_factor[-1][1] - efield_antenna_factor[0][1] * efield_antenna_factor[-1][0])
mask = np.abs(denom) != 0
# solve it in a vectorized way
efield3_f = np.zeros((3, n_frequencies), dtype=complex)
if force_Polarization == 'eTheta':
efield3_f[1:2, mask] = np.moveaxis(stacked_lstsq(np.moveaxis(efield_antenna_factor[:, 0, mask], 1, 0)[:, :, np.newaxis], np.moveaxis(V[:, mask], 1, 0)), 0, 1)
elif force_Polarization == 'ePhi':
efield3_f[2:, mask] = np.moveaxis(stacked_lstsq(np.moveaxis(efield_antenna_factor[:, 1, mask], 1, 0)[:, :, np.newaxis], np.moveaxis(V[:, mask], 1, 0)), 0, 1)
else:
efield3_f[1:, mask] = np.moveaxis(stacked_lstsq(np.moveaxis(efield_antenna_factor[:, :, mask], 2, 0), np.moveaxis(V[:, mask], 1, 0)), 0, 1)
efield_position = np.mean([
det.get_relative_position(station.get_id(), channel_id)
for channel_id in use_channels], axis=0)
electric_field = NuRadioReco.framework.electric_field.ElectricField(use_channels, efield_position)
electric_field.set_frequency_spectrum(efield3_f, station.get_channel(use_channels[0]).get_sampling_rate())
electric_field.set_parameter(efp.zenith, zenith)
electric_field.set_parameter(efp.azimuth, azimuth)
# figure out the timing of the E-field
t_shifts = np.zeros(V.shape[0])
site = det.get_site(station_id)
if(zenith > 0.5 * np.pi):
logger.warning("Module has not been optimized for neutrino reconstruction yet. Results may be nonsense.")
refractive_index = ice.get_refractive_index(-1, site) # if signal comes from below, use refractivity at antenna position
else:
refractive_index = ice.get_refractive_index(1, site) # if signal comes from above, in-air propagation speed
for i_ch, channel_id in enumerate(use_channels):
antenna_position = det.get_relative_position(station.get_id(), channel_id)
t_shifts[i_ch] = station.get_channel(channel_id).get_trace_start_time() - geo_utl.get_time_delay_from_direction(zenith, azimuth, antenna_position, n=refractive_index)
electric_field.set_trace_start_time(t_shifts.max())
station.add_electric_field(electric_field)