import NuRadioReco.detector.RNO_G.db_mongo_read
import NuRadioReco.utilities.metaclasses
import six
import datetime
from bson import ObjectId
import logging
logger = logging.getLogger("NuRadioReco.MongoDBWrite")
logger.setLevel(logging.DEBUG)
[docs]@six.add_metaclass(NuRadioReco.utilities.metaclasses.Singleton)
class Database(NuRadioReco.detector.RNO_G.db_mongo_read.Database):
# operation changing collections
[docs] def rename_database_collection(self, old_name, new_name):
"""
changes the name of a collection of the database
If the new name already exists, the operation fails.
Parameters
----------
old_name: string
old name of the collection
new_name: string
new name of the collection
"""
self.db[old_name].rename(new_name)
[docs] def create_empty_collection(self, collection_name):
self.db.create_collection(collection_name)
[docs] def clone_collection_to_collection(self, old_collection, new_collection):
self.db[old_collection].aggregate(
[{'$match': {}}, {'$out': new_collection}])
# operation adding documents to a collection
[docs] def set_not_working(self, collection_name, name, primary_measurement, channel_id=None, breakout_id=None, breakout_channel_id=None):
"""
inserts that the input unit is broken.
If the input unit dosn't exist yet, it will be created.
Parameters
----------
collection_name: string
name of the collection that is searched (surface_board, iglu_board, ...)
name: string
the unique identifier of the input unit
primary_measurement: boolean
specifies if this measurement is used as the primary measurement
channel_id: int
channel-id of the measured object (default:None)
breakout_id: int
number describing the breakout of the fiber (default: None)
breakout_channel_id: string
string describing the breakout channel of the fiber (default: None)
"""
# close the time period of the old primary measurement
if primary_measurement and name in self.get_object_names(collection_name):
self.update_current_primary(collection_name, name, channel_id=channel_id,
breakout_id=breakout_id, breakout_channel_id=breakout_channel_id)
# define the new primary measurement times
primary_measurement_times = [{'start': datetime.datetime.utcnow(
), 'end': datetime.datetime(2100, 1, 1, 0, 0, 0)}]
if channel_id is not None:
self.db[collection_name].update_one({'name': name},
{'$push': {'measurements': {
'id_measurement': ObjectId(),
'last_updated': datetime.datetime.utcnow(),
'function_test': False,
'primary_measurement': primary_measurement_times,
'channel_id': channel_id
}}}, upsert=True)
elif breakout_id is not None and breakout_channel_id is not None:
self.db[collection_name].update_one({'name': name},
{'$push': {'measurements': {
'id_measurement': ObjectId(),
'last_updated': datetime.datetime.utcnow(),
'function_test': False,
'primary_measurement': primary_measurement_times,
'breakout': breakout_id,
'breakout_channel': breakout_channel_id
}}}, upsert=True)
else:
self.db[collection_name].update_one({'name': name},
{'$push': {'measurements': {
'id_measurement': ObjectId(),
'last_updated': datetime.datetime.utcnow(),
'function_test': False,
'primary_measurement': primary_measurement_times
}}}, upsert=True)
[docs] def add_entry_to_database(self, collection, identification_key, identification_value, primary_measurement, data_dict, primary_measurement_start=None):
"""
inserts a entry into the database.
If the measurement dosn't exist yet, it will be created.
Only works for the following collections: hpol, vpol, surface, ...
Parameters
----------
collection: string
specify the collection in which the entry will be added
identification_key: string
specify the key used for identification (must be 'name' or 'id')
identification_value: string
specify the name of the entry (e.g. the name of the measurement or the channel position identifier)
primary_measurement: bool
indicates the primary measurement to be used for analysis
data_dict: dict
dictionary with all the information that should be saved for this entry
primary_measurement_start: datetime.datetime
If this quantity is given, the start time of the primary measurement is set to this value. Otherwise, the primary start time will be set to the current time
"""
self.set_database_time(datetime.datetime.utcnow())
# close the time period of the old primary measurement
if primary_measurement and identification_value in self.db[collection].distinct(identification_key):
self.update_current_primary(
collection, identification_value, identification_label=identification_key, data_dict=data_dict)
# define the new primary measurement times
if primary_measurement:
if primary_measurement_start is None:
primary_measurement_start = datetime.datetime.utcnow()
primary_measurement_times = [{'start': primary_measurement_start, 'end': datetime.datetime(2100, 1, 1, 0, 0, 0)}]
else:
primary_measurement_times = []
# update the entry with the measurement (if the entry doesn't exist it will be created)
data_dict.update({'id_measurement': ObjectId(
), 'primary_measurement': primary_measurement_times, 'last_updated': datetime.datetime.utcnow()})
self.db[collection].update_one({identification_key: identification_value},
{'$push': {'measurements': data_dict}}, upsert=True)
[docs] def add_general_station_info(self, station_id, station_name, station_comment, number_of_samples, sampling_rate, commission_time, decommission_time=datetime.datetime(2080, 1, 1)):
# check if an active station exist; if true, the active station will be decommissioned
# filter to get all active stations with the correct id
time = self.__detector_time
time_filter = [{"$match": {
'commission_time': {"$lte": time},
'decommission_time': {"$gte": time},
'id': station_id}}]
# get all stations which fit the filter (should only be one)
stations = list(self.db[self.__station_collection].aggregate(time_filter))
if len(stations) > 0:
self.decommission_a_station(station_id, commission_time)
# create uniqe position identifier
position_identifier = f'position_stn{station_id}_{commission_time.month}{commission_time.year}'
# insert the new station
self.db[self.__station_collection].insert_one({'id': station_id,
'name': station_name,
'commission_time': commission_time,
'decommission_time': decommission_time,
'number_of_samples': number_of_samples,
'sampling_rate': sampling_rate,
'station_comment': station_comment,
'id_position': position_identifier,
'channels': [],
'devices': []
})
[docs] def add_general_channel_info_to_station(self, station_id, channel_id, signal_chain, ant_type, channel_comment, commission_time, decommission_time=datetime.datetime(2080, 1, 1)):
# get the current active station
# filter to get all active stations with the correct id
time = self.__detector_time
time_filter = [{"$match": {
'commission_time': {"$lte": time},
'decommission_time': {"$gte": time},
'id': station_id}}]
# get all stations which fit the filter (should only be one)
stations = list(self.db[self.__station_collection].aggregate(time_filter))
if len(stations) != 1:
logger.error('More than one or no active stations in the database')
return 1
unique_station_id = stations[0]['_id']
# check if for this channel an entry already exists
component_filter = [{'$match': {'_id': unique_station_id}},
{'$unwind': '$channels'},
{'$match': {'channels.id': channel_id}}]
entries = list(self.db[self.__station_collection].aggregate(component_filter))
# check if the channel already exist, decommission the active channel first
if entries != []:
self.decommission_a_channel(station_id, channel_id, commission_time)
# create uniqe position and signal chain identifier
position_identifier = f'position_stn{station_id}_cha{channel_id}_{commission_time.month}{commission_time.year}'
signal_identifier = f'signal_stn{station_id}_cha{channel_id}_{commission_time.month}{commission_time.year}'
# insert the channel information
self.db[self.__station_collection].update_one({'_id': unique_station_id},
{"$push": {'channels': {
'id': channel_id,
'id_position': position_identifier,
'id_signal': signal_identifier,
'ant_type': ant_type,
'commission_time': commission_time,
'decommission_time': decommission_time,
'installed_components': signal_chain,
'channel_comment': channel_comment
}}
})
[docs] def add_general_device_info_to_station(self, station_id, device_id, device_name, device_comment, amp_name, commission_time, decommission_time=datetime.datetime(2080, 1, 1)):
# get the current active station
# filter to get all active stations with the correct id
time = self.__detector_time
time_filter = [{"$match": {
'commission_time': {"$lte": time},
'decommission_time': {"$gte": time},
'id': station_id}}]
# get all stations which fit the filter (should only be one)
stations = list(self.db[self.__station_collection].aggregate(time_filter))
if len(stations) != 1:
logger.error('More than one or no active stations in the database')
return 1
unique_station_id = stations[0]['_id']
# check if for this device an entry already exists
component_filter = [{'$match': {'_id': unique_station_id}},
{'$unwind': '$devices'},
{'$match': {'device.id': device_id}}]
entries = list(self.db[self.__station_collection].aggregate(component_filter))
# check if the device already exist, decommission the active device first
if entries != []:
self.decommission_a_device(station_id, device_id, commission_time)
# create uniqe position and identifier
position_identifier = f'position_stn{station_id}_dev{device_id}_{commission_time.month}{commission_time.year}'
# insert the device information
self.db[self.__station_collection].update_one({'_id': unique_station_id},
{"$push": {'devices': {
'id': device_id,
'id_position': position_identifier,
'device_name': device_name,
'amp_name': amp_name,
'commission_time': commission_time,
'decommission_time': decommission_time,
'device_comment': device_comment
}}
})
[docs] def add_measurement_protocol(self, protocol_name):
# insert the new measurement protocol
self.db['measurement_protocol'].insert_one({'protocol': protocol_name,
'inserted': datetime.datetime.utcnow()})
# operation that change the primary status of a measurement
[docs] def update_current_primary(self, collection_name, name, identification_label, data_dict):
"""
updates the status of primary_measurement, set the timestamp of the current primary measurement to end at datetime.utcnow()
Parameters
----------
collection_name: string
name of the collection that is searched (surface_board, iglu_board, ...)
name: string
the unique identifier of the input component
identification_label: string
specify what kind of label is used for the identification ("name" or "id")
data_dict: dict
dictionary containing additional information that are used to search the database (e.g., channel_id, S_parameter)
"""
present_time = self.__database_time
# find the current primary measurement
obj_id, measurement_id = self.find_primary_measurement(
collection_name, name, present_time, identification_label=identification_label, data_dict=data_dict)
if obj_id is None and measurement_id[0] == 0:
# no primary measurement was found and thus there is no measurement to update
pass
elif obj_id is None and measurement_id == [None]:
raise ValueError(
'More than one primary measurements are found. Please contact the database support.')
else:
for m_id in measurement_id:
# get the old primary times
filter_primary_times = [{'$match': {'_id': obj_id}},
{'$unwind': '$measurements'},
{'$match': {'measurements.id_measurement': m_id}}]
info = list(self.db[collection_name].aggregate(filter_primary_times))
primary_times = info[0]['measurements']['primary_measurement']
# update the 'end' time to the present time
primary_times[-1]['end'] = present_time
self.db[collection_name].update_one({'_id': obj_id}, {"$set": {
"measurements.$[updateIndex].primary_measurement": primary_times}}, array_filters=[{"updateIndex.id_measurement": m_id}])
def __change_primary_object_measurement(self, object_type, object_name, search_filter, channel_id=None, breakout_id=None, breakout_channel_id=None):
"""
helper function to change the current active primary measurement for a single antenna measurement
Parameters
----------
object_type: string
type of the input component (iglu_board, drab_board, ...)
object_name: string
the unique identifier of the object
search_filter:
specify the filter used to find the measurement
channel_id: int
If a channel id is given (e.g. for a drab_board) the value is used to search for the current primary measurement (default: None).
breakout_id: int
If a brakout id is given (for a downhole_chain) the value is used to search for the current primary measurement (default: None).
breakout_channel_id: int
If a brakout channel id is given (for a downhole_chain) the value is used to search for the current primary measurement (default: None).
"""
present_time = datetime.datetime.utcnow()
# get the information about the measurement specified in the search filter
search_results = list(self.db[object_type].aggregate(search_filter))
# extract the object and measurement id and current primary time array (only gives one measurement id)
if len(search_results) > 1:
logger.error('More than one measurement found.')
object_id = None
measurement_id = None
elif len(search_results) == 0:
logger.error('No measurement found.')
object_id = None
measurement_id = None
else:
object_id = search_results[0]['_id']
measurement_id = search_results[0]['measurements']['id_measurement']
primary_times = search_results[0]['measurements']['primary_measurement']
# check if specified measurement is already the primary measurement (could be up to 4 measurement ids)
current_obj_id, current_measurement_id = self.find_primary_measurement(
object_type, object_name, present_time, channel_id=channel_id, breakout_id=breakout_id, breakout_channel_id=breakout_channel_id)
for c_m_id in current_measurement_id:
# find the current_measurement_id for the fitting S parameter
filter_primary_times = [{'$match': {'_id': current_obj_id}},
{'$unwind': '$measurements'},
{'$match': {'measurements.id_measurement': c_m_id}}]
info = list(self.db[object_type].aggregate(filter_primary_times))
if info[0]['measurements']['S_parameter'] == search_results[0]['measurements']['S_parameter']:
# the measurement id is fitting the S parameter
if c_m_id == measurement_id and current_obj_id == object_id and measurement_id is not None:
logger.info(
'The specified measurement is already the primary measurement.')
elif measurement_id is None or current_measurement_id is None:
pass
else:
# update the old primary time (not using the 'update current primary measurement' function so that we can only update the entry of a single S parameter
primary_times_old = info[0]['measurements']['primary_measurement']
# # update the 'end' time to the present time
primary_times_old[-1]['end'] = present_time
self.db[object_type].update_one({'_id': object_id}, {"$set": {
"measurements.$[updateIndex].primary_measurement": primary_times_old}}, array_filters=[{"updateIndex.id_measurement": c_m_id}])
# update the primary measurements of the specified measurements
if object_id is not None:
primary_times.append(
{'start': present_time, 'end': datetime.datetime(2100, 1, 1, 0, 0, 0)})
self.db[object_type].update_one({'_id': object_id}, {"$set": {
"measurements.$[updateIndex].primary_measurement": primary_times}}, array_filters=[{"updateIndex.id_measurement": measurement_id}])
else:
logger.error('S parameter not selected to be changed.')
[docs] def change_primary_antenna_measurement(self, antenna_type, antenna_name, S_parameter, protocol, units_arr, function_test):
"""
changes the current active primary measurement for a single antenna measurement
Parameters
----------
antenna_type: string
specify if it is a VPol or HPol antenna
antenna_name: string
the unique identifier of the antenna
S_parameter: list of strings
specify which S_parameter is used (S11, ...)
protocol: string
details of the testing environment
units_arr: list of strings
list of the input units (only y unit will be saved)
function_test: boolean
describes if the channel is working or not
"""
# find the entry specified by function arguments
search_filter = [{'$match': {'name': antenna_name}},
{'$unwind': '$measurements'},
{'$match': {'measurements.function_test': function_test,
'measurements.measurement_protocol': protocol,
'measurements.S_parameter': S_parameter,
'measurements.y-axis_units': units_arr}}]
self.__change_primary_object_measurement(
antenna_type, antenna_name, search_filter)
[docs] def change_primary_cable_measurement(self, cable_type, cable_name, S_parameter, protocol, units_arr, function_test):
"""
changes the current active primary measurement for a single cable measurement
Parameters
----------
cable_type: string
specify if it is a surface or downhole cable
cable_name: string
the unique identifier of the cable
S_parameter: list of strings
specify which S_parameter is used (S11, ...)
protocol: string
details of the testing environment
units_arr: list of strings
list of the input units (only y unit will be saved)
function_test: boolean
describes if the channel is working or not
"""
# find the entry specified by function arguments
search_filter = [{'$match': {'name': cable_name}},
{'$unwind': '$measurements'},
{'$match': {'measurements.function_test': function_test,
'measurements.measurement_protocol': protocol,
'measurements.S_parameter': S_parameter,
'measurements.y-axis_units': units_arr}}]
self.__change_primary_object_measurement(
cable_type, cable_name, search_filter)
[docs] def change_primary_iglu_measurement(self, board_type, board_name, S_parameter, protocol, units_arr, function_test, drab_id, laser_id, temperature):
"""
changes the current active primary measurement for a single board measurement
Parameters
----------
board_type: string
specify the board type
board_name: string
the unique identifier of the board
S_parameter: list of strings
specify which S_parameter is used (S11, ...)
protocol: string
details of the testing environment
units_arr: list of strings
list of the input units (only y unit will be saved)
function_test: boolean
describes if the channel is working or not
drab_id: string
unique identifier of the drab used in the iglu measurement
laser_id: string
id of the laser that is used in the iglu board
temperature: int
temperature at which the measurement was performed
"""
# find the entry specified by function arguments
search_filter = [{'$match': {'name': board_name}},
{'$unwind': '$measurements'},
{'$match': {'measurements.function_test': function_test,
'measurements.measurement_protocol': protocol,
'measurements.S_parameter': S_parameter,
'measurements.DRAB_id': drab_id,
'measurements.laser_id': laser_id,
'measurements.measurement_temp': temperature,
'measurements.y-axis_units': units_arr
}}]
self.__change_primary_object_measurement(
board_type, board_name, search_filter)
[docs] def change_primary_drab_measurement(self, board_type, board_name, S_parameter, iglu_id, photodiode_id, channel_id, temp, protocol, units_arr, function_test):
"""
changes the current active primary measurement for a single board measurement
Parameters
----------
board_type: string
specify the board type
board_name: string
the unique identifier of the board
S_parameter: list of strings
specify which S_parameter is used (S11, ...)
iglu_id: string
unique identifier of the iglu used in the drab measurement
channel_id: int
channel of the drab that is measured
temp: int
temperature at which the measurement was performed
protocol: string
details of the testing environment
units_arr: list of strings
list of the input units (only y unit will be saved)
function_test: boolean
describes if the channel is working or not
"""
# find the entry specified by function arguments
search_filter = [{'$match': {'name': board_name}},
{'$unwind': '$measurements'},
{'$match': {'measurements.function_test': function_test,
'measurements.measurement_protocol': protocol,
'measurements.S_parameter': S_parameter,
'measurements.IGLU_id': iglu_id,
'measurements.photodiode_serial': photodiode_id,
'measurements.channel_id': channel_id,
'measurements.measurement_temp': temp,
'measurements.y-axis_units': units_arr
}}]
self.__change_primary_object_measurement(
board_type, board_name, search_filter, channel_id=channel_id)
[docs] def change_primary_surface_measurement(self, board_type, board_name, S_parameter, channel_id, temp, protocol, units_arr, function_test):
"""
changes the current active primary measurement for a single board measurement
Parameters
----------
board_type: string
specify the board type
board_name: string
the unique identifier of the board
S_parameter: list of strings
specify which S_parameter is used (S11, ...)
channel_id: int
channel of the surface board that is measured
temp: int
temperature at which the measurement was performed
protocol: string
details of the testing environment
units_arr: list of strings
list of the input units (only y unit will be saved)
function_test: boolean
describes if the channel is working or not
"""
# find the entry specified by function arguments
search_filter = [{'$match': {'name': board_name}},
{'$unwind': '$measurements'},
{'$match': {'measurements.function_test': function_test,
'measurements.measurement_protocol': protocol,
'measurements.S_parameter': S_parameter,
'measurements.channel_id': channel_id,
'measurements.measurement_temp': temp,
'measurements.y-axis_units': units_arr
}}]
self.__change_primary_object_measurement(
board_type, board_name, search_filter, channel_id=channel_id)
[docs] def change_primary_downhole_measurement(self, board_type, board_name, S_parameter, breakout_id, breakout_cha_id, iglu_id, drab_id, temp, protocol, units_arr, function_test):
"""
changes the current active primary measurement for a single board measurement
Parameters
----------
board_type: string
specify the board type
board_name: string
the unique identifier of the board
S_parameter: list of strings
specify which S_parameter is used (S11, ...)
breakout_id: int
number describing the breakout of the fiber (default: None)
breakout_cha_id: string
string describing the breakout channel of the fiber (default: None)
iglu_id: string
unique identifier of the iglu used in the measurement
drab_id: string
unique identifier of the drab used in the measurement
temp: int
temperature at which the measurement was performed
protocol: string
details of the testing environment
units_arr: list of strings
list of the input units (only y unit will be saved)
function_test: boolean
describes if the channel is working or not
"""
# find the entry specified by function arguments
search_filter = [{'$match': {'name': board_name}},
{'$unwind': '$measurements'},
{'$match': {'measurements.function_test': function_test,
'measurements.measurement_protocol': protocol,
'measurements.S_parameter': S_parameter,
'measurements.IGLU_id': iglu_id,
'measurements.DRAB_id': drab_id,
'measurements.breakout': breakout_id,
'measurements.breakout_channel': breakout_cha_id,
'measurements.measurement_temp': temp,
'measurements.y-axis_units': units_arr
}}]
self.__change_primary_object_measurement(
board_type, board_name, search_filter, breakout_id=breakout_id, breakout_channel_id=breakout_cha_id)
# operation that decommission a object
[docs] def decommission_a_station(self, station_id, decomm_time):
"""
function to decommission an active station in the db
Parameters
----------
station_id: int
the unique identifier of the station
decomm_time: datetime
time which should be used for updating the decommission time
"""
# get the entry of the aktive station
if self.db[self.__station_collection].count_documents({'id': station_id}) == 0:
logger.error(f'No active station {station_id} in the database')
else:
# filter to get all active stations with the correct id
time = self.__current_time
time_filter = [{"$match": {
'commission_time': {"$lte": time},
'decommission_time': {"$gte": time},
'id': station_id}}]
# get all stations which fit the filter (should only be one)
stations = list(self.db[self.__station_collection].aggregate(time_filter))
if len(stations) > 1:
logger.error('More than one active station was found.')
else:
object_id = stations[0]['_id']
# change the commission/decomission time
self.db[self.__station_collection].update_one(
{'_id': object_id}, {'$set': {'decommission_time': decomm_time}})
[docs] def decommission_a_channel(self, station_id, channel_id, decomm_time):
"""
function to decommission an active channel in the db
Parameters
----------
station_id: int
the unique identifier of the station
channel_id: int
the unique identifier of the channel
decomm_time: datetime
time which should be used for updating the decommission time
"""
# get the entry of the aktive station
if self.db[self.__station_collection].count_documents({'id': station_id}) == 0:
logger.error(f'No active station {station_id} in the database')
else:
# filter to get all active stations with the correct id
time = decomm_time
time_filter = [{"$match": {
'commission_time': {"$lte": time},
'decommission_time': {"$gte": time},
'id': station_id}}]
# get all stations which fit the filter (should only be one)
stations = list(self.db[self.__station_collection].aggregate(time_filter))
if len(stations) > 1:
logger.error('More than one active station was found.')
else:
object_id = stations[0]['_id']
# change the decommission time of a specific channel
self.db[self.__station_collection].update_one({'_id': object_id}, {'$set': {'channels.$[updateIndex].decommission_time': decomm_time}},
array_filters=[{"updateIndex.id": channel_id}])
[docs] def decommission_a_device(self, station_id, device_id, decomm_time):
"""
function to decommission an active device in the db
Parameters
----------
station_id: int
the unique identifier of the station
device_id: int
the unique identifier of the device
decomm_time: datetime
time which should be used for updating the decommission time
"""
# get the entry of the active station
if self.db[self.__station_collection].count_documents({'id': station_id}) == 0:
logger.error(f'No active station {station_id} in the database')
else:
# filter to get all active stations with the correct id
time = self.__current_time
time_filter = [{"$match": {
'commission_time': {"$lte": time},
'decommission_time': {"$gte": time},
'id': station_id}}]
# get all stations which fit the filter (should only be one)
stations = list(self.db[self.__station_collection].aggregate(time_filter))
if len(stations) > 1:
logger.error('More than one active station was found.')
else:
object_id = stations[0]['_id']
# change the decommission time of a specific device
self.db[self.__station_collection].update_one({'_id': object_id}, {'$set': {'devices.$[updateIndex].decommission_time': decomm_time}},
array_filters=[{"updateIndex.id": device_id}])