Source code for NuRadioReco.modules.LOFAR.pipelineVisualizer_LOFAR

"""
This module contains the pipelineVisualizer class for LOFAR.

.. moduleauthor:: Karen Terveer <karen.terveer@fau.de>
"""

import logging
import numpy as np
import matplotlib.pyplot as plt
import radiotools
from matplotlib.cm import get_cmap
from matplotlib.colors import Normalize

from NuRadioReco.utilities import units
from NuRadioReco.framework.parameters import stationParameters, channelParameters, showerParameters
from NuRadioReco.modules.base.module import register_run


[docs] def check_for_good_ant(event, detector): """ Create a dictionary which for every station in the event contains a list of antennas which have not been flagged. """ good_antennas_dict = {} for station in event.get_stations(): if station.get_parameter(stationParameters.triggered): good_antennas_dict[station.get_id()] = [] flagged_channels = station.get_parameter(stationParameters.flagged_channels) # Get all group IDs which are still present in the station station_channel_group_ids = set([channel.get_group_id() for channel in station.iter_channels()]) # Get the dominant polarisation orientation as calculated by stationPulseFinder dominant_orientation = station.get_parameter(stationParameters.cr_dominant_polarisation) good_channel_pair_ids = np.zeros((len(station_channel_group_ids), 2), dtype=int) for ind, channel_group_id in enumerate(station_channel_group_ids): for channel in station.iter_channel_group(channel_group_id): if np.all(detector.get_antenna_orientation(station.get_id(), channel.get_id()) == dominant_orientation): good_channel_pair_ids[ind, 0] = channel.get_id() else: good_channel_pair_ids[ind, 1] = channel.get_id() # Check if dominant channel has been flagged channel = station.get_channel(good_channel_pair_ids[ind, 0]) if channel.get_id() not in flagged_channels: good_antennas_dict[station.get_id()].append(channel.get_id()) return good_antennas_dict
[docs] class pipelineVisualizer: """ Creates debug plots from the LOFAR pipeline - This is the pipelineVisualizerTM for LOFAR. Any significant plots resulting from the pipeline should be added here by creating a function for them, and calling all functions sequentially. """ def __init__(self): self.plots = None self.logger = logging.getLogger("NuRadioReco.pipelineVisualizer")
[docs] def begin(self, logger_level=logging.NOTSET): self.logger.setLevel(logger_level)
[docs] def plot_polarization(self, event, detector): """ Plot the polarization of the electric field. This method calculates the stokes parameters of the pulse using get_stokes from framework.electric_field, and determines the polarization angle and degree, plotting them as arrows in the vxB and vxvxB plane. It estimates uncertainties by picking a pure noise value of stokes parameters, propagating through the angle and degree formulas and plotting them as arrows with reduced opacity. Author: Karen Terveer Parameters ---------- event : Event object The event containing the stations and electric fields. detector : Detector object The detector object containing information about the detector. Returns ------- fig_pol : matplotlib Figure object The generated figure object containing the polarization plot. """ from NuRadioReco.framework.electric_field import get_stokes fig_pol, ax = plt.subplots(figsize=(8,7)) triggered_station_ids = [ station.get_id() for station in event.get_stations() if station.get_parameter(stationParameters.triggered) ] num_stations = len(triggered_station_ids) cmap = get_cmap('jet') norm = Normalize(vmin=0, vmax=num_stations-1) lora_core = event.get_hybrid_information().get_hybrid_shower("LORA").get_parameter(showerParameters.core) try: core = event.get_first_shower().get_parameter(showerParameters.core) except KeyError: self.logger.warning("No radio core found, using LORA core instead") core = lora_core for i, station in enumerate(event.get_stations()): if station.get_parameter(stationParameters.triggered): zenith = station.get_parameter(stationParameters.cr_zenith) / units.rad azimuth = station.get_parameter(stationParameters.cr_azimuth) / units.rad cs = radiotools.coordinatesystems.cstrafo( zenith, azimuth, magnetic_field_vector=None, site="lofar" ) efields = station.get_electric_fields() station_pos = detector.get_absolute_position(station.get_id()) station_pos_vB = cs.transform_to_vxB_vxvxB(station_pos, core=core)[0] station_pos_vvB = cs.transform_to_vxB_vxvxB(station_pos, core=core)[1] ax.scatter( station_pos_vB, station_pos_vvB, color=cmap(norm(i)), s=20, label=f'Station CS{station.get_id():03d}' ) for field in efields: ids = field.get_channel_ids() pos = station_pos + detector.get_relative_position(station.get_id(), ids[0]) pos_vB = cs.transform_to_vxB_vxvxB(pos, core=core)[0] pos_vvB = cs.transform_to_vxB_vxvxB(pos, core=core)[1] pulse_window_start, pulse_window_end = station.get_channel(ids[0]).get_parameter(channelParameters.signal_regions) pulse_window_len = pulse_window_end - pulse_window_start trace = field.get_trace()[:,pulse_window_start:pulse_window_end] efield_trace_vxB_vxvxB = cs.transform_to_vxB_vxvxB( cs.transform_from_onsky_to_ground(trace) ) #get stokes parameters stokes = get_stokes(*efield_trace_vxB_vxvxB[:2], window_samples=64) stokes_max = np.argmax(stokes[0]) I = stokes[0,stokes_max] Q = stokes[1,stokes_max] U = stokes[2,stokes_max] V = stokes[3,stokes_max] # get stokes uncertainties by picking a pure noise value I_sigma = stokes[0, stokes_max-pulse_window_len//4] Q_sigma = stokes[1, stokes_max-pulse_window_len//4] U_sigma = stokes[2, stokes_max-pulse_window_len//4] V_sigma = stokes[3, stokes_max-pulse_window_len//4] pol_angle = 0.5 * np.arctan2(U,Q) pol_angle_sigma= np.sqrt((U_sigma**2*(0.5*Q/(U**2+Q**2))**2 + Q_sigma**2*(0.5*U/(U**2+Q**2))**2)) # if the polarization deviates from the vxB direction by more than 80 degrees, # this could indicate something wrong with the antenna. Show a warning including # the channel ids if np.abs(0.5 * np.arctan2(U,Q)) > 80*np.pi/180: self.logger.warning("strange polarization direction in channel group %s" % ids) pol_degree= np.sqrt(U**2 + Q**2 + V**2) / I pol_degree *= 7 # scale for better visibility dx = pol_degree * np.cos(pol_angle) dy = pol_degree * np.sin(pol_angle) dx_sigma_plus = pol_degree * np.cos(pol_angle + pol_angle_sigma) dy_sigma_plus = pol_degree * np.sin(pol_angle + pol_angle_sigma) dx_sigma_minus = pol_degree * np.cos(pol_angle - pol_angle_sigma) dy_sigma_minus = pol_degree * np.sin(pol_angle - pol_angle_sigma) ax.arrow( pos_vB, pos_vvB, dx_sigma_plus, dy_sigma_plus, head_width=2, head_length=5, fc=cmap(norm(i)), ec = cmap(norm(i)), alpha=0.5 ) ax.arrow( pos_vB, pos_vvB, dx_sigma_minus, dy_sigma_minus, head_width=2, head_length=5, fc=cmap(norm(i)), ec = cmap(norm(i)), alpha=0.5 ) ax.arrow( pos_vB, pos_vvB, dx, dy, head_width=2, head_length=6, fc=cmap(norm(i)), ec = cmap(norm(i)) ) if np.any(core != lora_core): lora_vB = cs.transform_to_vxB_vxvxB(lora_core, core=core)[0] lora_vvB = cs.transform_to_vxB_vxvxB(lora_core, core=core)[1] ax.scatter(lora_vB, lora_vvB, color='tab:red', s=50, label='LORA core', marker = 'x') label = 'radio core' else: label = 'LORA core' ax.scatter([0], [0], color='black', s=50, label=label, marker = 'x') ax.legend() ax.set_xlabel('Direction along $v \\times B$ [m]') ax.set_ylabel('Direction along $v \\times (v \\times B)$ [m]') return fig_pol
[docs] def show_direction_plot(self, event): """ Make a comparison of the reconstructed direction per station with the LORA direction. Author: Philipp Laub Parameters ---------- event : Event object The event for which to show the final plots. Returns ------- fig_dir : matplotlib Figure object The generated figure object containing the direction plot. """ # plot reconstructed directions of all stations and compare to LORA in polar plot: fig_dir, ax = plt.subplots(subplot_kw={'projection': 'polar'}) ax.set_theta_zero_location('E') ax.set_theta_direction(1) triggered_station_ids = [ station.get_id() for station in event.get_stations() if station.get_parameter(stationParameters.triggered) ] num_stations = len(triggered_station_ids) cmap = get_cmap('jet') norm = Normalize(vmin=0, vmax=num_stations-1) for i, station in enumerate(event.get_stations()): if station.get_parameter(stationParameters.triggered): try: zenith = station.get_parameter(stationParameters.cr_zenith) azimuth = station.get_parameter(stationParameters.cr_azimuth) except KeyError: self.logger.info( f"Station CS{station.get_id():03d} does not have a reconstructed direction, " f"so I am not plotting this one." ) continue ax.plot( azimuth, zenith, label=f'Station CS{station.get_id():03d}', marker='P', markersize=7, linestyle='', color=cmap(norm(i)) ) ax.plot( event.get_hybrid_information().get_hybrid_shower("LORA").get_parameter(showerParameters.azimuth), event.get_hybrid_information().get_hybrid_shower("LORA").get_parameter(showerParameters.zenith), label='LORA', marker="X", markersize=7, linestyle='', color='black' ) ax.legend() ax.set_title("Reconstructed arrival directions") return fig_dir
[docs] def show_time_fluence_plot(self, event, detector, min_number_good_antennas=4): """ Plot the antenna positions, marking arrival time by color and pseudofluence by markersize. The reconstructed arrival direction per station is indicated with an arrow. Author: Philipp Laub Parameters ---------- event : Event object The event for which to show the final plots. detector : Detector object The detector for which to show the final plots. min_number_good_antennas : int, default=4 The minimum number of good antennas that should be present in a station to consider it for the fit. Returns ------- fig_pol : matplotlib Figure object The generated figure object containing the polarization plot. """ time = detector.get_detector_time().utc if time.mjd < 56266: self.logger.warning("Event was before Dec 1, 2012. The non-core station clocks might be off.") good_antennas_dict = check_for_good_ant(event, detector) fig_time, ax = plt.subplots(dpi=150, figsize=(8, 5)) triggered_station_ids = [ station.get_id() for station in event.get_stations() if station.get_parameter(stationParameters.triggered) ] num_stations = len(triggered_station_ids) cmap = get_cmap('jet') norm = Normalize(vmin=0, vmax=num_stations-1) fluences = [] positions = [] SNRs = [] for station in event.get_stations(): if station.get_parameter(stationParameters.triggered): try: azimuth = station.get_parameter(stationParameters.cr_azimuth) except KeyError: self.logger.info( f"Station CS{station.get_id():03d} does not have a reconstructed direction, " f"so I am not plotting this one." ) continue good_antennas = good_antennas_dict[station.get_id()] if len(good_antennas) >= min_number_good_antennas: for antenna in good_antennas: positions.append( detector.get_relative_position(station.get_id(), antenna) + detector.get_absolute_position(station.get_id()) ) channel = station.get_channel(antenna) SNRs.append(channel.get_parameter(channelParameters.SNR)) fluences.append(np.sum(np.square(channel.get_trace()))) station_pos = detector.get_absolute_position(station.get_id()) ax.quiver(station_pos[0], station_pos[1], np.cos(azimuth), np.sin(azimuth), color='black', scale=0.02, scale_units='xy', angles='uv', width=0.005) timelags = [] for station in event.get_stations(): if station.get_parameter(stationParameters.triggered): good_antennas = good_antennas_dict[station.get_id()] if len(good_antennas) >= min_number_good_antennas: for channel_id in good_antennas: timelags.append(station.get_channel(channel_id).get_parameter(channelParameters.signal_time)) for i, station in enumerate(event.get_stations()): # plot absolute station positions if station.get_parameter(stationParameters.triggered): station_pos = detector.get_absolute_position(station.get_id()) ax.scatter(station_pos[0], station_pos[1], color=cmap(norm(i)), s=20, label=f'Station CS{station.get_id():03d}') timelags = np.array(timelags) timelags -= timelags[0] # get timelags wrt 1st antenna # plot all locations and use arrival time for color and fluence for marker size and add a colorbar positions = np.array(positions) fluences = np.array(fluences) SNRs = np.array(SNRs) fluence_norm = Normalize(vmin=np.min(fluences), vmax=np.max(fluences)) sc = ax.scatter( positions[:,0], positions[:,1], c=timelags, s=15 * fluence_norm(fluences), cmap='viridis', zorder=-1) ax.set_aspect('equal') plt.colorbar(sc, label='Relative arrival time [ns]', shrink=0.7) ax.set_xlabel('Meters east [m]') ax.set_ylabel('Meters north [m]') plt.legend() plt.title("Antenna positions and arrival time") return fig_time
[docs] @register_run() def run(self, event, detector, save_dir='.', polarization=False, direction=False): """ Produce pipeline plots for the given event. Parameters ---------- event : Event object The event for which to visualize the pipeline. detector : Detector object The detector for which to visualize the pipeline. save_dir : str, optional The directory to save the plots to. Default is the current directory. """ plots = [] if polarization: pol_plot = self.plot_polarization(event, detector) plots.append(pol_plot) pol_plot.savefig(f'{save_dir}/polarization_plot_{event.get_id()}.png') if direction: dir_plot = self.show_direction_plot(event) plots.append(dir_plot) dir_plot.savefig(f'{save_dir}/direction_plot_{event.get_id()}.png') time_fluence_plot = self.show_time_fluence_plot(event, detector) plots.append(time_fluence_plot) time_fluence_plot.savefig(f'{save_dir}/time_fluence_plot_{event.get_id()}.png') self.plots = [plot for plot in plots]
[docs] def end(self): pass