"""
Interface to the MongoDB that contains RNO-G hardware and calibration information
The :mod:`NuRadioReco.detector.RNO_G.db_mongo_read` module and the `Database` class herein mostly serve as the
backend of the `NuRadioReco.detector.RNO_G.rnog_detector.Detector` class. Most users
will want to use that class to obtain information about deployed RNO-G stations and hardware.
`NuRadioReco.detector.RNO_G.rnog_detector.Detector` class has an interface similar to that of
other detector descriptions in NuRadioMC, and is documented there.
However, for some specific use cases (e.g. finding measurements for individual hardware components
that have not been deployed to the field), one can use the `Database` class directly, using the
`Database.get_component_data` method.
"""
import six
import os
import urllib.parse
import datetime
import numpy as np
from functools import wraps
import copy
import re
from pymongo import MongoClient
import NuRadioReco.utilities.metaclasses
import astropy.time
import logging
logger = logging.getLogger("NuRadioReco.MongoDBRead")
logger.setLevel(logging.INFO)
def _convert_astro_time_to_datetime(time_astro):
return time_astro.to_datetime()
def _check_database_time(method):
@wraps(method)
def _impl(self, *method_args, **method_kwargs):
time = self.get_database_time()
if time is None:
logger.error('Database time is None.')
raise ValueError('Database time is None.')
return method(self, *method_args, **method_kwargs)
return _impl
[docs]
@six.add_metaclass(NuRadioReco.utilities.metaclasses.Singleton)
class Database(object):
def __init__(self, database_connection="RNOG_public", database_name=None, mongo_kwargs={}):
"""
Interface to the RNO-G hardware database. This class uses the python API pymongo for the
RNO-G MongoDB.
This classes allows you to connect to preconfigured mongo clients or select your mongo client freely.
The database is accesible with the `self.db` variable.
Parameters
----------
database_connection : str (Default: `"RNOG_public"`)
Specify mongo client. You have 5 options:
* `"env_pw_user"`: Connect to a server with the environmental variables
`mongo_server`, `mongo_user`, and `mongo_password`
* `"RNOG_public"`: Preconfigured connection to read-only RNO-G Hardware DB
* `"RNOG_test_public"`: Preconfigured connection to read-only RNO-G Test Hardware DB
* `"connection_string"`: Use environmental variable `db_mongo_connection_string` to
connect to mongo server
* `"mongodb*": Every string which starts with `"mongodb"` will be used to connect to
a mongo server
database_name : str (Default: None -> `"RNOG_live"`)
Select the database by name. If None (default) is passed, set to `"RNOG_live"`
mongo_kwargs : dict (Default: `{}`)
Additional arguments to pass to MongoClient.
"""
if database_connection == "env_pw_user":
# use db connection from environment, pw and user need to be percent escaped
mongo_server = os.environ.get('mongo_server')
if mongo_server is None:
logger.warning('variable "mongo_server" not set')
mongo_password = urllib.parse.quote_plus(os.environ.get('mongo_password'))
mongo_user = urllib.parse.quote_plus(os.environ.get('mongo_user'))
if None in [mongo_user, mongo_password]:
logger.warning('"mongo_user" or "mongo_password" not set')
# start client
connection_string = f"mongodb://{mongo_user}:{mongo_password}@{mongo_server}"
mongo_kwargs["tls"] = True
elif database_connection == "RNOG_public":
# use read-only access to the RNO-G database
connection_string = (
"mongodb://read:EseNbGVaCV4pBBrt@radio.zeuthen.desy.de:27017/admin?authSource=admin&"
"readPreference=primary&appname=MongoDB%20Compass&directConnection=true&ssl=true")
elif database_connection == "RNOG_test_public":
# use readonly access to the RNO-G test database
connection_string = (
"mongodb://RNOG_test_public:jrE5xO38D7wQweVR5doa@radio-test.zeuthen.desy.de:27017/admin?authSource=admin&"
"readPreference=primary&appname=MongoDB%20Compass&directConnection=true&ssl=true")
elif database_connection == "connection_string":
# use a connection string from the environment
connection_string = os.environ.get('db_mongo_connection_string')
elif database_connection.startswith("mongodb"):
connection_string = database_connection
else:
logger.error('specify a defined database connection '
'["env_pw_user", "connection_string", "RNOG_public", "RNOG_test_public", "mongodb..."]')
self.__mongo_client = MongoClient(connection_string, **mongo_kwargs)
if database_name is None:
database_name = "RNOG_live"
if database_name not in self.__mongo_client.list_database_names():
logger.error(f'Could not find database "{database_name}" in mongo client.')
raise KeyError
logger.info("Attempting to connect to the database ...")
self.db = self.__mongo_client[database_name]
logger.info(f"... connection to {self.db.name} established")
# Set timestamp of database. This is used to determine which primary measurement is used
self.__database_time = datetime.datetime.utcnow()
# This is used to get commissioned stations/channels
self.__detector_time = None
self.__get_collection_names = None
self.__station_collection = "station_rnog"
self.__digitizer_collection = "digitizer_configuration"
[docs]
def set_database_time(self, time):
''' Set time for database. This affects which primary measurement is used.
Parameters
----------
time: `datetime.datetime` or ``astropy.time.Time``
UTC time.
'''
if isinstance(time, astropy.time.Time):
time = _convert_astro_time_to_datetime(time)
if not isinstance(time, datetime.datetime):
logger.error("Set invalid time for database. Time has to be of type datetime.datetime")
raise TypeError("Set invalid time for database. Time has to be of type datetime.datetime")
self.__database_time = time
[docs]
def set_detector_time(self, time):
''' Set time of detector. This controls which stations/channels are commissioned.
Parameters
----------
time: `datetime.datetime` or ``astropy.time.Time``
UTC time.
'''
if isinstance(time, astropy.time.Time):
time = _convert_astro_time_to_datetime(time)
if not isinstance(time, datetime.datetime):
logger.error("Set invalid time for detector. Time has to be of type datetime.datetime")
raise TypeError("Set invalid time for detector. Time has to be of type datetime.datetime")
self.__detector_time = time
[docs]
def get_database_time(self):
return self.__database_time
[docs]
def get_detector_time(self):
return self.__detector_time
[docs]
def find_primary_measurement(self, collection_name, name, primary_time, identification_label, data_dict):
"""
Find the object_id of entry with name 'name' and gives the measurement_id of the primary measurement,
return the id of the object and the measurement
Parameters
----------
collection_name: string
name of the collection that is searched (surface_board, iglu_board, ...)
name: string
the unique identifier of the input component
primary_time: datetime.datetime
timestamp for the primary measurement
identification_label: string
specify what kind of label is used for the identification ("name" or "id")
data_dict: dict
dictionary containing additional information that are used to search the database (e.g., channel_id, S_parameter)
"""
# define search filter for the collection
filter_primary = [{'$match': {identification_label: name}},
{'$unwind': '$measurements'},
{'$unwind': '$measurements.primary_measurement'}]
add_filter = {'$match': {'measurements.primary_measurement.start': {'$lte': primary_time},
'measurements.primary_measurement.end': {'$gte': primary_time}}}
data_dict_keys = data_dict.keys()
if 'breakout_channel' in data_dict_keys and 'breakout' in data_dict_keys:
add_filter['$match'].update({'measurements.breakout': data_dict['breakout'],
'measurements.breakout_channel': data_dict['breakout_channel']})
if 'channel_id' in data_dict_keys:
add_filter['$match'].update({f'measurements.channel_id': data_dict['channel_id']})
if 'S_parameter' in data_dict_keys:
add_filter['$match'].update({'measurements.S_parameter': data_dict['S_parameter']})
filter_primary.append(add_filter)
# get all entries matching the search filter
matching_entries = list(self.db[collection_name].aggregate(filter_primary))
# extract the object and measurement id
if len(matching_entries) > 1:
logger.error('More than one primary measurement found.')
return None, [None]
elif len(matching_entries) == 0:
logger.warning('No primary measurement found.')
# the last zero is the information that no primary measurement was found
return None, [0]
else:
object_id = matching_entries[0]['_id']
measurement_id = matching_entries[0]['measurements']['id_measurement']
return object_id, [measurement_id]
[docs]
def get_object_names(self, object_type):
return self.db[object_type].distinct('name')
[docs]
def get_collection_names(self):
if self.__get_collection_names is None:
self.__get_collection_names = self.db.list_collection_names()
return self.__get_collection_names
[docs]
def get_station_ids(self):
return self.db[self.__station_collection].distinct('id')
[docs]
def get_digitizer_configuration(self, config_id):
""" Get digitizer configuration from the database. Access information in the digitizer collection.
Parameters
----------
config_id: int
Identifier to get the correct configuration
Returns
-------
config: dict
"""
# filter to get the correct configuration
filter = [{"$match": {'id': config_id}}]
# query the configuration from the database
config = list(self.db[self.__digitizer_collection].aggregate(filter))
if len(config) > 1:
err = f"Found to many digitizer configurations (f{len(config)}) for: config_id = {config_id}"
logger.error(err)
raise ValueError(err)
elif len(config) == 0:
err = f"Found no digitizer configuration for: config_id = {config_id}"
logger.error(err)
raise ValueError(err)
return config[0]
[docs]
def get_quantity_names(self, collection_name, wanted_quantity):
"""
Returns a list with all measurement names, ids, ...
or what is specified (example: wanted_quantity = measurements.measurement_name)
"""
return self.db[collection_name].distinct(wanted_quantity)
[docs]
def get_all_available_signal_chain_configs(self, collection, object_name, input_dic):
"""
Depending on the inputs, all possible configurations in the database are returned;
Input example: 'iglu_boards', 'Golden_IGLU' {'measurement_temp': 20, 'DRAB_id': 'Golden_DRAB'}
"""
return_dic = {}
if object_name is None:
for key in input_dic.keys():
return_dic[key] = self.get_quantity_names(collection, f'measurements.{key}')
else:
# define a search filter
search_filter = []
search_filter.append({'$match': {'name': object_name}})
search_filter.append({'$unwind': '$measurements'})
help_dic1 = {}
help_dic2 = {}
for key in input_dic.keys():
if input_dic[key] is not None:
help_dic2[f'measurements.{key}'] = input_dic[key]
if help_dic2 != {}:
help_dic1['$match'] = help_dic2
search_filter.append(help_dic1)
# print(search_filter)
search_result = list(self.db[collection].aggregate(search_filter))
for key in input_dic.keys():
help_list = []
for entry in search_result:
help_list.append(entry['measurements'][key])
return_dic[key] = list(set(help_list))
return return_dic
[docs]
def get_identifier(self, station_id, channel_device_id=None, component="station", what="signal"):
"""
Get the identifier for a station/channel/device measurement,
For station and device returns position identifer. For channel returns
position and signal chain identifier.
Access information in the main collection.
Parameters
----------
station id: int
Specify the station for which the measurement identifier is return
channel_device_id: int
Specify the channel/device id. Only necessary if component="channel" or "device.
(Default: None)
component: str
Specify for what you want to have the identifier(s):
"station" (default), "channel", or "device"
what: str
For what to return the identifier: "position" (default) or "signal_chain" (only available for "channel")
Returns
-------
position_id: str
Unique identifier to find measurement in different collection
"""
# if the collection is empty, return None
if self.db[self.__station_collection].count_documents({'id': station_id}) == 0:
return None
detector_time = self.get_detector_time()
# filter to get all information from one station with station_id and with active commission time
time_filter = [{"$match": {
'commission_time': {"$lte": detector_time},
'decommission_time': {"$gte": detector_time},
'id': station_id}}]
if component == "channel" or component == "device":
if channel_device_id is None:
raise ValueError(f"Please provide a channel id.")
comp_str = component + "s"
time_filter += [{'$unwind': f'${comp_str}'},
{"$match": {f'{comp_str}.commission_time': {"$lte": detector_time},
f'{comp_str}.decommission_time': {"$gte": detector_time},
f'{comp_str}.id': channel_device_id}}]
elif component == "station":
pass # do nothing here
else:
err = (f"Requested identifer for unknown component: {component}. "
"Only valid components are \"station\", \"channel\", \"device\".")
logger.warning(err)
raise ValueError(err)
# get all stations which fit the filter (should only be one)
info = list(self.db[self.__station_collection].aggregate(time_filter))
if len(info) == 0:
err = (f"Could not find corresponding station/channel/device "
f"({component}: station_id = {station_id}, channel/device id "
f"= {channel_device_id}, time = {detector_time}")
logger.warning(err)
raise ValueError(err)
elif len(info) > 1:
err = (f"Found to many stations/channels/devices (f{len(info)}) "
f"({component}: station_id = {station_id}, channel/device id "
f"= {channel_device_id}, time = {detector_time}")
logger.error(err)
raise ValueError(err)
if component == "station":
return info[0]['id_position']
elif component == "channel":
return info[0]['channels'][f'id_{what}']
elif component == "device":
# only return the device position id
return info[0]['devices']['id_position']
[docs]
def get_position(self, station_id=None, channel_device_id=None, position_id=None,
measurement_name=None, use_primary_time_with_measurement=False,
component="station", verbose=False):
"""
Function to return the channel position,
returns primary unless measurement_name is not None
"""
# If the channel_position_id is given, the position is directly collected from the channel position
# collection (no need to look into the main collection again)
if position_id is None:
if station_id is None:
raise ValueError('Either the position_id or station_id (+ channel_id/device_id) needes to be given!')
position_id = self.get_identifier(
station_id, channel_device_id, component=component, what='position')
print(position_id)
# if measurement name is None, the primary measurement is returned
collection_info = self.get_collection_information(
f'{component}_position', search_by='id', obj_id=position_id, measurement_name=measurement_name,
use_primary_time_with_measurement=use_primary_time_with_measurement)
# raise an error if more than one value is returned
if len(collection_info) > 1:
raise ValueError
# return empty dict if no measurement is found
if len(collection_info) == 0:
return {}
# return the information
if verbose:
return collection_info[0]['measurements']
else:
return {k: collection_info[0]['measurements'][k] for k in
['position', 'rotation', 'orientation'] if k in collection_info[0]['measurements']}
[docs]
def get_channel_signal_chain_measurement(self, station_id=None, channel_id=None, channel_signal_id=None,
measurement_name=None, verbose=False):
""" function to return the channels signal chain information, returns primary unless measurement_name is not None """
# if the channel_signal_id is given, the signal chain is directly collected from the signal chain collection
# (no need to look into the main collection again)
if channel_signal_id is None:
if station_id is None :
raise ValueError('Either the channel_signal_id or station_id + channel_id needes to be given!')
channel_signal_id = self.get_identifier(
station_id, channel_id, component="channel", what="signal")
# if measurement name is None, the primary measurement is returned
collection_info = self.get_collection_information(
'signal_chain', search_by='id', obj_id=channel_signal_id, measurement_name=measurement_name)
# raise an error if more than one value is returned
if len(collection_info) > 1:
raise ValueError
# return empty dict if no measurement is found
if len(collection_info) == 0:
return {}
# return the information
if verbose:
return collection_info[0]['measurements']
else:
return {k:collection_info[0]['measurements'][k] for k in ('VEL', 'response_chain', 'primary_components')}
[docs]
def get_component_data(self, component_type, component_id, supplementary_info={}, primary_time=None, verbose=True, sparameter='S21'):
"""
returns the current primary measurement of the component, reads in the component collection
Returns a single measurement (e.g. gain of an IGLU)
Examples
--------
.. code-block::
import NuRadioReco.detector.RNO_G.db_mongo_read
import datetime
db = NuRadioReco.detector.RNO_G.db_mongo_read.Database()
# gives you the entry in the database
database_entry = db.get_component_data(
component_type='iglu_board',
component_id='C0069',
supplementary_info={}, # if you want a DRAB you have to specify the channel: {'channel_id':0}
verbose=True,
sparameter='S21', # you can also read the other S parameters
primary_time=datetime.datetime.now())
# extract the gain + phase data
y_axis_units = database_entry['y-axis_units']
frequencies = database_entry['frequencies']
gain_data = database_entry['mag']
phase_data = database_entry['phase']
"""
if primary_time is None:
primary_time = self.get_database_time()
# define a search filter
search_filter = [{'$match': {'name': component_id}}, {'$unwind': '$measurements'}, {'$match': {}}]
# if supplemenatry information exsits (like channel id, etc ...), update the search filter
if supplementary_info != {}:
for supp_info in supplementary_info.keys():
search_filter[-1]['$match'].update({f'measurements.{supp_info}': supplementary_info[supp_info]})
# add the S parameter to the search filter, only collect single S parameter
search_filter[-1]['$match'].update({'measurements.S_parameter': sparameter})
search_filter.append({'$unwind': '$measurements.primary_measurement'})
search_filter.append({'$match': {'measurements.primary_measurement.start': {'$lte': primary_time},
'measurements.primary_measurement.end': {'$gte': primary_time}}})
search_result = list(self.db[component_type].aggregate(search_filter))
if len(search_result) != 1:
raise ValueError(f'No or more than one measurement found: {search_result}. Search filter: {search_filter}')
measurement = search_result[0]['measurements']
# remove 'id_measurement' object
measurement.pop('id_measurement', None)
if verbose:
return measurement
else:
return {k:measurement[k] for k in ('name', 'channel_id', 'frequencies', 'mag', 'phase') if k in measurement.keys()}
[docs]
def get_channel_signal_chain(self, channel_signal_id, measurement_name=None, verbose=True):
"""
Returns the response data for a given signal chain.
Parameters
----------
channel_signal_id: str
Indentifier of the signal chain
Returns
-------
signal_chain: dict
A dictinoary which among otherthings contains the "response_chain" which carries the measured response for the different
components in the signal chain.
"""
# load the channel signal chain information (which components are used in the signal chain per channel):
channel_sig_info = self.get_channel_signal_chain_measurement(
channel_signal_id=channel_signal_id, measurement_name=measurement_name, verbose=verbose)
for chain_key in ['response_chain', 'trigger_response_chain']:
# Not every channel has a trigger response chain
if chain_key not in channel_sig_info:
continue
# extract the information about the used components
component_dict = channel_sig_info.pop(chain_key)
# Certain keys in the response chain only carry additional information of other components
# and do not describe own components on their own ("channel", "breakout", "weight")
# extract the information about the components, the additional information and the weights from the response chain dict
filtered_component_dict = {}
additional_information = {}
weight_dict = {}
for key, ele in component_dict.items():
if re.search("(channel|breakout|weight)", key) is None:
filtered_component_dict[key] = ele
elif re.search("weight", key) is not None:
weight_dict[key.replace("_weight", "")] = ele
else:
additional_information[key] = ele
# go through all components and load the s parameter measurements for each used component
components_data = {}
for component, component_id in filtered_component_dict.items():
# Add the additional informatio which were filtered out above to the correct components
supp_info = {k.replace(component + "_", ""): additional_information[k] for k in additional_information
if re.search(component, k)}
if re.search("_[0-9]+", component, re.IGNORECASE):
collection_suffix = re.findall("(_[0-9]+)", component, re.IGNORECASE)[0]
collection_component = component.replace(collection_suffix, "")
else:
collection_component = component
# load the s21 parameter measurement
component_data = self.get_component_data(
collection_component, component_id, supp_info, primary_time=self.__database_time, verbose=verbose)
# add the component name, the weight of the s21 measurement and the actual s21 measurement (component_data) to a combined dictionary
components_data[component] = {'name': component_id}
if component in weight_dict:
components_data[component].update({'weight': weight_dict[component]})
components_data[component].update(component_data)
# add/update the signal chain to the channel data
channel_sig_info[chain_key] = components_data
return channel_sig_info
[docs]
def query_modification_timestamps_per_station(self, station_ids=None):
"""
Collects all the timestamps for station and channel (de)commissioning from the database.
Combines those to get a list of timestamps when modifications happened which requiers to update the buffer.
Returns
-------
station_data: dict(dict(list))
Returns for each station (key = station.id) a dictionary with three entries:
"modification_timestamps", "station_commission_timestamps", "station_decommission_timestamps"
each containing a list of timestamps. The former combines the latter two + channel (de)comission
timestamps.
"""
# get distinct set of stations:
if isinstance(station_ids, int):
station_ids = [station_ids]
if station_ids is None:
station_ids = self.db[self.__station_collection].distinct("id")
modification_timestamp_dict = {}
for station_id in station_ids:
# get set of (de)commission times for stations
station_times_comm = self.db[self.__station_collection].distinct("commission_time", {"id": station_id})
station_times_decomm = self.db[self.__station_collection].distinct("decommission_time", {"id": station_id})
# get set of (de)commission times for channels
channel_times_comm = self.db[self.__station_collection].distinct("channels.commission_time", {"id": station_id})
channel_times_decomm = self.db[self.__station_collection].distinct("channels.decommission_time", {"id": station_id})
mod_set = np.unique(station_times_comm + station_times_decomm + channel_times_comm + channel_times_decomm)
mod_set.sort()
station_times_comm.sort()
station_times_decomm.sort()
station_data = {
"modification_timestamps": mod_set,
"station_commission_timestamps": station_times_comm,
"station_decommission_timestamps": station_times_decomm
}
# store timestamps, which can be used with np.digitize
modification_timestamp_dict[station_id] = station_data
return modification_timestamp_dict
[docs]
def dictionarize_nested_lists(nested_lists, parent_key="id", nested_field="channels", nested_key="id"):
""" mongodb aggregate returns lists of dicts, which can be converted to dicts of dicts """
res = {}
for parent in nested_lists:
res[parent[parent_key]] = parent
if nested_field in parent and nested_field is not None:
daughter_list = parent[nested_field]
daughter_dict = {}
for daughter in daughter_list:
if nested_key in daughter:
daughter_dict[daughter[nested_key]] = daughter
res[parent[parent_key]][nested_field] = daughter_dict
return res
[docs]
def dictionarize_nested_lists_as_tuples(nested_lists, parent_key="name", nested_field="measurements", nested_keys=("channel_id", "S_parameter")):
""" mongodb aggregate returns lists of dicts, which can be converted to dicts of dicts """
res = {}
for parent in nested_lists:
res[parent[parent_key]] = parent
if nested_field in parent and (nested_field is not None):
daughter_list = parent[nested_field]
daughter_dict = {}
for daughter in daughter_list:
# measurements do not have a unique column which can be used as key for the dictionnary, so use a tuple instead for indexing
dict_key = []
for nested_key in nested_keys:
if nested_key in daughter:
dict_key.append(daughter[nested_key])
else:
dict_key.append(None)
daughter_dict[tuple(dict_key)] = daughter
#else:
# logger.warning(f"trying to access unavailable nested key {nested_key} in field {nested_field}. Nothing to be done.")
# replace list with dict
res[parent[parent_key]][nested_field] = daughter_dict
return res