Source code for NuRadioReco.modules.LOFAR.beamformingDirectionFitter_LOFAR

import logging
import numpy as np
import matplotlib.pyplot as plt
import radiotools.helper as hp

from scipy import constants
from scipy.signal import hilbert
from scipy.optimize import minimize, fmin_powell, Bounds

from NuRadioReco.utilities import fft
from NuRadioReco.utilities import units
from NuRadioReco.framework.parameters import stationParameters, channelParameters, showerParameters
from NuRadioReco.modules.base.module import register_run
from NuRadioReco.modules.voltageToEfieldConverter import voltageToEfieldConverter
from NuRadioReco.modules.LOFAR.beamforming_utilities import mini_beamformer

lightspeed = constants.c * units.m / units.s


[docs] class beamformingDirectionFitter: """ Fits the direction per station using interferometry between all the channels with a good enough signal. """ def __init__(self): self.logger = logging.getLogger("NuRadioReco.LOFAR.beamFormingDirectionFitter") self.__max_iter = None self.__cr_snr = None
[docs] def begin(self, max_iter, cr_snr=3, logger_level=logging.NOTSET): """ Set the values for the fitting procedures. Parameters ---------- max_iter : int The maximum number of iterations to use during the fitting procedure cr_snr : float, default=3 The minimum SNR a channel should have to be considered having a CR signal. logger_level : int, default=logging.NOTSET Use this parameter to override the logging level for this module. """ self.__max_iter = max_iter self.__cr_snr = cr_snr self.logger.setLevel(logger_level)
def _direction_fit(self, station, ant_positions): """ Fit the arrival direction by iteratively beamforming the signal and maximising the peak of the time trace. Parameters ---------- station : Station object The station for which to fit the direction ant_positions : np.ndarray, 2D The array of antenna positions, to be extracted from the detector description. """ freq = station.get_electric_fields()[0].get_frequencies() # Determine dominant polarisation in Efield by looking for strongest signal in 5 randomly selected traces random_traces = np.random.choice(station.get_electric_fields(), size=5) dominant_pol_traces = [] for trace in random_traces: trace_envelope = np.abs(hilbert(trace.get_trace(), axis=0)) dominant_pol_traces.append(np.argmax(np.max(trace_envelope, axis=1))) dominant_pol = np.argmax(np.bincount(dominant_pol_traces)) self.logger.debug(f"Dominant polarisation is {dominant_pol}") # Collect the Efield traces fft_traces = np.array([trace.get_frequency_spectrum()[dominant_pol] for trace in station.get_electric_fields()]) def negative_beamed_signal(direction): theta = direction[0] phi = direction[1] my_direction_cartesian = hp.spherical_to_cartesian(theta, phi) my_out = mini_beamformer(fft_traces, freq, ant_positions, my_direction_cartesian) timeseries = fft.freq2time(my_out, 200 * units.MHz) return -100 * np.max(timeseries ** 2) start_direction = np.array([station.get_parameter(stationParameters.zenith) / units.rad, station.get_parameter(stationParameters.azimuth) / units.rad]) self.logger.debug(f"Initial guess for fit routine is {start_direction}") # Using "minimize" with the "Powell" method allows to use the same minimization routine as before, # but also add bounds to the fit. However, this changes the result of the fitting routine, even if # without bounds the fit converges within the bounds. So currently we stick to the "fmin_powell" method. # fit_result = minimize(negative_beamed_signal, start_direction, # method='Powell', bounds=Bounds((0, -np.pi), (np.pi / 2, np.pi), (True, True)), # options={'maxiter': self.__max_iter, 'xtol': 1.0, 'disp': False, # 'direc': np.array([[0.1, 0], [0, 0.1]])}) # # self.logger.debug(f"Fit returned the following message: {fit_result.message}") # if not fit_result.success: # self.logger.warning(f"The fit failed with the following message: {fit_result.message}") # raise RuntimeError # # fit_direction = fit_result.x fit_result = fmin_powell(negative_beamed_signal, start_direction, maxiter=self.__max_iter, xtol=1.0 * units.deg, direc=np.array([[2.0 * units.deg, 0], [0, 2.0 * units.deg]]), disp=False, full_output=True) fit_direction = fit_result[0] self.logger.debug(f"Fit finished with following direc vector: {fit_result[2]}") direction_cartesian = hp.spherical_to_cartesian(*fit_direction) out = mini_beamformer(fft_traces, freq, ant_positions, direction_cartesian) return fit_direction, out
[docs] @register_run() def run(self, event, detector): """ reconstruct signal arrival direction for all events through beam forming. https://arxiv.org/pdf/1009.0345.pdf Parameters ---------- event: Event The event to run the module on detector: Detector The detector object """ converter = voltageToEfieldConverter() converter.begin() for station in event.get_stations(): if not station.get_parameter(stationParameters.triggered): # Not triggered means no reliable pulse found or not enough antennas to do the fit continue zenith = event.get_hybrid_information().get_hybrid_shower("LORA").get_parameter(showerParameters.zenith) azimuth = event.get_hybrid_information().get_hybrid_shower("LORA").get_parameter(showerParameters.azimuth) # Get all group IDs which are still present in the Station station_channel_group_ids = set([channel.get_group_id() for channel in station.iter_channels()]) position_array = [] good_antennas = [] for group_id in station_channel_group_ids: channels = [channel for channel in station.iter_channel_group(group_id)] # Only use the channels with an acceptable SNR good_snr = False for channel in channels: if channel.get_parameter(channelParameters.SNR) > self.__cr_snr: good_snr = True if good_snr: position_array.append( detector.get_absolute_position(station.get_id()) + detector.get_relative_position(station.get_id(), channels[0].get_id()) ) # the position are the same for every polarisation good_antennas.append((channels[0].get_id(), channels[1].get_id())) station.set_parameter(stationParameters.zenith, zenith) station.set_parameter(stationParameters.azimuth, azimuth) direction_difference = np.asarray([90, 90]) * units.deg while direction_difference[0] > 0.5 * units.deg or direction_difference[1] > 0.5 * units.deg: # Make sure all the previously calculated Efields are removed station.set_electric_fields([]) # Unfold antenna response for good antennas for ant in good_antennas: converter.run(event, station, detector, use_channels=ant) # Perform the direction fit on the station try: direction_fit, freq_spectrum = self._direction_fit( station, position_array ) except RuntimeError: self.logger.error(f"Direction fit could not be completed for station CS{station.get_id():03d}") break # Check if fit produced unphysical results if direction_fit[0] > 90 * units.deg: self.logger.warning(f"Zenith angle {direction_fit[0] / units.deg} is larger than 90 degrees!") self.logger.error(f"Direction fit could not be completed for station CS{station.get_id():03d}") break # See if the fit converged zenith_diff = np.abs(station.get_parameter(stationParameters.zenith) / units.rad - direction_fit[0]) azimuth_diff = np.abs(station.get_parameter(stationParameters.azimuth) / units.rad - direction_fit[1]) direction_difference = [zenith_diff, azimuth_diff] # Bookkeeping station.set_parameter(stationParameters.zenith, direction_fit[0]) station.set_parameter(stationParameters.azimuth, direction_fit[1]) self.logger.debug('Difference after another fit iteration is %s;' % direction_difference) self.logger.debug('Direction after this fit iteration is %s;' % direction_fit) self.logger.info(f"Azimuth (wrt to North) and elevation for station CS{station.get_id():03d}:") self.logger.info((90 - station.get_parameter(stationParameters.azimuth) / units.deg, 90 - station.get_parameter(stationParameters.zenith) / units.deg)) station.set_parameter(stationParameters.cr_zenith, station.get_parameter(stationParameters.zenith)) station.set_parameter(stationParameters.cr_azimuth, station.get_parameter(stationParameters.azimuth))
[docs] def end(self): pass