Source code for NuRadioReco.modules.trigger.envelopeTrigger

import logging
import time

import numpy as np
import scipy.signal

from NuRadioReco.framework.trigger import EnvelopeTrigger
from NuRadioReco.modules.base.module import register_run
from NuRadioReco.modules.trigger.highLowThreshold import get_majority_logic

logger = logging.getLogger('NuRadioReco.envelopeTrigger')


[docs]def get_envelope_triggers(trace, threshold): # define trigger constraint for each channel """ calculates a Hilbert-envelope based trigger Parameters ---------- trace: array of floats the signal trace threshold: float the threshold Returns ------- triggered bins: array of bools the bins where the trigger condition is satisfied """ return np.abs(scipy.signal.hilbert(trace)) > threshold
[docs]class triggerSimulator: """ Calculate a simple amplitude trigger depending on the Hilbert-envelope. """ def __init__(self): self.__t = 0 self.begin()
[docs] def begin(self): return
[docs] @register_run() def run(self, evt, station, det, passband, order, threshold, coinc_window, number_coincidences=2, triggered_channels=None, trigger_name='envelope_trigger'): """ Simulates simple threshold trigger based on a Hilbert-envelope of the trace. Passband of the trigger, coincidence window within different channels should have triggered, and the number of channels needed to trigger can be specified. Parameters ---------- evt: Event Event to run the module on station: Station Station to run the module on det: Detector The detector description passband: list Passband of the filter to apply before the trigger order: int Order of the butterworth filter to apply before the trigger threshold: float or dict of floats threshold above (or below) a trigger is issued, absolute amplitude a dict can be used to specify a different threshold per channel where the key is the channel id number_coincidences: int number of channels that are required in coincidence to trigger a station triggered_channels: array of ints or None channels ids that are triggered on, if None trigger will run on all channels coinc_window: float time window in which number_coincidences channels need to trigger trigger_name: string a unique name of this particular trigger """ t = time.time() # absolute time of system sampling_rate = station.get_channel(det.get_channel_ids(station.get_id())[0]).get_sampling_rate() dt = 1. / sampling_rate triggered_bins_channels = [] channels_that_passed_trigger = [] if triggered_channels is None: # caveat: all channels start at the same time for channel in station.iter_channels(): channel_trace_start_time = channel.get_trace_start_time() break else: channel_trace_start_time = station.get_channel(triggered_channels[0]).get_trace_start_time() for channel in station.iter_channels(): channel_id = channel.get_id() if triggered_channels is not None and channel_id not in triggered_channels: logger.debug("skipping channel {}".format(channel_id)) continue trace = channel.get_filtered_trace(passband, 'butter', order) if channel.get_trace_start_time() != channel_trace_start_time: logger.warning('Channel has a trace_start_time that differs from ' ' the other channels. The trigger simulator may not work properly') if(isinstance(threshold, dict)): threshold_tmp = threshold[channel_id] else: threshold_tmp = threshold triggered_bins = get_envelope_triggers(trace, threshold_tmp) triggered_bins_channels.append(triggered_bins) if True in triggered_bins: channels_that_passed_trigger.append(channel.get_id()) # check for coincidences with get_majority_logic(tts, number_of_coincidences=2, # time_coincidence=32 * units.ns, dt=1 * units.ns) # returns: # triggered: bool; returns True if majority logic is fulfilled --> has_triggered # triggered_bins: array of ints; the bins that fulfilled the trigger --> triggered_bins # triggered_times = triggered_bins * dt: array of floats; # the trigger times relative to the trace --> triggered_times has_triggered, triggered_bins, triggered_times = get_majority_logic(triggered_bins_channels, number_coincidences, coinc_window, dt) trigger = EnvelopeTrigger(trigger_name, passband, order, threshold, number_coincidences, coinc_window, triggered_channels) trigger.set_triggered_channels(channels_that_passed_trigger) if has_triggered: trigger.set_triggered(True) trigger.set_trigger_time(triggered_times.min() + channel_trace_start_time) # trigger_time = time from the beginning of the trace logger.debug("station has triggered") else: trigger.set_triggered(False) trigger.set_trigger_time(None) logger.debug("station has NOT triggered") station.set_trigger(trigger) self.__t += time.time() - t
[docs] def end(self): from datetime import timedelta dt = timedelta(seconds=self.__t) logger.info("total time used by this module is {}".format(dt)) return dt