Source code for NuRadioReco.modules.LOFAR.beamformingDirectionFitter_LOFAR

import logging
import numpy as np
import matplotlib.pyplot as plt
import radiotools.helper as hp

from scipy import constants
from scipy.signal import hilbert
from scipy.optimize import fmin_powell

from NuRadioReco.utilities import fft
from NuRadioReco.utilities import units
from NuRadioReco.framework.parameters import stationParameters, channelParameters, showerParameters
from NuRadioReco.modules.base.module import register_run
from NuRadioReco.modules.voltageToEfieldConverter import voltageToEfieldConverter
from NuRadioReco.modules.LOFAR.beamforming_utilities import beamformer


lightspeed = constants.c * units.m / units.s


[docs]def geometric_delays(ant_positions, sky): """ Returns geometric delays in a matrix. Parameters ---------- ant_positions : np.ndarray The antenna positions to use, formatted as a (nr_of_ant, 3) shaped array. sky : np.ndarray The unit vector pointing to the arrival direction, in cartesian coordinates. Returns ------- delays : np.ndarray """ delays = np.dot(ant_positions, sky) delays /= -1 * lightspeed return delays
[docs]class beamformingDirectionFitter: """ Fits the direction per station using interferometry between all the channels with a good enough signal. """ def __init__(self): self.logger = logging.getLogger("NuRadioReco.LOFAR.beamFormingDirectionFitter") self.__max_iter = None self.__cr_snr = None
[docs] def begin(self, max_iter, cr_snr=3, logger_level=logging.NOTSET): """ Set the values for the fitting procedures. Parameters ---------- max_iter : int The maximum number of iterations to use during the fitting procedure cr_snr : float, default=3 The minimum SNR a channel should have to be considered having a CR signal. logger_level : int, default=logging.NOTSET Use this parameter to override the logging level for this module. """ self.__max_iter = max_iter self.__cr_snr = cr_snr self.logger.setLevel(logger_level)
def _direction_fit(self, station, ant_positions): """ Fit the arrival direction by iteratively beamforming the signal and maximising the peak of the time trace. Parameters ---------- station : Station object The station for which to fit the direction ant_positions : np.ndarray, 2D The array of antenna positions, to be extracted from the detector description. """ freq = station.get_electric_fields()[0].get_frequencies() # Determine dominant polarisation in Efield by looking for strongest signal in 5 randomly selected traces random_traces = np.random.choice(station.get_electric_fields(), size=5) dominant_pol_traces = [] for trace in random_traces: trace_envelope = np.abs(hilbert(trace.get_trace(), axis=0)) dominant_pol_traces.append(np.argmax(np.max(trace_envelope, axis=1))) dominant_pol = np.argmax(np.bincount(dominant_pol_traces)) self.logger.debug(f"Dominant polarisation is {dominant_pol}") # Collect the Efield traces - the e-field from the converter has eR as [0] component -> do +1 in index fft_traces = np.array([trace.get_frequency_spectrum()[dominant_pol + 1] for trace in station.get_electric_fields()]) def negative_beamed_signal(direction): theta = direction[0] phi = direction[1] direction_cartesian = hp.spherical_to_cartesian(theta, phi) delays = geometric_delays(ant_positions, direction_cartesian) out = beamformer(fft_traces, freq, delays) timeseries = fft.freq2time(out, 200 * units.MHz) # TODO: is this really necessary? return -100 * np.max(timeseries ** 2) start_direction = np.array([station.get_parameter(stationParameters.zenith) / units.rad, station.get_parameter(stationParameters.azimuth) / units.rad]) fit_direction = fmin_powell(negative_beamed_signal, start_direction, maxiter=self.__max_iter, xtol=1.0, disp=False) direction_cartesian = hp.spherical_to_cartesian(*fit_direction) delays = geometric_delays(ant_positions, direction_cartesian) out = beamformer(fft_traces, freq, delays) fit_direction = hp.cartesian_to_spherical(*direction_cartesian) theta = fit_direction[0] phi = fit_direction[1] if phi < 0: # Radiotools uses np.arctan2, which returns phi in the interval [-pi, pi] -> map to [0, 2*pi] phi += 360 * units.deg return [theta, phi], out
[docs] @register_run() def run(self, event, detector): """ reconstruct signal arrival direction for all events through beam forming. https://arxiv.org/pdf/1009.0345.pdf Parameters ---------- event: Event The event to run the module on detector: Detector The detector object """ converter = voltageToEfieldConverter() converter.begin() for station in event.get_stations(): if not station.get_parameter(stationParameters.triggered): # Not triggered means no reliable pulse found or not enough antennas to do the fit continue zenith = event.get_hybrid_information().get_hybrid_shower("LORA").get_parameter(showerParameters.zenith) azimuth = event.get_hybrid_information().get_hybrid_shower("LORA").get_parameter(showerParameters.azimuth) # TODO: get this from Detector description station_channel_group_ids = set([channel.get_group_id() for channel in station.iter_channels()]) position_array = [] good_antennas = [] for group_id in station_channel_group_ids: channels = [channel for channel in station.iter_channel_group(group_id)] # Only use the channels with an acceptable SNR good_snr = False for channel in channels: if channel.get_parameter(channelParameters.SNR) > self.__cr_snr: good_snr = True if good_snr: position_array.append( detector.get_absolute_position(station.get_id()) + detector.get_relative_position(station.get_id(), channels[0].get_id()) ) # the position are the same for every polarisation good_antennas.append((channels[0].get_id(), channels[1].get_id())) station.set_parameter(stationParameters.zenith, zenith) station.set_parameter(stationParameters.azimuth, azimuth) direction_difference = np.asarray([100, 100]) while direction_difference[0] > 0.5 * units.deg and direction_difference[1] > 0.5 * units.deg: # Make sure all the previously calculated Efields are removed station.set_electric_fields([]) # Unfold antenna response for good antennas for ant in good_antennas: converter.run(event, station, detector, use_channels=ant) # Perform the direction fit on the station direction_fit, freq_spectrum = self._direction_fit( station, position_array ) # Check if fit produced unphysical results if direction_fit[0] > 90 * units.deg: self.logger.warning(f"Zenith angle {direction_fit[0] / units.deg} is larger than 90 degrees!") self.logger.error(f"Direction fit could not be completed for station CS{station.get_id():03d}") break # See if the fit converged zenith_diff = np.abs(station.get_parameter(stationParameters.zenith) / units.rad - direction_fit[0]) azimuth_diff = np.abs(station.get_parameter(stationParameters.azimuth) / units.rad - direction_fit[1]) direction_difference = [zenith_diff, azimuth_diff] # Bookkeeping station.set_parameter(stationParameters.zenith, direction_fit[0]) station.set_parameter(stationParameters.azimuth, direction_fit[1]) self.logger.debug('Difference after another fit iteration is %s;' % direction_difference) self.logger.debug('Direction after this fit iteration is %s;' % direction_fit) self.logger.info(f"Azimuth (wrt to North) and elevation for station CS{station.get_id():03d}:") self.logger.info((90 - station.get_parameter(stationParameters.azimuth) / units.deg, 90 - station.get_parameter(stationParameters.zenith) / units.deg)) station.set_parameter(stationParameters.cr_zenith, station.get_parameter(stationParameters.zenith)) station.set_parameter(stationParameters.cr_azimuth, station.get_parameter(stationParameters.azimuth))
[docs] def end(self): pass