import numpy as np
from NuRadioReco.modules import channelGenericNoiseAdder
from NuRadioReco.utilities import units, fft
from NuRadioReco.modules.trigger.highLowThreshold import get_high_low_triggers
from NuRadioReco.detector import generic_detector as detector
from scipy import constants
import datetime
import scipy
import scipy.signal
import copy
import time
import logging
logger = logging.getLogger('noiseTriggerSimulation')
[docs]
def rolled_sum_roll(traces, rolling):
"""
calculates rolled sum via np.roll
Parameters
----------
traces: list
list containing 1D traces
rolling: list
list of offsets per trace for rolling
Returns
-------
sumtr: np.array
1D array of summed traces
"""
# assume first trace always has no rolling
sumtr = traces[0].copy()
for i in range(1, len(traces)):
sumtr += np.roll(traces[i], rolling[i])
return sumtr
[docs]
def rolling_indices(traces, rolling):
"""
pre calculates rolling index array for rolled sum via take
Parameters
----------
traces: list
list containing (at least one) 1D trace
rolling: list
list of offsets per trace for rolling
"""
rolling_indices = []
idx = np.arange(len(traces[0]))
for roll in rolling:
rolling_indices.append(np.roll(idx, roll))
return np.array(rolling_indices).astype(int)
[docs]
def rolled_sum_take(traces, rolling_indices):
"""
calculates rolled sum via np.take
Parameters
----------
traces: list
list containing 1D traces
rolling_indices: list
list of pre-calculated 1D index arrays for rolled sum
Returns
-------
sumtr: np.array
1D array of summed traces
"""
# assume first trace always has no rolling
sumtr = traces[0].copy()
for i in range(1, len(traces)):
sumtr += np.take(traces[i], rolling_indices[i])
return sumtr
[docs]
def rolled_sum_slicing(traces, rolling):
"""
calculates rolled sum via slicing
Parameters
----------
traces: list
list containing 1D traces
rolling: list
list of offsets per trace for rolling
Returns
-------
sumtr: np.array
1D array of summed traces
"""
if rolling[0]:
raise RuntimeError(f"Cannot have a roll value of {rolling[0]}!=0 for channel 0")
# assume first trace always has no rolling
sumtr = traces[0].copy()
for i in range(1, len(traces)):
r = rolling[i]
if r != 0:
sumtr[:r] += traces[i][-r:]
sumtr[r:] += traces[i][:-r]
else:
sumtr += traces[i]
return sumtr
[docs]
class thermalNoiseGenerator():
def __init__(self, n_samples, sampling_rate, Vrms, threshold, time_coincidence, n_majority, time_coincidence_majority,
n_channels, trigger_time, filt, noise_type="rayleigh", keep_full_band=False):
"""
Efficient algorithms to generate thermal noise fluctuations that fulfill a high/low trigger + a majority
coincidence logic (as used by ARIANNA)
Parameters
----------
n_samples: int
the number of samples of the trace
sampling_rate: float
the sampling rate
Vrms: float or list of floats
the RMS noise
if only one value is supplied, it is assumed to be the same for all channels
threshold: float or list of floats
the trigger threshold (assuming a symmetric high and low threshold)
if only one value is supplied, it is assumed to be the same for all channels
time_coincidence: float
the high/low coincidence time
n_majority: int
specifies how many channels need to have a trigger
time_coincidence_majority: float
multi channel coincidence window
n_channels: int
number of channels to generate
trigger_time: float
the trigger time (time when the trigger completes)
filt: array of floats or list of array of floats
the filter that should be applied after noise generation (needs to match frequency binning)
if only one array is supplied, it is assumed to be the same for all channels
noise_type: string
the type of the noise, can be
* "rayleigh" (default)
* "noise"
keep_full_band: bool
Flag to keep the full-band waveform instead of the one with the filter applied. Allows for
triggering on a different (i.e. `filt`) band than what is returned, (default is False)
"""
self.n_samples = n_samples
self.sampling_rate = sampling_rate
self.time_coincidence = time_coincidence
self.n_majority = n_majority
self.time_coincidence_majority = time_coincidence_majority
self.trigger_time = trigger_time
self.n_channels = n_channels
self.noise_type = noise_type
self.min_freq = 0 * units.MHz
self.max_freq = 0.5 * self.sampling_rate
self.dt = 1. / self.sampling_rate
self.ff = np.fft.rfftfreq(self.n_samples, 1. / self.sampling_rate)
self.Vrms = {}
if isinstance(Vrms, float):
for i in range(n_channels):
self.Vrms[i] = Vrms
else:
if len(Vrms) != n_channels:
raise ValueError(f"Vrms has {len(Vrms)} values, but {n_channels} channels are requested")
for i in range(n_channels):
self.Vrms[i] = Vrms[i]
self.threshold = {}
if isinstance(threshold, float):
for i in range(n_channels):
self.threshold[i] = threshold
else:
if len(threshold) != n_channels:
raise ValueError(f"threshold has {len(threshold)} values, but {n_channels} channels are requested")
for i in range(n_channels):
self.threshold[i] = threshold[i]
self.filt = {}
if isinstance(filt, np.ndarray):
if filt.ndim == 1:
for i in range(n_channels):
self.filt[i] = filt
elif filt.ndim == 2:
if filt.shape[0] != n_channels:
raise ValueError(f"filt has {filt.shape[0]} values, but {n_channels} channels are requested")
for i in range(n_channels):
self.filt[i] = filt[i]
else:
raise ValueError("filt must be a 1D or 2D numpy array")
elif isinstance(filt, list):
if len(filt) != n_channels:
raise ValueError(f"filt has {len(filt)} values, but {n_channels} channels are requested")
for i in range(n_channels):
self.filt[i] = np.array(filt[i])
else:
raise ValueError("filt must be a numpy array or a list of numpy arrays")
self.trigger_bin = int(self.trigger_time / self.dt)
self.trigger_bin_low = int((self.trigger_time - self.time_coincidence_majority) / self.dt)
self.norm = {}
self.amplitude = {}
for i in range(n_channels):
self.norm[i] = np.trapz(np.abs(self.filt[i]) ** 2, self.ff)
self.amplitude[i] = (self.max_freq - self.min_freq) ** 0.5 / self.norm[i] ** 0.5 * self.Vrms[i]
self.noise = channelGenericNoiseAdder.channelGenericNoiseAdder()
self.keep_full_band = keep_full_band
[docs]
def generate_noise(self):
"""
generates noise traces for all channels that will cause a high/low majority logic trigger
Returns np.array of shape (n_channels, n_samples)
"""
n_traces = [None] * self.n_channels
t_bins = [None] * self.n_channels
number_of_triggers = 0
while number_of_triggers < self.n_majority:
for iCh in range(self.n_channels):
if n_traces[iCh] is None:
spec = self.noise.bandlimited_noise(self.min_freq, self.max_freq, self.n_samples, self.sampling_rate,
self.amplitude[iCh], self.noise_type, time_domain=False)
if self.keep_full_band:
trace_copy = fft.freq2time(spec, self.sampling_rate)
spec *= self.filt[iCh]
trace = fft.freq2time(spec, self.sampling_rate)
if np.any(trace > self.threshold[iCh]) and np.any(trace < -self.threshold[iCh]):
triggered_bins = get_high_low_triggers(trace, self.threshold[iCh], -self.threshold[iCh], self.time_coincidence, self.dt)
if np.any(triggered_bins):
number_of_triggers += 1
t_bins[iCh] = triggered_bins
trace_to_keep = trace if not self.keep_full_band else trace_copy
if number_of_triggers == 1:
n_traces[iCh] = np.roll(trace_to_keep, self.trigger_bin - np.argwhere(triggered_bins == True)[0])
else:
tmp = np.random.randint(self.trigger_bin_low, self.trigger_bin)
n_traces[iCh] = np.roll(trace_to_keep, tmp - np.argwhere(triggered_bins == True)[0])
if number_of_triggers == self.n_majority:
# this additional break is needed because another channel might trigger
# within the channel loop before the while statement is checked
break
traces = np.zeros((self.n_channels, self.n_samples))
for iCh in range(self.n_channels):
if(n_traces[iCh] is not None):
traces[iCh] = n_traces[iCh]
else:
spec = self.noise.bandlimited_noise(self.min_freq, self.max_freq, self.n_samples, self.sampling_rate,
self.amplitude[iCh], type=self.noise_type, time_domain=False)
if self.keep_full_band:
traces[iCh] = fft.freq2time(spec, self.sampling_rate)
else:
traces[iCh] = fft.freq2time(spec * self.filt[iCh], self.sampling_rate)
return traces
from NuRadioReco.modules.analogToDigitalConverter import perfect_floor_comparator
[docs]
class thermalNoiseGeneratorPhasedArray():
def __init__(self, detector_filename, station_id, triggered_channels,
Vrms, threshold, ref_index,
noise_type="rayleigh", log_level=logging.NOTSET,
pre_trigger_time=100 * units.ns, trace_length=512 * units.ns, filt=None,
upsampling=2, window_length=16 * units.ns, step_size=8 * units.ns,
main_low_angle=np.deg2rad(-59.54968597864437),
main_high_angle=np.deg2rad(59.54968597864437),
n_beams=11, quantize=True):
"""
Efficient algorithms to generate thermal noise fluctuations that fulfill a phased array trigger
Parameters
----------
detector_filename: string
the filename to the detector description
station_id: int
the station id of the station from the detector file
triggered_channels: array of ints
list of channel ids that are part of the trigger
Vrms: float
the RMS noise
threshold: float
the trigger threshold (assuming a symmetric high and low threshold)
ref_index: float
reference refractive index for calculating time delays of the beams
Other Parameters
----------------
noise_type: string
the type of the noise, can be
* "rayleigh" (default)
* "noise"
log_level: logging enum, default warn
the print level for this module
pre_trigger_time: float, default 100 ns
the time in the trace before the trigger happens
trace_length: float, default 512 ns
the total trace length
filt: array of complex values, default None
the filter that should be applied after noise generation (needs to match frequency binning in upsampled domain)
if `None`, a default filter is calculated from 96 to 220 MHz
upsampling: int, default 2
factor by which the waveforms will be upsampled before calculating time delays and beamforming
window_length: float, default 16 ns
time interval of the integration window
step_size: float, default 8 ns
duration of a stride between window calcuations
main_low_angle: float, default -59.5 deg
angle (radians) of the lowest beam
main_high_angle: float 59.5 deg
angle (radians) of the highest beam
n_beams: int, default 11
number of beams to calculate
quantize: bool, default True
If set to true, the conversion to and from ADC will be performed to mimic digitizations
"""
logger.setLevel(log_level)
self.debug = False
self.max_amp = 0
self.upsampling = upsampling
self.det = detector.GenericDetector(json_filename=detector_filename)
self.det.update(datetime.datetime(2018, 10, 1))
self.n_samples = self.det.get_number_of_samples(station_id, triggered_channels[0]) # assuming same settings for all channels
self.sampling_rate = self.det.get_sampling_frequency(station_id, triggered_channels[0])
self.pre_trigger_bins = int(pre_trigger_time * self.sampling_rate)
self.n_samples_trigger = int(trace_length * self.sampling_rate)
if self.n_samples_trigger > self.n_samples:
raise ValueError(f"Requested `trace_length` of {trace_length/units.ns:.1f}ns ({self.n_samples_trigger} bins)" +
f" is longer than the number of samples specified in the detector file ({self.n_samples} bins)")
self.quantize = quantize
if self.quantize:
det_channel = self.det.get_channel(station_id, triggered_channels[0])
self.adc_n_bits = det_channel["trigger_adc_nbits"]
self.adc_noise_count = det_channel["trigger_adc_noise_count"]
self.n_channels = len(triggered_channels)
self.triggered_channels = triggered_channels
self.ant_z = {}
for i, channel_id in enumerate(self.triggered_channels):
self.ant_z[channel_id] = self.det.get_relative_position(station_id, channel_id)[2]
ref_z = np.max(np.fromiter(self.ant_z.values(), dtype=float))
# Need to add in delay for trigger delay
cable_delays = {}
for channel_id in triggered_channels:
cable_delays[channel_id] = self.det.get_cable_delay(station_id, channel_id)
phasing_angles = np.arcsin(np.linspace(np.sin(main_low_angle), np.sin(main_high_angle), n_beams))
cspeed = constants.c * units.m / units.s
self.beam_time_delays = np.zeros((len(phasing_angles), self.n_channels), dtype=np.int)
for iBeam, angle in enumerate(phasing_angles):
delays = []
for key in self.ant_z:
delays += [-(self.ant_z[key] - ref_z) / cspeed * ref_index * np.sin(angle) - cable_delays[key]]
delays -= np.max(delays)
roll = np.array(np.round(np.array(delays) * self.sampling_rate * self.upsampling)).astype(int)
# subtract off the delay of antenna 0 because `rolled_sum_slicing` assumes this is the case
self.beam_time_delays[iBeam] = roll - roll[0]
self.Vrms = Vrms
self.threshold = threshold
self.noise_type = noise_type
self.min_freq = 0 * units.MHz
self.max_freq = 0.5 * self.sampling_rate * self.upsampling
self.dt = 1. / self.sampling_rate
self.ff = np.fft.rfftfreq(self.n_samples * self.upsampling, 1. / (self.sampling_rate * self.upsampling))
# Construct a default filter if one is not supplied
if filt is None:
import NuRadioReco.modules.channelBandPassFilter
channelBandPassFilter = NuRadioReco.modules.channelBandPassFilter.channelBandPassFilter()
self.filt = channelBandPassFilter.get_filter(self.ff, station_id, channel_id, self.det,
passband=[96 * units.MHz, 100 * units.GHz], filter_type='cheby1', order=4, rp=0.1)
self.filt *= channelBandPassFilter.get_filter(self.ff, station_id, channel_id, self.det,
passband=[1 * units.MHz, 220 * units.MHz], filter_type='cheby1', order=7, rp=0.1)
else:
if len(filt) != len(self.ff):
raise ValueError(f"Frequency filter supplied has {len(filt)} bins. It should match the upsampled" +
f" frequency binning of {len(self.ff)} bins from {self.ff[0] / units.MHz:.0f} to {self.ff[-1] / units.MHz:.0f} MHz")
self.filt = np.array(filt)
self.norm = np.trapz(np.abs(self.filt) ** 2, self.ff)
self.amplitude = (self.max_freq - self.min_freq) ** 0.5 / self.norm ** 0.5 * self.Vrms
logger.info(f"Vrms = {self.Vrms:.3g}V, noise amplitude = {self.amplitude:.3g}V, bandwidth = {self.norm / units.MHz:.0f}MHz")
logger.info(f"frequency range {self.min_freq / units.MHz}MHz - {self.max_freq / units.MHz}MHz")
if self.quantize:
self.adc_ref_voltage = self.Vrms * (2 ** self.adc_n_bits - 1) / self.adc_noise_count
self.window = int(window_length * self.sampling_rate * self.upsampling)
self.step = int(step_size * self.sampling_rate * self.upsampling)
if self.window >= self.pre_trigger_bins:
logger.warning(f"Pre-trigger time ({pre_trigger_time / units.ns:0.2} ns, {self.pre_trigger_bins} bins)" +
f" is within one window ({self.window} bins) of the beginning of the waveform" +
" it is recommended to choose a larger value to avoid clipping effects")
self.noise = channelGenericNoiseAdder.channelGenericNoiseAdder()
# pre-calculate all parameters which will be used to simulate each triggered noise event to avoid re-calculation
self.noise.precalculate_bandlimited_noise_parameters(self.min_freq, self.max_freq, self.n_samples * self.upsampling,
self.sampling_rate * self.upsampling,
self.amplitude, self.noise_type)
def __generation(self):
""" separated trace generation part for PA noise trigger """
for iCh in range(self.n_channels):
# spec = self.noise.bandlimited_noise(self.min_freq, self.max_freq, self.n_samples * self.upsampling,
# self.sampling_rate * self.upsampling,
# self.amplitude, self.noise_type, time_domain=False)
# function that does not re-calculate parameters in each simulated trace
spec = self.noise.bandlimited_noise_from_precalculated_parameters(self.noise_type, time_domain=False)
spec *= self.filt
trace = fft.freq2time(spec, self.sampling_rate * self.upsampling)
if self.quantize:
self._traces[iCh] = perfect_floor_comparator(trace, self.adc_n_bits, self.adc_ref_voltage)
else:
self._traces[iCh] = trace
def __phasing(self):
""" separated phasing part for PA noise trigger """
self._phased_traces = np.zeros((len(self.beam_time_delays), self.n_samples * self.upsampling))
for iBeam, beam_time_delay in enumerate(self.beam_time_delays):
self._phased_traces[iBeam] += rolled_sum_slicing(self._traces, beam_time_delay)
def __phasing_roll(self):
""" separated phasing part for PA noise trigger via np.roll """
self._phased_traces = np.zeros((len(self.beam_time_delays), self.n_samples * self.upsampling))
for iBeam, beam_time_delay in enumerate(self.beam_time_delays):
for iCh in range(self.n_channels):
self._phased_traces[iBeam] += np.roll(self._traces[iCh], beam_time_delay[iCh])
def __triggering(self):
""" separated trigger part for PA noise trigger """
self.max_amp = 0
# take square over entire array
coh_sum_squared = self._phased_traces ** 2
# bin the data into windows of length self.step and normalise to step length
reduced_array = np.add.reduceat(coh_sum_squared.T, np.arange(0, np.shape(coh_sum_squared)[1], self.step)).T / self.step
sliding_windows = []
# self.window can extend over multiple steps,
# assuming self.window being an integer multiple of self.step the reduction sums over subsequent steps
steps_per_window = self.window // self.step
# better extend the array in order to also trigger on sum of last/first (matching a previous implementation)
extended_reduced_array = np.column_stack([reduced_array, reduced_array[:, 0:steps_per_window]])
for offset in range(steps_per_window):
# sum over steps_per_window adjacent steps
window_sum = np.add.reduceat(extended_reduced_array.T, np.arange(offset, np.shape(extended_reduced_array)[1], steps_per_window)).T / steps_per_window
sliding_windows.append(window_sum)
# self.max_amp = max(np.array(sliding_windows).max(), self.max_amp)
self.max_amp = np.array(sliding_windows).max()
# check if trigger condition is fulfilled anywhere
if self.max_amp > self.threshold:
sliding_windows = np.array(sliding_windows)
tmp = np.argwhere(sliding_windows > self.threshold)
triggered_step, triggered_beam, triggered_bin = tmp[0]
triggered_bin *= (self.window + triggered_step * steps_per_window)
if(self.debug):
# check in which beam the trigger condition was fulfilled
sliding_windows = np.concatenate(sliding_windows, axis=1)
triggered_beams = np.amax(sliding_windows, axis=1) > self.threshold
for iBeam, is_triggered in enumerate(triggered_beams):
# print out each beam that has triggered
if is_triggered:
logger.info(f"triggered at beam {iBeam}")
import matplotlib.pyplot as plt
fig, ax = plt.subplots(self.n_channels + 1, 1, sharex=True)
for iCh in range(self.n_channels):
ax[iCh].plot(self._traces[iCh])
logger.info(f"{self._traces[iCh].std():.2f}")
ax[self.n_channels].plot(self._phased_traces[iBeam])
fig.tight_layout()
plt.show()
return True, triggered_bin, triggered_beam
return False, None, None
def __triggering_strided(self):
""" separated trigger part for PA noise trigger using np.lib.stride_tricks.as_strided """
self.max_amp = 0
for iBeam, phased_trace in enumerate(self._phased_traces):
# Create a sliding window
coh_sum_squared = phased_trace ** 2
num_frames = int(np.floor((len(phased_trace) - self.window) / self.step))
coh_sum_windowed = np.lib.stride_tricks.as_strided(coh_sum_squared, (num_frames, self.window),
(coh_sum_squared.strides[0] * self.step, coh_sum_squared.strides[0]))
squared_mean = np.sum(coh_sum_windowed, axis=1) / self.window
# self.max_amp = max(squared_mean.max(), self.max_amp)
self.max_amp = max(squared_mean.max(), self.max_amp)
if True in (squared_mean > self.threshold):
triggered_bin = np.where(squared_mean > self.threshold)[0,0]
logger.debug(f"triggered at beam {iBeam}")
if(self.debug):
import matplotlib.pyplot as plt
fig, ax = plt.subplots(self.n_channels+1, 1, sharex=True)
for iCh in range(self.n_channels):
ax[iCh].plot(self._traces[iCh])
logger.info(f"{self._traces[iCh].std():.2f}")
ax[self.n_channels].plot(self._phased_traces[iBeam])
fig.tight_layout()
plt.show()
return True, triggered_bin, iBeam
return False, None, None
[docs]
def generate_noise(self, phasing_mode="slice", trigger_mode="binned_sum", debug=False):
"""
generates noise traces for all channels that will cause a high/low majority logic trigger
Parameters
----------
phasing_mode: string (default: "slice")
"slice" or "roll", two implementations for phasing by either slicing the array or using
np.roll. The default shows better performance, but "roll" is kept as an alternative
trigger_mode: string (default: "binned_sum")
"binned_sum" or "stride", two implementations for triggering
The default shows better performance, but "stride" is kept as an alternative
debug:
generate debug plot
Returns
-------
np.array of shape (n_channels, n_samples), index of triggered bin, index of triggered beam
"""
self.debug = debug
# some variables for profiling code
dt_generation = 0
dt_phasing = 0
dt_triggering = 0
# generate empty trace array
self._traces = np.zeros((self.n_channels, self.n_samples * self.upsampling))
counter = 0
while True:
counter += 1
if(counter % 1000 == 0):
logger.info(f"{counter:d}, {self.max_amp:.2g}, threshold = {self.threshold:.2g}")
# some printout for profiling
logger.info(f"Time consumption: GENERATION: {dt_generation:.4f}, PHASING: {dt_phasing:.4f}, TRIGGER: {dt_triggering:.4f}")
tstart = time.process_time()
self.__generation()
# time profiling generation
dt_generation += time.process_time() - tstart
tstart = time.process_time()
if phasing_mode == "slice":
self.__phasing()
elif phasing_mode == "roll":
# more time consuming attempt to do phasing compared to slicing the array
self.__phasing_roll()
else:
logger.error(f"Requested phasing_mode {phasing_mode}. Only 'slice' and 'roll' are allowed")
raise NotImplementedError(f"Requested phasing_mode {phasing_mode}. Only 'slice' and 'roll' are allowed")
# time profiling phasing
dt_phasing += time.process_time() - tstart
tstart = time.process_time()
if trigger_mode == "binned_sum":
is_triggered, triggered_bin, triggered_beam = self.__triggering()
elif trigger_mode == "stride":
# more time consuming attempt to do triggering compared to taking binned sums
is_triggered, triggered_bin, triggered_beam = self.__triggering_strided()
else:
logger.error(f"Requested trigger_mode {trigger_mode}. Only 'binned_sum' and 'stride' are allowed")
raise NotImplementedError(f"Requested trigger_mode {trigger_mode}. Only 'binned_sum' and 'stride' are supported")
# time profiling trigger
dt_triggering += time.process_time() - tstart
if is_triggered:
triggered_bin = triggered_bin // self.upsampling # the trace is cut in the downsampled version. Therefore, triggered bin is factor of two smaller.
i_low = triggered_bin - self.pre_trigger_bins
i_high = i_low + self.n_samples_trigger
# traces need to be downsampled
# resample and use axis -1 since trace might be either shape (N) for analytic trace or shape (3,N) for E-field
self._traces = scipy.signal.resample(self._traces, np.shape(self._traces)[-1] // self.upsampling, axis=-1)
if (i_low >= 0) and (i_high < self.n_samples): # If range is directly a subset of the waveform
return self._traces[:, i_low:i_high], self._phased_traces, triggered_beam
# Otherwise, roll the waveforms. Safe as long as noise is generated in the freq domain
self._phased_traces = np.roll(self._phased_traces, -i_low * self.upsampling, axis=-1)
self._traces = np.roll(self._traces, -i_low, axis=-1)
return self._traces[:, :self.n_samples_trigger], self._phased_traces, triggered_beam
[docs]
def generate_noise2(self, debug=False):
"""
generates noise traces for all channels that will cause a high/low majority logic trigger
This implementation is slow due to nested python loops each rolling the trace over and over again
Returns np.array of shape (n_channels, n_samples)
"""
traces = np.zeros((self.n_channels, self.n_samples * self.upsampling))
counter = 0
max_amp = 0
while True:
counter += 1
if(counter % 1000 == 0):
print(f"{counter:d}, {max_amp:.3g}, threshold = {self.threshold:.3g}")
for iCh in range(self.n_channels):
spec = self.noise.bandlimited_noise(self.min_freq, self.max_freq, self.n_samples * self.upsampling,
self.sampling_rate * self.upsampling,
self.amplitude, self.noise_type, time_domain=False)
spec *= self.filt
trace = fft.freq2time(spec, self.sampling_rate * self.upsampling)
if self.quantize:
self._traces[iCh] = perfect_floor_comparator(trace, self.adc_n_bits, self.adc_ref_voltage)
else:
self._traces[iCh] = trace
shifts = np.zeros(self.n_channels, dtype=np.int)
shifted_traces = copy.copy(traces)
for shift1 in np.arange(-100, 100, 4, dtype=np.int):
shifted_traces[1] = np.roll(traces[1], shift1)
shifts[1] = shift1
for shift2 in np.arange(-100, 100, 4, dtype=np.int):
shifts[2] = shift2
shifted_traces[2] = np.roll(traces[2], shift2)
for shift3 in np.arange(-100, 100, 4, dtype=np.int):
shifts[3] = shift3
shifted_traces[3] = np.roll(traces[3], shift3)
phased_trace = np.zeros(self.n_samples * self.upsampling)
for iCh in range(self.n_channels):
phased_trace += shifted_traces[iCh]
# Create a sliding window
coh_sum_squared = phased_trace ** 2
num_frames = int(np.floor((len(phased_trace) - self.window) / self.step))
coh_sum_windowed = np.lib.stride_tricks.as_strided(coh_sum_squared, (num_frames, self.window),
(coh_sum_squared.strides[0] * self.step, coh_sum_squared.strides[0]))
squared_mean = np.sum(coh_sum_windowed, axis=1) / self.window
max_amp = max(squared_mean.max(), max_amp)
if True in (squared_mean > self.threshold):
logger.info(f"triggered at beam {shifts}")
if(debug):
import matplotlib.pyplot as plt
fig, ax = plt.subplots(5, 1, sharex=True)
for iCh in range(self.n_channels):
ax[iCh].plot(traces[iCh])
logger.info(f"{traces[iCh].std():.2f}")
ax[4].plot(phased_trace)
fig.tight_layout()
plt.show()
return traces, phased_trace