Source code for NuRadioReco.modules.trigger.powerIntegration

from NuRadioReco.modules.base.module import register_run
from NuRadioReco.utilities import units
from NuRadioReco.framework.parameters import stationParameters as stnp
from NuRadioReco.framework.trigger import IntegratedPowerTrigger
from NuRadioReco.modules.trigger.highLowThreshold import get_majority_logic
import numpy as np
import time
import logging
logger = logging.getLogger('NuRadioReco.powerIntegrationTrigger')


[docs]def get_power_int_triggers(trace, threshold, window=10 * units.ns, dt=1 * units.ns, full_output=False): """ calculats a power integration trigger Parameters ---------- trace: array of floats the signal trace threshold: float the threshold window: float the integration window dt: float the time binning of the trace full_output: bool (default False) if True, the integrated power is returned as second argument Returns ------- triggered bins: array of bools the bins where the trigger condition is satisfied """ i_window = int(window / dt) power = trace ** 2 int_power = np.convolve(power, np.ones(i_window, dtype=int), 'valid') * dt if full_output: return threshold < int_power, int_power return threshold < int_power
[docs]class triggerSimulator: """ Calculates a power integration trigger. """ def __init__(self): self.__t = 0 self.begin()
[docs] def begin(self): return
[docs] @register_run() def run(self, evt, station, det, threshold, integration_window, number_concidences=1, triggered_channels=None, coinc_window=200 * units.ns, trigger_name='default_powerint'): """ simulates a power integration trigger. The squared voltages are integrated over a sliding window Parameters ---------- evt: Event The event to run the module on station: Station The station to run the module on det: Detector The detector description number_concidences: int number of channels that are requried in coincidence to trigger a station threshold: float or dict of floats threshold in units of integrated power (V^2*time) a dict can be used to specify a different threshold per channel where the key is the channel id integration_window: float the integration window triggered_channels: array of ints or None channels ids that are triggered on, if None trigger will run on all channels coinc_window: float time window in which number_concidences channels need to trigger trigger_name: string a unique name of this particular trigger """ t = time.time() sampling_rate = station.get_channel(station.get_channel_ids()[0]).get_sampling_rate() dt = 1. / sampling_rate triggerd_bins_channels = [] if triggered_channels is None: for channel in station.iter_channels(): channel_trace_start_time = channel.get_trace_start_time() break else: channel_trace_start_time = station.get_channel(triggered_channels[0]).get_trace_start_time() channels_that_passed_trigger = [] for channel in station.iter_channels(): channel_id = channel.get_id() if triggered_channels is not None and channel_id not in triggered_channels: logger.debug("skipping channel {}".format(channel_id)) continue if channel.get_trace_start_time() != channel_trace_start_time: logger.warning('Channel has a trace_start_time that differs from the other channels. The trigger simulator may not work properly') trace = channel.get_trace() if(isinstance(threshold, dict)): threshold_tmp = threshold[channel_id] else: threshold_tmp = threshold triggerd_bins = get_power_int_triggers(trace, threshold_tmp, integration_window, dt=dt) triggerd_bins_channels.append(triggerd_bins) if True in triggerd_bins: channels_that_passed_trigger.append(channel.get_id()) has_triggered, triggered_bins, triggered_times = get_majority_logic( triggerd_bins_channels, number_concidences, coinc_window, dt) # set maximum signal aplitude max_signal = 0 if(has_triggered): for channel in station.iter_channels(): max_signal = max(max_signal, np.abs(channel.get_trace()[triggered_bins]).max()) station.set_parameter(stnp.channels_max_amplitude, max_signal) trigger = IntegratedPowerTrigger(trigger_name, threshold, triggered_channels, number_concidences, integration_window=integration_window) trigger.set_triggered_channels(channels_that_passed_trigger) if has_triggered: trigger.set_triggered(True) trigger.set_trigger_time(triggered_times.min() + channel_trace_start_time) logger.debug("station has triggered") else: trigger.set_triggered(False) logger.debug("station has NOT triggered") station.set_trigger(trigger) self.__t += time.time() - t
[docs] def end(self): from datetime import timedelta dt = timedelta(seconds=self.__t) logger.info("total time used by this module is {}".format(dt)) return dt