Source code for NuRadioReco.modules.likelihood_reconstruction.electricFieldLikelihoodReconstructor

from NuRadioReco.modules.base.module import register_run

import numpy as np
import matplotlib.pyplot as plt
import copy

from NuRadioReco.utilities.analytic_pulse import get_analytic_pulse_freq
from NuRadioReco.utilities import units, fft, minimization, matched_filter, trace_utilities
from NuRadioReco.framework.electric_field import ElectricField
from NuRadioReco.framework.sim_station import SimStation
from NuRadioReco.framework.event import Event
from NuRadioReco.framework.parameters import electricFieldParameters as efp
from NuRadioReco.framework.parameters import stationParameters as stnp
import NuRadioReco.modules.efieldToVoltageConverter
import NuRadioReco.modules.channelBandPassFilter
import NuRadioReco.modules.electricFieldBandPassFilter
import NuRadioReco.modules.channelLengthAdjuster
from NuRadioReco.modules.likelihood_reconstruction import likelihood_calculator
from radiotools import helper as hp
from radiotools import coordinatesystems

efieldToVoltageConverter = NuRadioReco.modules.efieldToVoltageConverter.efieldToVoltageConverter()
efieldToVoltageConverter.begin(pre_pulse_time=200*units.ns, post_pulse_time=200*units.ns, caching=False)
channelBandPassFilter = NuRadioReco.modules.channelBandPassFilter.channelBandPassFilter()
channelBandPassFilter.begin()
electricFieldBandPassFilter = NuRadioReco.modules.electricFieldBandPassFilter.electricFieldBandPassFilter()
channelLengthAdjuster = NuRadioReco.modules.channelLengthAdjuster.channelLengthAdjuster()

import logging
logger = logging.getLogger('NuRadioReco.electricFieldLikelihoodReconstructor')


