from NuRadioReco.modules.base.module import register_run
from NuRadioReco.utilities import units
from NuRadioReco.framework.trigger import SimplePhasedTrigger
from NuRadioReco.modules.analogToDigitalConverter import analogToDigitalConverter
import logging
import scipy
import numpy as np
from scipy import constants
logger = logging.getLogger('NuRadioReco.phasedTriggerSimulator')
cspeed = constants.c * units.m / units.s
main_low_angle = np.deg2rad(-55.0)
main_high_angle = -1.0 * main_low_angle
default_angles = np.arcsin(np.linspace(np.sin(main_low_angle), np.sin(main_high_angle), 11))
[docs]
class triggerSimulator:
"""
Calculates the trigger for a phased array with a primary beam.
The channels that participate in both beams and the pointing angle for each
subbeam can be specified.
See https://arxiv.org/pdf/1809.04573.pdf
"""
def __init__(self, log_level=logging.NOTSET):
self.__t = 0
self.__pre_trigger_time = None
self.__debug = None
logger.setLevel(log_level)
self.begin()
[docs]
def begin(self, debug=False):
self.__debug = debug
[docs]
def get_antenna_positions(self, station, det, triggered_channels=None, component=2):
"""
Calculates the vertical coordinates of the antennas of the detector
Parameters
----------
station: Station object
Description of the current station
det: Detector object
Description of the current detector
triggered_channels: array of ints
channels ids of the channels that form the primary phasing array
if None, all channels are taken
component: int
Which cartesian coordinate to return
Returns
-------
ant_pos: array of floatss
Desired antenna position in requested coordinate
"""
ant_pos = {}
for channel in station.iter_channels(use_channels=triggered_channels):
ant_pos[channel.get_id()] = det.get_relative_position(station.get_id(), channel.get_id())[component]
return ant_pos
[docs]
def calculate_time_delays(self, station, det,
triggered_channels,
phasing_angles=default_angles,
ref_index=1.75,
sampling_frequency=None):
"""
Calculates the delays needed for phasing the array.
Parameters
----------
station: Station object
Description of the current station
det: Detector object
Description of the current detector
triggered_channels: array of ints
channels ids of the channels that form the primary phasing array
if None, all channels are taken
phasing_angles: array of float
pointing angles for the primary beam
ref_index: float
refractive index for beam forming
sampling_frequency: float
Rate of the ADC used
Returns
-------
beam_rolls: array of dicts of keys=antenna and content=delay
"""
if(triggered_channels is None):
triggered_channels = [channel.get_id() for channel in station.iter_channels()]
time_step = 1. / sampling_frequency
ant_z = self.get_antenna_positions(station, det, triggered_channels, 2)
self.check_vertical_string(station, det, triggered_channels)
ref_z = np.max(np.fromiter(ant_z.values(), dtype=float))
# Need to add in delay for trigger delay
cable_delays = {}
for channel in station.iter_channels(use_channels=triggered_channels):
cable_delays[channel.get_id()] = det.get_cable_delay(station.get_id(), channel.get_id())
beam_rolls = []
for angle in phasing_angles:
delays = []
for key in ant_z:
delays += [-(ant_z[key] - ref_z) / cspeed * ref_index * np.sin(angle) - cable_delays[key]]
delays -= np.max(delays)
roll = np.array(np.round(np.array(delays) / time_step)).astype(int)
subbeam_rolls = dict(zip(triggered_channels, roll))
# logger.debug("angle:", angle / units.deg)
# logger.debug(subbeam_rolls)
beam_rolls.append(subbeam_rolls)
return beam_rolls
[docs]
def get_channel_trace_start_time(self, station, triggered_channels):
"""
Finds the start time of the desired traces.
Throws an error if all the channels dont have the same start time.
Parameters
----------
station: Station object
Description of the current station
triggered_channels: array of ints
channels ids of the channels that form the primary phasing array
if None, all channels are taken
Returns
-------
channel_trace_start_time: float
Channel start time
"""
channel_trace_start_time = None
for channel in station.iter_channels(use_channels=triggered_channels):
if channel_trace_start_time is None:
channel_trace_start_time = channel.get_trace_start_time()
elif channel_trace_start_time != channel.get_trace_start_time():
error_msg = 'Phased array channels do not have matching trace start times. '
error_msg += 'This module is not prepared for this case.'
raise ValueError(error_msg)
return channel_trace_start_time
[docs]
def check_vertical_string(self, station, det, triggered_channels):
"""
Checks if the triggering antennas lie in a straight vertical line
Throws error if not.
Parameters
----------
station: Station object
Description of the current station
det: Detector object
Description of the current detector
triggered_channels: array of ints
channels ids of the channels that form the primary phasing array
if None, all channels are taken
"""
cut = 1.e-3 * units.m
ant_x = np.fromiter(self.get_antenna_positions(station, det, triggered_channels, 0).values(), dtype=float)
diff_x = np.abs(ant_x - ant_x[0])
ant_y = np.fromiter(self.get_antenna_positions(station, det, triggered_channels, 1).values(), dtype=float)
diff_y = np.abs(ant_y - ant_y[0])
if (sum(diff_x) > cut or sum(diff_y) > cut):
raise NotImplementedError('The phased triggering array should lie on a vertical line')
[docs]
def power_sum(self, coh_sum, window, step, adc_output='voltage'):
"""
Calculate power summed over a length defined by 'window', overlapping at intervals defined by 'step'
Parameters
----------
coh_sum: array of floats
Phased signal to be integrated over
window: int
Power integral window
Units of ADC time ticks
step: int
Time step in power integral. If equal to window, there is no time overlap
in between neighboring integration windows
Units of ADC time ticks.
adc_output: string
- 'voltage' to store the ADC output as discretised voltage trace
- 'counts' to store the ADC output in ADC counts
Returns
-------
power:
Integrated power in each integration window
num_frames
Number of integration windows calculated
"""
if(adc_output != 'voltage' and adc_output != 'counts'):
error_msg = 'ADC output type must be "counts" or "voltage". Currently set to:' + str(adc_output)
raise ValueError(error_msg)
num_frames = int(np.floor((len(coh_sum) - window) / step))
if(adc_output == 'voltage'):
coh_sum_squared = (coh_sum * coh_sum).astype(float)
elif(adc_output == 'counts'):
coh_sum_squared = (coh_sum * coh_sum).astype(int)
coh_sum_windowed = np.lib.stride_tricks.as_strided(coh_sum_squared, (num_frames, window),
(coh_sum_squared.strides[0] * step, coh_sum_squared.strides[0]))
power = np.sum(coh_sum_windowed, axis=1)
return power.astype(float) / window, num_frames
[docs]
def phase_signals(self, traces, beam_rolls):
"""
Phase signals together given the rolls
Parameters
----------
traces: 2D array of floats
Signals from the antennas to be phased together.
beam_rolls: 2D array of floats
The amount to shift each signal before phasing the
traces together
Returns
-------
phased_traces: array of arrays
"""
phased_traces = [[] for i in range(len(beam_rolls))]
running_i = 0
for subbeam_rolls in beam_rolls:
phased_trace = np.zeros(len(list(traces.values())[0]))
for channel_id in traces:
trace = traces[channel_id]
phased_trace += np.roll(trace, subbeam_rolls[channel_id])
phased_traces[running_i] = phased_trace
running_i += 1
return phased_traces
[docs]
def phased_trigger(
self, station, det,
Vrms=None,
threshold=60 * units.mV,
triggered_channels=None,
phasing_angles=default_angles,
ref_index=1.75,
trigger_adc=False, # by default, assumes the trigger ADC is the same as the channels ADC
clock_offset=0,
adc_output='voltage',
trigger_filter=None,
upsampling_factor=1,
window=32,
step=16,
apply_digitization=True,
):
"""
simulates phased array trigger for each event
Several channels are phased by delaying their signals by an amount given
by a pointing angle. Several pointing angles are possible in order to cover
the sky. The array triggered_channels controls the channels that are phased,
according to the angles phasing_angles.
Parameters
----------
station: Station object
Description of the current station
det: Detector object
Description of the current detector
Vrms: float
RMS of the noise on a channel, used to automatically create the digitizer
reference voltage. If set to None, tries to use reference voltage as defined
int the detector description file.
threshold: float
threshold above (or below) a trigger is issued, absolute amplitude
triggered_channels: array of ints
channels ids of the channels that form the primary phasing array
if None, all channels are taken
phasing_angles: array of float
pointing angles for the primary beam
ref_index: float (default 1.75)
refractive index for beam forming
trigger_adc: bool (default True)
If True, uses the ADC settings from the trigger. It must be specified in the
detector file. See analogToDigitalConverter module for information
(see option `apply_digitization`)
clock_offset: float (default 0)
Overall clock offset, for adc clock jitter reasons (see `apply_digitization`)
adc_output: string (default 'voltage')
- 'voltage' to store the ADC output as discretised voltage trace
- 'counts' to store the ADC output in ADC counts
trigger_filter: array floats (default None)
Freq. domain of the response to be applied to post-ADC traces
Must be length for "MC freq"
upsampling_factor: integer (default 1)
Upsampling factor. The trace will be a upsampled to a
sampling frequency int_factor times higher than the original one
after conversion to digital
window: int (default 32)
Power integral window
Units of ADC time ticks
step: int (default 16)
Time step in power integral. If equal to window, there is no time overlap
in between neighboring integration windows.
Units of ADC time ticks
apply_digitization: bool (default True)
Perform the quantization of the ADC. If set to true, should also set options
`trigger_adc`, `adc_output`, `clock_offset`
Returns
-------
is_triggered: bool
True if the triggering condition is met
trigger_delays: dictionary
the delays for the primary channels that have caused a trigger.
If there is no trigger, it's an empty dictionary
trigger_time: float
the earliest trigger time with respect to first interaction time.
trigger_times: dictionary
all time bins that fulfil the trigger condition per beam. The key is the beam number. Time with respect to first interaction time.
maximum_amps: list of floats (length equal to that of `phasing_angles`)
the maximum value of all the integration windows for each of the phased waveforms
"""
if(triggered_channels is None):
triggered_channels = [channel.get_id() for channel in station.iter_channels()]
if(adc_output != 'voltage' and adc_output != 'counts'):
error_msg = 'ADC output type must be "counts" or "voltage". Currently set to:' + str(adc_output)
raise ValueError(error_msg)
ADC = analogToDigitalConverter()
is_triggered = False
trigger_delays = {}
logger.debug(f"trigger channels: {triggered_channels}")
traces = {}
for channel in station.iter_channels(use_channels=triggered_channels):
channel_id = channel.get_id()
trace = np.array(channel.get_trace())
if apply_digitization:
trace, adc_sampling_frequency = ADC.get_digital_trace(station, det, channel,
Vrms=Vrms,
trigger_adc=trigger_adc,
clock_offset=clock_offset,
return_sampling_frequency=True,
adc_type='perfect_floor_comparator',
adc_output=adc_output,
trigger_filter=None)
else:
adc_sampling_frequency = channel.get_sampling_rate()
# Upsampling here, linear interpolate to mimic an FPGA internal upsampling
if not isinstance(upsampling_factor, int):
try:
upsampling_factor = int(upsampling_factor)
except:
raise ValueError("Could not convert upsampling_factor to integer. Exiting.")
if(upsampling_factor >= 2):
# FFT upsampling
new_len = len(trace) * upsampling_factor
upsampled_trace = scipy.signal.resample(trace, new_len)
# If upsampled is performed, the final sampling frequency changes
trace = upsampled_trace[:]
if(len(trace) % 2 == 1):
trace = trace[:-1]
adc_sampling_frequency *= upsampling_factor
time_step = 1.0 / adc_sampling_frequency
traces[channel_id] = trace[:]
beam_rolls = self.calculate_time_delays(station, det,
triggered_channels,
phasing_angles,
ref_index=ref_index,
sampling_frequency=adc_sampling_frequency)
phased_traces = self.phase_signals(traces, beam_rolls)
trigger_time = None
trigger_times = {}
channel_trace_start_time = self.get_channel_trace_start_time(station, triggered_channels)
trigger_delays = {}
maximum_amps = np.zeros_like(phased_traces)
for iTrace, phased_trace in enumerate(phased_traces):
# Create a sliding window
squared_mean, num_frames = self.power_sum(coh_sum=phased_trace, window=window, step=step, adc_output=adc_output)
maximum_amps[iTrace] = np.max(squared_mean)
if True in (squared_mean > threshold):
trigger_delays[iTrace] = {}
for channel_id in beam_rolls[iTrace]:
trigger_delays[iTrace][channel_id] = beam_rolls[iTrace][channel_id] * time_step
triggered_bins = np.atleast_1d(np.squeeze(np.argwhere(squared_mean > threshold)))
logger.debug(f"Station has triggered, at bins {triggered_bins}")
logger.debug(trigger_delays)
logger.debug(f"trigger_delays {trigger_delays[iTrace][triggered_channels[0]]}")
is_triggered = True
trigger_times[iTrace] = trigger_delays[iTrace][triggered_channels[0]] + triggered_bins * step * time_step + channel_trace_start_time
logger.debug(f"trigger times = {trigger_times[iTrace]}")
if is_triggered:
logger.debug("Trigger condition satisfied!")
logger.debug("all trigger times", trigger_times)
trigger_time = min([x.min() for x in trigger_times.values()])
logger.debug(f"minimum trigger time is {trigger_time:.0f}ns")
return is_triggered, trigger_delays, trigger_time, trigger_times, maximum_amps
[docs]
@register_run()
def run(self, evt, station, det,
Vrms=None,
threshold=60 * units.mV,
triggered_channels=None,
trigger_name='simple_phased_threshold',
phasing_angles=default_angles,
set_not_triggered=False,
ref_index=1.75,
trigger_adc=False, # by default, assumes the trigger ADC is the same as the channels ADC
clock_offset=0,
adc_output='voltage',
trigger_filter=None,
upsampling_factor=1,
window=32,
step=16,
apply_digitization=True,
pre_trigger_time=None
):
"""
Simulates phased array trigger for each event.
Several channels are phased by delaying their signals by an amount given
by a pointing angle. Several pointing angles are possible in order to cover
the sky. The array triggered_channels controls the channels that are phased,
according to the angles phasing_angles.
Parameters
----------
evt: Event object
Description of the current event
station: Station object
Description of the current station
det: Detector object
Description of the current detector
Vrms: float
RMS of the noise on a channel, used to automatically create the digitizer
reference voltage. If set to None, tries to use reference voltage as defined
int the detector description file.
threshold: float
threshold above (or below) a trigger is issued, absolute amplitude
triggered_channels: array of ints
channels ids of the channels that form the primary phasing array
if None, all channels are taken
trigger_name: string
name for the trigger
phasing_angles: array of float
pointing angles for the primary beam
set_not_triggered: bool (default False)
If True, no trigger simulation will be performed and this trigger will be set to not_triggered
ref_index: float (default 1.75)
refractive index for beam forming
trigger_adc: bool, (default True)
If True, uses the ADC settings from the trigger. It must be specified in the
detector file. See analogToDigitalConverter module for information
clock_offset: float (default 0)
Overall clock offset, for adc clock jitter reasons
adc_output: string (default 'voltage')
- 'voltage' to store the ADC output as discretised voltage trace
- 'counts' to store the ADC output in ADC counts
trigger_filter: array floats (default None)
Freq. domain of the response to be applied to post-ADC traces
Must be length for "MC freq"
upsampling_factor: integer (default 1)
Upsampling factor. The trace will be a upsampled to a
sampling frequency int_factor times higher than the original one
after conversion to digital
window: int (default 32)
Power integral window
Units of ADC time ticks
step: int (default 16)
Time step in power integral. If equal to window, there is no time overlap
in between neighboring integration windows.
Units of ADC time ticks
apply_digitization: bool (default True)
Perform the quantization of the ADC. If set to true, should also set options
`trigger_adc`, `adc_output`, `clock_offset`
pre_trigger_time: float or dict of floats
Defines the amount of trace recorded before the trigger time. This module does not cut the traces,
but this trigger property is later used to trim traces accordingly.
if a dict is given, the keys are the channel_ids, and the value is the pre_trigger_time between the
start of the trace and the trigger time.
if only a float is given, the same pre_trigger_time is used for all channels
If none, the default value of the Trigger class is used, which is currently 55ns.
Returns
-------
is_triggered: bool
True if the triggering condition is met
"""
if triggered_channels is None:
triggered_channels = [channel.get_id() for channel in station.iter_channels()]
if adc_output != 'voltage' and adc_output != 'counts':
error_msg = 'ADC output type must be "counts" or "voltage". Currently set to:' + str(adc_output)
raise ValueError(error_msg)
is_triggered = False
trigger_delays = {}
if set_not_triggered:
is_triggered = False
trigger_delays = {}
maximum_amps = np.zeros_like(phasing_angles)
else:
is_triggered, trigger_delays, trigger_time, trigger_times, maximum_amps = self.phased_trigger(
station=station,
det=det,
Vrms=Vrms,
threshold=threshold,
triggered_channels=triggered_channels,
phasing_angles=phasing_angles,
ref_index=ref_index,
trigger_adc=trigger_adc,
clock_offset=clock_offset,
adc_output=adc_output,
trigger_filter=trigger_filter,
upsampling_factor=upsampling_factor,
window=window,
step=step,
apply_digitization=apply_digitization,
)
kwargs = {}
if pre_trigger_time is not None:
kwargs['pre_trigger_times'] = pre_trigger_time
# Create a trigger object to be returned to the station
trigger = SimplePhasedTrigger(
trigger_name,
threshold,
channels=triggered_channels,
primary_angles=phasing_angles,
trigger_delays=trigger_delays,
window_size=window,
step_size=step,
maximum_amps=maximum_amps,
**kwargs
)
trigger.set_triggered(is_triggered)
if is_triggered:
# trigger_time(s)= time(s) from start of trace + start time of trace with respect to moment of first
# interaction = trigger time from moment of first interaction; time offset to interaction time
# (channel_trace_start_time) already recognized in self.phased_trigger
trigger.set_trigger_time(trigger_time)
trigger.set_trigger_times(trigger_times)
else:
trigger.set_trigger_time(None)
station.set_trigger(trigger)
return is_triggered