from NuRadioReco.modules.base.module import register_run
from NuRadioReco.utilities import units
from NuRadioReco.framework.parameters import stationParameters as stnp
from NuRadioReco.framework.trigger import HighLowTrigger
from NuRadioReco.modules.analogToDigitalConverter import analogToDigitalConverter
import numpy as np
import time
import logging
logger = logging.getLogger('NuRadioReco.HighLowTriggerSimulator')
[docs]
def get_high_low_triggers(trace, high_threshold, low_threshold,
time_coincidence=5 * units.ns, dt=1 * units.ns,
step=1, align_strides_to_start=False):
"""
calculates a high low trigger in a time coincidence window
Parameters
----------
trace: array of floats
the signal trace
high_threshold: float
the high threshold
low_threshold: float
the low threshold
time_coincidence: float (default: 5 ns)
the time coincidence window between a high + low
dt: float (default: 1 ns)
the width of a time bin (inverse of sampling rate)
step: int (default: 1_
stride length for sampling rate and clock rate mismatch in trigger logic
align_strides_to_start: bool (default: False)
If true, the trace represents real detector data and will force the striding
to start at the beginning of the trace without padding. If false, the traces
will be zero-padded at the beginning of the trace. This allows a trigger at
beginning of the trace to be associated with the correct trigger time.
Returns
-------
triggered bins: array of bools
the bins where the trigger condition is satisfied
"""
if trace.dtype != type(high_threshold):
logger.error(f"The trace ({trace.dtype}) and the threshold ({type(high_threshold)}) must have the same type")
raise TypeError(f"The trace ({trace.dtype}) and the threshold ({type(high_threshold)}) must have the same type")
n_bins_coincidence = int(np.round(time_coincidence / dt))
if not align_strides_to_start:
# Pad trace so trigger bin matches with sample index
padded_trace = np.pad(trace, (n_bins_coincidence - 1, 0), "constant")
else:
padded_trace = trace
num_frames = int((len(padded_trace) - n_bins_coincidence) / step)
num_real_frames = int(len(trace) / step)
logger.debug("length of trace {} samples, coincidence window {}, num window bins {}".format(
len(padded_trace), n_bins_coincidence, num_frames))
# This transforms the trace (n samples) to an array with shape (num_frames, window) where each frame
# is extracted from the trace in steps of sample intervals
# Ex. step=2, window=4, trace=[1, 2, 3, 4, 5, 6, 7, 8], trace_windowed=[[1, 2, 3, 4], [3, 4, 5, 6], ...]
trace_windowed = np.lib.stride_tricks.as_strided(
padded_trace, (num_frames, n_bins_coincidence),
(padded_trace.strides[0] * step, padded_trace.strides[0]),
writeable=False)
# Find high and low triggering windows
trace_high = np.any(trace_windowed >= high_threshold, axis=1)
trace_low = np.any(trace_windowed <= low_threshold, axis=1)
trace_high_low = trace_high & trace_low
# Keep as many samples as the original trace or cut short to keep triggers in the trace length.
trace_high_low = trace_high_low[:num_real_frames]
return trace_high_low
[docs]
def get_majority_logic(tts, number_of_coincidences=2, time_coincidence=32 * units.ns, dt=1 * units.ns,
step=1, align_strides_to_start=False):
"""
calculates a majority logic trigger
Parameters
----------
tts: array/list of array of bools
an array of bools that indicate a single channel trigger per channel
number_of_coincidences: int (default: 2)
the number of coincidences between channels
time_coincidence: float (default: 32 ns)
the time coincidence window between channels
dt: float (default: 1ns)
the width of a time bin (inverse of sampling rate)
step: int (default: 1)
stride length for sampling rate and clock rate mismatch in trigger logic
align_strides_to_start: bool (default: False)
If true, the trace represents real detector data and will force the striding
to start at the beginning of the trace without padding. If false, the traces
will be zero-padded at the beginning of the trace. This allows a trigger at
beginning of the trace to be associated with the correct trigger time.
Returns
-------
triggerd: bool
returns True if majority logic is fulfilled
triggerd_bins: array of ints
the bins that fulfilled the trigger
triggered_times: array of floats
the trigger times
"""
n_bins_coincidence = int(np.round(time_coincidence / dt))
n = len(tts[0])
if n_bins_coincidence > n: # reduce coincidence window to maximum trace length
n_bins_coincidence = n
logger.debug("specified coincidence window longer than tracelength, reducing coincidence window to trace length")
for i in range(len(tts)):
if not align_strides_to_start:
trace = np.pad(tts[i], (n_bins_coincidence - 1, 0), "constant")
else:
trace = tts[i]
num_frames = int((len(trace) - n_bins_coincidence) / step)
# Use the stride trick again.
trace_windowed = np.lib.stride_tricks.as_strided(
trace, (num_frames, n_bins_coincidence),
(trace.strides[0] * step, trace.strides[0]),
writeable=False)
tts[i] = np.any(trace_windowed, axis=1)
tt = np.array(tts)
ttt = np.sum(np.array(tt), axis=0) >= number_of_coincidences
triggered_bins = np.atleast_1d(np.squeeze(np.argwhere(ttt))) * step
return np.any(ttt), triggered_bins, triggered_bins * dt
[docs]
class triggerSimulator:
"""
Calculates the trigger of an event.
Uses the ARIANNA trigger logic, that a single antenna needs to cross a high and a low threshold value,
and then coincidences between antennas can be required.
"""
def __init__(self):
self.__t = 0
self.begin()
[docs]
def begin(self, log_level=logging.NOTSET):
logger.setLevel(log_level)
[docs]
@register_run()
def run(self, evt, station, det,
use_digitization=False, # Only active if use_digitization is set to True
threshold_high=60 * units.mV,
threshold_low=-60 * units.mV,
high_low_window=5 * units.ns,
coinc_window=200 * units.ns,
number_concidences=2,
triggered_channels=None,
trigger_name="default_high_low",
set_not_triggered=False,
Vrms=None,
trigger_adc=True,
clock_offset=0,
adc_output='voltage',
step=1,
align_strides_to_start=False):
"""
simulate ARIANNA trigger logic
Parameters
----------
evt: Event
The event to run the module on
station: Station
The station to run the module on
det: Detector
The detector description
use_digitization: bool
If True, traces will be digitized
threshold_high: float or dict of floats
the threshold voltage that needs to be crossed on a single channel on the high side
a dict can be used to specify a different threshold per channel where the key is the channel id
threshold_low: float or dict of floats
the threshold voltage that needs to be crossed on a single channel on the low side
a dict can be used to specify a different threshold per channel where the key is the channel id
high_low_window: float
time window in which a high+low crossing needs to occur to trigger a channel
coinc_window: float
time window in which number_concidences channels need to trigger
number_concidences: int
number of channels that are requried in coincidence to trigger a station
triggered_channels: array of ints or None
channels ids that are triggered on, if None trigger will run on all channels
trigger_name: string
a unique name of this particular trigger
set_not_triggered: bool (default: False)
if True not trigger simulation will be performed and this trigger will be set to not_triggered
Vrms: float
If supplied, overrides adc_voltage_range as supplied in the detector description file
trigger_adc: bool
If True, the relevant ADC parameters in the config file are the ones
that start with `'trigger_'`
clock_offset: float
adc_output: string
Options:
* 'voltage' to store the ADC output as discretised voltage trace
* 'counts' to store the ADC output in ADC counts
step: int
stride length for sampling rate and clock rate mismatch in trigger logic
align_strides_to_start: bool (default: False)
If true, the trace represents real detector data and will force the striding
to start at the beginning of the trace without padding. If false, the traces
will be zero-padded at the beginning of the trace. This allows a trigger at
beginning of the trace to be associated with the correct trigger time.
"""
t = time.time()
if use_digitization:
adcConverter = analogToDigitalConverter()
channels_that_passed_trigger = []
if not set_not_triggered:
triggerd_bins_channels = []
if triggered_channels is None:
for channel in station.iter_trigger_channels():
channel_trace_start_time = channel.get_trace_start_time()
break
else:
channel_trace_start_time = station.get_trigger_channel(triggered_channels[0]).get_trace_start_time()
for channel in station.iter_trigger_channels():
channel_id = channel.get_id()
if triggered_channels is not None and channel_id not in triggered_channels:
continue
if channel.get_trace_start_time() != channel_trace_start_time:
logger.warning('Channel has a trace_start_time that differs from the other channels. The trigger simulator may not work properly')
dt = 1. / channel.get_sampling_rate()
trace = np.array(channel.get_trace())
if use_digitization:
trace, trigger_sampling_rate = adcConverter.get_digital_trace(
station, det, channel,
Vrms=Vrms,
trigger_adc=trigger_adc,
clock_offset=clock_offset,
return_sampling_frequency=True,
adc_type='perfect_floor_comparator',
adc_output=adc_output,
trigger_filter=None
)
# overwrite the dt defined for the original trace by the digitized one
dt = 1. / trigger_sampling_rate
triggerd_bins = get_high_low_triggers(
trace,
_get_threshold_channel(threshold_high, channel_id),
_get_threshold_channel(threshold_low, channel_id),
high_low_window, dt, step, align_strides_to_start)
if np.any(triggerd_bins):
channels_that_passed_trigger.append(channel.get_id())
triggerd_bins_channels.append(triggerd_bins)
logger.debug("channel {}, len(triggerd_bins) = {}".format(channel_id, len(triggerd_bins)))
if len(triggerd_bins_channels):
has_triggered, triggered_bins, triggered_times = get_majority_logic(
triggerd_bins_channels, number_concidences, coinc_window, dt * step, 1, align_strides_to_start)
else:
has_triggered = False
# set maximum signal aplitude
max_signal = 0
if has_triggered:
for channel in station.iter_trigger_channels():
max_signal = max(max_signal, np.abs(channel.get_trace()[triggered_bins]).max())
station.set_parameter(stnp.channels_max_amplitude, max_signal)
else:
logger.info("set_not_triggered flag True, setting triggered to False.")
has_triggered = False
trigger = HighLowTrigger(trigger_name, threshold_high, threshold_low, high_low_window,
coinc_window, channels=triggered_channels, number_of_coincidences=number_concidences)
trigger.set_triggered_channels(channels_that_passed_trigger)
if not has_triggered:
trigger.set_triggered(False)
logger.info("Station has NOT passed trigger")
else:
trigger.set_triggered(True)
# trigger_time= time from start of trace + start time of trace with respect to moment of first interaction = trigger time from moment of first interaction
trigger.set_trigger_time(triggered_times.min() + channel_trace_start_time)
trigger.set_trigger_times(triggered_times + channel_trace_start_time)
logger.info("Station has passed trigger, trigger time is {:.1f} ns".format(
trigger.get_trigger_time() / units.ns))
station.set_trigger(trigger)
self.__t += time.time() - t
[docs]
def end(self):
from datetime import timedelta
dt = timedelta(seconds=self.__t)
logger.info("total time used by this module is {}".format(dt))
return dt
def _get_threshold_channel(threshold, channel_id):
""" Returns channel specific threshold if threshold is a dict, otherwise returns threshold """
if isinstance(threshold, dict):
return threshold[channel_id]
elif isinstance(threshold, (int, float)):
return threshold
else:
raise TypeError(f"Threshold must be a int/float or dict, not {type(threshold)}")