Source code for NuRadioReco.utilities.interferometry

import numpy as np
import sys
from scipy import signal, constants
from radiotools import helper as hp
from NuRadioReco.utilities import units

# to convert V**2/m**2 * ns -> V**2/m**2 * s -> J/m**2 -> eV/m**2
conversion_factor_integrated_signal = 1 / units.s * \
    constants.c * constants.epsilon_0 / units.eSI


[docs]def get_signal(sum_trace, tstep, window_width=100 * units.ns, kind="power"): """ Calculates signal quantity from beam-formed waveform Parameters ---------- sum_trace : np.array(m,) beam-formed waveform with m samples tstep : double Sampling bin size window_width : double Time window size to calculate power kind : str Key-word what to do: "amplitude", "power", or "hilbert_sum" Returns ------- signal : double Signal calculated according to the specified metric """ # find signal peak hilbenv = np.abs(signal.hilbert(sum_trace)) peak_idx = np.argmax(hilbenv) if kind == "amplitude": return hilbenv[peak_idx] elif kind == "power" or kind == "hilbert_sum": trace_length = len(sum_trace) # shift peak in middle of trace sum_trace = np.roll(sum_trace, trace_length // 2 - peak_idx) peak_idx += trace_length // 2 - peak_idx # define signal window. If trace is to small -> pad idx_width = int(window_width / 2 // tstep) if trace_length < 2 * idx_width: sum_trace = np.hstack( [np.zeros(idx_width), sum_trace, np.zeros(idx_width)]) peak_idx += idx_width sum_trace *= conversion_factor_integrated_signal * tstep if kind == "power": # return sum of squares within signal window return np.sum(sum_trace[peak_idx-idx_width:peak_idx+idx_width] ** 2) elif kind == "hilbert_sum": hilbenv = np.abs(signal.hilbert(sum_trace)) return np.sum(hilbenv[peak_idx-idx_width:peak_idx+idx_width]) else: sys.exit("get_signal(), kind = '{}' not supported".format(kind))
[docs]def interfere_traces_interpolation(target_pos, positions, traces, times, tab): """ Calculate sum of time shifted waveforms. Performs a linear interpolation between samples. Parameters ---------- target_pos : np.array(3,) source/traget location positions : np.array(n, 3) observer positions traces : np.array(n, m) waveforms of n observers with m samples times : np.array(n, m) time stampes of the waveforms of each observer tab : radiotools.atmosphere.refractivity.RefractivityTable Tabulated table of the avg. refractive index between two points Returns ------- sum_trace : np.array(n, m) Summed trace """ # positions all have to be in sea level plane coordinates! times = times tstep = times[0, 1] - times[0, 0] tshifts = get_time_shifts(target_pos, positions, tab) times_new = times - tshifts[:, None] first_time = np.amin(times_new) last_time = np.amax(times_new) time_sum = np.arange(first_time, last_time + tstep, tstep) sum_trace = np.zeros(len(time_sum)) for trace, time in zip(traces, times_new): fidx = np.around((time[1:] - time_sum[0]) / tstep, 4) # TODO: check if that makes sense idx = np.array(fidx, dtype=int) if not np.unique(idx).size == len(idx): sys.exit( "Index array has not unique entries. That is most probably a rounding issue!") f = (fidx - idx)[0] # are all the same """ Linear interplation to match the binning of time_sum. """ trace_new = (1 - f) * trace[1:] + f * trace[:-1] sum_trace[idx] += trace_new return sum_trace
[docs]def get_time_shifts(target_pos, positions, tab): """ Calculates the time delay of an electromagnetic wave along a straight trajectories between a source/traget location and several observers. Parameters ---------- target_pos : np.array(3,) source/traget location positions : np.array(n, 3) observer positions (n observers) tab : radiotools.atmosphere.refractivity.RefractivityTable Tabulated table of the avg. refractive index between two points Returns ------- tshifts : np.array(n,) Time delay in sec """ tshifts = np.zeros(len(positions)) for idx, pos in enumerate(positions): effective_refractivity = tab.get_refractivity_between_two_points_tabulated( target_pos, pos) dt = np.linalg.norm(target_pos - pos) * \ (effective_refractivity + 1) / constants.c tshifts[idx] = dt return tshifts * units.s
[docs]def fit_axis(z, theta, phi, coreX, coreY): """ Predicts the intersetction of an axis/line with horizontal layers at different heights. Line is described by an anchor on a horizontal plane (coreX, coreY) and a direction in spherical coordinates (theta, phi). Returns the position/intersection of the line with flat horizontal layers at given height(s) z. Resulting array (positions) is flatten. Parameters ---------- z : array The height(s) for which the position on the defined axis should be evaluated. theta : double Zenith angle of the axis phi : double Azimuth angle of the axis coreX : Double x-coordinate of the intersection of the axis with a horizontal plane with z = 0. coreY : Double y-coordinate of the intersection of the axis with a horizontal plane with z = 0. Returns ------- points : array The flatten array of the positions on along the defined axis at heights given by "z" """ axis = hp.spherical_to_cartesian(theta, phi) norm = z / axis[-1] norm = np.asarray(norm) # when z is a float points = axis.reshape(1, 3) * norm[:, None] + \ np.array([coreX, coreY, 0])[None, :] return points.flatten()
[docs]def get_intersection_between_line_and_plane(plane_normal, plane_anchor, line_direction, line_anchor, epsilon=1e-6): """ Find intersection betweem a line and a plane. From https://rosettacode.org/wiki/Find_the_intersection_of_a_line_with_a_plane#Python Parameters ---------- plane_normal : np.array(3,) Normal vector of a plane plane_anchor : np.array(3,) Anchor of this plane line_direction : np.array(3,) Direction of a line line_anchor : np.array(3,) Anchor of this line epsilon : double Numerical precision Returns ------- psi : array(3,) Position of the intersection between plane and line """ ndotu = plane_normal.dot(line_direction) if abs(ndotu) < epsilon: raise RuntimeError("no intersection or line is within plane") w = line_anchor - plane_anchor si = -plane_normal.dot(w) / ndotu psi = w + si * line_direction + plane_anchor return psi