from NuRadioReco.utilities import units
import NuRadioReco.framework.base_trace
from scipy import interpolate
import numpy as np
import logging
import datetime
import copy
class Response:
This class provides an interface to read-in and apply the complex response functions of the
various components of the signal chain of a RNO-G channel. The class allows to combine the
response of several components into one response by multiplying them.
.. code-block::
response = det.get_signal_chain_response(station_id=24, channel_id=0)
# Multipy the response to a trace. The multiply operator takes care of everything
trace_at_readout = trace_at_antenna * response
# getting the complex response as array
freq = np.arange(50, 1000) * units.MHz
complex_resp = response(freq)
def __init__(
self, frequency, y, y_unit, time_delay=0, weight=1,
name="default", station_id=None, channel_id=None,
remove_time_delay=True, debug_plot=False,
log_level=logging.NOTSET, attenuator_in_dB=0):
frequency : list(float)
The frequency vector at which the response is measured. Unit has to be GHz.
y : [list(float), list(float)]
The measured response. First entry is the vector of the measured amplitude, the second entry is the measured phase.
The unit of both entries is specified with the next argument.
y_unit : [str, str]
The first entry specifies the unit of the measured amplitude. Options are "dB", "MAG" and "mag".
The second entry specifies the unit of the measured phase. Options are "rad" and "deg".
time_delay : float (Default: 0)
"Average" group delay. Read from database. Will be used to normalize the phase and stored alongside response
weight : float (Default: 1)
Specifies the weight with which this component reponse "adds" to the total signal-chain response or data.
Its the exponent of the complex multiplicitive gain. That means that a value of 1 means to linear multiply this
reponse while a value of -1 means to divide by this reponse.
name : str (Default: "default")
Give the response a name. This is only use for printing purposes.
station_id : int (Default: None)
Associated station id.
channel_id : int (Default: None)
Associated channel id.
remove_time_delay : bool (Default: True)
If True, remove `time_delay` from response.
debug_plot : bool (Default: False)
If True, produce a debug plot
attenuator_in_dB : float (Default: 0)
Allows to add an additional attenuation/gain to the response. This is useful to simulate or correct of the
the effect of an attenuator. The value is in dB. A value of 10dB will increase the response by 10 dB.
(Default: 0 -> no attenuation)
log_level : `logging.LOG_LEVEL` (Default: logging.NOTSET)
Overrides verbosity level of logger. Other options are: `logging.WARNING`, `logging.DEBUG`, ...
self.logger = logging.getLogger("NuRadioReco.Response")
self.__names = [name]
self._station_id = station_id
self._channel_id = channel_id
self._sanity_check = True # Tmp
if self._station_id is None or self._channel_id is None and self._station_id != -1:
self.logger.error(f"Station and channel id were not defined for response {name}. Please do that.")
self.__frequency = np.array(frequency) * units.GHz
if y[0] is None or y[1] is None:
raise ValueError("Data for response incomplete, detected \"None\"")
y_ampl, y_phase = np.array(y)
if y_unit[0] == "dB":
gain = 10 ** (y_ampl / 20)
elif y_unit[0].lower() == "mag":
gain = y_ampl
raise KeyError
if y_unit[1].lower() == "deg":
if np.max(np.abs(y_phase)) < 2 * np.pi:
self.logger.warning("Is the phase really in deg? Does not look like it... "
f"Do not convert {name} to rad: {y_phase}")
y_phase = np.deg2rad(y_phase)
elif y_unit[1].lower() == "rad":
# We can not make this test because the phase might be already unwrapped
# if np.amax(y_phase) - np.amin(y_phase) > 2 * np.pi:
# self.logger.warning("Is the phase really in rad? Does not look like it... "
# f"Do convert {name} to rad: {y_phase}")
y_phase = y_phase
raise KeyError
if time_delay:
if abs(2 * time_delay) > 1 / np.diff(self.__frequency)[0]:
f"The frequency binning (resolution) of {np.diff(self.__frequency)[0] * 1e3:.2f} MHz "
f"of the response function is too large/coarse to correctly remove the time delay of {time_delay} ns. "
f"This is a sign of potential aliasing. You need to upsample the response function "
"(zero padding in the time domain).")
raise ValueError("Time delay too large for frequency resolution. Upsample the response function.")
# Remove the average group delay from response
if remove_time_delay and time_delay:
self.logger.debug(f"Remove a time delay of {time_delay:.2f} ns from {name}")
y_phase_orig = np.copy(np.unwrap(y_phase))
_response = subtract_time_delay_from_response(self.__frequency, gain, y_phase, time_delay)
y_phase = np.angle(_response)
time_delay = 0 # set time_delay to 0 if group delay is not removed
y_phase = np.unwrap(y_phase)
if attenuator_in_dB:
gain = gain * 10 ** (attenuator_in_dB / 20)
self.__gains = [interpolate.interp1d(
self.__frequency, gain, kind="linear", bounds_error=False, fill_value=0)]
self.__phases = [interpolate.interp1d(
self.__frequency, y_phase, kind="linear", bounds_error=False, fill_value=0)]
if weight not in [-1, 1]:
err = f"Only a response weight of [-1, 1] is allowed (value is {weight})."
raise ValueError(err)
self.__weights = [weight]
self.__time_delays = [weight * time_delay]
# Debug plotting
if debug_plot:
from matplotlib import pyplot as plt
fig, axs = plt.subplots(3, 1, sharex=True)
frequency_interp = np.linspace(self.__frequency[0], self.__frequency[-1], 10000)
axs[0].plot(self.__frequency, gain, "C0o", label="data", markersize=2)
axs[0].plot(frequency_interp, self.__gains[0](frequency_interp), "C1--", lw=1, label="interpolation")
if remove_time_delay:
axs[1].plot(self.__frequency, y_phase_orig, "C0o", markersize=2, label="Original phase")
ax2 = axs[1].twinx()
ax2.plot(self.__frequency, y_phase, "C2o", markersize=2, label=f"Excl. time delay of {self.__time_delays[0]:.2f}ns")
ax2.plot(frequency_interp, self.__phases[0](frequency_interp), "C3--", lw=1, label="interpolation")
ax2.set_ylabel("norm. phase / rad")
axs[1].plot(self.__frequency, y_phase, "C0o", markersize=2)
axs[1].plot(frequency_interp, self.__phases[0](frequency_interp), "C1--", lw=1)
axs[1].set_ylabel("phase / rad")
group_delay = -np.diff(y_phase) / np.diff(self.__frequency)[0] / (2 * np.pi)
axs[2].plot(self.__frequency[:-1], group_delay, "C0o", markersize=2)
frequency_mask = np.all([50 * units.MHz < self.__frequency[:-1], self.__frequency[:-1] < 800 * units.MHz], axis=0)
axs[2].set_ylim(np.amin(group_delay[frequency_mask]), np.amax(group_delay[frequency_mask]))
axs[2].set_ylabel("group delay / ns")
axs[-1].set_xlabel("frequency / GHz")
f'{datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%S")}_debug.png', transparent=False)
def __call__(self, freq, component_names=None, blacklist=True):
Returns the complex response for a given frequency.
freq: list(float)
The frequencies for which to get the response.
component_names: list(str) or None (Default: None)
Only return the (combined) response of components selected by their names.
List of names to consider or not to consider (depends on `blacklist`).
`None` mean no selection.
blacklist: bool (Default: True)
If True (and `component_names is not None`), ignore components selected with `component_names`.
If False, only consider components selected with `component_names`.
response: np.array(np.complex128)
The complex response at the desired frequencies
response = np.ones_like(freq, dtype=np.complex128)
freq = np.asarray(freq)
if component_names is not None:
if isinstance(component_names, str):
component_names = [component_names]
for gain, phase, weight, name in zip(self.__gains, self.__phases, self.__weights, self.__names):
if component_names is not None:
if blacklist:
if name in component_names: # if name in blacklist skip
if name not in component_names: # if name *not* in whitelist skip
_gain = gain(freq / units.GHz)
# to avoid RunTime warning and NANs in total reponse
if weight == -1:
mask = _gain > 0
tmp_response = np.zeros_like(freq, dtype=np.complex128)
tmp_response[mask] = (_gain[mask] * np.exp(1j * phase(freq[mask] / units.GHz))) ** weight
response *= tmp_response
response *= (_gain * np.exp(1j * phase(freq / units.GHz))) ** weight
if np.allclose(response, np.ones_like(freq, dtype=np.complex128)):
if component_names is not None:
raise ValueError("Returned response is equal to 1. "
f"Did you requested a non-existing component ({component_names})? "
f"Options are: {self.__names}")
self.logger.warning("Returned response is equal to 1.")
return response
def get_names(self):
""" Get list of the names of all individual responses """
return self.__names
def remove(self, name):
Remove a component response from the response object.
name: str
Name of the component to remove
if name not in self.get_names():
raise ValueError(f"Component {name} not found in response. Options are: {self.get_names()}")
idx = self.__names.index(name)
def get(self, name):
Get a component response from the response object.
name: str
Name of the component to get
if name not in self.get_names():
raise ValueError(f"Component {name} not found in response. Options are: {self.get_names()}")
idx = self.__names.index(name)
single_response = copy.deepcopy(self)
single_response.__gains = [self.__names[idx]]
single_response.__names = [self.__gains[idx]]
single_response.__phases = [self.__phases[idx]]
single_response.__weights = [self.__weights[idx]]
single_response.__time_delays = [self.__time_delays[idx]]
return single_response
def __mul__(self, other):
Define multiplication operator for
- Other objects of the same class
- Objects of type NuRadioReco.framework.base_trace
if isinstance(other, Response):
self = copy.deepcopy(self)
# station_id == -1 is a special case to all non-station specific responses
if ((self._station_id != other._station_id or self._channel_id != other._channel_id) and
(other._station_id != -1 and self._station_id != -1)):
self.logger.error("It looks like you are combining responses from "
f"two different channels: {self._station_id}.{self._channel_id} "
f" vs {other._station_id}.{other._channel_id} (station_id.channel_id)")
# Store each response individually: append/concatenate lists of gains and phases.
# The multiplication happens in __call__.
self.__names += other.__names
self.__gains += other.__gains
self.__phases += other.__phases
self.__weights += other.__weights
self.__time_delays += other.__time_delays
return self
elif isinstance(other, NuRadioReco.framework.base_trace.BaseTrace):
other = copy.copy(other)
if self._sanity_check:
trace_length = other.get_number_of_samples() / other.get_sampling_rate()
time_delay = self._calculate_time_delay()
if time_delay > trace_length / 2:
self.logger.warning("The time shift appiled by the response is larger than half the trace length:\n\t"
f"{time_delay:.2f} vs {trace_length:.2f}")
spec = other.get_frequency_spectrum()
freqs = other.get_frequencies()
spec *= self(freqs) # __call__
other.set_frequency_spectrum(spec, sampling_rate="same")
return other
elif isinstance(other, np.ndarray):
raise TypeError("You try to multiply a `Response` object with a numpy array, "
"only `Response` or `BaseTrace` is allowed. "
"Did you call `get_trace()` or `get_frequency_spectrum()` on `BaseTrace`?")
raise TypeError(f"Response multiplied with unknown type: {type(other)}")
def __rmul__(self, other):
""" Same as mul """
return self.__mul__(other)
def __truediv__(self, other):
Define multiplication operator for
- Other objects of the same class
- Objects of type NuRadioReco.framework.base_trace
if isinstance(other, Response):
self = copy.deepcopy(self)
# station_id == -1 is a special case to all non-station specific responses
if ((self._station_id != other._station_id or self._channel_id != other._channel_id) and
(other._station_id != -1 and self._station_id != -1)):
"It looks like you are combining responses from "
f"two different channels: {self._station_id}.{self._channel_id} "
f" vs {other._station_id}.{other._channel_id} (station_id.channel_id)")
# Store each response individually: append/concatenate lists of gains and phases.
# The multiplication happens in __call__.
self.__names += other.__names
self.__gains += other.__gains
self.__phases += other.__phases
self.__weights += [-1 * ele for ele in other.__weights]
self.__time_delays += [-1 * ele for ele in other.__time_delays]
return self
elif isinstance(other, NuRadioReco.framework.base_trace.BaseTrace):
other = copy.copy(other)
if self._sanity_check:
trace_length = other.get_number_of_samples() / other.get_sampling_rate()
time_delay = self._calculate_time_delay()
if time_delay > trace_length / 2:
self.logger.warning("The time shift appiled by the response is larger than half the trace length:\n\t"
f"{time_delay:.2f} vs {trace_length:.2f}")
spec = other.get_frequency_spectrum()
freqs = other.get_frequencies()
spec *= 1 / self(freqs) # __call__
other.set_frequency_spectrum(spec, sampling_rate="same")
return other
elif isinstance(other, np.ndarray):
raise TypeError("You try to multiply a `Response` object with a numpy array, "
"only `Response` or `BaseTrace` is allowed. "
"Did you call `get_trace()` or `get_frequency_spectrum()` on `BaseTrace`?")
raise TypeError(f"Response multiplied with unknown type: {type(other)}")
def __str__(self):
ampl = 20 * np.log10(np.abs(self(np.array([0.15, 0.5]) * units.GHz)))
return "Response of " + ", ".join([f"{name} ({weight})" for name, weight in zip(self.get_names(), self.__weights)]) \
+ f": |R([0.15, 0.5] GHz)| = [{ampl[0]:.2f}, {ampl[1]:.2f}] dB (amplitude) ({np.sum(self.__time_delays):.2f} ns)"
def plot(self, ax1=None, show=False, in_dB=True, plt_kwargs={}):
import matplotlib.pyplot as plt
freqs = np.linspace(0, 1.4) * units.GHz
if ax1 is None:
fig, ax = plt.subplots()
ax = ax1
for gain, weight, name, td in zip(self.__gains, self.__weights, self.__names, self.__time_delays):
_gain = gain(freqs)
name = name.replace("_", " ")
ls = "-" if weight == 1 else "--"
if name.startswith("golden"):
name = name.replace("golden downhole components", "ref. comp.")
if name.endswith(" "):
name = name[:-1]
label = f"{name:<25} : {weight:<3} | {td:.1f}ns"
if in_dB:
mask = _gain > 0 # to avoid RunTime warning
ax.plot(freqs[mask] / units.MHz, 20 * np.log10(_gain[mask]), lw=1, ls=ls, label=label, **plt_kwargs)
ax.plot(freqs / units.MHz, _gain, label=label, lw=1, ls=ls, **plt_kwargs)
_gain = np.abs(self(freqs))
label = f"total: {np.sum(self.__time_delays):.1f}ns"
if in_dB:
mask = _gain > 0 # to avoid RunTime warning
ax.plot(freqs[mask] / units.MHz, 20 * np.log10(_gain[mask]), color="k", label=label, **plt_kwargs)
ax.plot(freqs / units.MHz, _gain, color="k", label=label, **plt_kwargs)
ax.set_xlabel("frequency / MHz")
if in_dB:
ax.set_ylabel("gain / dB")
if show:
elif ax1 is not None:
return fig, ax
def get_time_delay(self):
""" Get time delay from DB """
return np.sum(self.__time_delays)
def get_time_delays(self):
""" Get time delay from DB """
return self.__time_delays
def _calculate_time_delay(self):
Calculate time delay from phase of the stored complex response function.
This is not the time delay which is stored in the DB and which is used in
the `__init__()` to normalize the response function. Rather, its the remaining
group delay.
The time delay is calculated as the mean between 195 and 205 MHz.
time_delay1 : float
The time delay at ~ 200 MHz
freqs = np.arange(50, 1200, 0.5) * units.MHz
response = self(freqs)
delta_freq = np.diff(freqs)
phase = np.angle(response)
time_delay = -np.diff(np.unwrap(phase)) / delta_freq / 2 / np.pi
mask = np.all([195 * units.MHz < freqs, freqs < 250 * units.MHz], axis=0)[:-1]
time_delay1 = np.mean(time_delay[mask])
# This alternative calculation is only meaningful if group delay is ~ constant over the whole frequency range (which is the case for most cables)
# # fit the unwrapped phase with a linear function
# popt = np.polyfit(freqs, np.unwrap(phase), 1)
# time_delay2 = -popt[0] / (2 * np.pi)
# if np.abs(time_delay1 - time_delay2) > 0.1 * units.ns:
# self.logger.warning("Calculation of time delay. The two methods yield different results: "
# f"{time_delay1:.1f} ns / {time_delay2:.1f} ns for {self.get_names()}. Return the former...")
return time_delay1
def subtract_time_delay_from_response(frequencies, resp, phase=None, time_delay=None):
Remove a constant time delay from a complex response function
frequencies : array of floats
Corresponding frequencies
resp : array of (complex) floats
Complex response function (if `phase is None`), Real gain of a complex response
function (if `phase is not None`).
phase : array of floats
Phase of the complex response function (optional). (Default: None)
time_delay : float
Time delay to be removed
resp: array of complex floats
Corrected response function
resp = np.copy(resp)
if phase is not None:
phase = np.copy(phase)
gain = resp
phase = np.unwrap(phase) # double helps more
gain = np.abs(resp)
phase = np.unwrap(np.angle(resp))
if time_delay is None:
raise ValueError("You have to specify a time delay")
if np.any(np.abs(2 * time_delay * np.diff(frequencies)) > 1):
raise ValueError("The frequency binning (resolution) of the response function "
f"is to large/corse to correctly remove the time delay of {time_delay} ns. "
"You need to upsample the response function.")
resp = gain * np.exp(1j * (phase + 2 * np.pi * time_delay * frequencies))
return resp