"""
This module implements an interface for reading LOFAR TBB data.
This module is strongly based on pyCRtools module tbb.py by Pim Schellart, Tobias Winchen, and others.
However, it has been completely re-written for use with LOFAR-LIM
Author: Brian Hare
Definitions:
LOFAR is split into a number of different stations. There are three main types:
#. Core Stations (CS)
#. Remote Stations (RS)
#. international stations
Each station contains 96 low band antennas (LBA) and 48 high band antennas (HBA). Each antenna is dual polarized.
Each station is referred to by its name (e.g. "CS001"), which is a string, or its ID (e.g. 1), which is an integer. In
general, these are different! The mapping, however, is unique and is given in `utilities.py`.
There are a few complications with reading the data.
#. The data from each station is often spread over multiple files
There is a class below that can combine multiple files (even from different stations)
#. It is entirely possible that one file could contain multiple stations
This feature is not used, so I assume that it isn't a problem (for now)
#. Each Station has unknown clock offsets. Technically the core stations are all on one clock, but there are some
unknown cable delays. This is a difficult problem, not handled here
#. Each Antenna doesn't necessarily start reading data at precisely the same time.
The code below picks the latest start time so this problem can be ignored by the end user
#. The software that inserts metadata (namely antenna positions and calibrations) sometimes "forgets" to do its job
The code below will automatically read the metadata from other files when necessary
#. LOFAR is constantly changing
So..keeping code up-to-date and still backwards compatible will be an interesting challenge
#. LOFAR only has 96 RCUs (receiver control units) per station (at the moment).
Each RCU is essentially one digitizer. Each antenna needs two RCS to record both polarizations. The result is only
1/3 of the antennas can be read out each time.
LOFAR keeps track of things with two ways. First, the data is all referred to by its RCUid. 0 is the 0th RCU, ect...
However, which antenna corresponds to which RCU depends on the antennaSet. For LOFAR-LIM the antenna set will
generally be "LBA_OUTER". This could change, and sometimes the antenna set is spelled wrong in the data files. (This
should be handled here though)
In the code below each RCU is referred to by ANTENNA_NAME or antennaID. These are the same thing (I think). They are
however, a misnomer, as they actually refer to the RCU, not antenna. The specific antenna depends on the antenna
set. For the same antenna set, however, the ANTENNA_NAME will always refer to the same antenna.
Each ANTENNA_NAME is a string of 9 digits. First three is the station ID (not name!), next three is the group (no
idea, don't ask), final 3 is the RCU id
For LBA_INNER data set, even RCU ids refer to X-polarized dipoles and odd RCU ids refer to Y-polarized dipoles. This
is flipped for LBA_OUTER antenna set. X-polarization is NE-SW, and Y-polarization is NW-SE. antenna_response.py,
which handles the antenna function, assumes the data is LBA_OUTER.
"""
import datetime
import os
import logging
import h5py
import numpy as np
import NuRadioReco.modules.io.LOFAR.rawTBBio_metadata as md
import NuRadioReco.modules.io.LOFAR.rawTBBio_utilities as util
logger = logging.getLogger('NuRadioReco.LOFAR.rawTBBio')
# nyquist_zone = {'LBA_10_90' : 1, 'LBA_30_90' : 1, 'HBA_110_190' : 2, 'HBA_170_230' : 3, 'HBA_210_250' : 3}
conversion_dict = {
"": 1.0,
"kHz": 1000.0,
"MHz": 10.0 ** 6,
"GHz": 10.0 ** 9,
"THz": 10.0 ** 12,
}
# Helper functions
# The following four functions read what I call "correction files" these are corrections made to improve the data
[docs]def read_antenna_pol_flips(fname):
antennas_to_flip = []
with open(fname) as fin:
for line in fin:
ant_name = line.split()[0]
antennas_to_flip.append(ant_name)
return antennas_to_flip
[docs]def read_bad_antennas(fname):
bad_antenna_data = []
def parse_line_v1(line):
ant_name, pol = line.split()[0:2]
bad_antenna_data.append((ant_name, int(pol)))
def parse_line_v2(line):
ant_name = line.split()[0]
pol = 0
if not util.antName_is_even(ant_name):
ant_name = util.even_antName_to_odd(ant_name)
pol = 1
bad_antenna_data.append((ant_name, pol))
version = 1
with open(fname) as fin:
is_line_0 = True
for file_line in fin:
if is_line_0 and file_line[:2] == "v2":
version = 2
else:
if version == 1:
parse_line_v1(file_line)
elif version == 2:
parse_line_v2(file_line)
if is_line_0:
is_line_0 = False
return bad_antenna_data
[docs]def read_antenna_delays(fname):
additional_ant_delays = {}
def parse_line_v1(line):
ant_name, pol_E_delay, pol_O_delay = line.split()[0:3]
additional_ant_delays[ant_name] = [float(pol_E_delay), float(pol_O_delay)]
def parse_line_v2(line):
ant_name, delay = line.split()[0:2]
pol = 0
if not util.antName_is_even(ant_name):
ant_name = util.even_antName_to_odd(ant_name)
pol = 1
if ant_name not in additional_ant_delays:
additional_ant_delays[ant_name] = [0.0, 0.0]
additional_ant_delays[ant_name][pol] = float(delay)
parse_function = parse_line_v1
with open(fname) as fin:
is_line_0 = True
for file_line in fin:
if is_line_0 and file_line[0] == "v":
if file_line[:2] == "v1":
pass
elif file_line[:2] == "v2":
parse_function = parse_line_v2
else:
parse_function(file_line)
if is_line_0:
is_line_0 = False
return additional_ant_delays
[docs]def read_station_delays(fname):
station_delays = {}
with open(fname) as fin:
for line in fin:
sname, delay = line.split()[0:2]
station_delays[sname] = float(delay)
return station_delays
[docs]def decode_if_needed(input_decode):
if not isinstance(input_decode, str):
return input_decode.decode()
return input_decode
[docs]class TBBData_Dal1:
"""
A class for reading one station from one file.
However, since one station is often spread between different files,
use filePaths_by_stationName combined with :class:`MultiFile_Dal1` below.
"""
def __init__(
self,
filename,
force_metadata_ant_pos=False,
forcemetadata_delays=True,
metadata_dir=None,
):
self.filename = filename
self.metadata_dir = metadata_dir
self.force_metadata_ant_pos = force_metadata_ant_pos
self.forcemetadata_delays = forcemetadata_delays
# open file and set some basic info
self.file = h5py.File(filename, "r")
stationKeys = [s for s in self.file.keys() if s.startswith("Station")]
# assume there is only one station in the file
if len(stationKeys) != 1:
logger.warning(f"File {self.filename} has more then one station")
self.stationKey = stationKeys[0]
self.antennaSet = decode_if_needed(self.file.attrs["ANTENNA_SET"][0])
self.dipoleNames = list(self.file[self.stationKey].keys())
self.StationID = self.file[self.stationKey][self.dipoleNames[0]].attrs[
"STATION_ID"
][0]
self.StationName = util.SId_to_Sname[self.StationID]
# assume all antennas have the same sample frequency
self.SampleFrequency = (
self.file[self.stationKey][self.dipoleNames[0]].attrs[
"SAMPLE_FREQUENCY_VALUE"
][0]
* conversion_dict[
decode_if_needed(self.file[self.stationKey][self.dipoleNames[0]].attrs[
"SAMPLE_FREQUENCY_UNIT"
][0])
]
)
# PyCRTools comparison (testing purposes)
# print(
# self.file.attrs["OBSERVATION_FREQUENCY_MIN"],
# self.file.attrs["OBSERVATION_FREQUENCY_CENTER"],
# self.file.attrs["OBSERVATION_FREQUENCY_MAX"],
# ) # all return 0. ???
# filter selection is typically "LBA_10_90"
self.FilterSelection = decode_if_needed(self.file.attrs["FILTER_SELECTION"][0])
# check that all antennas start in the same second, and record the same number of samples #
self.Time = None
self.DataLengths = np.zeros(len(self.dipoleNames), dtype=int)
self.SampleNumbers = np.zeros(len(self.dipoleNames), dtype=int)
for dipole_i, dipole in enumerate(self.dipoleNames):
if self.Time is None:
self.Time = self.file[self.stationKey][dipole].attrs["TIME"][0]
else:
if self.Time != self.file[self.stationKey][dipole].attrs["TIME"][0]:
raise IOError(
"antennas do not start at same time in " + self.filename
)
self.DataLengths[dipole_i] = self.file[self.stationKey][dipole].attrs[
"DATA_LENGTH"
][0]
self.SampleNumbers[dipole_i] = self.file[self.stationKey][dipole].attrs[
"SAMPLE_NUMBER"
][0]
# get position and delay metadata...maybe
self.have_metadata = (
"DIPOLE_CALIBRATION_DELAY_VALUE"
in self.file[self.stationKey][self.dipoleNames[0]].attrs
)
self.antenna_filter = md.make_antennaID_filter(self.dipoleNames)
# load antenna locations from metadata and from file. IF they are too far apart, then give warning,
# and use metadata
self.ITRF_dipole_positions = md.getItrfAntennaPosition(self.StationName, self.antennaSet, self.metadata_dir)[
self.antenna_filter
] # load positions from metadata file
if self.have_metadata and not self.force_metadata_ant_pos:
use_TBB_positions = True
TBB_ITRF_dipole_positions = np.empty(
(len(self.dipoleNames), 3), dtype=np.double
)
for i, dipole in enumerate(self.dipoleNames):
TBB_ITRF_dipole_positions[i] = self.file[self.stationKey][dipole].attrs[
"ANTENNA_POSITION_VALUE"
]
dif = np.linalg.norm(
TBB_ITRF_dipole_positions[i] - self.ITRF_dipole_positions[i]
)
if dif > 1:
logger.warning(
f"Station {self.StationName} has suspicious antenna locations. Using metadata instead",
)
use_TBB_positions = False
if use_TBB_positions:
self.ITRF_dipole_positions = TBB_ITRF_dipole_positions
self.calibrationDelays = np.zeros(
len(self.dipoleNames), dtype=np.double
) # defined as calibration values in file. Never from external metadata!
if self.have_metadata: # and not self.forcemetadata_delays:
for i, dipole in enumerate(self.dipoleNames):
self.calibrationDelays[i] = self.file[self.stationKey][dipole].attrs[
"DIPOLE_CALIBRATION_DELAY_VALUE"
]
# get the offset, in number of samples, needed so that each antenna starts at the same time #
self.nominal_sample_number = np.max(self.SampleNumbers)
self.sample_offsets = self.nominal_sample_number - self.SampleNumbers
self.nominal_DataLengths = self.DataLengths - self.sample_offsets
[docs] def close_file(self):
self.file.close()
return
# GETTERS #
[docs] def get_station_name(self):
"""returns the name of the station, as a string"""
return self.StationName
[docs] def get_station_ID(self):
"""
returns the ID of the station, as an integer.
This is not the same as StationName. Mapping is given in utilities
"""
return self.StationID
[docs] def get_antenna_names(self):
"""return name of antenna as a list of strings. This is really the RCU id, and the physical antenna depends
on the antennaSet """
return self.dipoleNames
[docs] def get_antenna_set(self):
"""return the antenna set as a string. Typically "LBA_OUTER" """
return self.antennaSet
[docs] def get_sample_frequency(self):
"""gets samples per second. Typically 200 MHz."""
return self.SampleFrequency
[docs] def get_filter_selection(self):
"""return a string that represents the frequency filter used. Typically "LBA_10_90"""
return self.FilterSelection
[docs] def get_timestamp(self):
"""return the POSIX timestamp of the first data point"""
return self.Time
[docs] def get_full_data_lengths(self):
"""get the number of samples stored for each antenna. Note that due to the fact that the antennas do not
start recording at the exact same instant (in general), this full data length is not all usable returns array
of ints """
return self.DataLengths
[docs] def get_all_sample_numbers(self):
"""return numpy array that contains the sample numbers of each antenna. Divide this by the sample frequency
to get time since the timestamp of the first data point. Note that since these are, in general, different,
they do NOT refer to sample 0 of "get_data" in general """
return self.SampleNumbers
[docs] def get_nominal_sample_number(self):
"""return the sample number of the 0th data sample returned by get_data.
Divide by sample_frequency to get time from timestamp of the 0th data sample"""
return self.nominal_sample_number
[docs] def get_nominal_data_lengths(self):
"""return the number of data samples that are usable for each antenna, accounting for different starting
sample numbers. returns array of ints """
return self.nominal_DataLengths
[docs] def get_ITRF_antenna_positions(self, copy=False):
"""returns the ITRF positions of the antennas. Returns a 2D numpy array. If copy is False, then this just
returns the internal array of values """
if copy:
return np.array(self.ITRF_dipole_positions)
else:
return self.ITRF_dipole_positions
[docs] def get_LOFAR_centered_positions(self, out=None):
"""returns the positions (as a 2D numpy array) of the antennas with respect to CS002.
if out is a numpy array, it is used to store the antenna positions, otherwise a new array is allocated"""
return md.convertITRFToLocal(self.ITRF_dipole_positions, self.metadata_dir, out=out)
[docs] def get_timing_callibration_phases(self):
"""only a test function for the moment, do not use"""
fpath = os.path.dirname(self.filename) + "/" + self.StationName
phase_calibration = md.getStationPhaseCalibration(self.StationName, self.antennaSet,
metadata_dir=self.metadata_dir, file_location=fpath)
phase_calibration = phase_calibration[self.antenna_filter]
return phase_calibration
[docs] def get_timing_callibration_delays(self, force_file_delays=False):
"""return the timing calibration of the antennas, as a 1D np array. If not included in the metadata, will look
for a data file in the same directory as this file. Otherwise returns None"""
if (self.have_metadata and not self.forcemetadata_delays) or force_file_delays:
return self.calibrationDelays
else:
fpath = os.path.dirname(self.filename) + "/" + self.StationName
phase_calibration = md.getStationPhaseCalibration(self.StationName, self.antennaSet,
metadata_dir=self.metadata_dir, file_location=fpath)
phase_calibration = phase_calibration[self.antenna_filter]
return md.convertPhase_to_Timing(
phase_calibration, 1.0 / self.SampleFrequency
)
[docs] def get_data(self, start_index, num_points, antenna_index=None, antenna_ID=None):
"""
return the raw data for a specific antenna, as an 1D int16 numpy array, of length num_points.
First point returned is start_index past get_nominal_sample_number().
Specify the antenna by giving the antenna_ID (which
is a string, same as output from get_antenna_names(), or as an integer antenna_index. An antenna_index of 0
is the first antenna in get_antenna_names().
"""
if antenna_index is None:
if antenna_ID is None:
raise LookupError("need either antenna_ID or antenna_index")
antenna_index = self.dipoleNames.index(antenna_ID)
else:
antenna_ID = self.dipoleNames[antenna_index]
initial_point = self.sample_offsets[antenna_index] + start_index
final_point = initial_point + num_points
return self.file[self.stationKey][antenna_ID][initial_point:final_point]
[docs]class MultiFile_Dal1:
"""
A class for reading the data from one station from multiple files
"""
def __init__(
self,
filename_list,
metadata_dir,
force_metadata_ant_pos=False,
polarization_flips=None,
bad_antennas=None,
additional_ant_delays=None,
station_delay=0.0,
only_complete_pairs=True,
pol_flips_are_bad=False,
):
"""
Parameters
----------
filename_list: list
List of filenames for this station for this event
force_metadata_ant_pos : bool, default=False
If True, then load antenna positions from a metadata file and not the raw data file
polarization_flips : list
List of even antennas where it is known that even and odd antenna names are flipped in file. This is
assumed to apply both to data and timing calibration
bad_antennas : list
Antennas that should not be used. Each item in the list is a tuple, first item of tuple is name of even
antenna, second item is a 0 or 1 indicating if even or odd antenna is bad. assumed to be BEFORE antenna flips are accounted for
additional_ant_delays : dict
Each key is name of even antenna, each value is a tuple with additional even and odd antenna delays.
This should rarely be needed. assumed to be found BEFORE antenna flips are accounted for
station_delay : float
A single number that represents the clock offset of this station, as a delay
only_complete_pairs : bool
If True, discards antenna if the other in pair is not present or is bad.
If False, keeps all good antennas with a 'none' value if other antenna in pair is missing
pol_flips_are_bad : bool
If True, antennas that are in pol-flips are included in `bad_antennas`
Notes
-----
This module always defaults to using antenna timing calibration from metadata.
Also, polarization_flips, bad_antennas, additional_ant_delays, and station_delay can now be strings
that are file names. If this is the case, they will be read automatically
"""
self.metadata_dir = metadata_dir
self.files = [
TBBData_Dal1(fname, force_metadata_ant_pos, metadata_dir=self.metadata_dir)
for fname in filename_list
]
if isinstance(polarization_flips, str):
polarization_flips = read_antenna_pol_flips(polarization_flips)
if bad_antennas is None:
bad_antennas = []
elif isinstance(bad_antennas, str):
bad_antennas = read_bad_antennas(bad_antennas)
if isinstance(additional_ant_delays, str):
additional_ant_delays = read_antenna_delays(additional_ant_delays)
if polarization_flips is not None and pol_flips_are_bad:
for even_ant in polarization_flips:
bad_antennas.append((even_ant, 0))
bad_antennas.append((even_ant, 1))
polarization_flips = []
# get some data that should be constant
self.antennaSet = self.files[0].antennaSet
self.StationID = self.files[0].StationID
self.StationName = self.files[0].StationName
self.SampleFrequency = self.files[0].SampleFrequency
self.FilterSelection = self.files[0].FilterSelection
self.Time = self.files[0].Time
self.bad_antennas = bad_antennas
self.odd_pol_additional_timing_delay = (
0.0 # another timing delay to add to all odd-polarized antennas
)
if isinstance(station_delay, str):
station_delay = read_station_delays(station_delay)[self.StationName]
self.station_delay = station_delay
# check consistency of data
for TBB_file in self.files:
if TBB_file.antennaSet != self.antennaSet:
raise IOError(
"antenna set not the same between files for station: "
+ self.StationName
)
if TBB_file.StationID != self.StationID:
raise IOError(
"station ID not the same between files for station: "
+ self.StationName
)
if TBB_file.StationName != self.StationName:
raise IOError(
"station name not the same between files for station: "
+ self.StationName
)
if TBB_file.FilterSelection != self.FilterSelection:
raise IOError(
"filter selection not the same between files for station: "
+ self.StationName
)
if TBB_file.Time != self.Time:
raise IOError(
"antenna set not the same between files for station: "
+ self.StationName
)
# check LBA outer antenna set
if self.antennaSet != "LBA_OUTER":
logger.warning(
f"Antenna set on station {self.StationName} is not LBA_OUTER"
)
# find best files to get antennas from #
# require each antenna shows up once, and even pol is followed by odd pol
self.dipoleNames = []
self.antenna_to_file = (
[]
) # each item is a tuple. First item is file object, second is antenna index in file
unused_antenna_names = []
unused_antenna_to_file = []
bad_PolE_antennas = [ant for ant, pol in bad_antennas if pol == 0]
bad_PolO_antennas = [
ant for ant, pol in bad_antennas if pol == 1
] # note that this is still the name of the even antenna, although it is the ODD antenna that is bad!!!
for TBB_file in self.files:
file_ant_names = TBB_file.get_antenna_names()
for ant_i, ant_name in enumerate(file_ant_names):
if ant_name in self.dipoleNames:
continue
ant_ID = int(ant_name[-3:])
if ant_ID % 2 == 0: # check if antenna is even
if ant_name in bad_PolE_antennas:
continue
odd_ant_name = ant_name[:-3] + str(ant_ID + 1).zfill(3)
if odd_ant_name in unused_antenna_names: # we have the odd antenna
self.dipoleNames.append(ant_name)
self.dipoleNames.append(odd_ant_name)
self.antenna_to_file.append((TBB_file, ant_i))
odd_unused_index = unused_antenna_names.index(odd_ant_name)
self.antenna_to_file.append(
unused_antenna_to_file[odd_unused_index]
)
unused_antenna_names.pop(odd_unused_index)
unused_antenna_to_file.pop(odd_unused_index)
else: # we haven't found the odd antenna, so store info for now
unused_antenna_names.append(ant_name)
unused_antenna_to_file.append((TBB_file, ant_i))
else: # antenna is odd
even_ant_name = ant_name[:-3] + str(ant_ID - 1).zfill(3)
if (
even_ant_name in bad_PolO_antennas
): # note that have to check if EVEN antenna is in bad antenna names...
continue
if (
even_ant_name in unused_antenna_names
): # we have the odd antenna
self.dipoleNames.append(even_ant_name)
self.dipoleNames.append(ant_name)
even_unused_index = unused_antenna_names.index(even_ant_name)
self.antenna_to_file.append(
unused_antenna_to_file[even_unused_index]
)
unused_antenna_names.pop(even_unused_index)
unused_antenna_to_file.pop(even_unused_index)
self.antenna_to_file.append((TBB_file, ant_i))
else: # we haven't found the odd antenna, so store info for now
unused_antenna_names.append(ant_name)
unused_antenna_to_file.append((TBB_file, ant_i))
if not only_complete_pairs:
for ant_name, to_file in zip(unused_antenna_names, unused_antenna_to_file):
ant_ID = int(ant_name[-3:])
if ant_ID % 2 == 0: # check if antenna is even
self.dipoleNames.append(ant_name)
self.antenna_to_file.append(to_file)
self.dipoleNames.append(
ant_name[:-3] + str(ant_ID + 1).zfill(3)
) # add the odd antenna
self.antenna_to_file.append(None) # doesn't exist in a file
else:
self.dipoleNames.append(
ant_name[:-3] + str(ant_ID - 1).zfill(3)
) # add the even antenna
self.antenna_to_file.append(None) # doesn't exist in a file
self.dipoleNames.append(ant_name)
self.antenna_to_file.append(to_file)
self.index_adjusts = np.arange(
len(self.antenna_to_file)
) # used to compensate for polarization flips
# when given an antenna index to open data, use this index instead to open the correct data location
# get sample numbers and offsets and lengths and other related stuff #
self.SampleNumbers = []
self.DataLengths = []
for TBB_file, file_ant_i in self.antenna_to_file:
self.SampleNumbers.append(TBB_file.SampleNumbers[file_ant_i])
self.DataLengths.append(TBB_file.DataLengths[file_ant_i])
self.SampleNumbers = np.array(self.SampleNumbers, dtype=int)
self.DataLengths = np.array(self.DataLengths, dtype=int)
self.nominal_sample_number = np.max(self.SampleNumbers)
self.sample_offsets = self.nominal_sample_number - self.SampleNumbers
self.nominal_DataLengths = self.DataLengths - self.sample_offsets
self.even_ant_pol_flips = None
if polarization_flips is not None:
self.set_polarization_flips(polarization_flips)
self.additional_ant_delays = additional_ant_delays
[docs] def set_polarization_flips(self, even_antenna_names):
"""given a set of names(IDs) of even antennas, flip the data between the even and odd antennas"""
self.even_ant_pol_flips = even_antenna_names
for ant_name in even_antenna_names:
if ant_name in self.dipoleNames:
even_antenna_index = self.dipoleNames.index(ant_name)
self.index_adjusts[even_antenna_index] += 1
self.index_adjusts[even_antenna_index + 1] -= 1
[docs] def set_odd_polarization_delay(self, new_delay):
self.odd_pol_additional_timing_delay = new_delay
[docs] def set_station_delay(self, station_delay):
"""set the station delay, should be a number"""
self.station_delay = station_delay
[docs] def find_and_set_polarization_delay(self, verbose=False, tolerance=1e-9):
fpath = os.path.dirname(self.files[0].filename) + "/" + self.StationName
phase_calibration = md.getStationPhaseCalibration(self.StationName, self.antennaSet,
metadata_dir=self.metadata_dir, file_location=fpath)
all_antenna_calibrations = md.convertPhase_to_Timing(
phase_calibration, 1.0 / self.SampleFrequency
)
even_delays = all_antenna_calibrations[::2]
odd_delays = all_antenna_calibrations[1::2]
odd_offset = odd_delays - even_delays
median_odd_offset = np.median(odd_offset)
if verbose:
print("median offset is:", median_odd_offset)
below_tolerance = np.abs(odd_offset - median_odd_offset) < tolerance
if verbose:
print(
np.sum(below_tolerance),
"antennas below tolerance.",
len(below_tolerance) - np.sum(below_tolerance),
"above.",
)
ave_best_offset = np.average(odd_offset[below_tolerance])
if verbose:
print("average of below-tolerance offset is:", ave_best_offset)
self.set_odd_polarization_delay(-ave_best_offset)
above_tolerance = np.zeros(len(all_antenna_calibrations), dtype=bool)
above_tolerance[::2] = np.logical_not(below_tolerance)
above_tolerance[1::2] = above_tolerance[::2]
above_tolerance = above_tolerance[
md.make_antennaID_filter(self.get_antenna_names())
]
return [AN for AN, AT in zip(self.get_antenna_names(), above_tolerance) if AT]
[docs] def close_file(self):
"""
Properly close all the TBBData_Dal1 files.
"""
for file in self.files:
file.close_file()
# GETTERS
[docs] def get_station_name(self):
"""returns the name of the station, as a string"""
return self.StationName
[docs] def get_station_ID(self):
"""returns the ID of the station, as an integer. This is not the same as StationName. Mapping is given in
utilities """
return self.StationID
[docs] def get_antenna_names(self):
"""return name of antenna as a list of strings. This is really the RCU id, and the physical antenna depends
on the antennaSet """
return self.dipoleNames
[docs] def has_antenna(self, antenna_name):
"""if only_complete_pairs is False, then we could have antenna names without the data. Return True if we
actually have the antenna, False otherwise. Account for polarization flips. """
if antenna_name in self.dipoleNames:
index = self.index_adjusts[self.dipoleNames.index(antenna_name)]
if self.antenna_to_file[index] is None:
return False
else:
return True
else:
return False
[docs] def get_antenna_set(self):
"""return the antenna set as a string. Typically "LBA_OUTER" """
return self.antennaSet
[docs] def get_sample_frequency(self):
"""gets samples per second. Typically 200 MHz."""
return self.SampleFrequency
[docs] def get_filter_selection(self):
"""return a string that represents the frequency filter used. Typically "LBA_10_90"""
return self.FilterSelection
[docs] def get_timestamp(self):
"""return the POSIX timestamp of the first data point"""
return self.Time
[docs] def get_timestamp_as_datetime(self):
"""return the POSIX timestamp of the first data point as a python datetime localized to UTC"""
return datetime.datetime.fromtimestamp(
self.get_timestamp(), tz=datetime.timezone.utc
)
[docs] def get_full_data_lengths(self):
"""get the number of samples stored for each antenna. Note that due to the fact that the antennas do not
start recording at the exact same instant (in general), this full data length is not all usable returns array
of ints """
return self.DataLengths
[docs] def get_all_sample_numbers(self):
"""return numpy array that contains the sample numbers of each antenna. Divide this by the sample frequency
to get time since the timestamp of the first data point. Note that since these are, in general, different,
they do NOT refer to sample 0 of "get_data" """
return self.SampleNumbers
[docs] def get_nominal_sample_number(self):
"""return the sample number of the 0th data sample returned by get_data.
Divide by sample_frequency to get time from timestamp of the 0th data sample"""
return self.nominal_sample_number
[docs] def get_nominal_data_lengths(self):
"""return the number of data samples that are usable for each antenna, accounting for different starting
sample numbers. returns array of ints """
return self.nominal_DataLengths
[docs] def get_ITRF_antenna_positions(self, out=None):
"""returns the ITRF positions of the antennas. Returns a 2D numpy array.
if out is a numpy array, it is used to store the antenna positions, otherwise a new array is allocated.
Does not account for polarization flips, but shouldn't need too."""
if out is None:
out = np.empty((len(self.dipoleNames), 3))
for ant_i, (TBB_file, station_ant_i) in enumerate(self.antenna_to_file):
out[ant_i] = TBB_file.ITRF_dipole_positions[station_ant_i]
return out
[docs] def get_LOFAR_centered_positions(self, out=None):
"""returns the positions (as a 2D numpy array) of the antennas with respect to CS002.
if out is a numpy array, it is used to store the antenna positions, otherwise a new array is allocated.
Does not account for polarization flips, but shouldn't need too."""
if out is None:
out = np.empty((len(self.dipoleNames), 3))
md.convertITRFToLocal(self.get_ITRF_antenna_positions(), self.metadata_dir, out=out)
return out
[docs] def get_timing_callibration_phases(self):
"""only a test function for the moment, do not use"""
out = [None for _ in range(len(self.dipoleNames))]
for TBB_file in self.files:
ret = TBB_file.get_timing_callibration_phases()
if ret is None:
return None
for ant_i, (TBB_fileA, station_ant_i) in enumerate(self.antenna_to_file):
if TBB_fileA is TBB_file:
out[ant_i] = ret[station_ant_i]
return np.array(out)
[docs] def get_timing_callibration_delays(self, out=None, force_file_delays=False):
"""
return the timing calibration of the antennas, as a 1D np array.
If not included in the metadata, will look for a data file in the same directory as this file.
Otherwise returns None. If out is a numpy
array, it is used to store the antenna delays, otherwise a new array is allocated. This takes polarization
flips, and additional_ant_delays into account (assuming that both were found BEFORE the pol flip was found).
Also can account for a timing difference between even and odd antennas, if it is set. """
if out is None:
out = np.zeros(len(self.dipoleNames))
for TBB_file in self.files:
ret = TBB_file.get_timing_callibration_delays(force_file_delays)
if ret is None:
return None
for ant_i, adjust_i in enumerate(self.index_adjusts):
TBB_fileA, station_ant_i = self.antenna_to_file[adjust_i]
if TBB_fileA is TBB_file:
out[ant_i] = ret[station_ant_i]
if self.additional_ant_delays is not None:
# additional_ant_delays stores only even antenna names for historical reasons. so we need to be
# clever here
antenna_polarization = 0 if (ant_i % 2 == 0) else 1
even_ant_name = self.dipoleNames[ant_i - antenna_polarization]
if even_ant_name in self.additional_ant_delays:
if even_ant_name in self.even_ant_pol_flips:
antenna_polarization = int(not antenna_polarization)
out[ant_i] += self.additional_ant_delays[even_ant_name][
antenna_polarization
]
out[1::2] += self.odd_pol_additional_timing_delay
return out
[docs] def get_total_delays(self, out=None):
"""Return the total delay for each antenna, accounting for all antenna delays, polarization delay,
station clock offsets, and trigger time offsets (nominal sample number). This function should be preferred
over 'get_timing_callibration_delays', but the offsets can have a large average. It is recommended to pick one
antenna (on your reference station) and use it as a reference antenna so that it has zero timing delay. Note:
this creates two definitions of T=0. I will call 'uncorrected time' is when the result of this function is
used as-is, and a reference antenna is not chosen. (IE, the reference station can have a large total_delay
offset), 'corrected time' will be otherwise. """
delays = self.get_timing_callibration_delays(out)
delays += self.station_delay - self.get_nominal_sample_number() * 5.0e-9
return delays
[docs] def get_time_from_second(self, out=None):
"""return the time (in units of seconds) since the second of each antenna (which should be get_timestamp).
accounting for delays. This is literally just the opposite of get_total_delays """
out = self.get_total_delays(out)
out *= -1
return out
[docs] def get_geometric_delays(self, source_location, out=None, antenna_locations=None):
"""
Calculate travel time from a XYZ location to each antenna.
out can be an array of length equal to number
of antennas. antenna_locations is the table of antenna locations, given by get_LOFAR_centered_positions(). If
None, it is calculated. Note that antenna_locations CAN be modified in this function. If antenna_locations is
less then all antennas, then the returned array will be correspondingly shorter. The output of this function
plus??? get_total_delays plus emission time of the source is the time the source is seen on each antenna.
"""
if antenna_locations is None:
antenna_locations = self.get_LOFAR_centered_positions()
if out is None:
out = np.empty(len(antenna_locations), dtype=np.double)
if len(out) != len(antenna_locations):
logger.error("Arrays are not of same length in geometric_delays()")
return None
antenna_locations -= source_location
antenna_locations *= antenna_locations
np.sum(antenna_locations, axis=1, out=out)
np.sqrt(out, out=out)
out /= util.v_air
return out
[docs] def get_data(self, start_index, num_points, antenna_index=None, antenna_ID=None):
"""
return the raw data for a specific antenna, as an 1D int16 numpy array, of length num_points.
First point returned is start_index past get_nominal_sample_number(). Specify the antenna by giving the antenna_ID (which
is a string, same as output from get_antenna_names()) or as an integer antenna_index. An antenna_index of 0
is the first antenna in get_antenna_names().
"""
if antenna_index is None:
if antenna_ID is None:
raise LookupError("need either antenna_ID or antenna_index")
antenna_index = self.dipoleNames.index(antenna_ID)
antenna_index = self.index_adjusts[
antenna_index
] # in case of polarization flips
initial_point = self.sample_offsets[antenna_index] + start_index
final_point = initial_point + num_points
to_file = self.antenna_to_file[antenna_index]
if to_file is None:
raise LookupError("do not have data for this antenna")
TBB_file, station_antenna_index = to_file
antenna_ID = self.dipoleNames[antenna_index]
if final_point > len(TBB_file.file[TBB_file.stationKey][antenna_ID]):
print(
"WARNING! data point",
final_point,
"is off end of file",
len(TBB_file.file[TBB_file.stationKey][antenna_ID]),
)
return TBB_file.file[TBB_file.stationKey][antenna_ID][initial_point:final_point]