import logging
import numpy as np
import radiotools.helper as hp
from scipy.signal import hilbert, resample
from NuRadioReco.utilities import fft
from NuRadioReco.modules.base.module import register_run
from NuRadioReco.framework.parameters import stationParameters, channelParameters, showerParameters
from NuRadioReco.modules.LOFAR.beamforming_utilities import mini_beamformer
[docs]
def find_snr_of_timeseries(timeseries, sampling_rate=None, window_start=0, window_end=-1, noise_start=0, noise_end=-1,
resample_factor=1, full_output=False):
r"""
Return the signal-to-noise ratio (SNR) of a given time trace, defined as
..math ::
\frac{max( | Hilbert(timeseries[window]) | )}{ STD( Hilbert(timeseries[noise]) ) }
The signal window and noise window are controlled through the extra parameters to the function.
Parameters
----------
timeseries: array-like
The time trace
sampling_rate : float
The sampling rate of the time trace (only needed if full_output=True)
window_start : int
window_end : int
We look for the peak inside the resampled array `timeseries[window_start:window_end]`
noise_start : int
noise_end : int
The array `timeseries[noise_start:noise_end]` is used to calculate the noise level
resample_factor : int, default=1
Factor with which the timeseries will be resampled, needs to be integer > 0
full_output : bool, default=False
If True, also the peak of the envelope, RMS and signal time are returned
Returns
-------
The SNR of the time trace
"""
resampled_window = resample(timeseries[window_start:window_end], (window_end - window_start) * resample_factor)
analytic_signal = hilbert(resampled_window)
amplitude_envelope = np.abs(analytic_signal)
peak = np.max(
amplitude_envelope
)
if full_output:
resampled_max = np.argmax(analytic_signal)
resampled_max_time = resampled_max / sampling_rate / resample_factor
window_start_time = window_start / sampling_rate
signal_time = window_start_time + resampled_max_time
# (AC) The Hilbert envelope has a mean that is nonzero
# the stddev only takes the variations around the mean
# whereas the (real) RMS takes the square of all values (incl the mean), then sqrt
# TEST if this SNR definition gives good results!
# This is what seems to have been done in PyCRTools. See pulseenvelope.py:233
# and mMath.cc function hMaxSNR
std = np.std(
np.abs(hilbert(timeseries[noise_start:noise_end]))
)
rms = np.sqrt(
np.mean(
np.abs(hilbert(timeseries[noise_start:noise_end])) ** 2
)
)
if full_output:
return peak / std, peak, rms, signal_time
return peak / std
[docs]
class stationPulseFinder:
"""
Look for significant pulses in every station. The module uses beamforming to enhance the sensitivity in
direction estimated from the LORA particle data. It also identifies the channels which have an SNR good
enough to use for direction fitting later.
"""
def __init__(self):
self.logger = logging.getLogger('NuRadioReco.stationPulseFinder')
self.__window_size = None
self.__noise_window_size = None
self.__snr_cr = None
self.__min_good_channels = None
self.direction_cartesian = None # The zenith and azimuth pointing towards where to beamform.
[docs]
def begin(self, window=256, noise_window=10000, cr_snr=6.5, good_channels=6, logger_level=logging.NOTSET):
"""
Sets the window size to use for pulse finding, as well as the number of samples away from the pulse
to use for noise measurements. The function also defines what an acceptable SNR is to consider a
cosmic-ray signal to be in the trace, as well as the number of good channels a station should have
to be kept for further processing.
Parameters
----------
window : int, default=256
Size of the window to look for pulse
noise_window : int, default=10000
The trace used for noise characterisation goes from sample 0 to the start of the pulse searching
window minus this number.
cr_snr : float, default=3
The minimum SNR a channel should have to be considered having a CR signal.
good_channels : int, default=6
The minimum number of good channels a station should have in order be "triggered".
logger_level : int, default=logging.NOTSET
Use this parameter to override the logging level for this module.
"""
# TODO: find window size used in PyCRTools
self.__window_size = window
self.__noise_window_size = noise_window
self.__snr_cr = cr_snr
self.__min_good_channels = good_channels
self.logger.setLevel(logger_level)
def _signal_windows_polarisation(self, station, channel_positions, channel_ids_per_pol):
"""
Considers the channel groups given by `channel_ids_per_pol` one by one and beamforms the traces
in the direction of `stationPulseFinder.direction_cartesian`. It then calculates the maximum of the
amplitude envelope without resampling the signal, and saves the corresponding index with the indices
for pulse finding in a tuple, which it returns.
Parameters
----------
station : Station object
The station to analyse.
channel_positions : np.ndarray
The array of channels positions, to be extracted from the detector description.
channel_ids_per_pol : list[list]
A list of channel IDs grouped per polarisation. The IDs in each sublist will be used together for
beamforming in the direction given by `beamform_direction`.
Returns
-------
tuple of floats
* The index in `channel_ids_per_pol` which contains the strongest signal
* The start index of the pulse window
* The stop index of the pulse window
"""
# Assume all the channels have the same frequency content and sampling rate
frequencies = station.get_channel(channel_ids_per_pol[0][0]).get_frequencies()
sampling_rate = station.get_channel(channel_ids_per_pol[0][0]).get_sampling_rate()
values_per_pol = []
# the first few samples are tapered with half-Hann, which would blow up the SNR
noise_window_start = 10000
noise_window_end = noise_window_start + self.__noise_window_size
for i, channel_ids in enumerate(channel_ids_per_pol):
all_spectra = np.array([station.get_channel(channel).get_frequency_spectrum() for channel in channel_ids])
beamed_fft = mini_beamformer(all_spectra, frequencies, channel_positions, self.direction_cartesian)
beamed_timeseries = fft.freq2time(beamed_fft, sampling_rate,
n=station.get_channel(channel_ids[0]).get_trace().shape[0])
analytic_signal = hilbert(beamed_timeseries)
amplitude_envelope = np.abs(analytic_signal)
signal_window_start = int(
np.argmax(amplitude_envelope) - self.__window_size / 2
)
signal_window_end = int(
np.argmax(amplitude_envelope) + self.__window_size / 2
)
snr = find_snr_of_timeseries(beamed_timeseries,
window_start=signal_window_start, window_end=signal_window_end,
noise_start=noise_window_start, noise_end=noise_window_end)
if snr > self.__snr_cr:
station.set_parameter(stationParameters.triggered, True)
else:
station.set_parameter(stationParameters.triggered, False)
values_per_pol.append([snr, signal_window_start, signal_window_end])
# SNR is technically better than just the max(envelope) as a measure for strongest polarization
values_per_pol = np.asarray(values_per_pol)
dominant = np.argmax(values_per_pol[:, 0])
window_start, window_end = values_per_pol[dominant][1:]
# Save window of strongest polarisation in all channels
for channel in station.iter_channels():
channel.set_parameter(channelParameters.signal_regions, [int(window_start), int(window_end)])
channel.set_parameter(channelParameters.noise_regions, [int(noise_window_start), int(noise_window_end)])
return dominant
def _find_good_channels(self, station):
"""
Loop over all channels in the station and return an array which contains booleans indicating whether
the channel at that index has an SNR higher than the minimum required one
(set in the `stationPulseFinder.begin()` function).
Parameters
----------
station : Station object
The station to process
"""
good_channels = []
for channel in station.iter_channels():
signal_window = channel.get_parameter(channelParameters.signal_regions)
noise_window = channel.get_parameter(channelParameters.noise_regions)
self.logger.debug(f'Channel {channel.get_id()}: looking for signal in indices {signal_window}')
self.logger.debug(f'Channel {channel.get_id()}: using {noise_window} as noise trace')
snr, peak, rms, signal_time = find_snr_of_timeseries(channel.get_trace(),
sampling_rate=channel.get_sampling_rate(),
window_start=signal_window[0], window_end=signal_window[1],
noise_start=noise_window[0], noise_end=noise_window[1],
resample_factor=16, full_output=True)
channel.set_parameter(channelParameters.SNR, snr)
channel.set_parameter(channelParameters.noise_rms, rms)
channel.set_parameter(channelParameters.signal_time, signal_time)
channel.set_parameter(channelParameters.maximum_amplitude_envelope, peak)
channel.set_parameter(channelParameters.maximum_amplitude, np.max(channel.get_trace()))
if snr > self.__snr_cr:
good_channels.append(channel.get_id())
return good_channels
def _check_station_triggered(self, station):
"""
Check if the station has been triggered by a radio signal. This functions verifies there is a significant
pulse in the radio signal after beamforming, and also checks if there enough channels (i.e. LOFAR dipoles)
that contain a good signal.
Parameters
----------
station : Station object
The station to analyse
"""
if station.get_parameter(stationParameters.triggered):
good_channels_station = self._find_good_channels(
station
)
self.logger.debug(f'Station {station.get_id()} has {len(good_channels_station)} good antennas')
if len(good_channels_station) < self.__min_good_channels:
self.logger.warning(f'Station {station.get_id()} has only {len(good_channels_station)} antennas '
f'with an SNR higher than {self.__snr_cr}, while there '
f'are at least {self.__min_good_channels} required')
station.set_parameter(stationParameters.triggered, False) # stop from further processing
[docs]
@register_run()
def run(self, event, detector):
"""
Run a beamformer on all stations in `event` to search for significant pulses.
Parameters
----------
event : Event object
The event on which to apply the pulse finding.
detector : Detector object
The detector related to the event.
"""
zenith = event.get_hybrid_information().get_hybrid_shower("LORA").get_parameter(showerParameters.zenith)
azimuth = event.get_hybrid_information().get_hybrid_shower("LORA").get_parameter(showerParameters.azimuth)
self.direction_cartesian = hp.spherical_to_cartesian(
zenith, azimuth
)
for station in event.get_stations():
station_id = station.get_id()
# Get the channel IDs grouped per polarisation (i.e. dipole orientation)
# -> take these from Event, cause some might already be thrown out!
station_even_list = []
station_odd_list = []
for channel in station.iter_channels():
if channel.get_id() == channel.get_group_id():
station_even_list.append(channel.get_id())
else:
station_odd_list.append(channel.get_id())
# Find the antenna positions by only looking at the channels from a given polarisation
position_array = [
# detector.get_absolute_position(station_id) +
# only use the relative position since the absolute position would introduce a time shift
# in the beamformed timeseries which would lead to a time shift in the signal window.
detector.get_relative_position(station_id, channel_id)
for channel_id in station_even_list
]
position_array = np.asarray(position_array)
# Find polarisation with max envelope amplitude and calculate pulse search window from it
dominant_pol = self._signal_windows_polarisation(
station, position_array, [station_even_list, station_odd_list]
)
# Save the antenna orientation which contains the strongest pulse
if dominant_pol == 0:
dominant_orientation = detector.get_antenna_orientation(station_id, station_even_list[0])
elif dominant_pol == 1:
dominant_orientation = detector.get_antenna_orientation(station_id, station_odd_list[0])
else:
raise ValueError(f"Dominant polarisation {dominant_pol} not recognised")
station.set_parameter(stationParameters.cr_dominant_polarisation, dominant_orientation)
# Go over all the channels to check if individual SNR is strong enough
self._check_station_triggered(
station
)