Source code for NuRadioReco.detector.RNO_G.db_mongo_read
import six
import os
import urllib.parse
import datetime
import numpy as np
from functools import wraps
import copy
import re
from pymongo import MongoClient
import NuRadioReco.utilities.metaclasses
import astropy.time
import logging
logger = logging.getLogger("NuRadioReco.MongoDBRead")
logger.setLevel(logging.INFO)
def _convert_astro_time_to_datetime(time_astro):
return time_astro.to_datetime()
def _check_database_time(method):
@wraps(method)
def _impl(self, *method_args, **method_kwargs):
time = self.get_database_time()
if time is None:
logger.error('Database time is None.')
raise ValueError('Database time is None.')
return method(self, *method_args, **method_kwargs)
return _impl
[docs]@six.add_metaclass(NuRadioReco.utilities.metaclasses.Singleton)
class Database(object):
def __init__(self, database_connection="RNOG_public", database_name=None, mongo_kwargs={}):
"""
Interface to the RNO-G hardware database. This class uses the python API pymongo for the
RNO-G MongoDB.
This classes allows you to connect to preconfigured mongo clients or select your mongo client freely.
The database is accesible with the `self.db` variable.
Parameters
----------
database_connection : str (Default: `"RNOG_public"`)
Specify mongo client. You have 5 options:
* `"env_pw_user"`: Connect to a server with the environmental variables
`mongo_server`, `mongo_user`, and `mongo_password`
* `"RNOG_public"`: Preconfigured connection to read-only RNO-G Hardware DB
* `"RNOG_test_public"`: Preconfigured connection to read-only RNO-G Test Hardware DB
* `"connection_string"`: Use environmental variable `db_mongo_connection_string` to
connect to mongo server
* `"mongodb*": Every string which starts with `"mongodb"` will be used to connect to
a mongo server
database_name : str (Default: None -> `"RNOG_live"`)
Select the database by name. If None (default) is passed, set to `"RNOG_live"`
mongo_kwargs : dict (Default: `{}`)
Additional arguments to pass to MongoClient.
"""
if database_connection == "env_pw_user":
# use db connection from environment, pw and user need to be percent escaped
mongo_server = os.environ.get('mongo_server')
if mongo_server is None:
logger.warning('variable "mongo_server" not set')
mongo_password = urllib.parse.quote_plus(os.environ.get('mongo_password'))
mongo_user = urllib.parse.quote_plus(os.environ.get('mongo_user'))
if None in [mongo_user, mongo_password]:
logger.warning('"mongo_user" or "mongo_password" not set')
# start client
connection_string = f"mongodb://{mongo_user}:{mongo_password}@{mongo_server}"
mongo_kwargs["tls"] = True
elif database_connection == "RNOG_public":
# use read-only access to the RNO-G database
connection_string = (
"mongodb://read:EseNbGVaCV4pBBrt@radio.zeuthen.desy.de:27017/admin?authSource=admin&"
"readPreference=primary&appname=MongoDB%20Compass&directConnection=true&ssl=true")
elif database_connection == "RNOG_test_public":
# use readonly access to the RNO-G test database
connection_string = (
"mongodb://RNOG_test_public:jrE5xO38D7wQweVR5doa@radio-test.zeuthen.desy.de:27017/admin?authSource=admin&"
"readPreference=primary&appname=MongoDB%20Compass&directConnection=true&ssl=true")
elif database_connection == "connection_string":
# use a connection string from the environment
connection_string = os.environ.get('db_mongo_connection_string')
elif database_connection.startswith("mongodb"):
connection_string = database_connection
else:
logger.error('specify a defined database connection '
'["env_pw_user", "connection_string", "RNOG_public", "RNOG_test_public", "mongodb..."]')
self.__mongo_client = MongoClient(connection_string, **mongo_kwargs)
if database_name is None:
database_name = "RNOG_live"
if database_name not in self.__mongo_client.list_database_names():
logger.error(f'Could not find database "{database_name}" in mongo client.')
raise KeyError
self.db = self.__mongo_client[database_name]
logger.info("database connection to {} established".format(self.db.name))
# Set timestamp of database. This is used to determine which primary measurement is used
self.__database_time = datetime.datetime.utcnow()
# This is used to get commissioned stations/channels
self.__detector_time = None
self.__get_collection_names = None
self.__station_collection = "station_rnog"
[docs] def set_database_time(self, time):
''' Set time for database. This affects which primary measurement is used.
Parameters
----------
time: `datetime.datetime` or ``astropy.time.Time``
UTC time.
'''
if isinstance(time, astropy.time.Time):
time = _convert_astro_time_to_datetime(time)
if not isinstance(time, datetime.datetime):
logger.error("Set invalid time for database. Time has to be of type datetime.datetime")
raise TypeError("Set invalid time for database. Time has to be of type datetime.datetime")
self.__database_time = time
[docs] def set_detector_time(self, time):
''' Set time of detector. This controls which stations/channels are commissioned.
Parameters
----------
time: `datetime.datetime` or ``astropy.time.Time``
UTC time.
'''
if isinstance(time, astropy.time.Time):
time = _convert_astro_time_to_datetime(time)
if not isinstance(time, datetime.datetime):
logger.error("Set invalid time for detector. Time has to be of type datetime.datetime")
raise TypeError("Set invalid time for detector. Time has to be of type datetime.datetime")
self.__detector_time = time
[docs] def find_primary_measurement(self, collection_name, name, primary_time, identification_label, data_dict):
"""
Find the object_id of entry with name 'name' and gives the measurement_id of the primary measurement,
return the id of the object and the measurement
Parameters
----------
collection_name: string
name of the collection that is searched (surface_board, iglu_board, ...)
name: string
the unique identifier of the input component
primary_time: datetime.datetime
timestamp for the primary measurement
identification_label: string
specify what kind of label is used for the identification ("name" or "id")
data_dict: dict
dictionary containing additional information that are used to search the database (e.g., channel_id, S_parameter)
"""
# define search filter for the collection
filter_primary = [{'$match': {identification_label: name}},
{'$unwind': '$measurements'},
{'$unwind': '$measurements.primary_measurement'}]
add_filter = {'$match': {'measurements.primary_measurement.start': {'$lte': primary_time},
'measurements.primary_measurement.end': {'$gte': primary_time}}}
data_dict_keys = data_dict.keys()
if 'breakout_channel' in data_dict_keys and 'breakout' in data_dict_keys:
add_filter['$match'].update({'measurements.breakout': data_dict['breakout'],
'measurements.breakout_channel': data_dict['breakout_channel']})
if 'channel_id' in data_dict_keys:
add_filter['$match'].update({f'measurements.channel_id': data_dict['channel_id']})
if 'S_parameter' in data_dict_keys:
add_filter['$match'].update({'measurements.S_parameter': data_dict['S_parameter']})
filter_primary.append(add_filter)
# get all entries matching the search filter
matching_entries = list(self.db[collection_name].aggregate(filter_primary))
# extract the object and measurement id
if len(matching_entries) > 1:
logger.error('More than one primary measurement found.')
return None, [None]
elif len(matching_entries) == 0:
logger.warning('No primary measurement found.')
# the last zero is the information that no primary measurement was found
return None, [0]
else:
object_id = matching_entries[0]['_id']
measurement_id = matching_entries[0]['measurements']['id_measurement']
return object_id, [measurement_id]
[docs] def get_collection_names(self):
if self.__get_collection_names is None:
self.__get_collection_names = self.db.list_collection_names()
return self.__get_collection_names
[docs] def load_board_information(self, type, board_name, info_names):
""" Load the information for a single component from the database (can be used for IGLU / DRAB) """
infos = []
board_data = self.db[type].find_one({'name': board_name})
for i in range(len(board_data['measurements'])):
if board_data['measurements'][i]['function_test']:
for name in info_names:
infos.append(board_data['measurements'][i][name])
break
return infos
[docs] @_check_database_time
def get_general_station_information(self, station_id):
""" Get information from one station. Access information in the main collection.
Parameters
----------
station_id: int
Station id
Returns
-------
info: dict
"""
# if the collection is empty, return an empty dict
if self.db[self.__station_collection].count_documents({'id': station_id}) == 0:
return {}
if self.__detector_time is None:
detector_time = self.__database_time
logger.error("Detector time is None, use database time.")
else:
detector_time = self.__detector_time
# filter to get all information from one station with station_id and with active commission time
time_filter = [{"$match": {
'commission_time': {"$lte": detector_time},
'decommission_time': {"$gte": detector_time},
'id': station_id}}]
# get all stations which fit the filter (should only be one)
stations_for_buffer = list(self.db[self.__station_collection].aggregate(time_filter))
if len(stations_for_buffer) == 0:
logger.warning('No corresponding station found!')
return {}
elif len(stations_for_buffer) > 1:
err = f"Found to many stations (f{len(stations_for_buffer)}) for: station_id = {station_id}, and time = {detector_time}"
logger.error(err)
raise ValueError(err)
# filter out all decommissioned channels and devices
commissioned_info = copy.deepcopy(stations_for_buffer)
for key in ['channels', 'devices']:
for entry in stations_for_buffer[0][key]:
if not entry['commission_time'] <= detector_time <= entry['decommission_time']:
commissioned_info[0][key].remove(entry)
# transform the output of db.aggregate to a dict
# dictionarize the channel information
station_info = dictionarize_nested_lists(commissioned_info, parent_key="id", nested_field="channels", nested_key="id")
# dictionarize the device information
station_info_help = dictionarize_nested_lists(commissioned_info, parent_key="id", nested_field="devices", nested_key="id")
station_info[station_id]['devices'] = station_info_help[station_id]['devices']
# Add empty dicts if necessary
for key in ['channels', 'devices']:
if key not in station_info[station_id].keys():
station_info[station_id][key] = {}
return station_info
[docs] @_check_database_time
def get_general_channel_information(self, station_id, channel_id):
""" Get information from one channel. Access information in the main collection.
Parameters
----------
station_id: int
specifiy the station id from which the channel information is taken
channel_id: int
specifiy the channel id
Returns
-------
info: dict
"""
# if the collection is empty, return an empty dict
if self.db[self.__station_collection].count_documents({'id': station_id}) == 0:
return {}
if self.__detector_time is None:
detector_time = self.__database_time
logger.error("Detector time is None, use database time.")
else:
detector_time = self.__detector_time
# filter to get all information from one channel with station_id and channel_id and active commission time
time_filter = [{"$match": {
'commission_time': {"$lte": detector_time},
'decommission_time': {"$gte": detector_time},
'id': station_id}},
{'$unwind': '$channels'},
{"$match": {'channels.commission_time': {"$lte": detector_time},
'channels.decommission_time': {"$gte": detector_time},
'channels.id': channel_id}}]
# get all stations which fit the filter (should only be one)
channel_info = list(self.db[self.__station_collection].aggregate(time_filter))
if len(channel_info) == 0:
logger.warning('No corresponding channel found!')
return {}
elif len(channel_info) > 1:
err = (f"Found to many channels ({len(channel_info)}) for: station_id = {station_id}, "
f"channel_id = {channel_id}, and time = {detector_time}")
logger.error(err)
raise ValueError(err)
# only return the channel information
return channel_info[0]['channels']
[docs] @_check_database_time
def get_general_device_information(self, station_id, device_id):
""" Get information from one device. Access information in the main collection.
Parameters
----------
station_id: int
specifiy the station id from which the device information is taken
device_id: int
specifiy the device id
Returns
-------
info: dict
"""
# if the collection is empty, return an empty dict
if self.db[self.__station_collection].count_documents({'id': station_id}) == 0:
return {}
if self.__detector_time is None:
detector_time = self.__database_time
logger.error("Detector time is None, use database time.")
else:
detector_time = self.__detector_time
# filter to get all information from one channel with station_id and channel_id and active commission time
time_filter = [{"$match": {
'commission_time': {"$lte": detector_time},
'decommission_time': {"$gte": detector_time},
'id': station_id}},
{'$unwind': '$devices'},
{"$match": {'devices.commission_time': {"$lte": detector_time},
'devices.decommission_time': {"$gte": detector_time},
'devices.id': device_id}}]
# get all stations which fit the filter (should only be one)
device_info = list(self.db[self.__station_collection].aggregate(time_filter))
if len(device_info) == 0:
logger.warning('No corresponding channel found!')
return {}
elif len(device_info) > 1:
err = f"Found to many channels ({len(device_info)}) for: station_id = {station_id}, device_id = {device_id}, and time = {detector_time}"
logger.error(err)
raise ValueError(err)
# only return the channel information
return device_info[0]['devices']
[docs] @_check_database_time
def get_collection_information(self, collection_name, search_by, obj_id, measurement_name=None, channel_id=None, use_primary_time_with_measurement=False):
"""
Get the information for a specified collection (will only work for 'station_position', 'channel_position' and 'signal_chain')
if the station does not exist, {} will be returned. Return primary measurement unless measurement_name is specified.
Parameters
----------
collection_name: string
Specify the collection, from which the information should be extracted (will only work for 'station_position',
'channel_position' and 'signal_chain')
search_by: string
Specify if the collection is searched by 'station_id' or 'id'. The latter is a position or signal chain identifier
obj_id: string or int
station id or position/signal_chain identifier
measurement_name: string
Use the measurement name to select the requested data (not database time / primary time).
If "use_primary_time_with_measurement" is True, use measurement_name and primary time to
find matching objects. (Default: None -> return measurement based on primary time)
channel_id: int
Unique identifier of the channel. Only allowed if searched by 'station_id'
use_primary_time_with_measurement: bool
If True (and measurement_name is not None), use measurement name and primary time to select objects.
(Default: False)
Returns
-------
info: list(dict)
"""
if search_by == 'station_id':
id_dict = {'id': {'$regex': f'_stn{obj_id}_'}}
elif search_by == 'id':
id_dict = {'id': obj_id}
else:
raise ValueError('Only "station_id" and "id" are valid options for the "search_by" argument.')
# if the collection is empty, return an empty dict
if self.db[collection_name].count_documents(id_dict) == 0:
return {}
# define the search filter
search_filter = [{'$match': id_dict},
{'$unwind': '$measurements'}]
if measurement_name is not None or channel_id is not None:
search_filter.append({'$match': {}})
if measurement_name is not None:
# add {'measurements.measurement_name': measurement_name} to dict in '$match'
search_filter[-1]['$match'].update(
{'measurements.measurement_name': measurement_name})
if channel_id is not None :
if search_by == 'id':
raise ValueError('channel_id can only be used if the collection is searched by "station_id"')
# add {'measurements.channel_id': channel_id} to dict in '$match'
search_filter[-1]['$match'].update({'measurements.channel_id': channel_id})
if measurement_name is None or use_primary_time_with_measurement:
search_filter += [
{'$unwind': '$measurements.primary_measurement'},
{'$match': {'measurements.primary_measurement.start': {'$lte': self.__database_time},
'measurements.primary_measurement.end': {'$gte': self.__database_time}}}]
else:
# measurement/object identified by soley by "measurement_name"
pass
search_result = list(self.db[collection_name].aggregate(search_filter))
if search_result == []:
return search_result
# The following code block is necessary if the "primary_measurement" has several entries. Right now we always do that.
# Extract the information using the object and measurements id
id_filter = [{'$match': {'_id': {'$in': [dic['_id'] for dic in search_result]}}},
{'$unwind': '$measurements'},
{'$match': {'measurements.id_measurement':
{'$in': [dic['measurements']['id_measurement'] for dic in search_result]}}}]
info = list(self.db[collection_name].aggregate(id_filter))
return info
[docs] def get_quantity_names(self, collection_name, wanted_quantity):
"""
Returns a list with all measurement names, ids, ...
or what is specified (example: wanted_quantity = measurements.measurement_name)
"""
return self.db[collection_name].distinct(wanted_quantity)
[docs] def get_all_available_signal_chain_configs(self, collection, object_name, input_dic):
"""
Depending on the inputs, all possible configurations in the database are returned;
Input example: 'iglu_boards', 'Golden_IGLU' {'measurement_temp': 20, 'DRAB_id': 'Golden_DRAB'}
"""
return_dic = {}
if object_name is None:
for key in input_dic.keys():
return_dic[key] = self.get_quantity_names(collection, f'measurements.{key}')
else:
# define a search filter
search_filter = []
search_filter.append({'$match': {'name': object_name}})
search_filter.append({'$unwind': '$measurements'})
help_dic1 = {}
help_dic2 = {}
for key in input_dic.keys():
if input_dic[key] is not None:
help_dic2[f'measurements.{key}'] = input_dic[key]
if help_dic2 != {}:
help_dic1['$match'] = help_dic2
search_filter.append(help_dic1)
# print(search_filter)
search_result = list(self.db[collection].aggregate(search_filter))
for key in input_dic.keys():
help_list = []
for entry in search_result:
help_list.append(entry['measurements'][key])
return_dic[key] = list(set(help_list))
return return_dic
[docs] def get_identifier(self, station_id, channel_device_id=None, component="station", what="signal"):
"""
Get the identifier for a station/channel/device measurement,
For station and device returns position identifer. For channel returns
position and signal chain identifier.
Access information in the main collection.
Parameters
----------
station id: int
Specify the station for which the measurement identifier is return
channel_device_id: int
Specify the channel/device id. Only necessary if component="channel" or "device.
(Default: None)
component: str
Specify for what you want to have the identifier(s):
"station" (default), "channel", or "device"
what: str
For what to return the identifier: "position" (default) or "signal_chain" (only available for "channel")
Returns
-------
position_id: str
Unique identifier to find measurement in different collection
"""
# if the collection is empty, return None
if self.db[self.__station_collection].count_documents({'id': station_id}) == 0:
return None
detector_time = self.get_detector_time()
# filter to get all information from one station with station_id and with active commission time
time_filter = [{"$match": {
'commission_time': {"$lte": detector_time},
'decommission_time': {"$gte": detector_time},
'id': station_id}}]
if component == "channel" or component == "device":
if channel_device_id is None:
raise ValueError(f"Please provide a channel id.")
comp_str = component + "s"
time_filter += [{'$unwind': f'${comp_str}'},
{"$match": {f'{comp_str}.commission_time': {"$lte": detector_time},
f'{comp_str}.decommission_time': {"$gte": detector_time},
f'{comp_str}.id': channel_device_id}}]
elif component == "station":
pass # do nothing here
else:
err = (f"Requested identifer for unknown component: {component}. "
"Only valid components are \"station\", \"channel\", \"device\".")
logger.warning(err)
raise ValueError(err)
# get all stations which fit the filter (should only be one)
info = list(self.db[self.__station_collection].aggregate(time_filter))
if len(info) == 0:
err = (f"Could not find corresponding station/channel/device "
f"({component}: station_id = {station_id}, channel/device id "
f"= {channel_device_id}, time = {detector_time}")
logger.warning(err)
raise ValueError(err)
elif len(info) > 1:
err = (f"Found to many stations/channels/devices (f{len(info)}) "
f"({component}: station_id = {station_id}, channel/device id "
f"= {channel_device_id}, time = {detector_time}")
logger.error(err)
raise ValueError(err)
if component == "station":
return info[0]['id_position']
elif component == "channel":
return info[0]['channels'][f'id_{what}']
elif component == "device":
# only return the device position id
return info[0]['devices']['id_position']
[docs] def get_position(self, station_id=None, channel_device_id=None, position_id=None,
measurement_name=None, use_primary_time_with_measurement=False,
component="station", verbose=False):
"""
Function to return the channel position,
returns primary unless measurement_name is not None
"""
# If the channel_position_id is given, the position is directly collected from the channel position
# collection (no need to look into the main collection again)
if position_id is None:
if station_id is None:
raise ValueError('Either the position_id or station_id (+ channel_id/device_id) needes to be given!')
position_id = self.get_identifier(
station_id, channel_device_id, component=component, what='position')
print(position_id)
# if measurement name is None, the primary measurement is returned
collection_info = self.get_collection_information(
f'{component}_position', search_by='id', obj_id=position_id, measurement_name=measurement_name,
use_primary_time_with_measurement=use_primary_time_with_measurement)
# raise an error if more than one value is returned
if len(collection_info) > 1:
raise ValueError
# return empty dict if no measurement is found
if len(collection_info) == 0:
return {}
# return the information
if verbose:
return collection_info[0]['measurements']
else:
return {k: collection_info[0]['measurements'][k] for k in
['position', 'rotation', 'orientation'] if k in collection_info[0]['measurements']}
[docs] def get_channel_signal_chain_measurement(self, station_id=None, channel_id=None, channel_signal_id=None,
measurement_name=None, verbose=False):
""" function to return the channels signal chain information, returns primary unless measurement_name is not None """
# if the channel_signal_id is given, the signal chain is directly collected from the signal chain collection
# (no need to look into the main collection again)
if channel_signal_id is None:
if station_id is None :
raise ValueError('Either the channel_signal_id or station_id + channel_id needes to be given!')
channel_signal_id = self.get_identifier(
station_id, channel_id, component="channel", what="signal")
# if measurement name is None, the primary measurement is returned
collection_info = self.get_collection_information(
'signal_chain', search_by='id', obj_id=channel_signal_id, measurement_name=measurement_name)
# raise an error if more than one value is returned
if len(collection_info) > 1:
raise ValueError
# return empty dict if no measurement is found
if len(collection_info) == 0:
return {}
# return the information
if verbose:
return collection_info[0]['measurements']
else:
return {k:collection_info[0]['measurements'][k] for k in ('VEL', 'response_chain', 'primary_components')}
[docs] def get_component_data(self, component_type, component_id, supplementary_info, primary_time, verbose=True, sparameter='S21'):
""" returns the current primary measurement of the component, reads in the component collection"""
# define a search filter
search_filter = [{'$match': {'name': component_id}}, {'$unwind': '$measurements'}, {'$match': {}}]
# if supplemenatry information exsits (like channel id, etc ...), update the search filter
if supplementary_info != {}:
for supp_info in supplementary_info.keys():
search_filter[-1]['$match'].update({f'measurements.{supp_info}': supplementary_info[supp_info]})
# add the S parameter to the search filter, only collect single S parameter
search_filter[-1]['$match'].update({'measurements.S_parameter': sparameter})
search_filter.append({'$unwind': '$measurements.primary_measurement'})
search_filter.append({'$match': {'measurements.primary_measurement.start': {'$lte': primary_time},
'measurements.primary_measurement.end': {'$gte': primary_time}}})
search_result = list(self.db[component_type].aggregate(search_filter))
if len(search_result) != 1:
raise ValueError(f'No or more than one measurement found: {search_result}. Search filter: {search_filter}')
measurement = search_result[0]['measurements']
# remove 'id_measurement' object
measurement.pop('id_measurement', None)
if verbose:
return measurement
else:
return {k:measurement[k] for k in ('name', 'channel_id', 'frequencies', 'mag', 'phase') if k in measurement.keys()}
[docs] def get_complete_station_information(
self, station_id, measurement_station_position=None, measurement_channel_position=None,
measurement_signal_chain=None, measurement_device_position=None, verbose=True):
"""
Collects all available information about the station
Parameters
----------
station_id: int
The unique identifier of the station the channel belongs to
measurement_station_position: string
If not None, this measurement will be collected (even though it is not the primary measurement)
measurement_channel_position: string
If not None, this measurement will be collected (even though it is not the primary measurement)
measurement_signal_chain: string
If not None, this measurement will be collected (even though it is not the primary measurement)
measurement_device_position: string
If not None, this measurement will be collected (even though it is not the primary measurement)
Returns
-------
complete_info: dict
"""
# load general station information (dump of the main collection)
general_info = self.get_general_station_information(station_id)
#extract and delete the position identifier, drop the general channel and device information
station_position_id = general_info[station_id]['id_position']
general_channel_info = general_info[station_id]['channels']
general_device_info = general_info[station_id]['devices']
# remove '_id' object
general_info[station_id].pop('_id')
# get the station positions
station_position = self.get_position(
position_id=station_position_id, measurement_name=measurement_station_position, verbose=verbose)
# remove 'id_measurement' object
station_position.pop('id_measurement', None)
# include the station position into the final dict
general_info[station_id]['station_position'] = station_position
# get the channel ids of all channels contained in the station
channel_ids = list(general_channel_info.keys())
# get the channel info
channel_info = {}
for cha_id in sorted(channel_ids):
channel_info[cha_id] = self.get_complete_channel_information(
station_id, cha_id, measurement_position=measurement_channel_position,
measurement_signal_chain=measurement_signal_chain, verbose=verbose)
# include the channel information into the final dict
for channel_id in general_info[station_id]['channels']:
# print(general_info[station_id]['channels'][channel_id].keys(), channel_info[channel_id].keys())
general_info[station_id]['channels'][channel_id].update(channel_info[channel_id])
# get the device ids of all devices contained in the station
device_ids = list(general_device_info.keys())
# get the device info
device_info = {}
for dev_id in sorted(device_ids):
device_info[dev_id] = self.get_complete_device_information(
station_id, dev_id, measurement_position=measurement_device_position,verbose=verbose)
# include the device information into the final dict
for device in general_info[station_id]['devices']:
general_info[station_id]['devices'][device].update(device_info[device])
return general_info
[docs] def get_channel_signal_chain(self, channel_signal_id, measurement_name=None, verbose=True):
"""
Returns the response data for a given signal chain.
Parameters
----------
channel_signal_id: str
Indentifier of the signal chain
Returns
-------
signal_chain: dict
A dictinoary which among otherthings contains the "response_chain" which carries the measured response for the different
components in the signal chain.
"""
# load the channel signal chain information (which components are used in the signal chain per channel):
channel_sig_info = self.get_channel_signal_chain_measurement(
channel_signal_id=channel_signal_id, measurement_name=measurement_name, verbose=verbose)
# extract the information about the used components
component_dict = channel_sig_info.pop('response_chain')
# Certain keys in the response chain only carry additional information of other components
# and do not describe own components on their own ("channel", "breakout", "weight")
# extract the information about the components, the additional information and the weights from the response chain dict
filtered_component_dict = {}
additional_information = {}
weight_dict = {}
for key, ele in component_dict.items():
if re.search("(channel|breakout|weight)", key) is None:
filtered_component_dict[key] = ele
elif re.search("weight", key) is not None:
weight_dict[key.replace("_weight", "")] = ele
else:
additional_information[key] = ele
# go through all components and load the s parameter measurements for each used component
components_data = {}
for component, component_id in filtered_component_dict.items():
# Add the additional informatio which were filtered out above to the correct components
supp_info = {k.replace(component + "_", ""): additional_information[k] for k in additional_information
if re.search(component, k)}
if re.search("_\d+", component, re.IGNORECASE):
collection_suffix = re.findall("(_\d+)", component, re.IGNORECASE)[0]
collection_component = component.replace(collection_suffix, "")
else:
collection_component = component
# load the s21 parameter measurement
component_data = self.get_component_data(
collection_component, component_id, supp_info, primary_time=self.__database_time, verbose=verbose)
# add the component name, the weight of the s21 measurement and the actual s21 measurement (component_data) to a combined dictionary
components_data[component] = {'name': component_id}
if component in weight_dict:
components_data[component].update({'weight': weight_dict[component]})
components_data[component].update(component_data)
# add/update the signal chain to the channel data
channel_sig_info['response_chain'] = components_data
return channel_sig_info
[docs] def get_complete_channel_information(
self, station_id, channel_id, measurement_position=None, measurement_signal_chain=None, verbose=True):
"""
Collects all available information about the input channel
Parameters
----------
station_id: int
The unique identifier of the station the channel belongs to
channel_id: int
Channel id for which all information will be returned
measurement_position: string
If not None, this measurement will be collected (even though it is not the primary measurement)
measurement_signal_chain: string
If not None, this measurement will be collected (even though it is not the primary measurement)
Returns
-------
complete_info
"""
# load general channel information
general_info = self.get_general_channel_information(station_id, channel_id)
#extract and delete the position/signal identifier
position_id = general_info['id_position']
signal_id = general_info['id_signal']
# load the channel position information:
channel_pos_info = self.get_position(
position_id=position_id, measurement_name=measurement_position, verbose=verbose, component="channel")
# include the channel position into the final dict
general_info['channel_position'] = channel_pos_info
channel_sig_info = self.get_channel_signal_chain(signal_id, measurement_signal_chain, verbose)
# remove 'id_measurement' and 'channel_id' object
channel_sig_info.pop('channel_id', None)
general_info['signal_chain'] = channel_sig_info
return general_info
[docs] def get_complete_device_information(self, station_id, device_id, measurement_position=None, verbose=True):
"""
Collects all available information about a device
Parameters
----------
station_id: int
The unique identifier of the station the device belongs to
device_id: int
The device id for which the information will be written out
measurement_position: string
If not None, this measurement will be collected (even though it is not the primary measurement)
Returns
-------
complete_info
"""
complete_info = {}
# load general device information
general_info = self.get_general_device_information(station_id, device_id)
#extract and delete the position identifier
position_id = general_info.pop('id_position')
# include the general info into the final dict
complete_info.update(general_info)
# load the device position information:
device_pos_info = self.get_position(position_id=position_id, measurement_name=measurement_position, verbose=verbose, component="device")
# remove 'id_measurement' and 'device_id' object
device_pos_info.pop('id_measurement', None)
device_pos_info.pop('device_id', None)
# include the device position into the final dict
complete_info['device_position'] = device_pos_info
return complete_info
[docs] def query_modification_timestamps_per_station(self):
"""
Collects all the timestamps for station and channel (de)commissioning from the database.
Combines those to get a list of timestamps when modifications happened which requiers to update the buffer.
Returns
-------
station_data: dict(dict(list))
Returns for each station (key = station.id) a dictionary with three entries:
"modification_timestamps", "station_commission_timestamps", "station_decommission_timestamps"
each containing a list of timestamps. The former combines the latter two + channel (de)comission
timestamps.
"""
# get distinct set of stations:
station_ids = self.db[self.__station_collection].distinct("id")
modification_timestamp_dict = {}
for station_id in station_ids:
# get set of (de)commission times for stations
station_times_comm = self.db[self.__station_collection].distinct("commission_time", {"id": station_id})
station_times_decomm = self.db[self.__station_collection].distinct("decommission_time", {"id": station_id})
# get set of (de)commission times for channels
channel_times_comm = self.db[self.__station_collection].distinct("channels.commission_time", {"id": station_id})
channel_times_decomm = self.db[self.__station_collection].distinct("channels.decommission_time", {"id": station_id})
mod_set = np.unique(station_times_comm + station_times_decomm + channel_times_comm + channel_times_decomm)
mod_set.sort()
station_times_comm.sort()
station_times_decomm.sort()
station_data = {
"modification_timestamps": mod_set,
"station_commission_timestamps": station_times_comm,
"station_decommission_timestamps": station_times_decomm
}
# store timestamps, which can be used with np.digitize
modification_timestamp_dict[station_id] = station_data
return modification_timestamp_dict
[docs]def dictionarize_nested_lists(nested_lists, parent_key="id", nested_field="channels", nested_key="id"):
""" mongodb aggregate returns lists of dicts, which can be converted to dicts of dicts """
res = {}
for parent in nested_lists:
res[parent[parent_key]] = parent
if nested_field in parent and nested_field is not None:
daughter_list = parent[nested_field]
daughter_dict = {}
for daughter in daughter_list:
if nested_key in daughter:
daughter_dict[daughter[nested_key]] = daughter
res[parent[parent_key]][nested_field] = daughter_dict
return res
[docs]def dictionarize_nested_lists_as_tuples(nested_lists, parent_key="name", nested_field="measurements", nested_keys=("channel_id", "S_parameter")):
""" mongodb aggregate returns lists of dicts, which can be converted to dicts of dicts """
res = {}
for parent in nested_lists:
res[parent[parent_key]] = parent
if nested_field in parent and (nested_field is not None):
daughter_list = parent[nested_field]
daughter_dict = {}
for daughter in daughter_list:
# measurements do not have a unique column which can be used as key for the dictionnary, so use a tuple instead for indexing
dict_key = []
for nested_key in nested_keys:
if nested_key in daughter:
dict_key.append(daughter[nested_key])
else:
dict_key.append(None)
daughter_dict[tuple(dict_key)] = daughter
#else:
# logger.warning(f"trying to access unavailable nested key {nested_key} in field {nested_field}. Nothing to be done.")
# replace list with dict
res[parent[parent_key]][nested_field] = daughter_dict
return res