Source code for NuRadioReco.modules.beamFormingDirectionFitter
import copy
import matplotlib.pyplot as plt
import numpy as np
import scipy.optimize as opt
import logging
import NuRadioReco.framework.base_trace
from NuRadioReco.utilities import ice
from NuRadioReco.utilities import geometryUtilities as geo_utl
from NuRadioReco.utilities import units
from NuRadioReco.framework.parameters import stationParameters as stnp
import NuRadioReco.modules.voltageToEfieldConverterPerChannel
import NuRadioReco.modules.electricFieldBandPassFilter
electricFieldBandPassFilter = NuRadioReco.modules.electricFieldBandPassFilter.electricFieldBandPassFilter()
voltageToEfieldConverterPerChannel = NuRadioReco.modules.voltageToEfieldConverterPerChannel.voltageToEfieldConverterPerChannel()
voltageToEfieldConverterPerChannel.begin()
[docs]def get_array_of_channels(station, det, zenith, azimuth, polarization):
"""
Returns an array of the channel traces that is cut to the physical overlapping time
Parameters
----------
station: Station
Station from which to take the channels
det: Detector
The detector description
zenith: float
Arrival zenith angle at antenna
azimuth: float
Arrival azimuth angle at antenna
polarization: int
0: eTheta
1: ePhi
"""
time_shifts = np.zeros(8)
t_geos = np.zeros(8)
sampling_rate = next(station.iter_channels()).get_sampling_rate()
station_id = station.get_id()
site = det.get_site(station_id)
for iCh, channel in enumerate(station.get_electric_fields()):
channel_id = channel.get_channel_ids()[0]
antenna_position = det.get_relative_position(station_id, channel_id)
# determine refractive index of signal propagation speed between antennas
refractive_index = ice.get_refractive_index(1, site) # if signal comes from above, in-air propagation speed
if(zenith > 0.5 * np.pi):
refractive_index = ice.get_refractive_index(antenna_position[2], site) # if signal comes from below, use refractivity at antenna position
refractive_index = 1.353
time_shift = -geo_utl.get_time_delay_from_direction(zenith, azimuth, antenna_position, n=refractive_index)
t_geos[iCh] = time_shift
time_shift += channel.get_trace_start_time()
time_shifts[iCh] = time_shift
delta_t = time_shifts.max() - time_shifts.min()
tmin = time_shifts.min()
tmax = time_shifts.max()
trace_length = station.get_electric_fields()[0].get_times()[-1] - station.get_electric_fields()[0].get_times()[0]
traces = []
n_samples = None
for iCh, channel in enumerate(station.get_electric_fields()):
tstart = delta_t - (time_shifts[iCh] - tmin)
tstop = tmax - time_shifts[iCh] - delta_t + trace_length
iStart = int(round(tstart * sampling_rate))
iStop = int(round(tstop * sampling_rate))
if(n_samples is None):
n_samples = iStop - iStart
if(n_samples % 2):
n_samples -= 1
trace = copy.copy(channel.get_trace()[polarization]) # copy to not modify data structure
trace = trace[iStart:(iStart + n_samples)]
base_trace = NuRadioReco.framework.base_trace.BaseTrace() # create base trace class to do the fft with correct normalization etc.
base_trace.set_trace(trace, sampling_rate)
traces.append(base_trace)
return traces
[docs]class beamFormingDirectionFitter:
"""
Fits the direction using interferometry between desired channels.
"""
def __init__(self):
self.__zenith = []
self.__azimuth = []
self.__delta_zenith = []
self.__delta_azimuth = []
self.__debug = None
self.begin()
self.logger = logging.getLogger("NuRadioReco.beamFormingDirectionFitter")
[docs] def begin(self, debug=False, log_level=logging.NOTSET):
self.logger.setLevel(log_level)
self.__debug = debug
[docs] def run(self, evt, station, det, polarization, n_index=None, channels=None, ZenLim=None,
AziLim=None):
"""
reconstruct signal arrival direction for all events through beam forming.
https://arxiv.org/pdf/1009.0345.pdf
Parameters
----------
evt: Event
The event to run the module on
station: Station
The station to run the module on
det: Detector
The detector description
polarization: int
0: eTheta
1: ePhi
n_index: float
the index of refraction
channels: list
the channel ids
ZenLim: 2-dim array/list of floats
the zenith angle limits for the fit
default if 0-90deg (upward coming signal)
AziLim: 2-dim array/list of floats
the azimuth angle limits for the fit
default is 0-360deg
"""
if channels is None:
channels = [4, 5, 6, 7]
if ZenLim is None:
ZenLim = [90 * units.deg, 180 * units.deg]
if AziLim is None:
AziLim = [0 * units.deg, 360 * units.deg]
def ll_regular_station(angles, evt, station, det, polarization, sampling_rate, positions, channels):
"""
Likelihood function for a four antenna ARIANNA station, using correction.
Using correlation, has no built in wrap around, pulse needs to be in the middle
"""
zenith = angles[0]
azimuth = angles[1]
station.set_parameter(stnp.zenith, zenith)
station.set_parameter(stnp.azimuth, azimuth)
station.set_electric_fields([]) # resets EFields, necessary
voltageToEfieldConverterPerChannel.run(evt, station, det, pol=polarization, debug=False) # Antenna response
electricFieldBandPassFilter.run(evt, station, det, passband=[120 * units.MHz, 300 * units.MHz], filter_type='butterabs')
Efields_object = get_array_of_channels(station, det, zenith, azimuth, polarization + 1)
Efields = []
Efield_Times = []
maximum = 0 # location at maximum among all traces
for chn in channels:
efield = Efields_object[chn]
Efield_Times.append(efield.get_times())
Efield = efield.get_trace()
if max(Efield) > maximum:
maximum = max(Efield)
Efields.append(Efield) # np.pad(Efield, (200,200), 'constant', constant_values=(0, 0)))
Efields[0] = Efields[0] / maximum # normalize all traces to remove small antenna responses, see last line of next for loop
for j in range(len(Efields) - 1):
Efields[j + 1] = Efields[j + 1] / maximum
N = len(Efields)
N_pairs = 0.5 * np.math.factorial(N) / np.math.factorial(N - 2)
cc = np.zeros(len(Efields[0]))
for j in range(N - 1): # finds the cc-beam taken from referenced paper above
for k in range(N - 1 - j):
cc = cc + Efields[k] * Efields[k + 1 + k]
cc = cc / N_pairs
cc = np.sign(cc) * np.sqrt(np.abs(cc))
cc = np.abs(cc)
n_bins = 2000
ave_cc = np.convolve(cc, np.ones((n_bins)) / float(n_bins), mode='same')
likelihood = -1 * np.max(ave_cc)
return likelihood
station_id = station.get_id()
positions = []
for chan in channels:
positions.append(det.get_relative_position(station_id, chan))
sampling_rate = station.get_channel(channels[0]).get_sampling_rate()
ll = opt.brute(
ll_regular_station,
ranges=(slice(ZenLim[0], ZenLim[1], 1.0 * units.deg), slice(AziLim[0], AziLim[1], 1.0 * units.deg)),
args=(evt, station, det, polarization, sampling_rate, positions, channels),
full_output=True,
finish=opt.fmin
) # slow but does the trick
station[stnp.zenith] = max(ZenLim[0], min(ZenLim[1], ll[0][0]))
station[stnp.azimuth] = ll[0][1]
if self.__debug:
# Show fit space
zen = np.arange(ZenLim[0], ZenLim[1], 1 * units.deg)
az = np.arange(AziLim[0], AziLim[1], 1 * units.deg)
x_plot = np.zeros(zen.shape[0] * az.shape[0])
y_plot = np.zeros(zen.shape[0] * az.shape[0])
z_plot = np.zeros(zen.shape[0] * az.shape[0])
i = 0
for z in zen:
for a in az:
# Evaluate fit function for grid
z_plot[i] = ll_regular_station([z, a], evt, station, det, polarization, sampling_rate, positions, channels)
x_plot[i] = a
y_plot[i] = z
i += 1
fig, ax = plt.subplots(1, 1)
ax.scatter(np.asarray(x_plot) / units.deg, np.asarray(y_plot) / units.deg, c=z_plot, cmap='gnuplot2_r', lw=0)
ax.scatter(np.rad2deg(ll[0][1]), np.rad2deg(ll[0][0]), marker='o', label='Fit')
ax.colorbar(label='Fit parameter')
ax.set_ylabel('Zenith [rad]')
ax.set_xlabel('Azimuth [rad]')
plt.tight_layout()
plt.legend()
plt.show()