from NuRadioReco.modules.base.module import register_run
from NuRadioReco.utilities import units, interferometry
from NuRadioReco.framework.parameters import showerParameters as shp
from radiotools import helper as hp, coordinatesystems
from radiotools.atmosphere import models, refractivity
from collections import defaultdict
import numpy as np
import sys
import copy
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
import logging
logger = logging.getLogger('NuRadioReco.efieldRadioInterferometricReconstruction')
"""
This module hosts to classes
- efieldInterferometricDepthReco
- efieldInterferometricAxisReco (TODO: Not yet implemented)
The radio-interferometric reconstruction (RIT) was proposed in [1].
The implementation here is based on work published in [2].
[1]: H. Schoorlemmer, W. R. Carvalho Jr., arXiv:2006.10348
[2]: F. Schlueter, T. Huege, doi:10.1088/1748-0221/16/07/P07048
"""
[docs]class efieldInterferometricDepthReco:
"""
This class reconstructs the depth of the maximum of the longitudinal profile of the
beam-formed radio emission: X_RIT along a given axis. X_RIT is found to correlate with X_max.
This correlation may depend on the zenith angle, the frequency band, and detector layout
(if the detector is very irregular).
For the reconstruction, a shower axis is needed. The radio emission in the vxB polarisation is used
(and thus the arrival direction is needed).
"""
def __init__(self):
self._debug = None
self._at = None
self._tab = None
self._interpolation = None
self._signal_kind = None
pass
[docs] def begin(self, interpolation=True, signal_kind="power", debug=False):
"""
Set module config.
Parameters
----------
interpolation : bool
If true, use a linear interpolation to match sampling of the beamformed signal trace and the individual time-shifted antenna traces.
Default is True, False is not yet implemented
signal_kind : str
Define which signal "metric" is used on the beamformed traces. Default "power" : sum over the squared amplitudes in a 100 ns window around the peak.
Other options are "amplitude" or "hilbert_sum"
debug : bool
If true, show some debug plots (Default: False).
"""
self._debug = debug
self._interpolation = interpolation
self._signal_kind = signal_kind
self._data = defaultdict(list)
pass
[docs] def sample_longitudinal_profile(
self, traces, times, station_positions,
shower_axis, core, depths=None, distances=None):
"""
Returns the longitudinal profile of the interferometic signal sampled along the shower axis.
Parameters
----------
traces : array(number_of_antennas, samples)
Electric field traces (one polarisation of it, usually vxB) for all antennas/stations.
times : array(number_of_antennas, samples)
Time vectors corresponding to the electric field traces.
station_positions : array(number_of_antennas, 3)
Position of each antenna.
shower_axis : array(3,)
Axis/direction along which the interferometric signal is sampled. Anchor is "core".
core : array(3,)
Shower core. Keep in mind that the altitudes (z-coordinate) matters.
depths : array (optinal)
Define the positions (slant depth along the axis) at which the interferometric signal is sampled.
Instead of "depths" you can provide "distances".
distances : array (optinal)
Define the positions (geometrical distance from core along the axis) at which the interferometric signal is sampled.
Instead of "distances" you can provide "depths".
Returns
-------
signals : array
Interferometric singals sampled along the given axis
"""
zenith = hp.get_angle(np.array([0, 0, 1]), shower_axis)
tstep = times[0, 1] - times[0, 0]
if depths is not None:
signals = np.zeros(len(depths))
depths_or_distances = depths
else:
if distances is None:
sys.exit("depths or distances has to be not None!")
signals = np.zeros(len(distances))
depths_or_distances = distances
for idx, dod in enumerate(depths_or_distances):
if depths is not None:
try:
# here z coordinate of core has to be the altitude of the observation_level
dist = self._at.get_distance_xmax_geometric(
zenith, dod, observation_level=core[-1])
except ValueError:
signals[idx] = 0
continue
else:
dist = dod
if dist < 0:
signals[idx] = 0
continue
point_on_axis = shower_axis * dist + core
if self._interpolation:
sum_trace = interferometry.interfere_traces_interpolation(
point_on_axis, station_positions, traces, times, tab=self._tab)
else:
# sum_trace = interferometry.interfere_traces_padding(
# point_on_axis, station_positions, core, traces, times, tab=self._tab)
sys.exit("Not implemented")
# plt.title(dod)
# plt.plot(sum_trace)
# plt.show()
signal = interferometry.get_signal(sum_trace, tstep, kind=self._signal_kind)
signals[idx] = signal
return signals
[docs] def reconstruct_interferometric_depth(
self, traces, times, station_positions, shower_axis, core,
lower_depth=400, upper_depth=800, bin_size=100, return_profile=False):
"""
Returns Gauss-parameters fitted to the "peak" of the interferometic
longitudinal profile along the shower axis.
A initial samping range and size in defined by "lower_depth", "upper_depth", "bin_size".
However if the "peak", i.e., maximum signal is found at an edge the sampling range in
continually increased (with a min/max depth of 0/2000 g/cm^2). The Gauss is fitted around the
found peak with a refined sampling (use 20 samples in this narrow range).
Parameters
----------
traces : array(number_of_antennas, samples)
Electric field traces (one polarisation of it, usually vxB) for all antennas/stations.
times : array(number_of_antennas, samples)
Time vectors corresponding to the electric field traces.
station_positions : array(number_of_antennas, 3)
Position of each antenna.
shower_axis : array(3,)
Axis/direction along which the interferometric signal is sampled. Anchor is "core".
core : array(3,)
Shower core. Keep in mind that the altitudes (z-coordinate) matters.
lower_depth : float
Define the lower edge for the inital sampling (default: 400 g/cm2).
upper_depth : float
Define the upper edge for the inital sampling (default: 800 g/cm2).
bin_size : float
Define the step size pf the inital sampling (default: 100 g/cm2).
The refined sampling around the peak region is / 10 this value.
return_profile : bool
If true return the sampled profile in addition to the Gauss parameter (default: False).
Returns
-------
If return_profile is True
depths_corse : np.array
Depths along shower axis coarsely sampled
depths_fine : np.array
Depths along shower axis finely sampled (used in fitting)
signals_corese : np.array
Beamformed signals along shower axis coarsely sampled
signals_fine : np.array
Beamformed signals along shower axis finely sampled (used in fitting)
popt : list
List of fitted Gauss parameters (amplitude, position, width)
If return_profile is False:
popt : list
List of fitted Gauss parameters (amplitude, position, width)
"""
depths = np.arange(lower_depth, upper_depth, bin_size)
signals_tmp = self.sample_longitudinal_profile(
traces, times, station_positions, shower_axis, core, depths=depths)
# if max signal is at the upper edge add points there
if np.argmax(signals_tmp) == len(depths) - 1:
while True:
depth_add = np.amax(depths) + bin_size
signal_add = self.sample_longitudinal_profile(
traces, times, station_positions, shower_axis, core, depths=[depth_add])
depths = np.append(depths, depth_add)
signals_tmp = np.append(signals_tmp, signal_add)
if not np.argmax(signals_tmp) == len(depths) - 1 or depth_add > 2000:
break
# if max signal is at the lower edge add points there
elif np.argmax(signals_tmp) == 0:
while True:
depth_add = np.amin(depths) - bin_size
signal_add = self.sample_longitudinal_profile(
traces, times, station_positions, shower_axis, core, depths=[depth_add])
depths = np.append(depth_add, depths)
signals_tmp = np.append(signal_add, signals_tmp)
if not np.argmax(signals_tmp) == 0 or depth_add <= 0:
break
idx_max = np.argmax(signals_tmp)
depths_final = np.linspace(
depths[idx_max - 1], depths[idx_max + 1], 20) # 10 g/cm2 bins
signals_final = self.sample_longitudinal_profile(
traces, times, station_positions, shower_axis, core, depths=depths_final)
def normal(x, A, x0, sigma):
""" Gauss curve """
return A / np.sqrt(2 * np.pi * sigma ** 2) \
* np.exp(-1 / 2 * ((x - x0) / sigma) ** 2)
popt, pkov = curve_fit(normal, depths_final, signals_final, p0=[np.amax(
signals_final), depths_final[np.argmax(signals_final)], 100], maxfev=1000)
if return_profile:
return depths, depths_final, signals_tmp, signals_final, popt
return popt
[docs] def update_atmospheric_model_and_refractivity_table(self, shower):
"""
Updates model of the atmosphere and tabulated, integrated refractive index according to shower properties.
Parameters
----------
shower : BaseShower
"""
if self._at is None:
self._at = models.Atmosphere(shower[shp.atmospheric_model])
self._tab = refractivity.RefractivityTable(
self._at.model, refractivity_at_sea_level=shower[shp.refractive_index_at_ground] - 1, curved=True)
elif self._at.model != shower[shp.atmospheric_model]:
self._at = models.Atmosphere(shower[shp.atmospheric_model])
self._tab = refractivity.RefractivityTable(
self._at.model, refractivity_at_sea_level=shower[shp.refractive_index_at_ground] - 1, curved=True)
elif self._tab._refractivity_at_sea_level != shower[shp.refractive_index_at_ground] - 1:
self._tab = refractivity.RefractivityTable(
self._at.model, refractivity_at_sea_level=shower[shp.refractive_index_at_ground] - 1, curved=True)
else:
pass
[docs] @register_run()
def run(self, evt, det, use_MC_geometry=True, use_MC_pulses=True):
"""
Run interferometric reconstruction of depth of coherent signal.
Parameters
----------
evt : Event
Event to run the module on.
det : Detector
Detector description
use_MC_geometry : bool
if true, take geometry from sim_shower. Results will than also be stored in sim_shower
use_MC_pulses : bool
if true, take electric field trace from sim_station
"""
# TODO: Mimic imperfect time syncronasation by adding a time jitter here?
# TODO: Make it more flexible. Choose shower from which the geometry and atmospheric properties are taken.
# Also store xrit in this shower.
if use_MC_geometry:
shower = evt.get_first_sim_shower()
else:
shower = evt.get_first_shower()
self.update_atmospheric_model_and_refractivity_table(shower)
core, shower_axis, cs = get_geometry_and_transformation(shower)
traces_vxB, times, pos = get_station_data(
evt, det, cs, use_MC_pulses, n_sampling=256)
if self._debug:
depths, depths_final, signals_tmp, signals_final, rit_parameters = \
self.reconstruct_interferometric_depth(
traces_vxB, times, pos, shower_axis, core, return_profile=True)
fig, ax = plt.subplots(1)
ax.plot(depths, signals_tmp, "o")
ax.plot(depths_final, signals_final, "o")
ax.plot(depths_final, interferometry.gaus(
depths_final, *rit_parameters[:-1]))
ax.axvline(rit_parameters[1])
plt.show()
else:
rit_parameters = self.reconstruct_interferometric_depth(
traces_vxB, times, pos, shower_axis, core)
xrit = rit_parameters[1]
shower.set_parameter(shp.interferometric_shower_maximum, xrit * units.g / units.cm2)
#TODO: Add calibration Xmax(Xrit, theta, ...)?
# for plotting
self._data["xrit"].append(xrit)
self._data["xmax"].append(shower[shp.shower_maximum] / (units.g / units.cm2))
self._data["zenith"].append(shower[shp.zenith])
[docs] def end(self):
"""
Plot reconstructed depth vs true depth of shower maximum (Xmax).
"""
fig, ax = plt.subplots(1)
sct = ax.scatter(
self._data["xmax"], self._data["xrit"], s=200, c=np.rad2deg(self._data["zenith"]))
cbi = plt.colorbar(sct, pad=0.02)
cbi.set_label("zenith angle / deg")
ax.set_xlabel(r"$X_\mathrm{max}$ / g$\,$cm$^{-2}$")
ax.set_ylabel(r"$X_\mathrm{RIT}$ / g$\,$cm$^{-2}$")
fig.tight_layout()
plt.show()
pass
[docs]class efieldInterferometricAxisReco(efieldInterferometricDepthReco):
"""
Class to reconstruct the shower axis with beamforming.
"""
def __init__(self):
super().__init__()
[docs] def find_maximum_in_plane(self, xs, ys, p_axis, station_positions, traces, times, cs):
"""
Sample interferometric signals in 2-d plane (vxB-vxvxB) perpendicular to a given axis on a rectangular/quadratic grid.
The orientation of the plane is defined by the radiotools.coordinatesytem.cstrafo argument.
Parameters
----------
xs : array
x-coordinates defining the sampling positions.
ys : array
y-coordinates defining the sampling positions.
p_axis : array(3,)
Origin of the 2-d plane along the axis.
station_positions : array(number_of_antennas, 3)
Position of each antenna.
traces : array(number_of_antennas, samples)
Electric field traces (one polarisation of it, usually vxB) for all antennas/stations.
times : array(number_of_antennas, samples)
Time vectors corresponding to the electric field traces.
cs : radiotools.coordinatesytem.cstrafo
Returns
-------
idx : int
Index of the entry with the largest signal (np.argmax(signals))
signals : array(len(xs), len(ys))
Interferometric signal
"""
signals = np.zeros((len(xs), len(ys)))
tstep = times[0, 1] - times[0, 0]
for xdx, x in enumerate(xs):
for ydx, y in enumerate(ys):
p = p_axis + cs.transform_from_vxB_vxvxB(np.array([x, y, 0]))
sum_trace = interferometry.interfere_traces_interpolation(
p, station_positions, traces, times, tab=self._tab)
signal = interferometry.get_signal(
sum_trace, tstep, kind=self._signal_kind)
signals[xdx, ydx] = signal
idx = np.argmax(signals)
return idx, signals
[docs] def sample_lateral_cross_section(
self,
traces, times, station_positions,
shower_axis_inital, core, depth, cs,
shower_axis_mc, core_mc,
relative=False, initial_grid_spacing=60, centered_around_truth=True,
cross_section_size=1000, deg_resolution=np.deg2rad(0.005)):
"""
Sampling the "cross section", i.e., 2d-lateral distribution of the beam formed signal for a slice in the atmosphere.
It is looking for the maximum in the lateral distribution with an (stupid) iterative grid search.
Returns the position and the strenght of the maximum signal.
Parameters
----------
traces : array(number_of_antennas, samples)
Electric field traces (one polarisation of it, usually vxB) for all antennas/stations.
times : array(number_of_antennas, samples)
Time vectors corresponding to the electric field traces.
station_positions : array(number_of_antennas, 3)
Position of each antenna.
shower_axis_inital : array(3,)
Axis/direction which is used as initial guess for the true shower axis.
Around this axis the interferometric signals are sample on 2-d planes.
core : array(3,)
Shower core which is used as initial guess. Keep in mind that the altitudes (z-coordinate) matters.
depth : np.array
cs : radiotools.coordinatesytem.cstrafo
shower_axis_mc : np.array(3,)
core_mc : np.array(3,)
relative : bool (Default: False)
If True, the size of the search grid is relative to the distance between the MC axis and the inital guess axis.
The search grid will by a 20 x 20 and just include the MC axis. It is made sure that the MC axis is not at a
grid point. If False, see `centered_around_truth`.
initial_grid_spacing : double
Initial spacing of your grid points in meters. (Default: 60m)
centered_around_truth : bool (Default: True)
Only used when `relative == False`. If True, the search grid will be constructed around the MC axis. The size
and spacing between grid points is determined by `cross_section_size` and `initial_grid_spacing`. If False,
the search grid is constructed around the inital guess axis. It ensured that the MC-axis is within the search grid.
that means the grid size might be abitrary large (which makes the reconstruction very slow) if the inital axis is far off
the MC axis.
cross_section_size : float
(Only used when `centered_around_truth == True`.) Side length on the 2-d planes (slice) along which the maximum around
the initial axis is sampled in meters. (Default: 1000m)
deg_resolution : float
Target spacing for the grid spacing in terms of opening angle. Unit is radiants.
Defines the stopping condition for the iterations. (Default: np.deg2rad(0.005))
Returns
-------
point_found : np.array(3,)
Position of the found maximum
weight : float
Amplitude/Strengt of the maximum
"""
zenith_inital, _ = hp.cartesian_to_spherical(
*np.split(shower_axis_inital, 3))
dist = self._at.get_distance_xmax_geometric(
zenith_inital, depth, observation_level=core[-1])
p_axis = shower_axis_inital * dist + core
# we use the true core to make sure that it is within the inital search gri
mc_at_plane = interferometry.get_intersection_between_line_and_plane(
shower_axis_inital, p_axis, shower_axis_mc, core_mc)
mc_vB = cs.transform_to_vxB_vxvxB(mc_at_plane, core=p_axis)
# mc_points.append(mc_at_plane + core)
dr_ref_target = np.tan(deg_resolution) * dist
if relative:
# ensure that true xmax is within the search grid but not on a grid point
xlims = (np.array([-1.2, 1.2]) + np.random.uniform(0, .1, 2)) * np.abs(mc_vB[0])
ylims = (np.array([-1.2, 1.2]) + np.random.uniform(0, .1, 2)) * np.abs(mc_vB[1])
xs = np.linspace(*xlims, 20)
ys = np.linspace(*ylims, 20)
else:
if centered_around_truth:
xs = np.arange(mc_vB[0] - cross_section_size / 2 -
np.random.uniform(0, initial_grid_spacing, 1),
mc_vB[0] + cross_section_size / 2, initial_grid_spacing)
ys = np.arange(mc_vB[1] - cross_section_size / 2 -
np.random.uniform(0, initial_grid_spacing, 1),
mc_vB[1] + cross_section_size / 2, initial_grid_spacing)
else:
# quadratic grid with true intersection contained
max_dist = np.amax(
[np.abs(mc_vB[0]), np.abs(mc_vB[1]), 3 * initial_grid_spacing]) + initial_grid_spacing
xlims = np.array([-max_dist, max_dist]) + \
np.random.uniform(-0.1 * initial_grid_spacing,
0.1 * initial_grid_spacing, 2)
ylims = np.array([-max_dist, max_dist]) + \
np.random.uniform(-0.1 * initial_grid_spacing,
0.1 * initial_grid_spacing, 2)
xs = np.arange(xlims[0], xlims[1] +
initial_grid_spacing, initial_grid_spacing)
ys = np.arange(ylims[0], ylims[1] +
initial_grid_spacing, initial_grid_spacing)
iloop = 0
while True:
idx, signals = self.find_maximum_in_plane(
xs, ys, p_axis, station_positions, traces, times, cs=cs)
if self._debug:
# from AERAutilities import pyplots, pyplots_utils # for mpl rc
plot_lateral_cross_section(
xs, ys, signals, mc_vB, title=r"%.1f$\,$g$\,$cm$^{-2}$" % depth)
iloop += 1
dr = np.sqrt((xs[1] - xs[0]) ** 2 + (ys[1] - ys[0]) ** 2)
if iloop == 10 or dr < dr_ref_target:
break
# maximum
x_max = xs[int(idx // len(ys))]
y_max = ys[int(idx % len(ys))]
# update range / grid
dx = xs[1] - xs[0]
dy = ys[1] - ys[0]
if iloop >= 2:
dx /= 2
dy /= 2
xs = np.linspace(x_max - dx, x_max + dx, 5)
ys = np.linspace(y_max - dy, y_max + dy, 5)
weight = np.amax(signals)
xfound = xs[int(idx // len(ys))]
yfound = ys[int(idx % len(ys))]
point_found = p_axis + \
cs.transform_from_vxB_vxvxB(np.array([xfound, yfound, 0]))
return point_found, weight
[docs] def reconstruct_shower_axis(
self,
traces, times, station_positions,
shower_axis, core,
magnetic_field_vector,
is_mc=True,
initial_grid_spacing=60,
cross_section_size=1000):
"""
Run interferometric reconstruction of the shower axis. Find the maxima of the interferometric signals
within 2-d plane (slices) along a given axis (initial guess). Through those maxima (their position in the
atmosphere) a straight line is fitted to reconstruct the shower axis.
traces : array(number_of_antennas, samples)
Electric field traces (one polarisation of it, usually vxB) for all antennas/stations.
times : array(number_of_antennas, samples)
Time vectors corresponding to the electric field traces.
station_positions : array(number_of_antennas, 3)
Position of each antenna.
shower_axis_inital : array(3,)
Axis/direction which is used as initial guess for the true shower axis.
Around this axis the interferometric signals are sample on 2-d planes.
core : array(3,)
Shower core which is used as initial guess. Keep in mind that the altitudes (z-coordinate) matters.
magnetic_field_vector : array(3,)
Magnetic field vector of the site you are using.
is_mc : bool
If true, interprete the provided shower axis as truth and add some gaussian smearing to optain an
inperfect initial guess for the shower axis (Default: True).
initial_grid_spacing : double
Spacing of your grid points in meters (Default: 60m)
cross_section_size : double
Side length on the 2-d planes (slice) along which the maximum around the initial axis is sampled in meters
(Default: 1000m).
"""
if is_mc:
zenith_mc, azimuth_mc = hp.cartesian_to_spherical(*shower_axis)
# smeare mc axis.
zenith_inital = zenith_mc + np.deg2rad(np.random.normal(0, 0.5))
azimuth_inital = azimuth_mc + np.deg2rad(np.random.normal(0, 0.5))
shower_axis_inital = hp.spherical_to_cartesian(
zenith=zenith_inital, azimuth=azimuth_inital)
else:
shower_axis_inital = shower_axis
core_inital = core
zenith_inital, azimuth_inital = hp.cartesian_to_spherical(
*shower_axis)
shower_axis, core = None, None
raise ValueError("is_mc=False is not yet properly implemented!")
cs = coordinatesystems.cstrafo(
zenith_inital, azimuth_inital, magnetic_field_vector=magnetic_field_vector)
if is_mc:
core_inital = cs.transform_from_vxB_vxvxB_2D(
np.array([np.random.normal(0, 100), np.random.normal(0, 100), 0]), core)
depths = [500, 600, 700, 800, 900, 1000]
deg_resolution = np.deg2rad(0.005)
found_points = []
weights = []
relative = False
centered_around_truth = True
def sample_lateral_cross_section_placeholder(dep):
"""
Run sample_lateral_cross_section for a particular depth.
Parameters
----------
dep : double
Depth along the axis at which the cross section is sampled in g/cm2.
"""
return self.sample_lateral_cross_section(
traces, times, station_positions,
shower_axis_inital, core_inital, dep, cs,
shower_axis, core,
relative=relative, initial_grid_spacing=initial_grid_spacing,
centered_around_truth=centered_around_truth,
cross_section_size=cross_section_size, deg_resolution=deg_resolution)
for depth in depths:
found_point, weight = sample_lateral_cross_section_placeholder(depth)
found_points.append(found_point)
weights.append(weight)
if 0:
while True:
if np.argmax(weights) != 0:
break
new_depth = depths[0] - (depths[1] - depths[0])
print("extend to", new_depth)
found_point, weight = sample_lateral_cross_section_placeholder(
new_depth)
depths = [new_depth] + depths
found_points = [found_point] + found_points
weights = [weight] + weights
while True:
if np.argmax(weights) != len(weights):
break
new_depth = depths[-1] + (depths[1] - depths[0])
print("extend to", new_depth)
found_point, weight = sample_lateral_cross_section_placeholder(
new_depth)
depths.append(new_depth)
found_points.append(found_point)
weights.append(weight)
found_points = np.array(found_points)
weights = np.array(weights)
popt, pcov = curve_fit(interferometry.fit_axis, found_points[:, -1], found_points.flatten(),
sigma=np.amax(weights) / np.repeat(weights, 3), p0=[zenith_inital, azimuth_inital, 0, 0])
direction_rec = hp.spherical_to_cartesian(*popt[:2])
core_rec = interferometry.fit_axis(np.array([core[-1]]), *popt)
return direction_rec, core_rec
[docs] @register_run()
def run(self, evt, det, use_MC_geometry=True, use_MC_pulses=True):
"""
Run interferometric reconstruction of depth of coherent signal.
Parameters
----------
evt : Event
Event to run the module on.
det : Detector
Detector description
use_MC_geometry : bool
if true, take geometry from sim_shower. Results will than also be stored in sim_shower
use_MC_pulses : bool
if true, take electric field trace from sim_station
"""
# TODO: Mimic imperfect time syncronasation by adding a time jitter here?
# TODO: Make it more flexible. Choose shower from which the geometry and atmospheric properties are taken.
# Also store xrit in this shower.
if use_MC_geometry:
shower = evt.get_first_sim_shower()
else:
shower = evt.get_first_shower()
self.update_atmospheric_model_and_refractivity_table(shower)
core, shower_axis, cs = get_geometry_and_transformation(shower)
traces_vxB, times, pos = get_station_data(
evt, det, cs, use_MC_pulses, n_sampling=256)
direction_rec, core_rec = self.reconstruct_shower_axis(
traces_vxB, times, pos, shower_axis, core, is_mc=True, magnetic_field_vector=shower[shp.magnetic_field_vector])
shower.set_parameter(shp.interferometric_shower_axis, direction_rec)
shower.set_parameter(shp.interferometric_core, core_rec)
[docs]def get_station_data(evt, det, cs, use_MC_pulses, n_sampling=None):
"""
Returns station data in a proper format
Parameters
----------
evt : Event
det : Detector
cs : radiotools.coordinatesystems.cstrafo
use_MC_pulses : bool
if true take electric field trace from sim_station
n_sampling : int
if not None clip trace with n_sampling // 2 around np.argmax(np.abs(trace))
Returns
-------
traces_vxB : np.array
The electric field traces in the vxB polarisation (takes first electric field stored in a station) for all stations/observers.
times : mp.array
The electric field traces time series for all stations/observers.
pos : np.array
Positions for all stations/observers.
"""
traces_vxB = []
times = []
pos = []
for station in evt.get_stations():
if use_MC_pulses:
station = station.get_sim_station()
for electric_field in station.get_electric_fields():
traces = cs.transform_to_vxB_vxvxB(
cs.transform_from_onsky_to_ground(electric_field.get_trace()))
trace_vxB = traces[0]
time = copy.copy(electric_field.get_times())
if n_sampling is not None:
hw = n_sampling // 2
m = np.argmax(np.abs(trace_vxB))
if m < hw:
m = hw
if m > len(trace_vxB) - hw:
m = len(trace_vxB) - hw
trace_vxB = trace_vxB[m-hw:m+hw]
time = time[m-hw:m+hw]
traces_vxB.append(trace_vxB)
times.append(time)
break # just take the first efield. TODO: Improve this
pos.append(det.get_absolute_position(station.get_id()))
traces_vxB = np.array(traces_vxB)
times = np.array(times)
pos = np.array(pos)
return traces_vxB, times, pos
[docs]def plot_lateral_cross_section(xs, ys, signals, mc_pos=None, fname=None, title=None):
"""
Plot the lateral distribution of the beamformed singal (in the vxB, vxvxB directions).
Parameters
----------
xs : np.array
Positions on x-axis (vxB) at which the signal is sampled (on a 2d grid)
ys : np.array
Positions on y-axis (vxvxB) at which the signal is sampled (on a 2d grid)
signals : np.array
Signals sampled on the 2d grid defined by xs and ys.
mc_pos : np.array(2,)
Intersection of the (MC-)axis with the "slice" of the lateral distribution plotted.
fname : str
Name of the figure. If given the figure is saved, if fname is None the fiture is shown.
title : str
Title of the figure (Default: None)
"""
yy, xx = np.meshgrid(ys, xs)
fig, ax = plt.subplots(1)
pcm = ax.pcolormesh(xx, yy, signals, shading='gouraud')
ax.plot(xx, yy, "ko", markersize=3)
cbi = plt.colorbar(pcm, pad=0.02)
cbi.set_label(r"$f_{B_{j}}$ / eV$\,$m$^{-2}$")
idx = np.argmax(signals)
xfound = xs[int(idx // len(ys))]
yfound = ys[int(idx % len(ys))]
ax.plot(xfound, yfound, ls=None, marker="o", color="C1",
markersize=10, label="found maximum")
if mc_pos is not None:
ax.plot(mc_pos[0], mc_pos[1], ls=None, marker="*", color="r",
markersize=10, label=r"intersection with $\hat{a}_\mathrm{MC}$")
ax.legend()
ax.set_ylabel(
r"$\vec{v} \times \vec{v} \times \vec{B}$ / m")
ax.set_xlabel(r"$\vec{v} \times \vec{B}$ / m")
if title is not None:
ax.set_title(title) # r"slant depth = %d g / cm$^2$" % depth)
plt.tight_layout()
if fname is not None:
# "rit_xy_%d_%d_%s.png" % (depth, iloop, args.label))
plt.savefig(fname)
else:
plt.show()