Source code for NuRadioReco.modules.iftElectricFieldReconstructor.iftElectricFieldReconstructor

import numpy as np
from NuRadioReco.utilities import units, fft, trace_utilities, bandpass_filter
import NuRadioReco.utilities.trace_utilities
import NuRadioReco.detector.antennapattern
import NuRadioReco.detector.RNO_G.analog_components
import NuRadioReco.detector.ARIANNA.analog_components
from NuRadioReco.framework.parameters import electricFieldParameters as efp
from NuRadioReco.framework.parameters import channelParameters as chp
import NuRadioReco.modules.iftElectricFieldReconstructor.operators
import NuRadioReco.framework.base_trace
import NuRadioReco.framework.electric_field
import scipy
import nifty5 as ift
import matplotlib.pyplot as plt
import scipy.signal
import radiotools.helper


[docs]class IftElectricFieldReconstructor: """ Module that uses Information Field Theory to reconstruct the electric field. A description how this method works can be found at https://arxiv.org/abs/2102.00258 """ def __init__(self): self.__antenna_pattern_provider = NuRadioReco.detector.antennapattern.AntennaPatternProvider() self.__passband = None self.__filter_type = None self.__debug = False self.__efield_scaling = None self.__amp_dct = None self.__phase_dct = None self.__used_channel_ids = None self.__trace_samples = None self.__fft_operator = None self.__n_shifts = None self.__trace_start_times = None self.__n_iterations = None self.__n_samples = None self.__polarization = None self.__electric_field_template = None self.__convergence_level = None self.__relative_tolerance = None self.__use_sim = False self.__pulse_time_prior = None self.__pulse_time_uncertainty = None self.__phase_slope = None self.__slope_passbands = None self.__energy_fluence_passbands = None return
[docs] def begin( self, electric_field_template, passband=None, filter_type='butter', amp_dct=None, pulse_time_prior=20. * units.ns, pulse_time_uncertainty=5. * units.ns, n_iterations=5, n_samples=20, polarization='pol', relative_tolerance=1.e-7, convergence_level=3, energy_fluence_passbands=None, slope_passbands=None, phase_slope='both', debug=False ): """ Define settings for the reconstruction. Parameters ---------- electric_field_template: NuRadioReco.framework.base_trace.BaseTrace object BaseTrace (or child class) object containing an electric field template that is used to determine the position of the radio pulse in the channel waveforms. passband: list of floats or None Lower and upper bound of the filter that should be applied to the channel waveforms and the IFT model. If None is passed, no filter is applied filter_type: string Name of the filter type to be used. Has to be implemented in the NuRadioReco.utilities. bandpass_filter.get_filter_response function. Only used if passband is not None amp_dct: dictionary Dictionary containing the prior settings for the electric field spectrum pulse_time_prior: float Expected pulse time relative to the trace start time. Note that this is the time of the electric field pulse, not the voltage pulse pulse_time_uncertainty: float Uncertainty on the pulse time n_iterations: integer Number of times the IFT minimizer iterates. More iterations lead to better results, but increase run time. n_samples: integer Number of prior samples the IFT minimizer uses to find the maximum prior. Also the number of samples used to estimate uncertainties polarization: string Polarization of the reconstructed radio signal. If set to "theta" or "phi", only that component of the electric field is reconstructed. If set to "pol", both components are reconstructed. relative_tolerance: float Relative improvement for the minimizer in a cycle for the optimization to finish. convergence_level: integer Number of cycles the relative improvement of the minimizer has to be below relative_tolerance for the optimization to finish. energy_fluence_passbands: list of floats List of passbands for which the energy fluence is calculated slope_passbands: list of floats List of passbands to calculate the ratio of the energy fluences in different passbands. phase_slope: string Specifies the sign of the slope of the linear function describing the phase of the electric field. Options are "negative", "positive" and "both". If "both" is selected, positive and negative slopes are used and the best fit is selected. debug: bool If true, debug plots are drawn. """ self.__passband = passband self.__filter_type = filter_type self.__debug = debug self.__n_iterations = n_iterations self.__n_samples = n_samples self.__trace_samples = len(electric_field_template.get_times()) self.__polarization = polarization self.__electric_field_template = electric_field_template self.__convergence_level = convergence_level self.__relative_tolerance = relative_tolerance self.__pulse_time_prior = pulse_time_prior self.__pulse_time_uncertainty = pulse_time_uncertainty if phase_slope not in ['both', 'negative', 'positive']: raise ValueError('Phase slope has to be either both, negative of positive.') self.__phase_slope = phase_slope if slope_passbands is None: self.__slope_passbands = [ [ (130. * units.MHz, 200 * units.MHz), (200. * units.MHz, 350. * units.MHz) ] ] if energy_fluence_passbands is None: self.__energy_fluence_passbands = [ (130. * units.MHz, 500. * units.MHz) ] else: self.__slope_passbands = slope_passbands if amp_dct is None: self.__amp_dct = { 'n_pix': 64, # spectral bins # Spectral smoothness (affects Gaussian process part) 'a': .01, 'k0': 2., # Power-law part of spectrum: 'sm': -4.9, 'sv': .5, 'im': 2., 'iv': .5 } else: self.__amp_dct = amp_dct return
[docs] def make_priors_plot(self, event, station, detector, channel_ids): """ Plots samples from the prior distribution of the electric field. Parameters ---------- event: NuRadioReco.framework.event.Event object station: NuRadioReco.framework.station.Station object detector: NuRadioReco.detector.detector.Detector object or child object channel_ids: list of floats IDs of the channels to use for the electric field reconstruction """ self.__used_channel_ids = [] self.__efield_scaling = False self.__used_channel_ids = channel_ids self.__prepare_traces(event, station, detector) ref_channel = station.get_channel(self.__used_channel_ids[0]) sampling_rate = ref_channel.get_sampling_rate() time_domain = ift.RGSpace(self.__trace_samples) frequency_domain = time_domain.get_default_codomain() self.__fft_operator = ift.FFTOperator(frequency_domain.get_default_codomain()) amp_operators, filter_operator = self.__get_detector_operators( station, detector, frequency_domain, sampling_rate, ) self.__draw_priors(event, station, frequency_domain)
[docs] def run(self, event, station, detector, channel_ids, efield_scaling, use_sim=False): """ Run the electric field reconstruction Parameters ---------- event: NuRadioReco.framework.event.Event object station: NuRadioReco.framework.station.Station object detector: NuRadioReco.detector.detector.Detector object or child object channel_ids: list of integers IDs of the channels to be used for the electric field reconstruction efield_scaling: boolean If true, a small variation in the amplitude between channels is included in the IFT model. use_sim: boolean If true, the simChannels are used to identify the position of the radio pulse. """ self.__used_channel_ids = [] # only use channels with associated E-field and zenith self.__efield_scaling = efield_scaling self.__used_channel_ids = channel_ids self.__use_sim = use_sim self.__prepare_traces(event, station, detector) ref_channel = station.get_channel(self.__used_channel_ids[0]) sampling_rate = ref_channel.get_sampling_rate() time_domain = ift.RGSpace(self.__trace_samples) frequency_domain = time_domain.get_default_codomain() large_frequency_domain = ift.RGSpace(self.__trace_samples * 2, harmonic=True) self.__fft_operator = ift.FFTOperator(frequency_domain.get_default_codomain()) amp_operators, filter_operator = self.__get_detector_operators( station, detector, frequency_domain, sampling_rate, ) final_KL = None positive_reco_KL = None negative_reco_KL = None # Run Positive Phase Slope # if self.__phase_slope == 'both' or self.__phase_slope == 'positive': phase_slope = 2. * np.pi * self.__pulse_time_prior * self.__electric_field_template.get_sampling_rate() / self.__trace_samples phase_uncertainty = 2. * np.pi * self.__pulse_time_uncertainty * self.__electric_field_template.get_sampling_rate() / self.__trace_samples self.__phase_dct = { 'sm': phase_slope, 'sv': phase_uncertainty, 'im': 0., 'iv': 10. } likelihood = self.__get_likelihood_operator( frequency_domain, large_frequency_domain, amp_operators, filter_operator ) self.__draw_priors(event, station, frequency_domain) ic_sampling = ift.GradientNormController(1E-8, iteration_limit=min(1000, likelihood.domain.size)) H = ift.StandardHamiltonian(likelihood, ic_sampling) ic_newton = ift.DeltaEnergyController(name='newton', iteration_limit=200, tol_rel_deltaE=self.__relative_tolerance, convergence_level=self.__convergence_level) minimizer = ift.NewtonCG(ic_newton) median = ift.MultiField.full(H.domain, 0.) min_energy = None best_reco_KL = None for k in range(self.__n_iterations): print('----------->>> {} <<<-----------'.format(k)) KL = ift.MetricGaussianKL(median, H, self.__n_samples, mirror_samples=True) KL, convergence = minimizer(KL) median = KL.position if min_energy is None or KL.value < min_energy: min_energy = KL.value print('New min Energy', KL.value) best_reco_KL = KL if self.__phase_slope == 'both': suffix = '_positive_phase' else: suffix = '' if self.__debug: self.__draw_reconstruction( event, station, KL, suffix ) positive_reco_KL = best_reco_KL final_KL = best_reco_KL # Run Negative Phase Slope ### if self.__phase_slope == 'both' or self.__phase_slope == 'negative': phase_slope = 2. * np.pi * (self.__pulse_time_prior * self.__electric_field_template.get_sampling_rate() - self.__trace_samples) / self.__trace_samples phase_uncertainty = 2. * np.pi * self.__pulse_time_uncertainty * self.__electric_field_template.get_sampling_rate() / self.__trace_samples self.__phase_dct = { 'sm': phase_slope, 'sv': phase_uncertainty, 'im': 0., 'iv': 10. } likelihood = self.__get_likelihood_operator( frequency_domain, large_frequency_domain, amp_operators, filter_operator ) # self.__draw_priors(event, station, frequency_domain) ic_sampling = ift.GradientNormController(1E-8, iteration_limit=min(1000, likelihood.domain.size)) H = ift.StandardHamiltonian(likelihood, ic_sampling) ic_newton = ift.DeltaEnergyController(name='newton', iteration_limit=200, tol_rel_deltaE=self.__relative_tolerance, convergence_level=self.__convergence_level) minimizer = ift.NewtonCG(ic_newton) median = ift.MultiField.full(H.domain, 0.) min_energy = None best_reco_KL = None for k in range(self.__n_iterations): print('----------->>> {} <<<-----------'.format(k)) KL = ift.MetricGaussianKL(median, H, self.__n_samples, mirror_samples=True) KL, convergence = minimizer(KL) median = KL.position if min_energy is None or KL.value < min_energy: min_energy = KL.value print('New min Energy', KL.value) best_reco_KL = KL if self.__phase_slope == 'both': suffix = '_negative_phase' else: suffix = '' if self.__debug: self.__draw_reconstruction( event, station, KL, suffix ) negative_reco_KL = best_reco_KL final_KL = best_reco_KL if self.__phase_slope == 'both': if negative_reco_KL.value < positive_reco_KL.value: final_KL = negative_reco_KL else: final_KL = positive_reco_KL self.__store_reconstructed_efields( event, station, final_KL ) if self.__debug: self.__draw_reconstruction( event, station, final_KL, '' ) return True
def __prepare_traces( self, event, station, det ): """ Prepares the channel waveforms for the reconstruction by correcting for time differences between channels, cutting them to the right size and locating the radio pulse. """ if self.__debug: plt.close('all') fig1 = plt.figure(figsize=(18, 12)) ax1_1 = fig1.add_subplot(len(self.__used_channel_ids), 2, (1, 2 * len(self.__used_channel_ids) - 1)) fig2 = plt.figure(figsize=(18, 12)) self.__noise_levels = np.zeros(len(self.__used_channel_ids)) self.__n_shifts = np.zeros_like(self.__used_channel_ids) self.__trace_start_times = np.zeros(len(self.__used_channel_ids)) self.__data_traces = np.zeros((len(self.__used_channel_ids), self.__trace_samples)) max_channel_length = 0 passband = [100. * units.MHz, 200 * units.MHz] sim_channel_traces = [] for channel_id in self.__used_channel_ids: channel = station.get_channel(channel_id) if self.__use_sim: sim_channel_sum = NuRadioReco.framework.base_trace.BaseTrace() sim_channel_sum.set_trace(np.zeros_like(channel.get_trace()), channel.get_sampling_rate()) sim_channel_sum.set_trace_start_time(channel.get_trace_start_time()) for sim_channel in station.get_sim_station().get_channels_by_channel_id(channel_id): sim_channel_sum += sim_channel if sim_channel_sum.get_number_of_samples() > max_channel_length: max_channel_length = sim_channel_sum.get_number_of_samples() sim_channel_traces.append(sim_channel_sum) else: if channel.get_number_of_samples() > max_channel_length: max_channel_length = channel.get_number_of_samples() correlation_sum = np.zeros(self.__electric_field_template.get_number_of_samples() + max_channel_length) if self.__debug: plt.close('all') fig1 = plt.figure(figsize=(16, 8)) ax1_1 = fig1.add_subplot(121) ax1_1.grid() ax1_2 = fig1.add_subplot(122) ax1_2.grid() fig2 = plt.figure(figsize=(12, 12)) channel_trace_templates = np.zeros((len(self.__used_channel_ids), len(self.__electric_field_template.get_trace()))) for i_channel, channel_id in enumerate(self.__used_channel_ids): channel = station.get_channel(channel_id) amp_response = det.get_amplifier_response(station.get_id(), channel_id, self.__electric_field_template.get_frequencies()) antenna_orientation = det.get_antenna_orientation(station.get_id(), channel_id) antenna_pattern = self.__antenna_pattern_provider.load_antenna_pattern(det.get_antenna_model(station.get_id(), channel_id)) antenna_response = antenna_pattern.get_antenna_response_vectorized( self.__electric_field_template.get_frequencies(), channel.get_parameter(chp.signal_receiving_zenith), 0., antenna_orientation[0], antenna_orientation[1], antenna_orientation[2], antenna_orientation[3] ) channel_spectrum_template = fft.time2freq( self.__electric_field_template.get_filtered_trace(passband, filter_type='butterabs'), self.__electric_field_template.get_sampling_rate() ) * amp_response * (antenna_response['theta'] + antenna_response['phi']) channel_trace_template = fft.freq2time(channel_spectrum_template, self.__electric_field_template.get_sampling_rate()) channel_trace_templates[i_channel] = channel_trace_template channel.apply_time_shift(-channel.get_parameter(chp.signal_time_offset), True) if self.__use_sim: sim_channel_traces[i_channel].apply_time_shift(-channel.get_parameter(chp.signal_time_offset), True) channel_trace = sim_channel_traces[i_channel].get_filtered_trace(passband, filter_type='butterabs') else: channel_trace = channel.get_filtered_trace(passband, filter_type='butterabs') if self.__use_sim: correlation = radiotools.helper.get_normalized_xcorr(np.abs(scipy.signal.hilbert(channel_trace_template)), np.abs(scipy.signal.hilbert(channel_trace))) else: correlation = radiotools.helper.get_normalized_xcorr(channel_trace_template, channel_trace) correlation = np.abs(correlation) correlation_sum[:len(correlation)] += correlation toffset = -(np.arange(0, correlation.shape[0]) - len(channel_trace)) / channel.get_sampling_rate() # - propagation_times[i_channel, i_solution] - channel.get_trace_start_time() if self.__use_sim: sim_channel_traces[i_channel].apply_time_shift(channel.get_parameter(chp.signal_time_offset), True) # else: # channel.apply_time_shift(channel.get_parameter(chp.signal_time_offset), True) if self.__debug: ax1_1.plot(toffset, correlation) for i_channel, channel_id in enumerate(self.__used_channel_ids): channel = station.get_channel(channel_id) time_offset = channel.get_parameter(chp.signal_time_offset) channel_trace = channel.get_filtered_trace(passband, filter_type='butterabs') toffset = -(np.arange(0, correlation_sum.shape[0]) - len(channel_trace)) / channel.get_sampling_rate() if self.__debug: ax2_1 = fig2.add_subplot(len(self.__used_channel_ids), 2, 2 * i_channel + 1) ax2_1.grid() ax2_1.plot(channel.get_times(), channel_trace / units.mV, c='C0', alpha=1.) ax2_1.set_title('Channel {}'.format(channel_id)) ax2_1.plot(self.__electric_field_template.get_times() + channel.get_trace_start_time() + toffset[np.argmax(correlation_sum)], channel_trace_templates[i_channel] / np.max(channel_trace_templates[i_channel]) * np.max(channel_trace) / units.mV, c='C1') sim_channel_sum = None for sim_channel in station.get_sim_station().iter_channels(): if sim_channel.get_id() == channel_id: if sim_channel_sum is None: sim_channel_sum = sim_channel else: sim_channel_sum += sim_channel if sim_channel_sum is not None: sim_channel_sum.apply_time_shift(-channel.get_parameter(chp.signal_time_offset), True) ax2_1.plot(sim_channel_sum.get_times(), sim_channel_sum.get_filtered_trace(passband, filter_type='butterabs') / units.mV, c='k', alpha=.5) ax2_1.set_xlim([sim_channel_sum.get_trace_start_time() - 50, sim_channel_sum.get_times()[-1] + 50]) sim_channel_sum.apply_time_shift(channel.get_parameter(chp.signal_time_offset), True) channel.apply_time_shift(-toffset[np.argmax(correlation_sum)]) self.__data_traces[i_channel] = channel.get_trace()[:self.__trace_samples] self.__noise_levels[i_channel] = np.sqrt(np.mean(channel.get_trace()[self.__trace_samples + 1:]**2)) self.__n_shifts[i_channel] = int((toffset[np.argmax(correlation_sum)] + time_offset) * channel.get_sampling_rate()) self.__trace_start_times[i_channel] = channel.get_trace_start_time() + (toffset[np.argmax(correlation_sum)] + time_offset) if self.__debug: ax2_2 = fig2.add_subplot(len(self.__used_channel_ids), 2, 2 * i_channel + 2) ax2_2.grid() ax2_2.plot(np.arange(len(self.__data_traces[i_channel])) / channel.get_sampling_rate(), self.__data_traces[i_channel]) channel.apply_time_shift(channel.get_parameter(chp.signal_time_offset) + toffset[np.argmax(correlation_sum)], True) self.__scaling_factor = np.max(self.__data_traces) self.__data_traces /= self.__scaling_factor self.__noise_levels /= self.__scaling_factor if self.__debug: ax1_2.plot(correlation_sum) fig2.tight_layout() fig2.savefig('{}_{}_traces.png'.format(event.get_run_number(), event.get_id())) def __get_detector_operators( self, station, detector, frequency_domain, sampling_rate ): """ Creates the operators to simulate the detector response. """ amp_operators = [] self.__gain_scaling = [] self.__classic_efield_recos = [] frequencies = frequency_domain.get_k_length_array().val / self.__trace_samples * sampling_rate hardware_responses = np.zeros((len(self.__used_channel_ids), 2, len(frequencies)), dtype=complex) if self.__passband is not None: b, a = scipy.signal.butter(10, self.__passband, 'bandpass', analog=True) w, h = scipy.signal.freqs(b, a, frequencies) filter_field = ift.Field(ift.DomainTuple.make(frequency_domain), np.abs(h)) filter_operator = ift.DiagonalOperator(filter_field, frequency_domain) if self.__filter_type == 'butter': filter_phase = np.unwrap(np.angle(h)) else: filter_phase = 0 else: filter_operator = ift.ScalingOperator(1., frequency_domain) filter_phase = 0 for i_channel, channel_id in enumerate(self.__used_channel_ids): channel = station.get_channel(channel_id) receiving_zenith = channel.get_parameter(chp.signal_receiving_zenith) if channel.has_parameter(chp.signal_receiving_azimuth): receive_azimuth = channel.get_parameter(chp.signal_receiving_azimuth) else: receive_azimuth = 0. antenna_response = NuRadioReco.utilities.trace_utilities.get_efield_antenna_factor(station, frequencies, [channel_id], detector, receiving_zenith, receive_azimuth, self.__antenna_pattern_provider)[0] amp_response = detector.get_amplifier_response(station.get_id(), channel_id, frequencies) amp_gain = np.abs(amp_response) amp_phase = np.unwrap(np.angle(amp_response)) total_gain = np.abs(amp_gain) * np.abs(antenna_response) total_phase = np.unwrap(np.angle(antenna_response)) + amp_phase + filter_phase total_phase[:, total_phase.shape[1] // 2:] *= -1 total_phase[:, total_phase.shape[1] // 2 + 1] = 0 total_phase *= -1 hardware_responses[i_channel, 0] = (total_gain * np.exp(1.j * total_phase))[0] hardware_responses[i_channel, 1] = (total_gain * np.exp(1.j * total_phase))[1] max_gain = np.max(np.abs(hardware_responses)) self.__gain_scaling = max_gain hardware_responses /= max_gain for i_channel, channel_id in enumerate(self.__used_channel_ids): amp_field_theta = ift.Field(ift.DomainTuple.make(frequency_domain), hardware_responses[i_channel][0]) amp_field_phi = ift.Field(ift.DomainTuple.make(frequency_domain), hardware_responses[i_channel][1]) amp_operators.append([ift.DiagonalOperator(amp_field_theta), ift.DiagonalOperator(amp_field_phi)]) return amp_operators, filter_operator def __get_likelihood_operator( self, frequency_domain, large_frequency_domain, hardware_operators, filter_operator ): """ Creates the IFT model from which the maximum posterior is calculated """ power_domain = ift.RGSpace(large_frequency_domain.get_default_codomain().shape[0], harmonic=True) power_space = ift.PowerSpace(power_domain) self.__amp_dct['target'] = power_space A = ift.SLAmplitude(**self.__amp_dct) self.__power_spectrum_operator = A correlated_field = ift.CorrelatedField(large_frequency_domain.get_default_codomain(), A) realizer = ift.Realizer(self.__fft_operator.domain) realizer2 = ift.Realizer(self.__fft_operator.target) inserter = NuRadioReco.modules.iftElectricFieldReconstructor.operators.Inserter(realizer.target) large_sp = correlated_field.target small_sp = ift.RGSpace(large_sp.shape[0] // 2, large_sp[0].distances) zero_padder = ift.FieldZeroPadder(small_sp, large_sp.shape, central=False) domain_flipper = NuRadioReco.modules.iftElectricFieldReconstructor.operators.DomainFlipper(zero_padder.domain, target=ift.RGSpace(small_sp.shape, harmonic=True)) mag_S_h = (domain_flipper @ zero_padder.adjoint @ correlated_field) mag_S_h = NuRadioReco.modules.iftElectricFieldReconstructor.operators.SymmetrizingOperator(mag_S_h.target) @ mag_S_h subtract_one = ift.Adder(ift.Field(mag_S_h.target, -6)) mag_S_h = realizer2.adjoint @ (subtract_one @ mag_S_h).exp() fft_operator = ift.FFTOperator(frequency_domain.get_default_codomain()) scaling_domain = ift.UnstructuredDomain(1) add_one = ift.Adder(ift.Field(inserter.domain, 1)) polarization_domain = ift.UnstructuredDomain(1) likelihood = None self.__efield_trace_operators = [] self.__efield_spec_operators = [] self.__channel_trace_operators = [] self.__channel_spec_operators = [] polarization_inserter = NuRadioReco.modules.iftElectricFieldReconstructor.operators.Inserter(mag_S_h.target) polarization_field = realizer2 @ polarization_inserter @ (2. * ift.FieldAdapter(polarization_domain, 'pol')) for i_channel, channel_id in enumerate(self.__used_channel_ids): phi_S_h = NuRadioReco.modules.iftElectricFieldReconstructor.operators.SlopeSpectrumOperator(frequency_domain.get_default_codomain(), self.__phase_dct['sm'], self.__phase_dct['im'], self.__phase_dct['sv'], self.__phase_dct['iv']) phi_S_h = realizer2.adjoint @ phi_S_h scaling_field = (inserter @ add_one @ (.1 * ift.FieldAdapter(scaling_domain, 'scale{}'.format(i_channel)))) if self.__polarization == 'theta': efield_spec_operator_theta = ((filter_operator @ (mag_S_h * (1.j * phi_S_h).exp()))) efield_spec_operator_phi = None channel_spec_operator = (hardware_operators[i_channel][0] @ efield_spec_operator_theta) elif self.__polarization == 'phi': efield_spec_operator_theta = None efield_spec_operator_phi = ((filter_operator @ (mag_S_h * (1.j * phi_S_h).exp()))) channel_spec_operator = (hardware_operators[i_channel][1] @ efield_spec_operator_phi) elif self.__polarization == 'pol': efield_spec_operator_theta = ((filter_operator @ ((mag_S_h * polarization_field.cos()) * (1.j * phi_S_h).exp()))) efield_spec_operator_phi = ((filter_operator @ ((mag_S_h * polarization_field.sin()) * (1.j * phi_S_h).exp()))) channel_spec_operator = (hardware_operators[i_channel][0] @ efield_spec_operator_theta) + (hardware_operators[i_channel][1] @ efield_spec_operator_phi) else: raise ValueError('Unrecognized polarization setting {}. Possible values are theta, phi and pol'.format(self.__polarization)) efield_spec_operators = [ efield_spec_operator_theta, efield_spec_operator_phi ] efield_trace_operator = [] if self.__efield_scaling: for efield_spec_operator in efield_spec_operators: if efield_spec_operator is not None: efield_trace_operator.append(((realizer @ fft_operator.inverse @ efield_spec_operator)) * scaling_field) else: efield_trace_operator.append(None) channel_trace_operator = ((realizer @ fft_operator.inverse @ (channel_spec_operator))) * scaling_field else: for efield_spec_operator in efield_spec_operators: if efield_spec_operator is not None: efield_trace_operator.append(((realizer @ fft_operator.inverse @ efield_spec_operator))) else: efield_trace_operator.append(None) channel_trace_operator = ((realizer @ fft_operator.inverse @ (channel_spec_operator))) noise_operator = ift.ScalingOperator(self.__noise_levels[i_channel]**2, frequency_domain.get_default_codomain()) data_field = ift.Field(ift.DomainTuple.make(frequency_domain.get_default_codomain()), self.__data_traces[i_channel]) self.__efield_spec_operators.append(efield_spec_operators) self.__efield_trace_operators.append(efield_trace_operator) self.__channel_spec_operators.append(channel_spec_operator) self.__channel_trace_operators.append(channel_trace_operator) if likelihood is None: likelihood = ift.GaussianEnergy(mean=data_field, inverse_covariance=noise_operator.inverse)(self.__channel_trace_operators[i_channel]) else: likelihood += ift.GaussianEnergy(mean=data_field, inverse_covariance=noise_operator.inverse)(self.__channel_trace_operators[i_channel]) return likelihood def __store_reconstructed_efields( self, event, station, KL ): """ Ads electric fields containing the reconstruction results to the station """ if self.__efield_scaling: for i_channel, channel_id in enumerate(self.__used_channel_ids): efield = self.__get_reconstructed_efield(KL, i_channel) station.add_electric_field(efield) else: # The reconstructed electric field is the same for all channels, so it does not matter what we pick for # i_channel efield = self.__get_reconstructed_efield(KL, 0) efield.set_channel_ids(self.__used_channel_ids) station.add_electric_field(efield) def __get_reconstructed_efield( self, KL, i_channel ): """ Creates an electric field object containing the reconstruction results. """ median = KL.position efield_stat_calculators = [ift.StatCalculator(), ift.StatCalculator()] polarization_stat_calculator = ift.StatCalculator() energy_fluence_stat_calculator = ift.StatCalculator() slope_parameter_stat_calculator = ift.StatCalculator() rec_efield = np.zeros((3, self.__electric_field_template.get_number_of_samples())) sampling_rate = self.__electric_field_template.get_sampling_rate() times = np.arange(self.__data_traces.shape[1]) / sampling_rate freqs = np.fft.rfftfreq(rec_efield.shape[1], 1. / sampling_rate) for sample in KL.samples: efield_sample_pol = np.zeros_like(rec_efield) if self.__efield_trace_operators[i_channel][0] is not None: efield_sample_theta = self.__efield_trace_operators[i_channel][0].force(median + sample).val efield_stat_calculators[0].add(efield_sample_theta) efield_sample_pol[1] = efield_sample_theta if self.__efield_trace_operators[i_channel][1] is not None: efield_sample_phi = self.__efield_trace_operators[i_channel][1].force(median + sample).val efield_stat_calculators[1].add(efield_sample_phi) efield_sample_pol[2] = efield_sample_phi if self.__polarization == 'pol': energy_fluences = trace_utilities.get_electric_field_energy_fluence( efield_sample_pol, times ) polarization_stat_calculator.add(np.arctan(np.sqrt(energy_fluences[2]) / np.sqrt(energy_fluences[1]))) e_fluences = np.zeros((len(self.__energy_fluence_passbands), 3)) for i_passband, passband in enumerate(self.__energy_fluence_passbands): filter_response = bandpass_filter.get_filter_response(freqs, passband, 'butter', 10) e_fluence = trace_utilities.get_electric_field_energy_fluence( fft.freq2time(fft.time2freq(efield_sample_pol, sampling_rate) * filter_response, sampling_rate) * self.__scaling_factor / self.__gain_scaling, times ) e_fluence[0] = np.sum(np.abs(e_fluence)) e_fluences[i_passband] = e_fluence energy_fluence_stat_calculator.add(e_fluences) slopes = np.zeros((len(self.__slope_passbands), 3)) for i_passband, passbands in enumerate(self.__slope_passbands): filter_response_1 = bandpass_filter.get_filter_response(freqs, passbands[0], 'butter', 10) e_fluence_1 = trace_utilities.get_electric_field_energy_fluence( fft.freq2time(fft.time2freq(efield_sample_pol, sampling_rate) * filter_response_1, sampling_rate) * self.__scaling_factor / self.__gain_scaling, times ) e_fluence_1[0] = np.sum(np.abs(e_fluence_1)) filter_response_2 = bandpass_filter.get_filter_response(freqs, passbands[1], 'butter', 10) e_fluence_2 = trace_utilities.get_electric_field_energy_fluence( fft.freq2time(fft.time2freq(efield_sample_pol, sampling_rate) * filter_response_2, sampling_rate) * self.__scaling_factor / self.__gain_scaling, times ) e_fluence_2[0] = np.sum(np.abs(e_fluence_2)) if self.__polarization == 'pol': slopes[i_passband] = e_fluence_1[0] / e_fluence_2[0] elif self.__polarization == 'theta': slopes[i_passband] = e_fluence_1[1] / e_fluence_2[1] else: slopes[i_passband] = e_fluence_1[2] / e_fluence_2[2] slope_parameter_stat_calculator.add(slopes) if self.__efield_trace_operators[i_channel][0] is not None: rec_efield[1] = efield_stat_calculators[0].mean * self.__scaling_factor / self.__gain_scaling if self.__efield_trace_operators[i_channel][1] is not None: rec_efield[2] = efield_stat_calculators[1].mean * self.__scaling_factor / self.__gain_scaling efield = NuRadioReco.framework.electric_field.ElectricField([self.__used_channel_ids[i_channel]]) efield.set_trace(rec_efield, self.__electric_field_template.get_sampling_rate()) if self.__polarization == 'pol': efield.set_parameter(efp.polarization_angle, polarization_stat_calculator.mean) efield.set_parameter_error(efp.polarization_angle, np.sqrt(polarization_stat_calculator.var)) energy_fluence_dict = {} slope_dict = {} for i_passband, passband in enumerate(self.__energy_fluence_passbands): energy_fluence_dict['{:.0f}-{:.0f}'.format(passband[0] / units.MHz, passband[1] / units.MHz)] = energy_fluence_stat_calculator.mean[i_passband] for i_passband, passbands in enumerate(self.__slope_passbands): slope_dict['{:.0f}-{:.0f}, {:.0f}-{:.0f}'.format(passbands[0][0], passbands[0][1], passbands[1][0], passbands[1][1])] = slope_parameter_stat_calculator.mean[i_passband] energy_fluence_error = np.sqrt(energy_fluence_stat_calculator.var) efield.set_parameter(efp.signal_energy_fluence, energy_fluence_dict) efield.set_parameter_error(efp.signal_energy_fluence, energy_fluence_error) efield.set_parameter(efp.energy_fluence_ratios, slope_dict) efield.set_parameter_error(efp.energy_fluence_ratios, np.sqrt(slope_parameter_stat_calculator.var)) return efield def __draw_priors( self, event, station, freq_space ): """ Draws samples from the prior distribution of the electric field spectrum. """ plt.close('all') fig1 = plt.figure(figsize=(12, 8)) ax1_0 = fig1.add_subplot(3, 2, (1, 2)) ax1_1 = fig1.add_subplot(323) ax1_2 = fig1.add_subplot(324) ax1_3 = fig1.add_subplot(325) ax1_4 = fig1.add_subplot(326) sampling_rate = station.get_channel(self.__used_channel_ids[0]).get_sampling_rate() times = np.arange(self.__data_traces.shape[1]) / sampling_rate freqs = freq_space.get_k_length_array().val / self.__data_traces.shape[1] * sampling_rate alpha = .8 for i in range(8): x = ift.from_random('normal', self.__efield_trace_operators[0][0].domain) efield_spec_sample = self.__efield_spec_operators[0][0].force(x) ax1_1.plot(freqs / units.MHz, np.abs(efield_spec_sample.val) / np.max(np.abs(efield_spec_sample.val)), c='C{}'.format(i), alpha=alpha) efield_trace_sample = self.__efield_trace_operators[0][0].force(x) ax1_2.plot(times, efield_trace_sample.val / np.max(np.abs(efield_trace_sample.val))) channel_spec_sample = self.__channel_spec_operators[0].force(x) ax1_3.plot(freqs / units.MHz, np.abs(channel_spec_sample.val)) # / np.max(np.abs(channel_spec_sample.val)), c='C{}'.format(i), alpha=alpha) channel_trace_sample = self.__channel_trace_operators[0].force(x) ax1_4.plot(times, channel_trace_sample.val / np.max(np.abs(channel_trace_sample.val)), c='C{}'.format(i), alpha=alpha) a = self.__power_spectrum_operator.force(x).val power_freqs = self.__power_spectrum_operator.target[0].k_lengths / self.__data_traces.shape[1] * sampling_rate ax1_0.plot(power_freqs, a, c='C{}'.format(i), alpha=alpha) ax1_0.grid() ax1_0.set_xscale('log') ax1_0.set_yscale('log') ax1_0.set_title('Power Spectrum') ax1_0.set_xlabel('k') ax1_0.set_ylabel('A') ax1_1.grid() ax1_1.set_xlim([50, 750]) ax1_2.grid() ax1_2.set_xlabel('t [ns]') ax1_2.set_ylabel('E [a.u.]') ax1_2.set_title('E-Field Trace') ax1_3.grid() ax1_3.set_xlim([50, 750]) ax1_4.grid() ax1_4.set_xlim([0, 150]) ax1_1.set_xlabel('f [MHz]') # ax1_2.set_xlabel('t [ns]') ax1_3.set_xlabel('f [MHz]') ax1_4.set_xlabel('t [ns]') ax1_1.set_ylabel('E [a.u.]') # ax1_2.set_ylabel('E [a.u.]') ax1_3.set_ylabel('U [a.u.]') ax1_4.set_ylabel('U [a.u.]') ax1_1.set_title('E-Field Spectrum') # ax1_2.set_title('E-Field Trace') ax1_3.set_title('Channel Spectrum') ax1_4.set_title('Channel Trace') fig1.tight_layout() fig1.savefig('priors_{}_{}.png'.format(event.get_id(), event.get_run_number())) def __draw_reconstruction( self, event, station, KL, suffix='' ): """ Draw plots showing the results of the reconstruction. """ plt.close('all') fontsize = 16 n_channels = len(self.__used_channel_ids) median = KL.position sampling_rate = station.get_channel(self.__used_channel_ids[0]).get_sampling_rate() fig1 = plt.figure(figsize=(16, 4 * n_channels)) fig2 = plt.figure(figsize=(16, 4 * n_channels)) freqs = np.fft.rfftfreq(self.__data_traces.shape[1], 1. / sampling_rate) classic_mean_efield_spec = np.zeros_like(freqs) classic_mean_efield_spec /= len(self.__used_channel_ids) for i_channel, channel_id in enumerate(self.__used_channel_ids): times = np.arange(self.__data_traces.shape[1]) / sampling_rate + self.__trace_start_times[i_channel] trace_stat_calculator = ift.StatCalculator() amp_trace_stat_calculator = ift.StatCalculator() efield_stat_calculators = [ift.StatCalculator(), ift.StatCalculator()] amp_efield_stat_calculators = [ift.StatCalculator(), ift.StatCalculator()] if self.__polarization == 'pol': ax1_1 = fig1.add_subplot(n_channels, 3, 3 * i_channel + 1) ax1_2 = fig1.add_subplot(n_channels, 3, 3 * i_channel + 2) ax1_3 = fig1.add_subplot(n_channels, 3, 3 * i_channel + 3, sharey=ax1_2) ax1_2.set_title(r'$\theta$ component', fontsize=fontsize) ax1_3.set_title(r'$\varphi$ component', fontsize=fontsize) else: ax1_1 = fig1.add_subplot(n_channels, 2, 2 * i_channel + 1) ax1_2 = fig1.add_subplot(n_channels, 2, 2 * i_channel + 2) ax2_1 = fig2.add_subplot(n_channels, 1, i_channel + 1) for sample in KL.samples: for i_pol, efield_stat_calculator in enumerate(efield_stat_calculators): channel_sample_trace = self.__channel_trace_operators[i_channel].force(median + sample).val trace_stat_calculator.add(channel_sample_trace) amp_trace = np.abs(fft.time2freq(channel_sample_trace, sampling_rate)) amp_trace_stat_calculator.add(amp_trace) ax2_1.plot(times, channel_sample_trace * self.__scaling_factor / units.mV, c='k', alpha=.2) ax1_1.plot(freqs / units.MHz, amp_trace * self.__scaling_factor / units.mV, c='k', alpha=.2) if self.__efield_trace_operators[i_channel][i_pol] is not None: efield_sample_trace = self.__efield_trace_operators[i_channel][i_pol].force(median + sample).val efield_stat_calculator.add(efield_sample_trace) amp_efield = np.abs(fft.time2freq(efield_sample_trace, sampling_rate)) amp_efield_stat_calculators[i_pol].add(amp_efield) if self.__polarization == 'pol': if i_pol == 0: ax1_2.plot(freqs / units.MHz, amp_efield * self.__scaling_factor / self.__gain_scaling / (units.mV / units.m / units.GHz), c='k', alpha=.2) else: ax1_3.plot(freqs / units.MHz, amp_efield * self.__scaling_factor / self.__gain_scaling / (units.mV / units.m / units.GHz), c='k', alpha=.2) else: ax1_2.plot(freqs / units.MHz, amp_efield * self.__scaling_factor / self.__gain_scaling / (units.mV / units.m / units.GHz), c='k', alpha=.2) ax1_1.plot(freqs / units.MHz, np.abs(fft.time2freq(self.__data_traces[i_channel], sampling_rate)) * self.__scaling_factor / units.mV, c='C0', label='data') sim_efield_max = None channel_snr = None if station.has_sim_station(): sim_station = station.get_sim_station() n_drawn_sim_channels = 0 for ray_tracing_id in sim_station.get_ray_tracing_ids(): sim_channel_sum = None for sim_channel in sim_station.get_channels_by_ray_tracing_id(ray_tracing_id): if sim_channel.get_id() == channel_id: if sim_channel_sum is None: sim_channel_sum = sim_channel else: sim_channel_sum += sim_channel ax1_1.plot(sim_channel.get_frequencies() / units.MHz, np.abs(sim_channel.get_frequency_spectrum()) / units.mV, c='C1', linestyle='--', alpha=.5) ax2_1.plot(sim_channel.get_times(), sim_channel.get_trace() / units.mV, c='C1', linewidth=6, zorder=1, linestyle='--', alpha=.5) if sim_channel_sum is not None: if n_drawn_sim_channels == 0: sim_channel_label = 'MC truth' else: sim_channel_label = None snr = .5 * (np.max(sim_channel_sum.get_trace()) - np.min(sim_channel_sum.get_trace())) / (self.__noise_levels[i_channel] * self.__scaling_factor) if channel_snr is None or snr > channel_snr: channel_snr = snr ax1_1.plot( sim_channel_sum.get_frequencies() / units.MHz, np.abs(sim_channel_sum.get_frequency_spectrum()) / units.mV, c='C1', label=sim_channel_label, alpha=.8, linewidth=2 ) ax2_1.plot( sim_channel_sum.get_times(), sim_channel_sum.get_trace() / units.mV, c='C1', linewidth=6, zorder=1, label=sim_channel_label ) n_drawn_sim_channels += 1 efield_sum = None for efield in station.get_sim_station().get_electric_fields_for_channels([channel_id]): if efield.get_ray_tracing_solution_id() == ray_tracing_id: if self.__polarization == 'theta': ax1_2.plot(efield.get_frequencies() / units.MHz, np.abs(efield.get_frequency_spectrum()[1]) / (units.mV / units.m / units.GHz), c='C1', alpha=.2, linestyle='--') if self.__polarization == 'phi': ax1_2.plot(efield.get_frequencies() / units.MHz, np.abs(efield.get_frequency_spectrum()[2]) / (units.mV / units.m / units.GHz), c='C1', alpha=.2, linestyle='--') if self.__polarization == 'pol': ax1_2.plot(efield.get_frequencies() / units.MHz, np.abs(efield.get_frequency_spectrum()[1]) / (units.mV / units.m / units.GHz), c='C1', alpha=.2, linestyle='--') ax1_3.plot(efield.get_frequencies() / units.MHz, np.abs(efield.get_frequency_spectrum()[2]) / (units.mV / units.m / units.GHz), c='C1', alpha=.2, linestyle='--') if efield_sum is None: efield_sum = efield else: efield_sum += efield if efield_sum is not None: if self.__polarization == 'theta': ax1_2.plot(efield_sum.get_frequencies() / units.MHz, np.abs(efield_sum.get_frequency_spectrum()[1]) / (units.mV / units.m / units.GHz), c='C1', alpha=1.) if self.__polarization == 'phi': ax1_2.plot(efield_sum.get_frequencies() / units.MHz, np.abs(efield_sum.get_frequency_spectrum()[2]) / (units.mV / units.m / units.GHz), c='C1', alpha=1.) if self.__polarization == 'pol': ax1_2.plot(efield_sum.get_frequencies() / units.MHz, np.abs(efield_sum.get_frequency_spectrum()[1]) / (units.mV / units.m / units.GHz), c='C1', alpha=1.) ax1_3.plot(efield_sum.get_frequencies() / units.MHz, np.abs(efield_sum.get_frequency_spectrum()[2]) / (units.mV / units.m / units.GHz), c='C1', alpha=1.) if sim_efield_max is None or np.max(np.abs(efield_sum.get_frequency_spectrum())) > sim_efield_max: sim_efield_max = np.max(np.abs(efield_sum.get_frequency_spectrum())) else: channel_snr = .5 * (np.max(station.get_channel(channel_id).get_trace()) - np.min(station.get_channel(channel_id).get_trace())) / (self.__noise_levels * self.__scaling_factor) ax2_1.plot(times, self.__data_traces[i_channel] * self.__scaling_factor / units.mV, c='C0', alpha=1., zorder=5, label='data') ax1_1.plot(freqs / units.MHz, amp_trace_stat_calculator.mean * self.__scaling_factor / units.mV, c='C2', label='IFT reco', linewidth=3, alpha=.6) ax2_1.plot(times, trace_stat_calculator.mean * self.__scaling_factor / units.mV, c='C2', linestyle='-', zorder=2, linewidth=4, label='IFT reconstruction') ax2_1.set_xlim([times[0], times[-1]]) if channel_snr is not None: textbox = dict(boxstyle='round', facecolor='white', alpha=.5) ax2_1.text(.9, .05, 'SNR={:.1f}'.format(channel_snr), transform=ax2_1.transAxes, bbox=textbox, fontsize=18) if self.__polarization == 'theta': ax1_2.plot(freqs / units.MHz, amp_efield_stat_calculators[0].mean * self.__scaling_factor / self.__gain_scaling / (units.mV / units.m / units.GHz), c='C2', alpha=.6) if self.__polarization == 'phi': ax1_2.plot(freqs / units.MHz, amp_efield_stat_calculators[1].mean * self.__scaling_factor / self.__gain_scaling / (units.mV / units.m / units.GHz), c='C2', alpha=.6) if self.__polarization == 'pol': ax1_2.plot(freqs / units.MHz, np.abs(fft.time2freq(efield_stat_calculators[0].mean, sampling_rate)) * self.__scaling_factor / self.__gain_scaling / (units.mV / units.m / units.GHz), c='C2', alpha=.6) ax1_3.plot(freqs / units.MHz, np.abs(fft.time2freq(efield_stat_calculators[1].mean, sampling_rate)) * self.__scaling_factor / self.__gain_scaling / (units.mV / units.m / units.GHz), c='C2', alpha=.6) ax1_1.axvline(self.__passband[0] / units.MHz, c='k', alpha=.5, linestyle=':') ax1_1.axvline(self.__passband[1] / units.MHz, c='k', alpha=.5, linestyle=':') ax1_2.axvline(self.__passband[0] / units.MHz, c='k', alpha=.5, linestyle=':') ax1_2.axvline(self.__passband[1] / units.MHz, c='k', alpha=.5, linestyle=':') ax1_1.grid() ax1_2.grid() ax2_1.grid() if self.__polarization == 'pol': ax1_3.axvline(self.__passband[0] / units.MHz, c='k', alpha=.5, linestyle=':') ax1_3.axvline(self.__passband[1] / units.MHz, c='k', alpha=.5, linestyle=':') ax1_3.grid() ax1_3.set_xlim([0, 750]) ax1_3.set_xlabel('f [MHz]') if i_channel == 0: ax2_1.legend(fontsize=fontsize) ax1_1.legend(fontsize=fontsize) ax1_1.set_xlim([0, 750]) ax1_2.set_xlim([0, 750]) ax1_1.set_title('Channel {}'.format(channel_id), fontsize=fontsize) ax2_1.set_title('Channel {}'.format(channel_id), fontsize=fontsize) ax1_1.set_xlabel('f [MHz]', fontsize=fontsize) ax1_2.set_xlabel('f [MHz]', fontsize=fontsize) ax1_1.set_ylabel('channel voltage [mV/GHz]', fontsize=fontsize) ax1_2.set_ylabel('E-Field [mV/m/GHz]', fontsize=fontsize) ax2_1.set_xlabel('t [ns]', fontsize=fontsize) ax2_1.set_ylabel('U [mV]', fontsize=fontsize) ax2_1_dummy = ax2_1.twiny() ax2_1_dummy.set_xlim(ax2_1.get_xlim()) ax2_1_dummy.set_xticks(np.arange(times[0], times[-1], 10)) def get_ticklabels(ticks): return ['{:.0f}'.format(tick) for tick in np.arange(times[0], times[-1], 10) - times[0]] ax2_1_dummy.set_xticklabels(get_ticklabels(np.arange(times[0], times[-1], 10)), fontsize=fontsize) ax1_1.tick_params(axis='both', labelsize=fontsize) ax1_2.tick_params(axis='both', labelsize=fontsize) ax2_1.tick_params(axis='both', labelsize=fontsize) if self.__polarization == 'pol': ax1_3.tick_params(axis='both', labelsize=fontsize) if sim_efield_max is not None: ax1_2.set_ylim([0, 1.2 * sim_efield_max / (units.mV / units.m / units.GHz)]) fig1.tight_layout() fig1.savefig('{}_{}_spec_reco{}.png'.format(event.get_run_number(), event.get_id(), suffix)) fig2.tight_layout() fig2.savefig('{}_{}_trace_reco{}.png'.format(event.get_run_number(), event.get_id(), suffix))