Source code for NuRadioReco.modules.electricFieldSignalReconstructor

from NuRadioReco.modules.base.module import register_run
import numpy as np
import copy
from scipy import signal
import matplotlib.pyplot as plt
from radiotools import helper as hp
from radiotools import coordinatesystems
from NuRadioReco.utilities import trace_utilities
from NuRadioReco.utilities import units
from NuRadioReco.framework.parameters import electricFieldParameters as efp

import logging
logger = logging.getLogger('NuRadioReco.stationSignalReconstructor')


[docs] class electricFieldSignalReconstructor: """ Calculates basic signal parameters. """ def __init__(self): self.__signal_window_pre = None self.__signal_window_post = None self.__noise_window = None self.begin()
[docs] def begin(self, signal_window_pre=10 * units.ns, signal_window_post=40 * units.ns, noise_window=100 * units.ns, log_level=logging.NOTSET): self.__signal_window_pre = signal_window_pre self.__signal_window_post = signal_window_post self.__noise_window = noise_window logger.setLevel(log_level)
[docs] @register_run() def run(self, evt, station, det, signal_search_window=None, fluence_method="noise_subtraction", fluence_estimator_kwargs={}, debug=False): """ reconstructs quantities for electric field Parameters ---------- evt: event station: station det: detector signal_search_window: tuple (optional) search window for signal in ns fluence_method: str (optional) method to calculate fluence ("noise_subtraction" or "rice_distribution") fluence_estimator_kwargs: dict (optional) kwargs for fluence estimator. Only used if fluence_method is "rice_distribution" debug: bool (optional) set debug """ for electric_field in station.get_electric_fields(): trace_copy = copy.copy(electric_field.get_trace()) if signal_search_window is not None: times = electric_field.get_times() signal_search_window_mask = (times > signal_search_window[0]) & (times < signal_search_window[1]) times_masked = times[signal_search_window_mask] trace_copy = trace_copy[:, signal_search_window_mask] else: times_masked = electric_field.get_times() # calculate hilbert envelope envelope = np.abs(signal.hilbert(trace_copy)) envelope_mag = np.linalg.norm(envelope, axis=0) signal_time_bin = np.argmax(envelope_mag) signal_time = times_masked[signal_time_bin] electric_field[efp.signal_time] = signal_time # # low_pos = int(130 * units.ns * electric_field.get_sampling_rate()) # up_pos = int(210 * units.ns * electric_field.get_sampling_rate()) # if(debug): # fig, ax = plt.subplots(1, 1) # sc = ax.scatter(trace_copy[1, low_pos:up_pos], trace_copy[2, low_pos:up_pos], c=electric_field.get_times()[low_pos:up_pos], s=5) # fig.colorbar(sc, ax=ax) # ax.set_aspect('equal') # ax.set_xlabel("eTheta") # ax.set_ylabel("ePhi") # fig.tight_layout() low_pos, up_pos = hp.get_interval(envelope_mag, scale=0.5) v_start = trace_copy[:, signal_time_bin] v_avg = np.zeros(3) for i in range(low_pos, up_pos + 1): v = trace_copy[:, i] alpha = hp.get_angle(v_start, v) if(alpha > 0.5 * np.pi): v *= -1 v_avg += v pole = np.arctan2(np.abs(v_avg[2]), np.abs(v_avg[1])) electric_field[efp.polarization_angle] = pole logger.info("average e-field vector = {:.4g}, {:.4g}, {:.4g} -> polarization = {:.1f}deg".format(v_avg[0], v_avg[1], v_avg[2], pole / units.deg)) trace = electric_field.get_trace() if(debug): fig, ax = plt.subplots(1, 1) tt = electric_field.get_times() dt = 1. / electric_field.get_sampling_rate() ax.plot(tt / units.ns, trace[1] / units.mV * units.m) ax.plot(tt / units.ns, trace[2] / units.mV * units.m) ax.plot(times_masked / units.ns, envelope_mag / units.mV * units.m) ax.vlines([signal_search_window[0] + low_pos * dt, signal_search_window[0] + up_pos * dt], 0, envelope_mag.max() / units.mV * units.m, linestyles='dotted') ax.vlines([signal_time - self.__signal_window_pre, signal_time + self.__signal_window_post], 0, envelope_mag.max() / units.mV * units.m, linestyles='dashed') plt.show() times = electric_field.get_times() mask_signal_window = (times > (signal_time - self.__signal_window_pre)) & (times < (signal_time + self.__signal_window_post)) mask_noise_window = np.zeros_like(mask_signal_window, dtype=bool) if(self.__noise_window > 0): # set the noise window to the first "self.__noise_window" ns of the trace. If this cuts into the signal window, the noise window is reduced to not overlap with the signal window mask_noise_window = times < min(times[0] + self.__noise_window, signal_time - self.__signal_window_pre) signal_energy_fluence, signal_energy_fluence_error = trace_utilities.get_electric_field_energy_fluence(trace, times, mask_signal_window, mask_noise_window, return_uncertainty=True, method=fluence_method, estimator_kwargs=fluence_estimator_kwargs) electric_field.set_parameter(efp.signal_energy_fluence, signal_energy_fluence) electric_field.set_parameter_error(efp.signal_energy_fluence, signal_energy_fluence_error) logger.info("f = {} +- {}".format(signal_energy_fluence / units.eV * units.m2, signal_energy_fluence_error / units.eV * units.m2)) # calculate polarization angle from energy fluence x = np.abs(signal_energy_fluence[1]) ** 0.5 y = np.abs(signal_energy_fluence[2]) ** 0.5 sx = signal_energy_fluence_error[1] * 0.5 sy = signal_energy_fluence_error[2] * 0.5 pol_angle = np.arctan2(y, x) pol_angle_error = 1. / (x ** 2 + y ** 2) * (y ** 2 * sx ** 2 + x ** 2 * sy ** 2) ** 0.5 # gaussian error propagation logger.info("polarization angle = {:.1f} +- {:.1f}".format(pol_angle / units.deg, pol_angle_error / units.deg)) electric_field.set_parameter(efp.polarization_angle, pol_angle) electric_field.set_parameter_error(efp.polarization_angle, pol_angle_error) # compute expeted polarization if det is not None: site = det.get_site(station.get_id()).lower() exp_efield = hp.get_lorentzforce_vector(electric_field[efp.zenith], electric_field[efp.azimuth], hp.get_magnetic_field_vector(site)) cs = coordinatesystems.cstrafo(electric_field[efp.zenith], electric_field[efp.azimuth], site=site) exp_efield_onsky = cs.transform_from_ground_to_onsky(exp_efield) exp_pol_angle = np.arctan2(np.abs(exp_efield_onsky[2]), np.abs(exp_efield_onsky[1])) logger.info("expected polarization angle = {:.1f}".format(exp_pol_angle / units.deg)) electric_field.set_parameter(efp.polarization_angle_expectation, exp_pol_angle)
[docs] def end(self): pass