import numpy as np
import logging
import os
import time
import astropy.time
import math
from functools import lru_cache
from inspect import signature
from NuRadioReco.modules.base.module import register_run
from NuRadioReco.modules.RNO_G.channelBlockOffsetFitter import channelBlockOffsets, fit_block_offsets
import NuRadioReco.framework.event
import NuRadioReco.framework.station
import NuRadioReco.framework.trigger
import NuRadioReco.framework.parameters
from NuRadioReco.utilities import units
import mattak.Dataset
def _baseline_correction(wfs, n_bins=128, func=np.median, return_offsets=False):
Simple baseline correction function.
Determines baseline in discrete chuncks of "n_bins" with
the function specified (i.e., mean or median).
wfs: np.array(n_events, n_channels, n_samples)
Waveforms of several events/channels.
n_bins: int
Number of samples/bins in one "chunck". If None, calculate median/mean over entire trace. (Default: 128)
func: np.mean or np.median
Function to calculate pedestal
return_offsets: bool, default False
if True, additionally return the baseline offsets
wfs_corrected: np.array(n_events, n_channels, n_samples)
Baseline/pedestal corrected waveforms
baseline_values: np.array of shape (n_samples // n_bins, n_events, n_channels)
(Only if return_offsets==True) The baseline offsets
# Example: Get baselines in chunks of 128 bins
# wfs in (n_events, n_channels, 2048)
# np.split -> (16, n_events, n_channels, 128) each waveform split in 16 chuncks
# func -> (16, n_events, n_channels) pedestal for each chunck
if n_bins is not None:
baseline_values = func(np.split(wfs, 2048 // n_bins, axis=-1), axis=-1)
# np.repeat -> (2048, n_events, n_channels) concatenate the 16 chuncks to one baseline
baseline_traces = np.repeat(baseline_values, n_bins % 2048, axis=0)
baseline_values = func(wfs, axis=-1)
# np.repeat -> (2048, n_events, n_channels) concatenate the 16 chuncks to one baseline
baseline_traces = np.repeat(baseline_values, 2048, axis=0)
# np.moveaxis -> (n_events, n_channels, 2048)
baseline_traces = np.moveaxis(baseline_traces, 0, -1)
if return_offsets:
return wfs - baseline_traces, baseline_values
return wfs - baseline_traces
def get_time_offset(trigger_type):
Mapping the offset between trace start time and trigger time (~ signal time).
Temporary use hard-coded values for each trigger type. In the future this
information might be time, station, and channel dependent and should come
from a database (or is already calibrated in mattak)
Current values motivated by figures posted in PR
trigger_type: str
Trigger type encoded as string from Mattak
time_offset: float
trace_start_time = trigger_time - time_offset
time_offsets = {
"FORCE": 0,
"LT": 250 * units.ns,
"RADIANT": 475 * units.ns,
"UNKNOWN": 0 # Due to a firmware issue at the beginning of data taking the trigger types were not properly set.
# Should have the same time offset ?!
if trigger_type.startswith("RADIANT"):
trigger_type = "RADIANT"
if trigger_type in time_offsets:
return time_offsets[trigger_type]
known_trigger_types = ", ".join(time_offsets.keys())
raise KeyError(f"Unknown trigger type: {trigger_type}. Known are: {known_trigger_types}. Abort ....")
def _all_files_in_directory(mattak_dir):
Checks if all Mattak root files are in a directory.
Ignoring runinfo.root because (asaik) not all runs have those and information is currently not read by Mattak.
There are mattak directories which produce a ReferenceError when reading. They have a "combined.root" which is
apparently empty but are missing the daqstatus, pedestal, and header file.
mattak_dir: str
Path to a mattak directory
all_there: bool
True, if all "req_files" are there and waveforms.root or combined.root. Otherwise returns False.
# one or the other has to be present
if not os.path.exists(os.path.join(mattak_dir, "waveforms.root")) and \
not os.path.exists(os.path.join(mattak_dir, "combined.root")):
return False
req_files = ["daqstatus.root", "headers.root", "pedestal.root"]
for file in req_files:
if not os.path.exists(os.path.join(mattak_dir, file)):
logging.error(f"File {file} could not be found in {mattak_dir}")
return False
return True
def _convert_to_astropy_time(t):
""" Convert to astropy.time.Time """
return None if t is None else astropy.time.Time(t)
class readRNOGData:
def __init__(self, run_table_path=None, load_run_table=True, log_level=logging.NOTSET):
Reader for RNO-G ``.root`` files
This class provides read access to RNO-G ``.root`` files and converts them
to NuRadioMC :class:`Events <NuRadioReco.framework.event.Event>`. Requires ``mattak``
( to be installed.
run_table_path: str | None
Path to a run_table.csv file. If None, the run table is queried from the DB. (Default: None)
load_run_table: bool
If True, try to load the run_table from run_table_path. Otherwise, skip this.
log_level: enum
Set verbosity level of logger. If logging.DEBUG, set mattak to verbose (unless specified in mattak_kwargs).
(Default: logging.NOTSET, ie adhere to general log level)
.. code-block::
reader = readRNOGDataMattak.readRNOGData() # initialize reader
evt = reader.get_event_by_index(0) # returns the first event in the file
# OR
evt = reader.get_event(run_nr=1100, event_id=679) # returns the event with run_number 1100 and event_id 679
# OR
for evt in # loop over all events in file
# perform some analysis
self.logger = logging.getLogger('NuRadioReco.RNOG.readRNOGData')
self._blockoffsetfitter = channelBlockOffsets()
# Initialize run table for run selection
self.__run_table = None
if load_run_table:
if run_table_path is None:
from rnog_data.runtable import RunTable
self.logger.debug("Access RunTable database ...")
self.__run_table = RunTable().get_table()
self.logger.warn("No connect to RunTable database could be established. "
"Runs can not be filtered.")
except ImportError:
"import run_table failed. You can still use readRNOGData, but runs can not be filtered. "
"To install the run table, run\n\n"
"\tpip install git+ssh://\n"
# some users may mistakenly try to pass the .root files to __init__
# we check for this and raise a (hopefully) helpful error message
user_passed_root_file_msg = (
"The optional argument run_table_path expects a csv file, "
"but you passed a list of files or a .root file. Note that "
"the .root files to read in should be passed to the `begin` method of this class"
if isinstance(run_table_path, (list, np.ndarray)):
raise TypeError(user_passed_root_file_msg)
elif os.path.isdir(run_table_path) or run_table_path.endswith('.root'):
raise ValueError(user_passed_root_file_msg)
import pandas
self.__run_table = pandas.read_csv(run_table_path)
def begin(self,
max_trigger_rate=0 * units.Hz,
dirs_files: str, list(str)
Path to run directories (i.e. ".../stationXX/runXXX/") or path to root files (have to be "combined" mattak files).
read_calibrated_data: bool
If True, read calibrated waveforms from Mattak.Dataset. If False, read "raw" ADC traces.
(temp. Default: False, this can/should be switched once the calibration in incorp. into Mattak)
select_triggers: str or list(str)
Names of triggers which should be selected. Convinence interface instead of passing a selector
(see "selectors" below). (Default: None)
select_runs: bool
If True, use information in run_table to select runs (based on run_type, run_time, trigger_rate, ...).
If the run_table is not available no selection is performed (and the programm is not interrupted,
only an error message is raised). See parameters to configure run selection. (Default: False)
Other Parameters
apply_baseline_correction: str {'auto', 'fit', 'approximate', 'median', 'none'}, optional
Removes the DC (baseline) block offsets (pedestals).
Options are, in order of decreasing precision and increasing performance:
* 'fit' : do a full out-of-band fit to determine the block offsets; for more details,
see :mod:`NuRadioReco.modules.RNO_G.channelBlockOffsetFitter` (slow)
* 'approximate' : estimate block offsets by looking at the low-pass filtered trace
* 'median' : subtract the median of each block (faster)
* 'none' : do not apply a baseline correction (fastest)
The default ('auto') first performs the 'approximate' block offset removal, then
automatically decides whether to continue with the full 'fit' depending on the estimated
block offset size.
convert_to_voltage: bool
Only applies when non-calibrated data are read. If true, convert ADC to voltage.
(Default: True)
selectors: list of lambdas
List of lambda(eventInfo) -> bool to pass to mattak.Dataset.iterate to select events.
Example: trigger_selector = lambda eventInfo: eventInfo.triggerType == "FORCE"
(Default: [])
run_types: list
Used to select/reject runs from information in the RNO-G RunTable. List of run_types to be used. (Default: ['physics'])
run_time_range: tuple
Specify a time range to select runs (it is sufficient that runs cover the time range partially).
Each value of the tuple has to be in a format which astropy.time.Time understands. A value can be None
which means that the lower or upper bound is unconstrained. If run_time_range is None no time selection is
applied. (Default: None)
max_trigger_rate: float
Used to select/reject runs from information in the RNO-G RunTable. Maximum allowed trigger rate (per run) in Hz.
If 0, no cut is applied. (Default: 1 Hz)
mattak_kwargs: dict
Dictionary of arguments for mattak.Dataset.Dataset. (Default: {})
Example: Select a mattak "backend". Options are "auto", "pyroot", "uproot". If "auto" is selected,
pyroot is used if available otherwise a "fallback" to uproot is used. (Default: "auto")
overwrite_sampling_rate: float
Set sampling rate of the imported waveforms. This overwrites what is read out from runinfo
(i.e., stored in the mattak files) only when the stored sampling rate is invalid (i.e. 0 or None).
If None, nothing is overwritten and the sampling rate from the mattak file is used. (Default: None)
NOTE: This option might be necessary when old mattak files are read which have this not set.
max_in_mem: int
Set the maximum number of events that can be stored in memory. The datareader will divide
the data in batches based on this number.
NOTE: This is only relevant for the mattak uproot backend
t0 = time.time()
self._read_calibrated_data = read_calibrated_data
baseline_correction_valid_options = ['auto', 'approximate', 'fit', 'median', 'none']
if apply_baseline_correction is None:
apply_baseline_correction = 'none'
if apply_baseline_correction.lower() not in baseline_correction_valid_options:
raise ValueError(
f"Value for apply_baseline_correction ({apply_baseline_correction}) not recognized. "
f"Valid options are {baseline_correction_valid_options}"
self._apply_baseline_correction = apply_baseline_correction.lower()
self._convert_to_voltage = convert_to_voltage
# Temporary solution hard-coded values from Cosmin. Only used when uncalibrated data
# is read and convert_to_voltage is True.
self._adc_ref_voltage_range = 2.5 * units.volt
self._adc_n_bits = 12
self._overwrite_sampling_rate = overwrite_sampling_rate
# set max wavform array size that can be loaded in memory
self._max_in_mem = max_in_mem
# Set parameter for run selection
self.__max_trigger_rate = max_trigger_rate
self.__run_types = run_types
if run_time_range is not None:
self._time_low = _convert_to_astropy_time(run_time_range[0])
self._time_high = _convert_to_astropy_time(run_time_range[1])
self._time_low = None
self._time_high = None
if select_runs and self.__run_table is not None:"\n\tSelect runs with type: {}".format(", ".join(run_types)) +
f"\n\tSelect runs with max. trigger rate of {max_trigger_rate / units.Hz} Hz"
f"\n\tSelect runs which are between {self._time_low} - {self._time_high}")
self._selectors = []
self.add_selectors(selectors, select_triggers)
# Read data
self._time_begin = 0
self._time_run = 0
self._event_idx = -1 # only for logging
self.__counter = 0
self.__skipped = 0
self.__invalid = 0
self._events_information = None
self._datasets = []
self.__n_events_per_dataset = []
if not isinstance(dirs_files, (list, np.ndarray)):
dirs_files = [dirs_files]"Parse through / read-in {len(dirs_files)} directory(ies) / file(s).")
self.__skipped_runs = 0
self.__n_runs = 0
# Set verbose for mattak
verbose = mattak_kwargs.pop("verbose", self.logger.level >= logging.DEBUG)
for dir_file in dirs_files:
if not os.path.exists(dir_file):
self.logger.error(f"The directory/file {dir_file} does not exist")
if os.path.isdir(dir_file):
if not _all_files_in_directory(dir_file):
self.logger.error(f"Incomplete directory: {dir_file}. Skip ...")
dataset = mattak.Dataset.Dataset(station=0, run=0, data_path=dir_file, verbose=verbose, **mattak_kwargs)
except (ReferenceError, KeyError) as e:
self.logger.error(f"The following exeption was raised reading in the run: {dir_file}. Skip that run ...:\n", exc_info=e)
# filter runs/datasets based on
if select_runs and self.__run_table is not None and not self.__select_run(dataset):
self.__skipped_runs += 1
self.__n_runs += 1
if not len(self._datasets):
err = "Found no valid datasets. Stop!"
raise FileNotFoundError(err)
# keeps track which event index is in which dataset
self._event_idxs_datasets = np.cumsum(self.__n_events_per_dataset)
self._n_events_total = np.sum(self.__n_events_per_dataset)
self._time_begin = time.time() - t0"{self._n_events_total} events in {len(self._datasets)} runs/datasets "
f"have been found using the {self._datasets[0].backend} Mattak backend.")
if not self._n_events_total:
err = "No runs have been selected. Abort ..."
raise ValueError(err)
def add_selectors(self, selectors, select_triggers=None):
Add selectors (Callable(eventInfo) -> bool) to an internal list of selectors.
They are used for event filtering.
selectors: list of Callables
List of Callable(eventInfo) -> bool to pass to mattak.Dataset.iterate to select events.
Example: trigger_selector = lambda eventInfo: eventInfo.triggerType == "FORCE"
select_triggers: str or list(str)
Names of triggers which should be selected. Convenience interface instead of passing a selector. (Default: None)
# Initialize selectors for event filtering
if selectors is None:
selectors = []
if not isinstance(selectors, (list, np.ndarray)):
selectors = [selectors]
if select_triggers is not None:
if isinstance(select_triggers, str):
selectors.append(lambda eventInfo: eventInfo.triggerType == select_triggers)
for select_trigger in select_triggers:
selectors.append(lambda eventInfo: eventInfo.triggerType == select_trigger)"Add {len(selectors)} selector(s)")
self._selectors += selectors
self.get_waveforms.cache_clear() # reset cached waveforms
def __select_run(self, dataset):
""" Filter/select runs/datasets.
dataset: mattak.Dataset.Dataset
select: bool
Return True to select an dataset, return False to reject/skip it.
# get first eventInfo
event_info = dataset.eventInfo()
run_id =
station_id = event_info.station
run_info = self.__run_table.query(f"station == {station_id:d} & run == {run_id:d}")
if not len(run_info):
self.logger.error(f"Run {run_id:d} (station {station_id:d}) not in run table. Reject...")
return False
# "time_start/end" is stored in the isot format. datetime is much faster than astropy (~85ns vs 55 mus).
# But using datetime would mean to stip decimals because datetime can only handle mu sec precision and can not cope
# with the additional decimals for ns.
if self._time_low is not None:
time_end = astropy.time.Time(run_info["time_end"].values[0])
if time_end < self._time_low:"Reject station {station_id} run {run_id} because run ended before {self._time_low}")
return False
if self._time_high is not None:
time_start = astropy.time.Time(run_info["time_start"].values[0])
if time_start > self._time_high:"Reject station {station_id} run {run_id} because run started time after {self._time_high}")
return False
run_type = run_info["run_type"].values[0]
if run_type not in self.__run_types:"Reject station {station_id} run {run_id} because of run type {run_type}")
return False
trigger_rate = run_info["trigger_rate"].values[0] * units.Hz
if self.__max_trigger_rate and trigger_rate > self.__max_trigger_rate:"Reject station {station_id} run {run_id} because trigger rate is to high ({trigger_rate / units.Hz:.2f} Hz)")
return False
return True
def __get_n_events_of_prev_datasets(self, dataset_idx):
""" Get accumulated number of events from previous datasets """
dataset_idx_prev = dataset_idx - 1
return int(self._event_idxs_datasets[dataset_idx_prev]) if dataset_idx_prev >= 0 else 0
def __get_dataset_for_event(self, event_idx):
""" Get correct dataset and set entry accordingly to event index
event_index: int
Same as in read_event().
dataset: mattak.Dataset.Dataset
# find correct dataset
dataset_idx = np.digitize(event_idx, self._event_idxs_datasets)
dataset = self._datasets[dataset_idx]
event_idx_in_dataset = event_idx - self.__get_n_events_of_prev_datasets(dataset_idx)
dataset.setEntries(event_idx_in_dataset) # increment iterator -> point to new event
return dataset
def _select_events(self, evtinfo):
""" Filter an event base on its EventInfo and the configured selectors.
event_info: mattak.Dataset.EventInfo
The event info object for one event.
event_index: int
Same as in read_event(). Only use for (Default: None)
skip: bool
Returns False to skip/reject event, return True to keep/read event
f"(_select_events) Processing event number {self.__counter} out of total {self._n_events_total}")
self.__counter += 1 # for logging
if self._selectors is not None:
for selector in self._selectors:
if not selector(evtinfo):
self.logger.debug(f"Event {self.__counter - 1} (station {evtinfo.station}, run {}, "
f"event number {evtinfo.eventNumber}) did not pass a filter. Skip it ...")
self.__skipped += 1
return False
return True
def _check_for_valid_information_in_event_info(self, event_info):
Checks if certain information (sampling rate, trigger time) in mattak.Dataset.EventInfo are valid
event_info: mattak.Dataset.EventInfo
is_valid: bool
Returns True if all information valid, false otherwise
if math.isinf(event_info.triggerTime):
self.logger.error(f"Event {event_info.eventNumber} (st {event_info.station}, run {}) "
"has inf trigger time. Skip event...")
self.__invalid += 1
return False
if (event_info.sampleRate == 0 or event_info.sampleRate is None) and self._overwrite_sampling_rate is None:
self.logger.error(f"Event {event_info.eventNumber} (st {event_info.station}, run {}) "
f"has a sampling rate of {event_info.sampleRate} GHz. Event is skipped ... "
f"You can avoid this by setting 'overwrite_sampling_rate' in the begin() method.")
self.__invalid += 1
return False
return True
def _get_event(self, event_info, waveforms):
""" Return a NuRadioReco event
event_info: mattak.Dataset.EventInfo
The event info object for one event.
waveforms: np.array(n_channel, n_samples)
Typically what dataset.wfs() returns (for one event!)
evt: NuRadioReco.framework.event
trigger_time = event_info.triggerTime
# only overwrite sampling rate if the stored value is invalid
if self._overwrite_sampling_rate is not None and event_info.sampleRate in [0, None]:
sampling_rate = self._overwrite_sampling_rate
sampling_rate = event_info.sampleRate
evt = NuRadioReco.framework.event.Event(, event_info.eventNumber)
station = NuRadioReco.framework.station.Station(event_info.station)
station.set_station_time(astropy.time.Time(trigger_time, format='unix'))
trigger = NuRadioReco.framework.trigger.Trigger(event_info.triggerType)
trigger.set_trigger_time(0) # The trigger time is relative to the event/station time
block_offsets = None
if self._apply_baseline_correction == 'median':
waveforms, block_offsets = _baseline_correction(waveforms, return_offsets=True)
readout_delays = event_info.readoutDelay
for channel_id, wf in enumerate(waveforms):
channel =
if self._read_calibrated_data:
channel.set_trace(wf * units.V, sampling_rate * units.GHz)
# wf stores ADC counts
if self._convert_to_voltage:
# convert adc to voltage
wf = wf * (self._adc_ref_voltage_range / (2 ** (self._adc_n_bits) - 1))
channel.set_trace(wf, sampling_rate * units.GHz)
time_offset = get_time_offset(event_info.triggerType) + readout_delays[channel_id]
channel.set_trace_start_time(-time_offset) # relative to event/trigger time
if block_offsets is not None:
channel.set_parameter(NuRadioReco.framework.parameters.channelParameters.block_offsets, block_offsets.T[channel_id])
if self._apply_baseline_correction in ['auto', 'fit', 'approximate']:
self._blockoffsetfitter.remove_offsets(evt, station, mode=self._apply_baseline_correction)
return evt
def run(self):
Loop over all events.
evt: `NuRadioReco.framework.event.Event`
for dataset in self._datasets:
dataset.setEntries((0, dataset.N()))
# read all event infos of the entire dataset (= run)
for evtinfo, wf in dataset.iterate(calibrated=self._read_calibrated_data,
evt = self._get_event(evtinfo, wf)
yield evt
def get_event_by_index(self, event_index):
""" Allows to read a specific event identifed by its index
event_index: int
The index of a particluar event. The index is the chronological number from 0 to
number of total events (across all datasets).
evt: `NuRadioReco.framework.event.Event`
self.logger.debug(f"Processing event number {event_index} out of total {self._n_events_total}")
t0 = time.time()
dataset = self.__get_dataset_for_event(event_index)
event_info = dataset.eventInfo() # returns a single eventInfo
if not self._select_events(event_info):
return None
# access data
waveforms = dataset.wfs()
evt = self._get_event(event_info, waveforms)
self._time_run += time.time() - t0
self.__counter += 1
return evt
def get_event(self, run_nr, event_id):
""" Allows to read a specific event identifed by run number and event id
run_nr: int
Run number
event_id: int
Event Id
evt: `NuRadioReco.framework.event.Event`
self.logger.debug(f"Getting event {event_id}")
t0 = time.time()
event_infos = self.get_events_information(keys=["eventNumber", "run"])
event_idx_ids = np.array([[index, ele["eventNumber"], ele["run"]] for index, ele in event_infos.items()])
mask = np.all([event_idx_ids[:, 1] == event_id, event_idx_ids[:, 2] == run_nr], axis=0)
if not np.any(mask):"Could not find event with id: {event_id}.")
return None
elif np.sum(mask) > 1:
self.logger.error(f"Found several events with the same id: {event_id}.")
raise ValueError(f"Found several events with the same id: {event_id}.")
# int(...) necessary to pass it to mattak
event_index = int(event_idx_ids[mask, 0][0])
dataset = self.__get_dataset_for_event(event_index)
event_info = dataset.eventInfo() # returns a single eventInfo
self._event_idx = event_index
if not self._select_events(event_info):
return None
# access data
waveforms = dataset.wfs()
evt = self._get_event(event_info, waveforms)
self._time_run += time.time() - t0
self.__counter += 1
return evt
def end(self):
if self.__counter:
f"\n\tRead {self.__counter} events ({self.__skipped} events are skipped (filtered), {self.__invalid} invalid events)"
f"\n\tTime to initialize data sets : {self._time_begin:.2f}s"
f"\n\tTime to read all events : {self._time_run:.2f}s"
f"\n\tTime to per event : {self._time_run / self.__counter:.2f}s"
f"\n\tRead {self.__n_runs} runs, skipped {self.__skipped_runs} runs.")
f"\n\tRead {self.__counter} events (skipped {self.__skipped} events, {self.__invalid} invalid events)"
f"\n\tTime to initialize data sets : {self._time_begin:.2f}s"
f"\n\tTime to read all events : {self._time_run:.2f}s")
### we create a wrapper for readRNOGData to mirror the interface of the .nur reader
class _readRNOGData_eventbrowser(readRNOGData):
Wrapper for readRNOGData for use in the eventbrowser
This wrapper mirrors the interface of the .nur reader for use in the eventbrowser,
and uses the lru_cache to speed up IO for parallel plots of the same event.
It should probably not be used outside of the eventbrowser.
def begin(self, *args, **kwargs):
# We reduce the required amount of IO by caching individual events.
# However, we need to clear the cache every time we change the file
# we're reading. This is implemented here.
return super().begin(*args, **kwargs)
def get_event_ids(self):
event_infos = self.get_events_information()
return np.array([(i['run'], i['eventNumber']) for i in event_infos.values()])
def get_event_i(self, i):
return self.get_event_by_index(i)
def get_event(self, event_id):
return super().get_event(*event_id)
def get_n_events(self):
return self._n_events_total
def get_detector(self):
"""Not implemented in mattak reader"""
return None
def get_header(self):
"""Not implemented in mattak reader"""
return None