Source code for NuRadioReco.utilities.trace_utilities

"""
This module contains utility functions to compute various observables from waveforms.

The functions in this module can be used to compute observables from waveforms, such as the energy fluence,
the stokes parameters, the signal-to-noise ratio, the root power ratio, the Hilbert envelope, the impulsivity,
the coherent sum, the entropy, the kurtosis, and the correlation between two traces.

All functions in this module do not depend on the NuRadioReco framework
and can be used independently. The functions do not alter the input traces,
but only compute observables from them.

See Also
--------
`NuRadioReco.utilities.signal_processing`
    Module for functions that modify traces, e.g., by filtering, delaying, etc.
"""

from NuRadioReco.utilities import units, signal_processing

import numpy as np
import scipy.stats
import scipy.signal
import scipy.ndimage
import scipy.constants
import warnings

import logging
logger = logging.getLogger('NuRadioReco.trace_utilities')

conversion_factor_integrated_signal = scipy.constants.c * scipy.constants.epsilon_0 * units.joule / units.s / units.volt ** 2

# see Phys. Rev. D DOI: 10.1103/PhysRevD.93.122005
# to convert V**2/m**2 * s -> J/m**2 -> eV/m**2

[docs] def get_efield_antenna_factor(*args, **kwargs): warnings.warn("get_efield_antenna_factor is moved to NuRadioReco.utilities.signal_processing.get_efield_antenna_factor", DeprecationWarning) return signal_processing.get_efield_antenna_factor(*args, **kwargs)
[docs] def get_channel_voltage_from_efield(*args, **kwargs): warnings.warn("get_channel_voltage_from_efield is moved to NuRadioReco.utilities.signal_processing.get_channel_voltage_from_efield", DeprecationWarning) return signal_processing.get_channel_voltage_from_efield(*args, **kwargs)
[docs] def upsampling_fir(*args, **kwargs): warnings.warn("upsampling_fir is moved to NuRadioReco.utilities.signal_processing.upsampling_fir", DeprecationWarning) return signal_processing.upsampling_fir(*args, **kwargs)
[docs] def butterworth_filter_trace(*args, **kwargs): warnings.warn("butterworth_filter_trace is moved to NuRadioReco.utilities.signal_processing.butterworth_filter_trace", DeprecationWarning) return signal_processing.butterworth_filter_trace(*args, **kwargs)
[docs] def apply_butterworth(*args, **kwargs): warnings.warn("apply_butterworth is moved to NuRadioReco.utilities.signal_processing.apply_butterworth", DeprecationWarning) return signal_processing.apply_butterworth(*args, **kwargs)
[docs] def delay_trace(*args, **kwargs): warnings.warn("delay_trace is moved to NuRadioReco.utilities.signal_processing.delay_trace", DeprecationWarning) return signal_processing.delay_trace(*args, **kwargs)
[docs] def get_electric_field_energy_fluence(electric_field_trace, times, signal_window_mask=None, noise_window_mask=None): """ Compute the energy fluence of an electric field trace. Parameters ---------- electric_field_trace : 2d array The electric field trace. The first dimension corresponds to the number of traces/polarizations, the second dimension to the number of samples. times : 1d array The times corresponding to the electric field trace. signal_window_mask : 1d array, default=None A boolean mask to select the signal window. noise_window_mask : 1d array, default=None A boolean mask to select the noise window. Returns ------- energy_fluence : 1d array The energy fluence of the electric field per polarization. """ if signal_window_mask is None: f_signal = np.sum(electric_field_trace ** 2, axis=1) else: f_signal = np.sum(electric_field_trace[:, signal_window_mask] ** 2, axis=1) if noise_window_mask is not None and np.sum(noise_window_mask) > 0: f_noise = np.sum(electric_field_trace[:, noise_window_mask] ** 2, axis=1) f_signal -= f_noise * np.sum(signal_window_mask) / np.sum(noise_window_mask) dt = times[1] - times[0] return f_signal * dt * conversion_factor_integrated_signal
[docs] def get_stokes(trace_u, trace_v, window_samples=128, squeeze=True): """ Compute the stokes parameters for electric field traces Parameters ---------- trace_u : 1d array (float) The u component of the electric field trace trace_v : 1d array (float) The v component of the electric field trace. The two components should have equal lengths, and the (u, v) coordinates should be perpendicular. Common choices are (theta, phi) or (vxB, vxvxB) window_samples : int | None, default: 128 If not None, return a running average of the stokes parameters over ``window_samples``. If None, compute the stokes parameters over the whole trace (equivalent to ``window_samples=len(trace_u)``). squeeze : bool, default: True Only relevant if ``window_samples=None``. Squeezes out the second axis (which has a length of one) and returns an array of shape (4,) Returns ------- stokes : 2d array of floats The stokes parameters I, Q, U, V. The shape of the returned array is ``(4, len(trace_u) - window_samples +1)``, i.e. stokes[0] returns the I parameter, stokes[1] corresponds to Q, and so on. Examples -------- For an electric field defined in (eR, eTheta, ePhi) components, the stokes parameters can be given simply by: .. code-block:: get_stokes(electric_field.get_trace()[1], electric_field.get_trace()[2]) To instead get the stokes parameters in vxB and vxvxB, we need to first obtain the appropriate electric field components .. code-block:: cs = radiotools.coordinatesystems.cstrafo(zenith, azimuth, magnetic_field_vector) efield_trace_vxB_vxvxB = cs.transform_to_vxB_vxvxB( cs.transform_from_onsky_to_ground(efield.get_trace()) ) """ assert len(trace_u) == len(trace_v) h1 = scipy.signal.hilbert(trace_u) h2 = scipy.signal.hilbert(trace_v) stokes_i = np.abs(h1)**2 + np.abs(h2)**2 stokes_q = np.abs(h1)**2 - np.abs(h2)**2 uv = 2 * h1 * np.conjugate(h2) stokes_u = np.real(uv) stokes_v = np.imag(uv) stokes = np.array([stokes_i, stokes_q, stokes_u, stokes_v]) if window_samples == 1: # no need to average return stokes elif window_samples is None: window_samples = len(h1) stokes = np.asarray([ scipy.signal.convolve(i, np.ones(window_samples), mode='valid') for i in stokes ]) stokes /= window_samples if squeeze: return np.squeeze(stokes) return stokes
[docs] def peak_to_peak_amplitudes(trace, coincidence_window_size): """ Calculates all local peak to peak amplitudes of a given trace. Parameters ---------- trace: array of floats Array containing the trace coincidence_window_size: int Length along which to calculate minimum Returns ------- amplitudes: array of floats (same length as the input trace) Local peak to peak amplitudes """ amplitudes = scipy.ndimage.maximum_filter1d(trace, coincidence_window_size) - scipy.ndimage.minimum_filter1d(trace, coincidence_window_size) return amplitudes
[docs] def get_split_trace_noise_RMS(trace, segments=4, lowest=2): """ Calculates the noise root mean square (RMS) of a given trace. This method splits the trace into segments, then calculates the RMS of each segment, and then takes the mean of the lowest few segemts' RMS values. Parameters ---------- trace: array of floats Array containing the trace segments: int Amount of segments to cut the trace int lowest: int Amount of lowest segment rms values to use when calculating the mean RMS end result Returns ------- noise_root_mean_square: float The mean of the lowest few segments' RMS values """ split_array = np.array_split(trace, segments) split_array = np.array(split_array, dtype="object") #Objectify dtype to allow timetraces indivisible by amount of segments rms_of_splits = [np.std(split) for split in split_array] ordered_rmss = np.sort(rms_of_splits) lowest_rmss = ordered_rmss[:lowest] noise_root_mean_square = np.mean(lowest_rmss) return noise_root_mean_square
[docs] def get_signal_to_noise_ratio(trace, noise_rms, window_size=3): """ Computes the Signal to Noise Ratio (SNR) of a given trace. The signal to noise ratio is calculated as the peak to peak amplitude within a given window size divided by twice the noise root mean square (RMS). Parameters ---------- trace : array of floats Trace of a waveform noise_rms: float Noise root mean square (RMS) window_size: int Coincidence window size (default: 3) Returns ------- signal_to_noise_ratio: float Signal to Noise Ratio (SNR) value """ if not window_size >= 2: logger.error(f"Window size must be greater-equal 2 (but is {window_size})") raise ValueError(f"Window size must be greater-equal 2 (but is {window_size})") p2p = np.amax(peak_to_peak_amplitudes(trace, window_size)) signal_to_noise_ratio = p2p / (2 * noise_rms) return signal_to_noise_ratio
[docs] def get_root_power_ratio(trace, times, noise_rms): """ Computes the Root Power Ratio (RPR) of a given trace. It compares the peak signal strength to the baseline noise. The waveform’s power is smoothed using a 25 ns sliding window, and the square root of this smoothed power gives the rolling root power. The RPR is the maximum root power divided by the noise RMS Parameters ---------- trace: array of floats Trace of a waveform times: array of floats Times of a waveform noise_rms: float noise root mean square (RMS) Returns ------- root_power_ratio: float Root Power Ratio (RPR) value """ # Calculating RPR (Root Power Ratio) if noise_rms == 0: root_power_ratio = np.inf else: channel_wf = trace ** 2 # Calculate the smoothing window size based on sampling rate dt = times[1] - times[0] sum_win = 25 # Smoothing window in ns sum_win_idx = int(np.round(sum_win / dt)) # Convert window size to sample points channel_wf = np.sqrt(scipy.ndimage.uniform_filter1d(channel_wf, size=sum_win_idx, mode='constant')) # Find the maximum value of the smoothed waveform max_bin = np.argmax(channel_wf) max_val = channel_wf[max_bin] root_power_ratio = max_val / noise_rms return root_power_ratio
[docs] def get_hilbert_envelope(trace): """ Applies the Hilbert Tranform to a given waveform trace, then it will give us an envelope trace. Parameters ---------- trace: array of floats Trace of a waveform Returns ------- envelope: array of floats Hilbert envelope of the waveform trace """ # Get the Hilbert envelope of the waveform trace envelope = np.abs(scipy.signal.hilbert(trace)) return envelope
[docs] def get_impulsivity(trace): """ Calculates the impulsivity of a signal (trace). This function computes the impulsivity of a trace by first performing the Hilbert Transform to obtain an analytic signal (Hilbert envelope) and then determining the cumulative distribution function (CDF) of the square of the sorted envelope values i.e. power values based on their closeness to the maximum value. The average of the CDF is then scaled and returned as the impulsivity value. Parameters ---------- trace: array of floats Trace of a waveform Returns ------- impulsivity: float Impulsivity of the signal (scaled between 0 and 1) """ envelope = get_hilbert_envelope(trace) maxv = np.argmax(envelope) envelope_indexes = np.arange(len(envelope)) ## just a list of indices the same length as the array closeness = list( np.abs(envelope_indexes - maxv) ) ## create an array containing index distance to max voltage (lower the value, the closer it is) sorted_envelope = np.array([x for _, x in sorted(zip(closeness, envelope))]) cdf = np.cumsum(sorted_envelope**2) cdf = cdf / cdf[-1] impulsivity = (np.mean(np.asarray([cdf])) * 2.0) - 1.0 if impulsivity < 0: impulsivity = 0.0 return impulsivity
[docs] def get_coherent_sum(trace_set, ref_trace, use_envelope = False): """ Generates the coherently-summed waveform (CSW) of a sets of traces. This function finds the correlation between each trace from a set and the reference trace, then rolls all traces to align with the reference trace, and then add them up to get the CSW. Parameters ---------- trace_set: 2-D array of floats Traces of multiple channel waveforms without the reference trace ref_trace: 1-D array of floats Trace of the reference channel use_envelope: bool See if users would like to find the correlation between envelopes or just normal traces (default: False) Returns ------- sum_trace: 1-D array of floats CSW of the set of traces """ sum_trace = ref_trace for idx, trace in enumerate(trace_set): if use_envelope: sig_ref = get_hilbert_envelope(ref_trace) sig_i = get_hilbert_envelope(trace) else: sig_ref = ref_trace sig_i = trace cor = scipy.signal.correlate(sig_ref, sig_i, mode = "full") lag = int(np.argmax((cor)) - (np.size(cor)/2.)) aligned_trace = np.roll(trace, lag) sum_trace += aligned_trace return sum_trace
[docs] def get_entropy(trace, n_hist_bins = 50): """ Calculates the shannon entropy (randomness measurement) of a trace. Parameters ---------- trace: array of floats Trace of a waveform n_hist_bins: int Number of bins for the histogram (default: 50) Returns ------- entropy: float Shannon entropy of the signal (trace) """ # Step 1: Discretize the signal into bins # If density = True, the result is the value of the probability density function at the bin, # normalized such that the integral over the range is 1. hist, bin_edges = np.histogram(trace, bins = n_hist_bins, density = True) # Step 2: Calculate the probability distribution (normalized) probabilities = hist / np.sum(hist) # Step 3: Calculate Shannon Entropy # Using base = 2 for entropy in bits entropy = scipy.stats.entropy(probabilities, base = 2) return entropy
[docs] def get_kurtosis(trace): """ Calculates the kurtosis (tailedness) of a trace. Parameters ---------- trace: array of floats Trace of a waveform Returns ------- kurtosis: float Kurtosis of the signal (trace) """ kurtosis = scipy.stats.kurtosis(trace) return kurtosis
[docs] def is_NAN_or_INF(trace): """ To see if a trace has any NAN or INF. If there's any NAN or any INF, this function will tell us how many points of NAN and INF there are. Parameters ---------- trace: array of floats Trace of a waveform Returns ------- is_bad_trace: bool True if there's any or False if the trace doesn't have any unreadable point npoints_NAN: int Number of NAN points npoints_INF: int Number of INF points """ is_bad_trace = False trace = np.array(trace) npoints_NAN = len(np.argwhere(np.isnan(trace))) npoints_INF = len(np.argwhere(np.isinf(trace))) if npoints_NAN or npoints_INF: is_bad_trace = True return is_bad_trace, npoints_NAN, npoints_INF
[docs] def get_variable_window_size_correlation(data_trace, template_trace, window_size, sampling_rate=3.2*units.GHz, return_time_difference=False, debug=False): """ Calculate the correlation between two traces using a variable window size and matrix multiplication Parameters ---------- data_trace: array full trace of the data event template_trace: array full trace of the template window_size: float Size of the template window in nanoseconds sampling_rate: float sampling rate of the data and template trace return_time_difference: boolean if true, the time difference (for the maximal correlation value) between the starting of the data trace and the starting of the (cut) template trace is returned (returned time is in units.ns) debug: boolean if true, debug plots are created Returns ------- correlation : array of floats time_diff : float, optional The time difference of the maximal correlation value. Returned only if ``return_time_difference==True`` """ # preparing the traces data_trace = np.asarray(data_trace, dtype=float) template_trace = np.asarray(template_trace, dtype=float) # create the template window window_steps = int(window_size * sampling_rate) max_amp = np.max(abs(template_trace)) max_amp_i = np.where(abs(template_trace) == max_amp)[0][0] lower_bound = int(max_amp_i - window_steps / 3) upper_bound = int(max_amp_i + 2 * window_steps / 3) template_trace = template_trace[lower_bound:upper_bound] # zero padding on the data trace data_trace = np.append(np.zeros(len(template_trace) - 1), data_trace) data_trace = np.append(data_trace, np.zeros(len(template_trace) - 1)) if debug: plot_data_trace = data_trace.copy() # only calculate the correlation of the part of the trace where at least 10% of the maximum is visible (fastens the calculation) max_amp_data = np.max(abs(data_trace)) help_val = np.where(abs(data_trace) >= 0.1 * max_amp_data)[0] lower_bound_data = help_val[0] - (len(template_trace) - 1) upper_bound_data = help_val[len(help_val) - 1] + (len(template_trace) - 1) data_trace = data_trace[lower_bound_data:upper_bound_data] # run the correlation using matrix multiplication dataMatrix = np.lib.stride_tricks.sliding_window_view(data_trace, len(template_trace)) corr_numerator = dataMatrix.dot(template_trace) norm_dataMatrix = np.linalg.norm(dataMatrix, axis=1) norm_template_trace = np.linalg.norm(template_trace) corr_denominator = norm_dataMatrix * norm_template_trace correlation = corr_numerator / corr_denominator max_correlation = np.max(abs(correlation)) max_corr_i = np.where(abs(np.asarray(correlation)) == max_correlation)[0][0] if return_time_difference: # calculate the time difference between the beginning of the template and data trace for the largest correlation value # time difference is given in ns time_diff = (max_corr_i + (lower_bound_data - len(template_trace))) / sampling_rate logger.debug('max correlation: {}'.format(max_correlation)) if return_time_difference: logger.debug('time difference: {:.2f} ns'.format(time_diff)) if debug: import matplotlib.pyplot as plt fig, axs = plt.subplots(2) axs[0].plot(correlation) axs[0].plot(np.array([np.where(abs(correlation) == max(abs(correlation)))[0][0]]), np.array([max_correlation]), marker="x", markersize=12, color='tab:red') axs[0].set_ylim(-1.1, 1.1) axs[0].set_ylabel(r"$\chi$") axs[0].set_xlabel('N') axs[1].plot(plot_data_trace, label='complete data trace') x_data = np.arange(0, len(data_trace), 1) x_data = x_data + lower_bound_data axs[1].plot(x_data, data_trace, label='scanned data trace') x_template = np.arange(0, len(template_trace), 1) x_template = x_template + max_corr_i + lower_bound_data axs[1].plot(x_template, template_trace, label='template') axs[1].set_xlabel('time') axs[1].set_ylabel('amplitude') axs[1].legend() fig.savefig('debug_plots_get_variable_window_size_correlation.png') if return_time_difference: return correlation, time_diff else: return correlation