Source code for NuRadioReco.modules.LOFAR.stationPulseFinder

import logging
import numpy as np
import radiotools.helper as hp

from scipy.signal import hilbert

from NuRadioReco.utilities import fft
from NuRadioReco.modules.base.module import register_run
from NuRadioReco.framework.parameters import stationParameters, channelParameters, showerParameters
from NuRadioReco.modules.LOFAR.beamforming_utilities import mini_beamformer


[docs]def find_snr_of_timeseries(timeseries, window_start=0, window_end=-1, noise_start=0, noise_end=-1): """ Return the signal-to-noise ratio (SNR) of a given time trace. The additional parameters allow to select a window where the signal is and a separate window where there is only noise. Parameters ---------- timeseries: list or np.ndarray The time trace window_start : int window_end : int We look for the peak inside the array `timeseries[window_start:window_end]` noise_start : int noise_end : int The array `timeseries[noise_start:noise_end]` is used to calculate the noise level Returns ------- The SNR of the time trace """ analytic_signal = hilbert(timeseries) amplitude_envelope = np.abs(analytic_signal) rms = np.sqrt( np.mean(amplitude_envelope[noise_start:noise_end] ** 2) ) peak = np.max( amplitude_envelope[window_start:window_end] ) return peak / rms
[docs]class stationPulseFinder: """ Look for significant pulses in every station. The module uses beamforming to enhance the sensitivity in direction estimated from the LORA particle data. It also identifies the channels which have an SNR good enough to use for direction fitting later. """ def __init__(self): self.logger = logging.getLogger('NuRadioReco.LOFAR.stationPulseFinder') self.__window_size = None self.__noise_away_from_pulse = None self.__snr_cr = None self.__min_good_channels = None self.direction_cartesian = None # The zenith and azimuth pointing towards where to beamform.
[docs] def begin(self, window=500, noise_window=10000, cr_snr=3, good_channels=6, logger_level=logging.NOTSET): """ Sets the window size to use for pulse finding, as well as the number of samples away from the pulse to use for noise measurements. The function also defines what an acceptable SNR is to consider a cosmic-ray signal to be in the trace, as well as the number of good channels a station should have to be kept for further processing. Parameters ---------- window : int, default=500 Size of the window to look for pulse noise_window : int, default=10000 The trace used for noise characterisation goes from sample 0 to the start of the pulse searching window minus this number. cr_snr : float, default=3 The minimum SNR a channel should have to be considered having a CR signal. good_channels : int, default=6 The minimum number of good channels a station should have in order be "triggered". logger_level : int, default=logging.NOTSET Use this parameter to override the logging level for this module. """ # TODO: find window size used in PyCRTools self.__window_size = window self.__noise_away_from_pulse = noise_window self.__snr_cr = cr_snr self.__min_good_channels = good_channels self.logger.setLevel(logger_level)
def _signal_windows_polarisation(self, station, channel_positions, channel_ids_per_pol): """ Considers the channel groups given by `channel_ids_per_pol` one by one and beamforms the traces in the direction of `stationPulseFinder.direction_cartesian`. It then calculates the maximum of the amplitude envelope, and saves the corresponding index with the indices for pulse finding in a tuple, which it returns. Parameters ---------- station : Station object The station to analyse. channel_positions : np.ndarray The array of channels positions, to be extracted from the detector description. channel_ids_per_pol : list[list] A list of channel IDs grouped per polarisation. The IDs in each sublist will be used together for beamforming in the direction given by `beamform_direction`. Returns ------- tuple of floats * The index in `channel_ids_per_pol` which contains the strongest signal * The start index of the pulse window * The stop index of the pulse window """ # Assume all the channels have the same frequency content and sampling rate frequencies = station.get_channel(channel_ids_per_pol[0][0]).get_frequencies() sampling_rate = station.get_channel(channel_ids_per_pol[0][0]).get_sampling_rate() values_per_pol = [] for channel_ids in channel_ids_per_pol: all_traces = np.array([station.get_channel(channel).get_frequency_spectrum() for channel in channel_ids]) beamed_fft = mini_beamformer(all_traces, frequencies, channel_positions, self.direction_cartesian) beamed_timeseries = fft.freq2time(beamed_fft, sampling_rate, n=all_traces.shape[1]) analytic_signal = hilbert(beamed_timeseries) amplitude_envelope = np.abs(analytic_signal) signal_window_start = int( np.argmax(amplitude_envelope) - self.__window_size / 2 ) signal_window_end = int( np.argmax(amplitude_envelope) + self.__window_size / 2 ) values_per_pol.append([np.max(amplitude_envelope), signal_window_start, signal_window_end]) values_per_pol = np.asarray(values_per_pol) dominant = np.argmax(values_per_pol[:, 0]) window_start, window_end = values_per_pol[dominant][1:] return dominant, window_start, window_end def _station_has_cr(self, station, channel_positions, channel_ids_per_pol, signal_window=None, noise_window=None): """ Beamform the station towards the direction given by `stationPulseFinder.direction_cartesian` and check if there is any significant signal in the trace. If this is the case, the `stationParameter.triggered` value is set to `True`. Parameters ---------- station : Station object The station to process channel_positions : np.ndarray The array of channels positions, to be extracted from the detector description channel_ids_per_pol : list[list] A list of channel IDs grouped per polarisation. signal_window : array-like, default=[0, -1] A list containing the first and last index of the trace where to look for a pulse noise_window : array-like, default=[0, -1] A list containing the first and last index of the trace to use for noise characterisation """ if signal_window is None: signal_window = [0, -1] if noise_window is None: noise_window = [0, -1] frequencies = station.get_channel(channel_ids_per_pol[0][0]).get_frequencies() sampling_rate = station.get_channel(channel_ids_per_pol[0][0]).get_sampling_rate() for channel_ids in channel_ids_per_pol: all_traces = np.array([station.get_channel(channel).get_frequency_spectrum() for channel in channel_ids]) beamed_fft = mini_beamformer(all_traces, frequencies, channel_positions, self.direction_cartesian) beamed_timeseries = fft.freq2time(beamed_fft, sampling_rate, n=all_traces.shape[1]) snr = find_snr_of_timeseries(beamed_timeseries, window_start=signal_window[0], window_end=signal_window[1], noise_start=noise_window[0], noise_end=noise_window[1]) if snr > self.__snr_cr: station.set_parameter(stationParameters.triggered, True) return True # no need to check second polarisation if CR found return False def _find_good_channels(self, station, signal_window=None, noise_window=None): """ Loop over all channels in the station and return an array which contains booleans indicating whether the channel at that index has an SNR higher than the minimum required one (set in the `stationPulseFinder.begin()` function). Parameters ---------- station : Station object The station to process signal_window : array-like, default=[0, -1] A list containing the first and last index of the trace where to look for a pulse noise_window : array-like, default=[0, -1] A list containing the first and last index of the trace to use for noise characterisation """ if signal_window is None: signal_window = [0, -1] if noise_window is None: noise_window = [0, -1] good_channels = [] for channel in station.iter_channels(): snr = find_snr_of_timeseries(channel.get_trace(), window_start=signal_window[0], window_end=signal_window[1], noise_start=noise_window[0], noise_end=noise_window[1]) channel.set_parameter(channelParameters.SNR, snr) if snr > self.__snr_cr: good_channels.append(channel.get_id()) return good_channels def _check_station_triggered(self, station, channel_positions, channel_ids_per_pol, signal_window, noise_window): """ Check if the station has been triggered by a radio signal. This functions verifies there is a significant pulse in the radio signal after beamforming, and also checks if there enough channels (i.e. LOFAR dipoles) that contain a good signal. Parameters ---------- station : Station object The station to analyse """ self.logger.debug(f'Looking for signal in indices {signal_window}') self.logger.debug(f'Using {noise_window} as noise trace') cr_found = self._station_has_cr( station, channel_positions, channel_ids_per_pol, signal_window=signal_window, noise_window=noise_window ) if cr_found: good_channels_station = self._find_good_channels( station, signal_window=signal_window, noise_window=noise_window ) self.logger.debug(f'Station {station.get_id()} has {len(good_channels_station)} good antennas') if len(good_channels_station) < self.__min_good_channels: self.logger.warning(f'There are only {len(good_channels_station)} antennas ' f'with an SNR higher than {self.__snr_cr}, while there ' f'are at least {self.__min_good_channels} required') station.set_parameter(stationParameters.triggered, False) # stop from further processing
[docs] @register_run() def run(self, event, detector): """ Run a beamformer on all stations in `event` to search for significant pulses. Parameters ---------- event : Event object The event on which to apply the pulse finding. detector : Detector object The detector related to the event. """ zenith = event.get_hybrid_information().get_hybrid_shower("LORA").get_parameter(showerParameters.zenith) azimuth = event.get_hybrid_information().get_hybrid_shower("LORA").get_parameter(showerParameters.azimuth) self.direction_cartesian = hp.spherical_to_cartesian( zenith, azimuth ) for station in event.get_stations(): station_id = station.get_id() # Get the channel IDs grouped per polarisation (i.e. dipole orientation) ant_same_orientation = detector.get_parallel_channels(station_id) # Find the antenna positions by only looking at the channels from a given polarisation position_array = [ detector.get_absolute_position(station_id) + detector.get_relative_position(station_id, channel_id) for channel_id in ant_same_orientation[0] ] position_array = np.asarray(position_array) # Find polarisation with max envelope amplitude and calculate pulse search window from it dominant_pol, pulse_window_start, pulse_window_end = self._signal_windows_polarisation( station, position_array, ant_same_orientation ) # Save the antenna orientation which contains the strongest pulse station.set_parameter(stationParameters.cr_dominant_polarisation, detector.get_antenna_orientation(station_id, ant_same_orientation[dominant_pol][0])) # Check if the station has a strong enough signal signal_window = [int(pulse_window_start), int(pulse_window_end)] noise_window = [0, int(pulse_window_start - self.__noise_away_from_pulse)] self._check_station_triggered(station, position_array, ant_same_orientation, signal_window, noise_window)
[docs] def end(self): pass