Source code for NuRadioReco.modules.ARA.triggerSimulator

from NuRadioReco.utilities import units
from NuRadioReco.modules.base.module import register_run
import numpy as np
import logging
from NuRadioReco.framework.trigger import IntegratedPowerTrigger
from NuRadioReco.utilities.diodeSimulator import diodeSimulator

logger = logging.getLogger('NuRadioReco.ARA.triggerSimulator')


[docs]class triggerSimulator: """ Calculates the trigger of an event. Uses the ARA trigger logic of a tunnel diode. Implementation similar to PyRex by Ben Hokanson-Fasig/ """ def __init__(self): self.__t = 0 self._power_mean = None self._power_std = None self._diode = diodeSimulator() self.power_threshold = None logger.warning("This module does not contain cutting the trace to ARA specific parameters.")
[docs] def has_triggered(self, channel): """ Check if the detector system triggers on a given channel. Passes the signal through the tunnel diode. Then compares the maximum and minimum values to a tunnel diode noise signal. Triggers if one of the maximum or minimum values exceed the noise mean +/- the noise rms times the power threshold. Parameters ---------- channel : Channel `Channel` object on which to test the trigger condition. Returns ------- boolean Whether or not the antenna triggers on `channel`. """ # Send signal through tunnel_diode after_tunnel_diode = self._diode.tunnel_diode(channel) low_trigger = (self._power_mean - self._power_std * np.abs(self.power_threshold)) return np.min(after_tunnel_diode) < low_trigger
[docs] @register_run() def run(self, evt, station, det, power_threshold=6.5, coinc_window=110 * units.ns, number_concidences=3, triggered_channels=None, power_mean=None, power_std=None, trigger_name='default_integrated_power'): """ simulate ARA trigger logic Parameters ---------- evt: Event The event on which to run the module station: Station The station on which to run the module det: Detector or GenericDetector The detector description power_threshold: float The factor of sigma that the signal needs to exceed the noise coinc_window: float time window in which number_concidences channels need to trigger number_concidences: int number of channels that are requried in coincidence to trigger a station triggered_channels: array of ints channels ids that are triggered on power_mean : float Parameter extracted in ARA from noise. If not given, it is calculated from generic noise power_std : float Parameter extracted in ARA from noise. If not given, it is calculated from generic noise trigger_name: string a unique name of this particular trigger """ # if the run method specifies power mean and rms we use these values, # if the parameters are None, the power mean and rms gets calculated for # some standard assumptions on the noise RMS and it needs to be done only once if triggered_channels is None: triggered_channels = [0, 1, 2, 3, 4, 5, 6, 7] if(power_mean is not None and power_std is not None): self._power_mean = power_mean self._power_std = power_std else: error_msg = 'The power_mean and power_std parameters are not defined. ' error_msg += 'Please define them. You can use the calculate_noise_parameters ' error_msg += 'function in utilities.diodeSimulator to do so.' raise ValueError(error_msg) self.power_threshold = power_threshold # No coincidence requirement yet trigger = {} trigger_times = [] times_min = [] times_max = [] sampling_rates = [] number_triggered_channels = 0 for channel in station.iter_channels(): channel_id = channel.get_id() if channel_id not in triggered_channels: continue trigger[channel_id] = self.has_triggered(channel) if trigger[channel_id]: number_triggered_channels += 1 times = channel.get_times() trace_after_diode = self._diode.tunnel_diode(channel) arg_trigger = np.argmin(trace_after_diode) trigger_times.append(times[arg_trigger]) times_min.append(np.min(times)) times_max.append(np.max(times)) sampling_rates.append(channel.get_sampling_rate()) has_triggered = False trigger_time = None if (number_triggered_channels >= number_concidences): trace_times = np.arange(np.min(times_min), np.max(times_max), 1 / np.min(sampling_rates)) trigger_times = np.array(trigger_times) slice_left = int(coinc_window / 2 / (trace_times[1] - trace_times[0])) slice_right = len(trace_times) - slice_left for trace_time in trace_times[slice_left:slice_right]: if (np.sum(np.abs(trace_time - trigger_times) <= coinc_window / 2) >= number_concidences): has_triggered = True trigger_time = np.min(trigger_times) break trigger = IntegratedPowerTrigger(trigger_name, power_threshold, coinc_window, channels=triggered_channels, number_of_coincidences=number_concidences, power_mean=self._power_mean, power_std=self._power_std) if not has_triggered: trigger.set_triggered(False) logger.info("Station has NOT passed trigger") trigger_time = 0 trigger.set_trigger_time(trigger_time) else: trigger.set_triggered(True) trigger.set_trigger_time(trigger_time) logger.info("Station has passed trigger, trigger time is {:.1f} ns (sample {})".format( trigger.get_trigger_time() / units.ns, trigger_time)) station.set_trigger(trigger)
[docs] def end(self): from datetime import timedelta dt = timedelta(seconds=self.__t) logger.info("total time used by this module is {}".format(dt)) return dt