Source code for NuRadioReco.detector.generic_detector

import logging
import json
from tinydb import Query
from tinydb_serialization import SerializationMiddleware
import NuRadioReco.detector.detector_base
from NuRadioReco.detector.detector_base import DateTimeSerializer
import copy

logger = logging.getLogger('NuRadioReco.genericDetector')

serialization = SerializationMiddleware()
serialization.register_serializer(DateTimeSerializer(), 'TinyDate')


[docs]class GenericDetector(NuRadioReco.detector.detector_base.DetectorBase): """ Used the same way as the main detector class, but works with incomplete detector descriptions. The user can define a default station. If any property is missing from one of the other station, the value from the default station will be used instead. If no channels are specified for a station, the channels from the default station will be used instead. For cases when the station design has a lot of channels, a default channel can also be defined. The default channel has to be part of the default station. It works the same way as the default station: Any property missing from one of the other channels will be taken from the default channel. The GenericDetector also ignores commission and decommission times and should therefore not be used for real data, but only for simulation studies. This detector only accepts json detector descriptions or dictionary. """ def __init__(self, json_filename, default_station=None, default_channel=None, default_device=None, source='json', dictionary=None, log_level=logging.WARNING, assume_inf=True, antenna_by_depth=False): """ Initialize the stations detector properties. Parameters ---------- json_filename : str the path to the json detector description file (if first checks a path relative to this directory, then a path relative to the current working directory of the user) default value is 'ARIANNA/arianna_detector_db.json' default_station: This option is deprecated. ONLY in case no 'reference_station' is set in the station parameters of the detector description, the 'default_station' is used as reference for the time being. ID of the station that should be used as the default station. The default station needs to have a complete detector description. If a property is missing in any of the other stations, the value from the default station will be used instead. default_channel: This option is deprecated. ONLY in case no 'reference_channel' is set in the channel parameters of the detector description, the 'default_channel' is used as reference for the time being. ID of the channel that should be used as the default channel. This channel has to be part of the reference station and have a complete detector description. If a property is missing in any of the other channels, the value from the default channel will be used instead. default_device: This option is deprecated. ONLY in case no 'reference_device' is set in the device parameters of the detector description, the 'default_device' is used as reference for the time being. ID of the device that should be used as the default device. This channel has to be part of the reference station and have a complete detector description. If a property is missing in any of the other devices, the value from the default device will be used instead. source: str 'json' or 'dictionary' default value is 'json' If 'json' is passed, the JSON dictionary at the location specified by json_filename will be used If 'dictionary' is passed, the dictionary specified by the parameter 'dictionary' will be used dictionary: dict If 'dictionary' is passed to the parameter source, the dictionary passed to this parameter will be used for the detector description. assume_inf : Bool Default to True, if true forces antenna models to have infinite boundary conditions, otherwise the antenna madel will be determined by the station geometry. antenna_by_depth: bool (default False) if True the antenna model is determined automatically depending on the depth of the antenna. This is done by appending e.g. '_InfFirn' to the antenna model name. if False, the antenna model as specified in the database is used. """ self.__logger = logging.getLogger('NuRadioReco.genericDetector') self.__logger.setLevel(log_level) if (default_station is None) and (default_channel is None) and (default_device is None): # load detector super(GenericDetector, self).__init__(source=source, json_filename=json_filename, dictionary=dictionary, assume_inf=assume_inf, antenna_by_depth=antenna_by_depth) else: if source == "json": # load json as dictionary and pass that to the detector, this is needed in order to not overwrite the # json when updating the table to include reference_station/channel/device with open(json_filename, "r") as json_input: dictionary = json.load(json_input) super(GenericDetector, self).__init__(source="dictionary", json_filename=None, dictionary=dictionary, assume_inf=assume_inf, antenna_by_depth=antenna_by_depth) if default_station is not None: logger.warning( "DeprecationWarning: replace default_station by setting a 'reference_station' for each station in " "the detector description. This allows to define multiple default station types") # fill default station info into 'reference_station' field for all stations in detector for sta in self._stations: if 'reference_station' in sta.keys(): logger.warning( f"Station already has a reference station {sta['reference_station']}. Ignoring deprecated " f"'default_station'") else: logger.warning( f"Setting deprecated 'default_station' as reference station ({default_station}) " f"as requested") Station = Query() self._stations.update({'reference_station': default_station}, (Station.station_id == sta["station_id"])) if default_channel is not None: logger.warning( "DeprecationWarning: replace default_channel by setting a 'reference_channel' for each channel in " "the detector description. This allows to define multiple default channel types") # fill default channel info into 'reference_channel' field for all channels in detector for chan in self._channels: if 'reference_channel' in chan.keys(): logger.warning( f"Channel already has a reference channel {chan['reference_channel']}. Ignoring " f"deprecated 'default_channel'") else: logger.warning( f"Setting deprecated 'default_channel' as reference channel ({default_channel}) " f"as requested") Channel = Query() self._channels.update({'reference_channel': default_channel}, (Channel.station_id == chan["station_id"]) & ( Channel.channel_id == chan["channel_id"])) if default_device is not None: logger.warning( "DeprecationWarning: replace default_device by setting a 'reference_device' for each device in " "the detector description. This allows to define multiple default device types") # fill default device info into 'reference_device' field for all devices in detector for dev in self._buffered_devices: if 'reference_device' in dev.keys(): logger.warning( f"Device already has a reference device {dev['reference_device']}. Ignoring deprecated " f"'default_device'") else: logger.warning( f"Setting deprecated 'default_device' as reference device ({default_device}) as requested") Device = Query() self._devices.update({'reference_device': default_device}, (Device.station_id == dev["station_id"]) & ( Device.device_id == dev["device_id"])) self.__default_station = default_station # TODO maybe these dicts/lists can be omitted # a lookup with one reference station for each station in the detector description self.__lookuptable_reference_station = {} self.__reference_station_ids = [] self.__reference_device_ids = {} self.__reference_channel_ids = {} # TODO maybe these dicts.lists can be omitted # add all stations to the lookup for sta in self._stations.all(): self._update_reference_station_lookup(sta) self.__reference_stations = {} Station = Query() for reference_station_id in self.__reference_station_ids: self.__reference_stations[reference_station_id] = self._stations.get( (Station.station_id == reference_station_id)) self.__station_changes_for_event = [] self.__run_number = None self.__event_id = None # check if all reference stations, reference_channels, and reference_devices are present Station = Query() for sta in self._stations.all(): if "reference_station" in sta: ref = self._stations.get( (Station.station_id == sta["reference_station"])) if ref is None: raise ValueError( 'The reference station {} was not found in the detector description'.format( sta["reference_station"])) Channel = Query() for chan in self._channels.all(): if "reference_channel" in chan: ref = self._channels.get( (Channel.station_id == chan["station_id"]) & (Channel.channel_id == chan["reference_channel"])) if ref is None: # Check if reference channel sits in reference station reference_station_id = self.__lookuptable_reference_station[chan["station_id"]] ref = self._channels.get( (Channel.station_id == reference_station_id) & (Channel.channel_id == chan["reference_channel"]) ) if ref is None: raise ValueError( 'The reference channel {} of station {} was not found in the detector description'.format( chan["reference_channel"], chan["station_id"])) Device = Query() for dev in self._devices.all(): if "reference_device" in dev: ref = self._devices.get( (Device.station_id == dev["station_id"]) & (Device.channel_id == dev["reference_device"])) if ref is None: raise ValueError( 'The reference device {} of station {} was not found in the detector description'.format( dev["reference_device"], dev["station_id"])) def _get_station(self, station_id): if station_id not in self._buffered_stations.keys(): self._buffer(station_id) res = copy.copy(self._buffered_stations[station_id]) if self.__run_number is not None and self.__event_id is not None: for change in self.__station_changes_for_event: if change['station_id'] == station_id and \ change['run_number'] == self.__run_number and \ change['event_id'] == self.__event_id: for name, value in change['properties'].items(): res[name] = value return res def _query_station(self, station_id, raw=False): Station = Query() res = self._stations.get((Station.station_id == station_id)) if res is None and not raw: logger.error("query for station {} returned no results".format(station_id)) raise LookupError("query for station {} returned no results".format(station_id)) if not raw: if "reference_station" in res.keys(): ref = self._stations.get((Station.station_id == res["reference_station"])) for key in ref.keys(): if key not in res.keys(): # if a property is missing, we use the value from the reference station instead res[key] = ref[key] return res def _query_channels(self, station_id, raw=False): Station = Query() sta = self._stations.get((Station.station_id == station_id)) reference_station_id = station_id if "reference_station" in sta: reference_station_id = sta["reference_station"] Channel = Query() res = self._channels.search((Channel.station_id == station_id)) if not raw: # if there are NO channels in this station defined, look up devices from the reference station if len(res) == 0: reference_channels = self._channels.search((Channel.station_id == reference_station_id)) res = [] for channel in reference_channels: new_channel = copy.copy(channel) new_channel['station_id'] = station_id res.append(new_channel) # now we look if there are reference fields to fill. Will use reference_station_id, which is either the # station or the reference for channel in res: if 'reference_channel' in channel: # add to dictionary to keep track of reference channels TODO this is not really needed? self.__reference_channel_ids[(station_id, channel['channel_id'])] = channel['reference_channel'] # there is a reference, so we have to get it ref_chan = self._channels.get( (Channel.station_id == reference_station_id) & ( Channel.channel_id == channel['reference_channel'])) for key in ref_chan.keys(): if key not in channel.keys() and key != 'station_id' and key != 'channel_id': channel[key] = ref_chan[key] return res def _query_devices(self, station_id, raw=False): # if the station has a reference, take this one to take the devices from Station = Query() sta = self._stations.get((Station.station_id == station_id)) reference_station_id = station_id if "reference_station" in sta: reference_station_id = sta["reference_station"] Device = Query() res = self._devices.search((Device.station_id == station_id)) if not raw: # if there are NO devices in this station defined, look up devices from the reference station if len(res) == 0: reference_devices = self._devices.search((Device.station_id == reference_station_id)) res = [] for device in reference_devices: new_device = copy.copy(device) new_device['station_id'] = station_id res.append(new_device) # now we look if there are reference fields to fill. Will use reference_station_id, which is either the # station or the reference for device in res: if 'reference_device' in device: # add to dictionary to keep track of reference devices TODO this is not really needed? self.__reference_device_ids[(station_id, device['device_id'])] = device['reference_device'] # there is a reference, so we have to get it ref_dev = self._devices.get( (Device.station_id == reference_station_id) & (Device.device_id == device['reference_device'])) for key in ref_dev.keys(): if key not in device.keys() and key != 'station_id' and key != 'device_id': device[key] = ref_dev[key] return res def _buffer(self, station_id): self._buffered_stations[station_id] = self._query_station(station_id) channels = self._query_channels(station_id) self._buffered_channels[station_id] = {} for channel in channels: self._buffered_channels[station_id][channel['channel_id']] = channel devices = self._query_devices(station_id) self._buffered_devices[station_id] = {} for device in devices: self._buffered_devices[station_id][device['device_id']] = device def _update_reference_station_lookup(self, station_dict): """ add a station to the lookup of reference stations and update the list of default station ids """ default_ids = set(self.__reference_station_ids) if 'reference_station' in station_dict: self.__lookuptable_reference_station[station_dict['station_id']] = station_dict['reference_station'] default_ids.add(station_dict['reference_station']) else: self.__lookuptable_reference_station[station_dict['station_id']] = None self.__reference_station_ids = list(default_ids)
[docs] def add_generic_station(self, station_dict): """ Add a generic station to the detector. The station is treated like a generic station in the original detector description file, i.e. all missing properties and all channels will be taken from the reference station. If a station with the same ID already exists, this function does nothing. Parameters ---------- station_dict: dictionary dictionary containing the station properties. Needs to at least include a station_id, any other missing parameters will be taken from the reference station """ if "reference_station" not in station_dict and self.__default_station is not None: logger.warning( 'DeprecationWarning: Generating a station via `add_generic_station` that has no "reference_station" ' 'specified.') logger.warning( f'DeprecationWarning: Taking the deprecated "default_station" ({self.__default_station}) as ' f'"reference_station".') station_dict["reference_station"] = self.__default_station if station_dict['station_id'] in self._buffered_stations.keys(): logger.warning('Station with ID {} already exists in buffer. Cannot add station with same ID'.format( station_dict['station_id'])) return if station_dict['station_id'] not in self.__lookuptable_reference_station: self._update_reference_station_lookup(station_dict) reference_station_id = self.__lookuptable_reference_station[station_dict['station_id']] for key in self.__reference_stations[reference_station_id].keys(): if key not in station_dict.keys(): station_dict[key] = self.__reference_stations[reference_station_id][key] self._buffered_stations[station_dict['station_id']] = station_dict if reference_station_id not in self._buffered_channels.keys(): self._buffer(reference_station_id) self._buffered_channels[station_dict['station_id']] = {} for i_channel, channel in self._buffered_channels[reference_station_id].items(): new_channel = copy.copy(channel) new_channel['station_id'] = station_dict['station_id'] self._buffered_channels[station_dict['station_id']][channel['channel_id']] = new_channel self._buffered_devices[station_dict['station_id']] = {} for i_device, device in self._buffered_devices[reference_station_id].items(): new_device = copy.copy(device) new_device['station_id'] = station_dict['station_id'] self._buffered_devices[station_dict['station_id']][device['device_id']] = new_device
[docs] def add_station_properties_for_event(self, properties, station_id, run_number, event_id): """ Adds an entry to the list of event-specific changes to the detector description. Parameters ---------- properties: dictionary Dictionary of the properties that should be changed, with keys being any of the property names in the detector description and values the values that these properties should be changed to station_id: integer ID of the station whose properties should be changed run_number: integer Run number of the event for which the changes are valid event_id: integer Event ID of the event for which the changes are valid """ self.__station_changes_for_event.append({ 'run_number': run_number, 'event_id': event_id, 'station_id': station_id, 'properties': properties })
[docs] def get_station_properties_for_event(self, run_number, event_id, station_id=None): """ Returns all event-specific changes that have been stored in the detector description for a given station and event Parameters ---------- run_number: integer Run number of the event for which the changes should be returned event_id: integer Event ID of the event for which the changes should be returned station_id: integer or None ID of the station for which the changes should be returned If station_id is None, changes for all stations are returned """ changes = [] for change in self.__station_changes_for_event: if change['run_number'] == run_number and change['event_id'] == event_id: if station_id is None or change['station_id'] == station_id: changes.append(change) return changes
[docs] def set_event(self, run_number, event_id): """ Sets the run number and event ID for which the detector description should be returned. This is needed if event-specific changes to the detector description have been stored. If run_number or event_id are not set (or are set to None), event-specific changes to the detector will be ignored Parameters ---------- run_number: integer Run number of the event the detector should be set to event_id: integer ID of the event the detector should be set to """ self.__run_number = run_number self.__event_id = event_id
[docs] def get_reference_station(self, station_id): """ Get the properties of the reference station """ return self.__reference_stations[station_id]
[docs] def get_reference_station_id(self, station_id): """ Get the properties of the reference station """ return self.__reference_station_ids[station_id]
[docs] def get_reference_stations(self): """ Get the properties of the reference stations """ return self.__reference_stations
[docs] def get_default_station(self): """ Get the properties of the default station """ return self.__reference_stations[self.get_default_station_id()]
[docs] def get_reference_station_ids(self): """ Get the whole dictionary of reference stations """ return self.__reference_station_ids
[docs] def get_default_station_id(self): """ Get the ID of the default station """ if len(self.__reference_station_ids) == 1: # only one default station, either passed as "reference_station" in detector description or in the ini return self.__reference_station_ids[0] else: # more than one "reference_station" passed in the detector description, return the first one logger.warning( f'more than one station id set as "reference station": {self.__reference_station_ids},\ continue with first entry: {self.__reference_station_ids[0]}') return self.__reference_station_ids[0]
[docs] def get_default_channel(self): """ Get the properties of the default channel """ logger.warning("The use of 'default_channel' is deprecated. returning None") return None # self.__default_channel
[docs] def get_default_channel_id(self): """ Get the ID of the default channel """ logger.warning("The use of 'default_channel' is deprecated. returning None") return None # self.__default_channel_id
[docs] def get_raw_station(self, station_id): """ Get the properties of a station as they are in the original detector description, i.e. without missing properties being replaced by those from the default station. Event-specific changes are also ignored. Parameters ---------- station_id: integer ID of the requested station """ if station_id in self._buffered_stations.keys(): return self._buffered_stations[station_id] station = self._query_station(station_id, True) if station is None: return { "station_id": station_id } else: return station
[docs] def get_raw_channel(self, station_id, channel_id): """ Get the properties of a channel as they are in the original detector description, i.e. without missing properties being replaced by those from the default channel. Parameters ---------- station_id: integer ID of the requested channel's station channel_id: integer ID of the requested channel """ if station_id in self._buffered_channels.keys(): if channel_id in self._buffered_channels[station_id].keys(): return self._buffered_channels[station_id][channel_id] channels = self._query_channels(station_id, True) for channel in channels: if channel['channel_id'] == channel_id: return channel return None
[docs] def has_station(self, station_id): if station_id in self._buffered_stations.keys(): return True Station = Query() res = self._stations.get(Station.station_id == station_id) return res is not None
# overwrite update function to do nothing
[docs] def update(self, time): return