[docs] class electricFieldLikelihoodReconstructor: """ Class for reconstructing electric fields in a station, e.g., a dual polarized antenna or the upwardfacing LPDAs in an RNO-G shallow station. This class forward-folds an analytical electric field, assumed to be the same in all channels, and compares it to a measured set of data traces in a likelihood objective function. The -2DeltaLLH is minimized in two stages, first using a matched filter to fit the shape of the signal and second a -2DeltaLLH minimization to fine-tune the reconstructed parameters. The likelihood is calculated using the spectrum of the noise, which enables correct error estimates of reconstructed parameters. This class is similar to voltageToAnalyticEfieldConverter, but uses a likelihood based on the noise spectrum instead of a chi-square and has an improved minimization strategy. The class assumes that the hardware response is subtracted from the data, e.g., hardwareResponseIncorporator.run(event, station, det, sim_to_data=False, mingainlin=0.001) has been run. For a full description of the method, see Section III.C in https://arxiv.org/abs/2510.21925. """ def __init__(self): pass
[docs] def begin(self, n_channels, n_samples, sampling_rate, noise_spectra, Vrms, filter_settings_list, use_chi2=False, zenith_azimuth_free=False, debug=False, travel_time_shifts=None): """ Parameters ---------- n_channels: int Number of channels to be used in the reconstruction n_samples: int Number of samples in the traces sampling_rate: float Sampling rate of the traces noise_spectra: np.ndarray Noise spectrum for each channel to be used for the likelihood calculation, i.e., sqrt(mean(abs(rfft(noise_traces))^2)). The overall normalizations of the spectra are ignored and set through the parameter Vrms. Vrms: float RMS of the noise in each channel. Used for the likelihood calculation. filter_settings_list: list of dicts, optional List of filter settings to be applied to the electric field signal. The same filters must have been applied to the data and noise before this module is run. use_chi2: bool, optional Whether to use chi2 minimization instead of likelihood. Mostly used for debugging and method comparison. zenith_azimuth_free: bool, optional Whether to reconstruct the zenith and azimuth arrival direction of the electric field in the minimization, or keep them fixed. The initial (or fixed) value used is the reconstructed values present in the station object or the MC values in the sim_station object. travel_time_shifts: np.ndarray Travel times for the electric field to reach each antenna. If no travel time shifts are provided, they are calculated in efieldToVoltageConverter, which assumes the electric field is a plane wave. It is useful to provide ray-traced travel time shifts e.g. for deep in-ice antennas, where the plane wave approximation doesn't hold. debug: bool, optional Extra plots and printouts for debugging """ self.n_channels = n_channels self.n_samples = n_samples self.sampling_rate = sampling_rate self.Vrms = Vrms self.filter_settings_list = filter_settings_list self.use_chi2 = use_chi2 self.zenith_azimuth_free = zenith_azimuth_free self.debug = debug self.travel_time_shifts = travel_time_shifts self.delta_t = 1/self.sampling_rate self.t_array_matched_filter = np.arange(0, self.n_samples) * self.delta_t - self.n_samples * self.delta_t/ 2 self.i_shift_cc = np.arange(0, self.n_samples) self.frequencies = np.fft.rfftfreq(self.n_samples, 1. / self.sampling_rate) # initialize likelihood calculator: self.likelihood_calculator = likelihood_calculator.LikelihoodCalculator( n_antennas = self.n_channels, n_samples = self.n_samples, sampling_rate = self.sampling_rate, matrix_inversion_method = "pseudo_inv", threshold_amplitude = 0.1 ) self.likelihood_calculator.initialize_with_spectra(noise_spectra, self.Vrms) self.noise_psd = self.likelihood_calculator.noise_psd # initialize matched filter: self.matched_filter = matched_filter.MatchedFilter( n_samples = self.n_samples, sampling_rate = self.sampling_rate, n_antennas = self.n_channels, noise_power_spectral_density = self.noise_psd, spectra_threshold_fraction = 0.1 )
[docs] @register_run() def run(self, evt, station, det, use_channels=None, signal_search_window=None, use_MC_direction=False, second_order=False, full_output=False, save_filtered_efield=True): """ Run the likelihood reconstruction of electric field. Parameters ---------- evt: NuRadioReco.framework.event.Event The event to run the module on. station: NuRadioReco.framework.station.Station The station object containing the channels with the data traces. det: NuRadioReco.framework.detector.Detector The detector description. use_channels: list, optional List of channel IDs to be used for the reconstruction. If None, all channels are used. signal_search_window: tuple, optional Time window (start, end) to search for the signal in the traces. use_MC_direction: bool, optional Whether to use the Monte Carlo true arrival direction for the reconstruction if it is present in the sim_station object. Default: False second_order: bool, optional If True, fit include the second order term in the frequency domain of the electric field as a fitting parameter. Otherwise, the second order term is fixed to 0. Default: False full_output: bool, optional If True, return the reconstructed signal, the signal parameters and the minus two log-likelihood of the reconstructed signal. Default: False save_filtered_efield: bool, optional Wether saved efield and its fluence are for the filtered or unfiltered electric field. If True, the module works in a way that is consistent with voltageToEfieldConverter and voltageToAnalyticEfieldConverter. Default: True Returns ------- fitted_signal: np.ndarray The reconstructed signal in the readout traces. Only returned if `full_output` enabled fitted_params_best: np.ndarray The best fit parameters of the signal model. Only returned if `full_output` enabled minus_two_llh_best: float The minus two log-likelihood value of the reconstructed signal. Only returned if `full_output` enabled """ if use_channels is None: use_channels = station.get_channel_ids() if use_MC_direction and (station.get_sim_station() is not None): zenith = station.get_sim_station()[stnp.zenith] azimuth = station.get_sim_station()[stnp.azimuth] else: logger.warning("Using reconstructed angles as no simulation present") zenith = station[stnp.zenith] azimuth = station[stnp.azimuth] self.reference_antenna_position = det.get_relative_position(station.get_id(), use_channels[0]) traces = [] trace_start_times = [] for channel in station.iter_channels(): if channel.get_id() not in use_channels: continue traces.append(channel.get_trace()) trace_start_times.append(channel.get_trace_start_time()) traces = np.array(traces) assert len(use_channels) == self.n_channels, "Number of channels in use_channels does not match n_channels in begin()" assert traces.shape[-1] == self.n_samples, "Number of samples in traces does not match n_samples in begin()" assert channel.get_sampling_rate() == self.sampling_rate, "Sampling rate of channel does not match sampling rate in begin()" def signal_function(parameters): return self._get_signal(parameters, det, station.get_id(), use_channels, trace_start_times, filter_before_det_resp=save_filtered_efield) self.matched_filter.set_data(traces) minus_two_llh_best = None f_theta_f_phi_initial = [[0.5, 1], [0.5, -1], [-0.5, 1], [-0.5, -1]] for i_fit in range(4): parameters_initial = np.array([f_theta_f_phi_initial[i_fit][0], f_theta_f_phi_initial[i_fit][1], -1, np.pi/2, trace_start_times[0] + 300, -10 if second_order else 0, zenith, azimuth]) minus_two_llh, polarization_reco, polarization_uncertainty, fluence_reco, fluence_uncertainty, fitted_params, fitted_params_uncertainties = self._reconstruct_signal( traces, signal_function, parameters_initial, trace_start_times, second_order=second_order, signal_search_window=signal_search_window) if minus_two_llh_best is None or minus_two_llh < minus_two_llh_best: minus_two_llh_best = np.copy(minus_two_llh) polarization_reco_best = np.copy(polarization_reco) fluence_reco_best = np.copy(fluence_reco) polarization_uncertainty_best = np.copy(polarization_uncertainty) fluence_uncertainty_best = np.copy(fluence_uncertainty) fitted_params_best = np.copy(fitted_params) fitted_params_uncertainties_best = np.copy(fitted_params_uncertainties) fitted_signal = signal_function(fitted_params_best) # save results to station object: if self.travel_time_shifts is None: efield_time = fitted_params_best[4] - det.get_cable_delay(station.get_id(), use_channels[0]) elif self.travel_time_shifts is not None: efield_time = fitted_params_best[4] - det.get_cable_delay(station.get_id(), use_channels[0]) - self.travel_time_shifts[0] efield_parameters = np.array([fitted_params_best[0], fitted_params_best[1], fitted_params_best[2], fitted_params_best[3], efield_time, fitted_params_best[5]]) electric_field = self._get_efield(efield_parameters, fitted_params_best[6], fitted_params_best[7], use_channels, apply_filter=save_filtered_efield) electric_field.set_parameter(efp.signal_energy_fluence, fluence_reco_best) electric_field.set_parameter_error(efp.signal_energy_fluence, fluence_uncertainty_best) electric_field.set_parameter(efp.polarization_angle, polarization_reco_best) electric_field.set_parameter_error(efp.polarization_angle, polarization_uncertainty_best) electric_field.set_parameter(efp.cr_spectrum_slope, fitted_params_best[2]) electric_field.set_parameter(efp.signal_time, trace_start_times[0] + fitted_params_best[3]) electric_field.set_parameter(efp.cr_spectrum_quadratic_term, fitted_params_best[5]) electric_field.set_parameter(efp.zenith, fitted_params_best[6]) electric_field.set_parameter(efp.azimuth, fitted_params_best[7]) # compute expected polarization site = det.get_site(station.get_id()) exp_efield = hp.get_lorentzforce_vector(fitted_params_best[6], fitted_params_best[7], hp.get_magnetic_field_vector(site)) cs = coordinatesystems.cstrafo(fitted_params_best[6], fitted_params_best[7], site=site) exp_efield_onsky = cs.transform_from_ground_to_onsky(exp_efield) exp_pol_angle = np.arctan2(exp_efield_onsky[2], exp_efield_onsky[1]) electric_field.set_parameter(efp.polarization_angle_expectation, exp_pol_angle) station.add_electric_field(electric_field) if full_output: return fitted_signal, fitted_params_best, minus_two_llh_best
def _function_to_minimize_mf(self, data, signal): """ Calculate the objective function for the first minimization. """ if not self.use_chi2: self.matched_filter.set_template(signal) t_best, x_best = self.matched_filter.matched_filter_search(time_shift_array=self.t_array_matched_filter) llh_mf = self.matched_filter.calculate_matched_filter_delta_log_likelihood() return -2 * llh_mf elif self.use_chi2: i_max, cross = self._cross_correlation(data, signal, shift_array=self.i_shift_cc) return -cross def _function_to_minimize_llh(self, data, signal): """ Calculate the log-likelihood objective function of the 2nd minimization. """ if not self.use_chi2: minus_two_llh = self.likelihood_calculator.calculate_minus_two_delta_llh(data, signal) return minus_two_llh elif self.use_chi2: return self._chi2(data, signal) def _cross_correlation(self, data, signal, shift_array): """ Calculate the cross-correlation between the data and the signal. Parameters ---------- data: np.ndarray Data from the two antennas signal: np.ndarray Signal from the two antennas shift_array: np.ndarray Array of shift indicies to calculate the cross-correlation for Returns ------- float Normalized cross-correlation between the data and the signal """ cross_correlation_array = np.zeros(len(shift_array)) for i, shift in enumerate(shift_array): cross_correlation_array[i] = np.sum(data[0,:] * np.roll(signal[0,:], shift)) + np.sum(data[1,:] * np.roll(signal[1,:], shift)) / np.sqrt(np.sum(data[0,:]**2) * np.sum(signal[0,:]**2) + np.sum(data[1,:]**2) * np.sum(signal[1,:]**2)) cross = np.max(cross_correlation_array) i_max = shift_array[np.argmax(cross_correlation_array)] return i_max, cross def _chi2(self, data, signal): """ Calculate the chi2 value between the data and the signal. """ if isinstance(self.Vrms, np.ndarray) or isinstance(self.Vrms, list): sigma = np.array(self.Vrms)[:,None] else: sigma = self.Vrms chi2 = np.sum((data - signal)**2 / sigma**2) return chi2 def _get_efield(self, parameters, zenith_arrival, azimuth_arrival, use_channels, apply_filter=False): """ Get the electric field for the given parameters. Parameters ---------- parameters: np.ndarray Parameters of the signal model 0: fluence theta 1: fluence phi 2: slope of the pulse 3: phase of the pulse 4: time of the electric field pulse (maximum of absolute hilbert envelope) at the 0th antenna 5: quadratic term zenith_arrival: float Zenith angle of the incoming electric field azimuth_arrival: float Azimuth angle of the incoming electric field use_channels: list List of channels that the electric field applies to apply_filter: bool Whether to apply the filter to the electric field Returns ------- np.ndarray Electric field for given parameters """ n_samples_time = self.n_samples sampling_rate = self.sampling_rate amp_p0 = 1 amp_p1 = parameters[2] phase_p0 = parameters[3] pulse_time = 0.5 * self.n_samples / self.sampling_rate # middle of efield trace phase_p1 = - pulse_time * 2*np.pi quadratic_term = parameters[5] quadratic_term_offset = 100 * units.MHz # Calculate the electric field: efield_norm = get_analytic_pulse_freq(amp_p0, amp_p1, phase_p0, n_samples_time, sampling_rate, phase_p1=phase_p1, bandpass=None, quadratic_term=quadratic_term, quadratic_term_offset=quadratic_term_offset) # Set the electric field: electric_field = ElectricField(use_channels, position=None, shower_id=None, ray_tracing_id=None) electric_field.set_frequency_spectrum(np.array([np.zeros_like(efield_norm), efield_norm, efield_norm]), self.sampling_rate) electric_field.set_trace_start_time(0) electric_field[efp.zenith] = zenith_arrival electric_field[efp.azimuth] = azimuth_arrival electric_field[efp.ray_path_type] = "direct" # Make a copy of the electric field and apply filter: efield_filtered = copy.copy(electric_field) sim_station = SimStation(0) sim_station.add_electric_field(efield_filtered) evt = Event(1, 1) for filter_settings in self.filter_settings_list: electricFieldBandPassFilter.run(evt, sim_station, det=None, **filter_settings) # Normalize the electric field to specified fluence (filtered): f_R, f_theta, f_phi = trace_utilities.get_electric_field_energy_fluence(efield_filtered.get_trace(), efield_filtered.get_times()) if apply_filter: trace = efield_filtered.get_frequency_spectrum() else: trace = electric_field.get_frequency_spectrum() trace[1] *= np.sign(parameters[0]) * np.sqrt(np.abs(parameters[0]) / f_theta) trace[2] *= np.sign(parameters[1]) * np.sqrt(np.abs(parameters[1]) / f_phi) electric_field.set_frequency_spectrum(trace, self.sampling_rate) electric_field.set_position(self.reference_antenna_position) electric_field.set_trace_start_time(parameters[4] - pulse_time) # efield pulse at provided time return electric_field def _get_signal(self, parameters, det, station_id, use_channels, trace_start_times, filter_before_det_resp=True): """ Get the signal in the antennas for the given parameters. Parameters ---------- parameters: np.ndarray Parameters of the signal model 0: fluence theta 1: fluence phi 2: slope of the pulse 3: phase of the pulse 4: global time of the pulse in the 0'th trace (minus antenna group delay) 5: quadratic term 6: zenith_arrival 7: azimuth_arrival det: NuRadioReco.framework.detector.Detector The detector description to use for the simulation station_id: int The ID of the station to use for the simulation use_channels: list List of channels to calculate the signal for trace_start_times: list List of start times for data traces. The 4th parameter is relative to the first time in this list. filter_before_det_resp: bool Whether to apply the filter to the efield (before detector response; default) or to the channel traces (after detector response). Returns ------- np.ndarray Signal for given parameters """ # Subtract the cable delay (and travel time) of the reference channel, to # get the rough efield time for the desired readout pulse time. These times # are added again in efieldToVoltageConverter. Antenna group delay is not # taken into account here: if self.travel_time_shifts is None: efield_time = parameters[4] - det.get_cable_delay(station_id, use_channels[0]) elif self.travel_time_shifts is not None: efield_time = parameters[4] - det.get_cable_delay(station_id, use_channels[0]) - self.travel_time_shifts[0] efield_parameters = np.array([parameters[0], parameters[1], parameters[2], parameters[3], efield_time, parameters[5]]) zenith_arrival = parameters[6] azimuth_arrival = parameters[7] electric_field = self._get_efield(efield_parameters, zenith_arrival, azimuth_arrival, use_channels, apply_filter=filter_before_det_resp) sim_station = SimStation(station_id) if self.travel_time_shifts is None: # If no travel times are provided, potential time delays are # handled in efieldToVoltageConverter: sim_station.add_electric_field(electric_field) else: for i_ch, channel_id in enumerate(use_channels): # If travel times are provided, we copy the efield to each # channel and add the travel times to the trace start times: electric_field_channel = copy.copy(electric_field) electric_field_channel.set_channel_ids([channel_id]) electric_field_channel.set_position(det.get_relative_position(station_id, channel_id)) electric_field_channel.set_trace_start_time( electric_field.get_trace_start_time() + self.travel_time_shifts[i_ch] ) sim_station.add_electric_field(electric_field_channel) sim_station.set_is_cosmic_ray() sim_station[stnp.zenith] = zenith_arrival sim_station[stnp.azimuth] = azimuth_arrival evt = Event(1, 1) station = NuRadioReco.framework.station.Station(station_id) station.add_sim_station(sim_station) station[stnp.zenith] = zenith_arrival station[stnp.azimuth] = azimuth_arrival efieldToVoltageConverter.run(evt, station, det, channel_ids=use_channels) for i_ch, channel_id in enumerate(use_channels): channel = station.get_channel(channel_id) # Make new channel which is the signal in the readout windows of the data trace: signal_channel = NuRadioReco.framework.channel.Channel(channel_id) signal_channel.set_trace(np.zeros(self.n_samples), self.sampling_rate) signal_channel.set_trace_start_time(trace_start_times[i_ch]) signal_channel.add_to_trace(channel, raise_error=False) station.add_channel(signal_channel, overwrite=True) # Apply bandpass filter. It is safest to apply rectangular filters again, in case of FFT or trace cutting artefacts: for filter_settings in self.filter_settings_list: if not filter_before_det_resp or (filter_settings["filter_type"]=="rectangular"): channelBandPassFilter.run(evt, station, det, **filter_settings) traces = [] for i_ch in range(self.n_channels): traces.append(station.get_channel(use_channels[i_ch]).get_trace()) return np.array(traces) def _reconstruct_signal(self, data, signal_function, parameters_initial, trace_start_times, second_order=False, signal_search_window=None): """ Reconstruct the signal from the given data. Parameters ---------- data: np.ndarray Data traces for the channels to be used in the reconstruction signal_function: callable Function to model the signal parameters_initial: np.ndarray Initial parameters for the reconstruction trace_start_times: np.ndarray Start times of the channels to be used in the reconstruction. Used to bound the time parameter of the signal in the reconstruction. second_order: bool, default: True If True, the second order correction to the analytical efield spectrum is fitted signal_search_window: tuple, optional The time window to search for the signal in the data. The window is the global time of the pulse in the readout window minus antenna group delay. If None, the entire time range is used. Returns ------- minus_two_llh_fit_2: float The negative log-likelihood value for the reconstructed signal polarization: float The polarization angle of the reconstructed signal fluence: float The fluence of the reconstructed signal fitted_params_2: np.ndarray The fitted parameters for the reconstructed signal uncertainties_fit: np.ndarray The uncertainties on the fitted parameters error_polarization: float The error on the polarization angle error_fluence: float The error on the fluence """ dx_array = np.array([1e-3, 1e-5, 1e-5, 1e-5, 1e-4, 1e-5, 1e-5, 1e-5]) fisher_information_matrix = self.likelihood_calculator.calculate_fisher_information_matrix(signal_function, parameters_initial, dx_array, ignore_parameters = [6,7] if not self.zenith_azimuth_free else []) f_i = np.linalg.pinv(fisher_information_matrix) uncertainties_1 = np.sqrt(np.diag(f_i)) normalization = np.append(uncertainties_1, [1, 1]) if not self.zenith_azimuth_free else uncertainties_1 bounds = np.array([ (-10000, 10000), (-10000, 10000), (-100, -0.0001), (-3*np.pi, 3*np.pi), (np.min(trace_start_times), np.max(trace_start_times) + (self.n_samples-1)*self.delta_t), (-500, 0), (0, np.pi/2), (-2*np.pi, 2*np.pi) ]) # Set matched filter scan bounds and initial signal to match signal_search_window: if signal_search_window is not None: search_window_length = signal_search_window[1] - signal_search_window[0] parameters_initial[4] = signal_search_window[0] + search_window_length / 2 if not self.use_chi2: self.t_array_matched_filter = np.arange(-search_window_length/2, search_window_length/2, self.delta_t/2) elif self.use_chi2: self.i_shift_cc = (self.t_array_matched_filter / self.sampling_rate).astype(int) minimizer_mf = minimization.Minimizer( signal_function = signal_function, objective_function = self._function_to_minimize_mf, parameters_initial = parameters_initial, parameters_bounds = bounds, normalization = normalization, ) if self.zenith_azimuth_free: minimizer_mf.fix_parameters([True, False, False, False, True, not(second_order), False, False]) else: minimizer_mf.fix_parameters([True, False, False, False, True, not(second_order), True, True]) m = minimizer_mf.run_minimization(data=data, method="minuit") fitted_params_1 = minimizer_mf.parameters signal_fit = signal_function(fitted_params_1) # get time: if not self.use_chi2: self.matched_filter.set_template(signal_fit) t_offset, x = self.matched_filter.matched_filter_search(time_shift_array=self.t_array_matched_filter) else: i_max, cross = self.cross_correlation(data, signal_fit, shift_array=self.i_shift_cc) t_offset = i_max / self.sampling_rate parameters_adjusted = np.array([fitted_params_1[0], fitted_params_1[1], fitted_params_1[2], fitted_params_1[3], (fitted_params_1[4]+t_offset), parameters_initial[5], fitted_params_1[6], fitted_params_1[7]]) signal_fit_adjusted = signal_function(parameters_adjusted) signal_fit_adjusted = signal_fit_adjusted / np.max(signal_fit_adjusted) * np.max(data) # Get best matching time and amplitude of the fit: amplitude_correction = (np.max(data) / np.max(signal_fit))**2 parameters_initial_2 = np.array([fitted_params_1[0]*amplitude_correction, fitted_params_1[1]*amplitude_correction, fitted_params_1[2], fitted_params_1[3], (parameters_initial[4]+t_offset), parameters_initial[5], parameters_initial[6], parameters_initial[7]]) if signal_search_window is not None: bounds[4] = np.array([signal_search_window[0], signal_search_window[1]]) # A wider window often results in more stable minimization: bounds[4][0] = bounds[4][0] - (bounds[4][1] - bounds[4][0]) / 2 bounds[4][1] = bounds[4][1] + (bounds[4][1] - bounds[4][0]) / 2 fisher_information_matrix2 = self.likelihood_calculator.calculate_fisher_information_matrix(signal_function, fitted_params_1, dx_array, ignore_parameters = [6,7] if not self.zenith_azimuth_free else []) f_i_2 = np.linalg.pinv(fisher_information_matrix2) uncertainties_2 = np.sqrt(np.diag(f_i_2)) normalization_2 = np.append(uncertainties_2, [1, 1]) if not self.zenith_azimuth_free else uncertainties_2 minimizer_llh = minimization.Minimizer( signal_function = signal_function, objective_function = self._function_to_minimize_llh, parameters_initial = parameters_initial_2, parameters_bounds = bounds, normalization = normalization_2 ) if self.zenith_azimuth_free: minimizer_llh.fix_parameters([False, False, False, False, False, not(second_order), False, False]) else: minimizer_llh.fix_parameters([False, False, False, False, False, not(second_order), True, True]) m = minimizer_llh.run_minimization(data=data, method="minuit") fitted_params_2 = minimizer_llh.parameters minus_two_llh_fit_2 = minimizer_llh.result f_theta = np.abs(fitted_params_2[0]) f_phi = np.abs(fitted_params_2[1]) fluence = f_theta + f_phi A_theta = np.sign(fitted_params_2[0]) * f_theta**0.5 A_phi = np.sign(fitted_params_2[1]) * f_phi**0.5 polarization = np.arctan2(A_phi, A_theta) phi = fitted_params_2[3] # If phi is greater than pi, the polarity is opposite of what A_theta and A_phi implies: if phi % (2 * np.pi) > np.pi: if polarization > 0: polarization -= 180 * units.deg elif polarization <= 0: polarization += 180 * units.deg fisher_information_matrix_fit = self.likelihood_calculator.calculate_fisher_information_matrix(signal_function, fitted_params_2, dx_array, ignore_parameters = [6,7] if not self.zenith_azimuth_free else []) f_i_fit = np.linalg.pinv(fisher_information_matrix_fit) uncertainties_fit = np.sqrt(np.diag(f_i_fit)) f_theta_uncertainty = uncertainties_fit[0] f_phi_uncertainty = uncertainties_fit[1] uncertainty_fluence = np.sqrt(f_theta_uncertainty**2 + f_phi_uncertainty**2) uncertainty_polarization = np.sqrt( (np.sqrt(f_theta) / (2 * np.sqrt(f_phi) * (f_phi+f_theta)) )**2 * f_phi_uncertainty**2 + ( -np.sqrt(f_phi) / (2 * np.sqrt(f_theta) * (f_phi+f_theta)) )**2 * f_theta_uncertainty**2) if self.debug: # plot results for debugging: signal_initial = signal_function(parameters_initial) signal_initial_2 = signal_function(parameters_initial_2) signal_fit_2 = signal_function(fitted_params_2) fig, ax = plt.subplots(self.n_channels, 1, figsize=(10, self.n_channels*3)) for i_ch in range(self.n_channels): t_array = trace_start_times[i_ch] + np.arange(0, self.n_samples) * self.delta_t ax[i_ch].plot(t_array, data[i_ch], label="data") ax[i_ch].plot(t_array, signal_initial[i_ch], ls="--", label="initial") ax[i_ch].plot(t_array, signal_fit[i_ch], label="fit") ax[i_ch].plot(t_array, signal_fit_adjusted[i_ch], "--", label="fit adjusted") ax[i_ch].plot(t_array, signal_initial_2[i_ch], "y:", label="initial 2") ax[i_ch].plot(t_array, signal_fit_2[i_ch], "k:", label="fit 2") # Plot bounds (matched filter): t_max = t_array[np.argmax(signal_fit[i_ch])] ax[i_ch].vlines([t_max+self.t_array_matched_filter[0], t_max+self.t_array_matched_filter[-1]], np.min(data[i_ch]*2), np.max(data[i_ch]*2), color="r", ls="--", label="Bounds (matched filter)") # Plot bounds (LLH reconstruction): s0 = signal_function(np.array([fitted_params_2[i_ch], fitted_params_2[1], fitted_params_2[2], fitted_params_2[3], bounds[4][0], fitted_params_2[5], fitted_params_2[6], fitted_params_2[7]])) t_max_bound_0 = t_array[np.argmax(s0[i_ch])] s1 = signal_function(np.array([fitted_params_2[i_ch], fitted_params_2[1], fitted_params_2[2], fitted_params_2[3], bounds[4][1], fitted_params_2[5], fitted_params_2[6], fitted_params_2[7]])) t_max_bound_1 = t_array[np.argmax(s1[i_ch])] ax[i_ch].vlines([t_max_bound_0, t_max_bound_1], np.min(data[i_ch]*2), np.max(data[i_ch]*2), color="b", ls="--", label="Bounds (LLH fit)") ax[i_ch].set_ylabel("Voltage [V]") ax[0].legend() if not self.use_chi2: ax[0].set_title(r"$-2\Delta$LLH: " f"{minus_two_llh_fit_2} \n parameters: {fitted_params_2}") else: ax[0].set_title(r"$\chi^2$: " f"{minus_two_llh_fit_2} \n parameters: {fitted_params_2}") ax[-1].set_xlabel("Time [s]") plt.tight_layout() plt.savefig("debug_StationElectricFieldReconstructor.png") plt.show() plt.close() # Plot spectra of (assumed) noise and data: fig, ax = plt.subplots(self.n_channels, 1, figsize=(10, self.n_channels*3)) for i_ch in range(self.n_channels): ax[i_ch].plot(self.frequencies, self.likelihood_calculator.spectra[i_ch], "k-", label="Likelihood noise spectrum") ax[i_ch].plot(self.frequencies, np.abs(fft.time2freq(data[i_ch], sampling_rate=self.sampling_rate)), "b-", label="data") ax[i_ch].plot(self.frequencies, np.abs(fft.time2freq(signal_initial[i_ch], sampling_rate=self.sampling_rate)), "r-", label="initial") ax[i_ch].plot(self.frequencies, np.abs(fft.time2freq(signal_fit_2[i_ch], sampling_rate=self.sampling_rate)), "g-", label="fit") ax[i_ch].hlines( np.max(self.likelihood_calculator.spectra[i_ch])/100, 0, max(self.frequencies), "m", "--", label="threshold") ax[i_ch].set_ylabel("Amplitude [V/GHz]") #ax[i].set_yscale("log") ax[0].legend() ax[-1].set_xlabel("Frequency [GHz]") fig.tight_layout() plt.savefig("debug_StationElectricFieldReconstructor_spectra.png") plt.show() plt.close() return minus_two_llh_fit_2, polarization, uncertainty_polarization, fluence, uncertainty_fluence, fitted_params_2, uncertainties_fit