from NuRadioReco.modules.base.module import register_run
import numpy as np
from scipy import signal
import time
from NuRadioReco.utilities import units
from NuRadioReco.utilities import trace_utilities
from NuRadioReco.framework.parameters import channelParameters as chp
from NuRadioReco.framework.parameters import stationParameters as stnp
import logging
logger = logging.getLogger('NuRadioReco.channelSignalReconstructor')
[docs]class channelSignalReconstructor:
"""
Calculates basic signal parameters.
"""
def __init__(self, log_level=logging.NOTSET):
self.__t = 0
logger.setLevel(log_level)
self.__conversion_factor_integrated_signal = trace_utilities.conversion_factor_integrated_signal
self.__signal_window_start = None
self.__signal_window_stop = None
self.__noise_window_start = None
self.__noise_window_stop = None
self.__signal_window_length = None
self.__noise_window_length = None
self.__debug = None
self.begin()
[docs] def begin(
self,
debug=False,
signal_window_start=None,
signal_window_length=120 * units.ns,
noise_window_start=None,
noise_window_length=None
):
"""
Parameters
----------
debug: bool
Set module to debug output
signal_window_start: float or None
Start time (relative to the trace start time) of the window in which signal quantities will be calculated, with time units
If None is passed as a parameter, the signal window is laid around the trace maximum
signal_window_length: float
Length of the signal window, with time units
noise_window_start: float or None
Start time (relative to the trace start time) of the window in which noise quantities will be calculated, with time units
If noise_window_start or noise_window_length are None, the noise window is the part of the trace outside the signal window
noise_window_length: float or None
Length of the noise window, with time units
If noise_window_start or noise_window_length are None, the noise window is the part of the trace outside the signal window
"""
self.__signal_window_start = signal_window_start
self.__signal_window_length = signal_window_length
self.__noise_window_start = noise_window_start
self.__noise_window_length = noise_window_length
self.__debug = debug
[docs] def get_SNR(self, station_id, channel, det, stored_noise=False, rms_stage=None):
"""
Parameters
----------
station_id: int
ID of the station
channel, det
Channel, Detector
stored_noise: bool
Calculates noise from pre-computed forced triggers
rms_stage: string
See functionality of det.get_noise_RMS
Returns
-------
SNR: dict
dictionary of various SNR parameters
"""
trace = channel.get_trace()
times = channel.get_times() - channel.get_trace_start_time()
if self.__signal_window_start is not None:
signal_window_start = self.__signal_window_start
signal_window_mask = (times > self.__signal_window_start) & (times < self.__signal_window_start + self.__signal_window_length)
else:
signal_window_start = times[np.argmax(np.abs(trace))] - .5 * self.__signal_window_length
signal_window_mask = (times > signal_window_start) & (times < signal_window_start + self.__signal_window_length)
if self.__noise_window_start is not None and self.__noise_window_length is not None:
noise_window_mask = (times > self.__noise_window_start) & (times < self.__noise_window_start + self.__noise_window_length)
noise_window_length = self.__noise_window_length
else:
noise_window_mask = ~signal_window_mask
noise_window_length = len(trace[noise_window_mask]) / channel.get_sampling_rate()
# Various definitions
noise_int = np.sum(np.square(trace[noise_window_mask]))
if(noise_window_length > 0):
noise_int *= (self.__signal_window_length) / float(noise_window_length)
else:
logger.warning(f"Noise window length is zero. This likely indicates that the tracelength is too small. Noise quantities can not be calcualted.")
if stored_noise:
# we use the RMS from forced triggers
noise_rms = det.get_noise_RMS(station_id, channel.get_id(), stage=rms_stage)
else:
noise_rms = np.sqrt(np.mean(np.square(trace[noise_window_mask])))
if self.__debug:
import matplotlib.pyplot as plt
plt.figure()
plt.plot(times[signal_window_mask], np.square(trace[signal_window_mask]))
plt.plot(times[noise_window_mask], np.square(trace[noise_window_mask]), c='k', label='noise')
plt.xlabel("Times [ns]")
plt.ylabel("Power")
plt.legend()
# Calculating SNR
SNR = {}
if (noise_rms == 0) or (noise_int == 0):
logger.info("RMS of noise is zero, calculating an SNR is not useful. All SNRs are set to infinity.")
SNR['peak_2_peak_amplitude'] = np.inf
SNR['peak_amplitude'] = np.inf
SNR['integrated_power'] = np.inf
else:
SNR['integrated_power'] = np.sum(np.square(trace[signal_window_mask])) - noise_int
if SNR['integrated_power'] <= 0:
logger.debug("Integrated signal {0} smaller than noise {1}, power SNR 0".format(SNR['integrated_power'], noise_int))
SNR['integrated_power'] = 0.
else:
SNR['integrated_power'] /= (noise_int)
SNR['integrated_power'] = np.sqrt(SNR['integrated_power'])
# calculate amplitude values
SNR['peak_2_peak_amplitude'] = np.max(trace[signal_window_mask]) - np.min(trace[signal_window_mask])
SNR['peak_2_peak_amplitude'] /= noise_rms
SNR['peak_2_peak_amplitude'] /= 2
SNR['peak_amplitude'] = np.max(np.abs(trace[signal_window_mask])) / noise_rms
# SCNR
SNR['Seckel_2_noise'] = 5
if self.__debug:
plt.figure()
plt.plot(times, trace)
plt.fill_between(times, 1.1 * np.max(trace), 1.1 * np.min(trace), where=noise_window_mask, color='k', alpha=.2, label='noise window')
plt.fill_between(times, 1.1 * np.max(trace), 1.1 * np.min(trace), where=signal_window_mask, color='r', alpha=.2, label='signal window')
plt.legend()
plt.show()
return SNR, noise_rms
[docs] @register_run()
def run(self, evt, station, det, stored_noise=False, rms_stage='amp'):
"""
Parameters
----------
evt, station, det
Event, Station, Detector
stored_noise: bool
Calculates noise from pre-computed forced triggers
rms_stage: string
See functionality of det.get_noise_RMS
"""
t = time.time()
max_amplitude_station = 0
for channel in station.iter_channels():
times = channel.get_times()
trace = channel.get_trace()
h = np.abs(signal.hilbert(trace))
max_amplitude = np.max(np.abs(trace))
logger.info(f"event {evt.get_run_number()}.{evt.get_id()} station {station.get_id()} channel {channel.get_id()} max amp = {max_amplitude:.6g} max amp env {h.max():.6g}")
if(logger.level >= logging.DEBUG):
tmp = ""
for amp in trace:
tmp += f"{amp:.6g}, "
logger.debug(tmp)
channel[chp.signal_time] = times[np.argmax(h)]
max_amplitude_station = max(max_amplitude_station, max_amplitude)
channel[chp.maximum_amplitude] = max_amplitude
channel[chp.maximum_amplitude_envelope] = h.max()
channel[chp.P2P_amplitude] = np.max(trace) - np.min(trace)
# Use noise precalculated from forced triggers
signal_to_noise, noise_rms = self.get_SNR(station.get_id(), channel, det, stored_noise=stored_noise, rms_stage=rms_stage)
channel[chp.SNR] = signal_to_noise
channel[chp.noise_rms] = noise_rms
station[stnp.channels_max_amplitude] = max_amplitude_station
self.__t = time.time() - t
[docs] def end(self):
from datetime import timedelta
dt = timedelta(seconds=self.__t)
logger.info("total time used by this module is {}".format(dt))
return dt