Source code for NuRadioReco.modules.phasedarray.phasedArrayBase

import logging
import functools
import numpy as np
from scipy import constants
from scipy.signal import hilbert, firwin

from NuRadioReco.utilities import units, signal_processing
from NuRadioReco.modules.analogToDigitalConverter import analogToDigitalConverter


logger = logging.getLogger('NuRadioReco.PhasedArrayTriggerBase')
cspeed = constants.c * units.m / units.s

main_low_angle = np.deg2rad(-55.0)
main_high_angle = -1.0 * main_low_angle
default_angles = np.arcsin(np.linspace(np.sin(main_low_angle), np.sin(main_high_angle), 11))

[docs] class PhasedArrayBase(): """ Base class for all phased array trigger modules. """ def __init__(self, log_level=logging.NOTSET): self.__pre_trigger_time = None self.__debug = None logger.setLevel(log_level) self._adc_to_digital_converter = analogToDigitalConverter() self.buffered_delays = {} self.begin()
[docs] def begin(self, debug=False, pre_trigger_time=100 * units.ns): self.__pre_trigger_time = pre_trigger_time self.__debug = debug
def _get_antenna_positions(self, station, det, triggered_channels, component=2): """ Calculates the vertical coordinates of the antennas of the detector Parameters ---------- station: Station object Description of the current station det: Detector object Description of the current detector triggered_channels: array of ints Channels ids of the channels that form the primary phasing array. component: int (default: 2) Which cartesian coordinate to return Returns ------- ant_pos: dict Dictionary of keys=antenna and content=position """ return np.array([det.get_relative_position(station.get_id(), channel_id)[component] for channel_id in triggered_channels])
[docs] def calculate_time_delays( self, station, det, triggered_channels, phasing_angles=None, ref_index=1.75, sampling_frequency=None): """ Calculates the delays needed for phasing the array. Parameters ---------- station: Station object Description of the current station det: Detector object Description of the current detector triggered_channels: array of ints channels ids of the channels that form the primary phasing array if None, all channels are taken phasing_angles: array of float pointing angles for the primary beam ref_index: float refractive index for beam forming sampling_frequency: float Rate of the ADC used Returns ------- beam_rolls: array of dicts of keys=antenna and content=delay """ if station.get_id() in self.buffered_delays: return self.buffered_delays[station.get_id()] if triggered_channels is None: triggered_channels = [channel.get_id() for channel in station.iter_trigger_channels()] ant_z = self._get_antenna_positions(station, det, triggered_channels, 2) self.check_vertical_string(station, det, triggered_channels) ref_z = np.max(ant_z) try: # RNO-G Detector cable_delays = np.array([det.get_cable_delay(station.get_id(), channel_id, trigger=True) for channel_id in triggered_channels]) except: # Others without trigger kwarg cable_delays = np.array([det.get_cable_delay(station.get_id(), channel_id) for channel_id in triggered_channels]) group_delays = np.zeros(len(triggered_channels)) for i, channel in enumerate(triggered_channels): try: resp = det.get_signal_chain_response(station.get_id(), channel, trigger=True) group_delays[i] = resp.calculate_time_delay() except: pass beam_rolls = [] for angle in phasing_angles: delays = (ant_z - ref_z) / cspeed * ref_index * np.sin(angle) - cable_delays - group_delays delays -= np.min(delays) roll = np.array(np.round(delays * sampling_frequency)).astype(int) subbeam_rolls = dict(zip(triggered_channels, roll)) beam_rolls.append(subbeam_rolls) self.buffered_delays[station.get_id()] = beam_rolls return beam_rolls
[docs] def get_channel_trace_start_time(self, station, triggered_channels): """ Finds the start time of the desired traces. Throws an error if all the channels dont have the same start time. Parameters ---------- station: Station object Description of the current station triggered_channels: array of ints channels ids of the channels that form the primary phasing array if None, all channels are taken Returns ------- channel_trace_start_time: float Channel start time """ channel_trace_start_time = None for channel in station.iter_trigger_channels(use_channels=triggered_channels): if channel_trace_start_time is None: channel_trace_start_time = channel.get_trace_start_time() elif channel_trace_start_time != channel.get_trace_start_time(): raise ValueError( 'Phased array channels do not have matching trace start times. ' 'This module is not prepared for this case.') return channel_trace_start_time
[docs] def check_vertical_string(self, station, det, triggered_channels): """ Checks if the triggering antennas lie in a straight vertical line Throws error if not. Parameters ---------- station: Station object Description of the current station det: Detector object Description of the current detector triggered_channels: array of ints channels ids of the channels that form the primary phasing array if None, all channels are taken """ cut = 1.e-3 * units.m ant_x = self._get_antenna_positions(station, det, triggered_channels, 0) diff_x = np.abs(ant_x - ant_x[0]) ant_y = self._get_antenna_positions(station, det, triggered_channels, 1) diff_y = np.abs(ant_y - ant_y[0]) if sum(diff_x) > cut or sum(diff_y) > cut: raise NotImplementedError('The phased triggering array should lie on a vertical line')
[docs] def phase_signals(self, traces, beam_rolls, adc_output="voltage", saturation_bits=None): """ Phase signals together given the rolls Parameters ---------- traces: 2D array of floats Signals from the antennas to be phased together. beam_rolls: 2D array of floats The amount to shift each signal before phasing the traces together Returns ------- phased_traces: array of arrays """ phased_traces = [[] for i in range(len(beam_rolls))] for beam_i, subbeam_rolls in enumerate(beam_rolls): phased_trace = np.zeros(len(list(traces.values())[0])) for channel_id in traces: trace = traces[channel_id] phased_trace += np.roll(trace, int(subbeam_rolls[channel_id])) if adc_output == 'counts' and saturation_bits is not None: phased_trace[phased_trace>2**(saturation_bits-1)-1] = 2**(saturation_bits-1) - 1 phased_trace[phased_trace<-2**(saturation_bits-1)] = -2**(saturation_bits-1) phased_traces[beam_i] = phased_trace return phased_traces
[docs] def power_sum(self, coh_sum, window, step, adc_output='voltage', averaging_divisor=None): """ Calculate power summed over a length defined by 'window', overlapping at intervals defined by 'step' Parameters ---------- coh_sum: array of floats Phased signal to be integrated over window: int Power integral window Units of ADC time ticks step: int Time step in power integral. If equal to window, there is no time overlap in between neighboring integration windows Units of ADC time ticks. adc_output: string Options: - 'voltage' to store the ADC output as discretised voltage trace - 'counts' to store the ADC output in ADC counts averaging_divisor: int (default None) Power integral divisor for averaging. If not specified, the divisor is the same as the summation window. Returns ------- power: Integrated power in each integration window num_frames Number of integration windows calculated """ # If not specified, the divisor is the same as the summation window. if averaging_divisor is None: averaging_divisor = window if adc_output not in ['voltage', 'counts']: raise ValueError(f'ADC output type must be "counts" or "voltage". Currently set to: {adc_output}') num_frames = int(np.floor((len(coh_sum) - window) / step)) coh_sum_squared = (coh_sum * coh_sum) coh_sum_windowed = np.lib.stride_tricks.as_strided( coh_sum_squared, (num_frames, window), (coh_sum_squared.strides[0] * step, coh_sum_squared.strides[0])) power = np.sum(coh_sum_windowed, axis=1) return_power = power.astype(float) / averaging_divisor if adc_output == 'counts': return_power = np.round(return_power) return return_power, num_frames
[docs] def get_traces(self, station, det, triggered_channels=None, apply_digitization=False, adc_kwargs={}, upsampling_kwargs={}): """ Get the traces from the station object Parameters ---------- station: Station object Description of the current station det: Detector object Description of the current detector triggered_channels: array of ints (default: None) channels ids of the channels that form the primary phasing array if None, all channels are taken apply_digitization: bool If True, the traces are digitized adc_kwargs: dict Keyword arguments for the ADC upsampling_kwargs: dict Keyword arguments for the upsampling function Returns ------- traces: 2D array of floats Signals from the antennas to be phased together. sampling_frequency: float Sampling frequency of the traces """ adc_output = adc_kwargs.get('adc_output') if adc_output not in ['voltage', 'counts']: raise ValueError(f'ADC output type must be "counts" or "voltage". Currently set to: {adc_output}') traces = {} final_sampling_frequency = None for channel in station.iter_trigger_channels(use_channels=triggered_channels): if apply_digitization: trace, adc_sampling_frequency = self._adc_to_digital_converter.get_digital_trace( station, det, channel, return_sampling_frequency=True, **adc_kwargs) else: adc_sampling_frequency = channel.get_sampling_rate() trace = channel.get_trace() if upsampling_kwargs.get("upsampling_factor") >= 2: trace, adc_sampling_frequency = signal_processing.digital_upsampling( trace, adc_sampling_frequency, **upsampling_kwargs ) if final_sampling_frequency is None: final_sampling_frequency = adc_sampling_frequency elif final_sampling_frequency != adc_sampling_frequency: raise ValueError( 'Phased array channels do not have matching sampling frequencies. ' 'This module is not prepared for this case.') traces[channel.get_id()] = trace logger.debug("trigger channels: {}".format(traces.keys())) return traces, final_sampling_frequency
[docs] def hilbert_envelope(self, coh_sum, adc_output='voltage', ideal_transformer=False, hilbert_n_taps=31, hilbert_coeff_gain=1): if ideal_transformer: imag_an = np.imag(hilbert(coh_sum)) if adc_output=='counts': imag_an = np.round(imag_an) envelope = np.sqrt(coh_sum**2 + imag_an**2) else: assert hilbert_n_taps % 2 != 0, "Num taps MUST be odd for a hilbert transformer" sin_factor = np.sin(np.linspace(-(hilbert_n_taps-1)/2, (hilbert_n_taps-1)/2, hilbert_n_taps)) lp_filter = -1 * firwin(hilbert_n_taps, cutoff=0.25, pass_zero=False, fs=1) hil = 2 * sin_factor * lp_filter if hilbert_coeff_gain != 1: hil = np.round(hil * hilbert_coeff_gain) / hilbert_coeff_gain imag_an = np.convolve(coh_sum, hil, mode='full')[len(hil) // 2 : len(coh_sum) + len(hil) // 2] if adc_output == 'counts': imag_an = np.rint(imag_an) # simple square root calculation taken from section 13.2 in Understanding Digital Signal Processing by Richard Lyons envelope = np.max(np.array((coh_sum, imag_an)), axis=0) + (3 / 8) * np.min(np.array((coh_sum, imag_an)), axis=0) if adc_output == 'counts': envelope = np.rint(envelope) return envelope
[docs] def phased_trigger( self, station, det, threshold=60 * units.mV, triggered_channels=None, phasing_angles=default_angles, ref_index=1.75, apply_digitization=False, adc_kwargs=dict( adc_output='voltage'), upsampling_kwargs=dict( upsampling_factor=1, upsampling_method='fft', coeff_gain=128, filter_taps=31), saturation_bits=8, window=32, step=16, averaging_divisor=None, hilbert_transformer_kwargs=dict( ideal_transformer=False, hilbert_n_taps=31, hilbert_coeff_gain=128), mode="power_sum", ): """ simulates phased array trigger for each event Several channels are phased by delaying their signals by an amount given by a pointing angle. Several pointing angles are possible in order to cover the sky. The array triggered_channels controls the channels that are phased, according to the angles phasing_angles. Parameters ---------- station: Station object Description of the current station det: Detector object Description of the current detector threshold: float threshold above (or below) a trigger is issued, absolute amplitude triggered_channels: array of ints channels ids of the channels that form the primary phasing array if None, all channels are taken phasing_angles: array of float pointing angles for the primary beam ref_index: float (default 1.75) refractive index for beam forming apply_digitization: bool (default False) Perform the quantization with the analogToDigitalConverter module. adc_kwargs: dict Keyword arguments for the ADC module. Only used if `apply_digitization == True`. For arguments, see the documentation of the analogToDigitalConverter module. upsampling_kwargs: dict For arguments, see the documentation of the digital_upsampling function saturation_bits: int (default None) Determines what the coherenty summed waveforms will saturate to if using adc counts window: int (default 32) Power integration window for averaging Units of ADC time ticks step: int (default 16) Time step in power integral. If equal to window, there is no time overlap in between neighboring integration windows. Units of ADC time ticks averaging_divisor: int (default 32) Power integral divisor for averaging (division by 2^n much easier in firmware) Units of ADC time ticks ideal_transformer: bool (default False) If true, use scipy.hilbert to compute an exact Hilbert envelope with scipy.hilbert and np.sqrt. Otherwise, a firmware-like FIR-based Hilbert transformer will be used and with an approximation function to a square root to calculate the envelope. hilbert_n_taps: int (defult 31) Number of taps used to construct the FIR-based Hilbert transformer hilbert_coeff_gain: int (default 128) Rescaling factor to quantize the FIR-based Hilbert transformer mode: string (default: "power_sum") The mode of the trigger. Can be either "power_sum" or "hilbert_env". - "power_sum": uses the power_sum method to calculate the trigger - "hilbert_env": uses the hilbert envelope method to calculate the trigger Returns ------- is_triggered: bool True if the triggering condition is met trigger_delays: dictionary the delays for the primary channels that have caused a trigger. If there is no trigger, it's an empty dictionary trigger_time: float the earliest trigger time with respect to first interaction time. trigger_times: dictionary all time bins that fulfil the trigger condition per beam. The key is the beam number. Time with respect to first interaction time. maximum_amps: list of floats (length equal to that of `phasing_angles`) the maximum value of all the integration windows for each of the phased waveforms n_trigs: int total number of triggers that happened for all beams across the full traces triggered_beams: list list of bools for which beams triggered """ adc_output = adc_kwargs.get('adc_output') traces, adc_sampling_frequency = self.get_traces( station, det, triggered_channels=triggered_channels, apply_digitization=apply_digitization, adc_kwargs=adc_kwargs, upsampling_kwargs=upsampling_kwargs ) triggered_channels = np.array(list(traces.keys())) time_step = 1.0 / adc_sampling_frequency beam_rolls = self.calculate_time_delays( station, det, triggered_channels, phasing_angles, ref_index=ref_index, sampling_frequency=adc_sampling_frequency) phased_traces = self.phase_signals(traces, beam_rolls, adc_output=adc_output, saturation_bits=saturation_bits) if adc_output == "counts": threshold = np.trunc(threshold) channel_trace_start_time = self.get_channel_trace_start_time(station, triggered_channels) maximum_amps = np.zeros(len(phased_traces)) trigger_delays = {} n_trigs = 0 triggered_beams = [] trigger_time = None trigger_times = {} for iTrace, phased_trace in enumerate(phased_traces): is_triggered = False if mode == "power_sum": # Create a sliding window sig_trace, _ = self.power_sum( coh_sum=phased_trace, window=window, step=step, averaging_divisor=averaging_divisor, adc_output=adc_output) elif mode == "hilbert_env": coeff_gain = upsampling_kwargs.get("coeff_gain") sig_trace = self.hilbert_envelope( coh_sum=phased_trace, adc_output=adc_output, **hilbert_transformer_kwargs) else: raise ValueError("mode must be either 'power_sum' or 'hilbert_env'") maximum_amps[iTrace] = np.max(sig_trace) if np.any(sig_trace > threshold): is_triggered = True n_trigs += np.sum(sig_trace > threshold) trigger_delays[iTrace] = {channel_id: beam_rolls[iTrace][channel_id] * time_step for channel_id in beam_rolls[iTrace]} triggered_bins = np.atleast_1d(np.squeeze(np.argwhere(sig_trace > threshold))) trigger_times[iTrace] = np.abs(np.min(list(trigger_delays[iTrace]))) + triggered_bins * step * time_step + channel_trace_start_time logger.debug( "Station has triggered, at bins {}\n".format(triggered_bins) + "Trigger delays: {}\n".format(trigger_delays[iTrace][triggered_channels[0]]) + "Trigger time is {}ns\n".format(trigger_times[iTrace]) ) triggered_beams.append(is_triggered) is_triggered = np.any(triggered_beams) if is_triggered: trigger_time = np.amin([x.min() for x in trigger_times.values()]) logger.debug("Trigger condition satisfied!\n" "All trigger times: {}\n".format(trigger_times) + "Minimum trigger time is {:.0f}ns".format(trigger_time)) return is_triggered, trigger_delays, trigger_time, trigger_times, maximum_amps, n_trigs, triggered_beams