Source code for NuRadioMC.utilities.Veff

import numpy as np
import h5py
from scipy import interpolate
from scipy.interpolate import interp1d
import glob
from six import iteritems
import json
import os
import copy
import time

from NuRadioReco.utilities import units

import logging
logger = logging.getLogger("Veff")
logging.basicConfig()
logger.setLevel(logging.INFO)


# collection of utility function regarding the calculation of the effective volume of a neutrino detector
[docs]def remove_duplicate_triggers(triggered, gids): """ remove duplicate entried from triggered array The hdf5 file contains a line per shower. One event can contain many showers, i.e. if we count all triggeres from all showers we overestimate the effective volume. This function modifies the triggered array such that it contains not more than one True value for each event group. Parameters ---------- triggered: array of bools gids: array of ints the event group ids Returns ------- array of bools the corrected triggered array """ gids = np.array(gids) triggered = np.array(triggered) # shift the integer gids (just within the function) by 0.5 to avoid zeros gids_shifted = gids + 0.5 # triggered gids, 0 where not triggered triggered_gids = triggered*gids_shifted unique_values, unique_indices = np.unique(triggered_gids, return_index=True) # create output boolean array and set True for first triggered indices first_occurences = np.zeros_like(triggered, dtype=bool) np.put(first_occurences, unique_indices, True) # the line above will also put the first untriggered, hence need to require "& triggered" return first_occurences&triggered
[docs]def FC_limits(counts): """ Returns the 68% confidence belt for a number of counts, using the Feldman-Cousins method. Published in Phys. Rev. D 57, 3873, DOI 10.1103/PhysRevD.57.3873 Parameters ---------- counts: integer or float Number of counts. Can be non-integer (weighted counts) Returns ------- (low_limit, upper_limit): float tuple Lower and upper limits for the confidence belt. """ count_list = np.arange(0, 21) lower_limits = [0.00, 0.37, 0.74, 1.10, 2.34, 2.75, 3.82, 4.25, 5.30, 6.33, 6.78, 7.81, 8.83, 9.28, 10.30, 11.32, 12.33, 12.79, 13.81, 14.82, 15.83] upper_limits = [1.29, 2.75, 4.25, 5.30, 6.78, 7.81, 9.28, 10.30, 11.32, 12.79, 13.81, 14.82, 16.29, 17.30, 18.32, 19.32, 20.80, 21.81, 22.82, 23.82, 25.30] if counts > count_list[-1]: return (counts - np.sqrt(counts), counts + np.sqrt(counts)) elif counts < 0: return (0.00, 1.29) low_interp = interp1d(count_list, lower_limits) up_interp = interp1d(count_list, upper_limits) return (low_interp(counts), up_interp(counts))
[docs]def get_Veff_water_equivalent(Veff, density_medium=0.917 * units.g / units.cm ** 3, density_water=1 * units.g / units.cm ** 3): """ convenience function to converte the effective volume of a medium with density `density_medium` to the water equivalent effective volume Parameters ---------- Veff: float or array the effective volume dentity_medium: float (optional) the density of the medium of the Veff simulation (default deep ice) density water: float (optional) the density of water Returns ------- water equivalen effective volume """ return Veff * density_medium / density_water
[docs]def get_veff_output(volume, counts, all_events): v_eff = volume * counts / all_events fc_low, fc_high = FC_limits(counts) if counts: v_eff_error = v_eff / np.sqrt(counts) else: v_eff_error = 0 v_eff_low = volume * fc_low / all_events v_eff_high = volume * fc_high / all_events return [v_eff, v_eff_error, counts, v_eff_low, v_eff_high]
[docs]def get_Veff_Aeff_single( filename, trigger_names, trigger_names_dict, trigger_combinations, deposited, station, veff_aeff="veff", bounds_theta=[0, np.pi]): """ Calculates the effective volume or effective area from surface muons from a single NuRadioMC hdf5 file The effective volume is NOT normalized to a water equivalent. It is also NOT multiplied with the solid angle (typically 4pi). Parameters ---------- filename: string filename of the hdf5 file trigger_names: list of strings list of the trigger names contained in the file trigger_names_dict: dict map from trigger name to index trigger_combinations: dict, optional keys are the names of triggers to calculate. Values are dicts again: * 'triggers': list of strings name of individual triggers that are combined with an OR the following additional options are optional * 'efficiency': dict allows to apply an (analysis) efficiency cut for calculating effective volumes * 'func': function a function that paramaterized the efficiency as a function of SNR (=Vmax/Vrms) * 'channel_ids': array on ints the channels for which the maximum signal amplitude should be determined * 'scale': float rescaling of the efficiency curve by SNR' = SNR * scale * 'n_reflections': int the number of bottom reflections of the ray tracing solution that likely triggered assuming that the solution with the shortest travel time caused the trigger, only considering channel 0 station: int the station that should be considered veff_aeff: string specifiy if the effective volume or the effective area for surface muons is calculated can be * "veff" (default) * "aeff_surface_muons" bounds_theta: list of floats restrict theta to sub-range wrt. the simulated range in the file Note: assumes events were simulated uniformly in cos(theta) bounds_theta should be a (two-item) list, but will care only about the min/max values Returns ------- list of dictionaries Each file is one entry. The dictionary keys store all relevant properties """ if(veff_aeff not in ["veff", "aeff_surface_muons"]): raise AttributeError(f"the paramter `veff_aeff` needs to be one of either `veff` or `aeff_surface_muons`") logger.warning(f"processing file {filename}") fin = h5py.File(filename, 'r') n_events = fin.attrs['n_events'] out = {} Emin = fin.attrs['Emin'] Emax = fin.attrs['Emax'] out['energy'] = 10 ** (0.5 * (np.log10(Emin) + np.log10(Emax))) out['energy_min'] = Emin out['energy_max'] = Emax # calculate effective phimin = fin.attrs['phimin'] phimax = fin.attrs['phimax'] thetamin = fin.attrs['thetamin'] thetamax = fin.attrs['thetamax'] theta_width_file = abs(np.cos(thetamin) - np.cos(thetamax)) # restrict the theta range, if requested if min(bounds_theta) > thetamin: logger.info("restricting thetamin from {} to {}".format(thetamin, min(bounds_theta))) thetamin = min(bounds_theta) if max(bounds_theta) < thetamax: logger.info("restricting thetamax from {} to {}".format(thetamax, max(bounds_theta))) thetamax = max(bounds_theta) # The restriction assumes isotropic event generation in cos(theta) band theta_fraction = abs(np.cos(thetamin) - np.cos(thetamax)) / theta_width_file if theta_fraction < 1: # adjust n_events to account for solid angle fraction in the requested theta range n_events *= theta_fraction if veff_aeff == "veff": volume_proj_area = fin.attrs['volume'] elif veff_aeff == "aeff_surface_muons": area = fin.attrs['area'] # The used area must be the projected area, perpendicular to the incoming # flux, which leaves us with the following correction. Remember that the # zenith bins must be small for the effective area to be correct. volume_proj_area = area * 0.5 * (np.abs(np.cos(thetamin)) + np.abs(np.cos(thetamax))) else: raise AttributeError(f"attributes do neither contain volume nor area") Vrms = fin.attrs['Vrms'] # Solid angle needed for the effective volume calculations out['domega'] = np.abs(phimax - phimin) * np.abs(np.cos(thetamin) - np.cos(thetamax)) out['thetamin'] = thetamin out['thetamax'] = thetamax out['deposited'] = deposited out[veff_aeff] = {} out['n_triggered_weighted'] = {} out['SNRs'] = {} fc_low_0, fc_high_0 = FC_limits(0) v_eff_low_0 = volume_proj_area * fc_low_0 / n_events v_eff_high_0 = volume_proj_area * fc_high_0 / n_events if 'weights' not in fin.keys(): logger.warning(f"file {filename} is empty") for iT, trigger_name in enumerate(trigger_names): out[veff_aeff][trigger_name] = [0, 0, 0, v_eff_low_0, v_eff_high_0] for trigger_name, values in iteritems(trigger_combinations): out[veff_aeff][trigger_name] = [0, 0, 0, v_eff_low_0, v_eff_high_0] return out triggered = np.array(fin['triggered']) if 'trigger_names' in fin.attrs: if np.any(trigger_names != fin.attrs['trigger_names']): if triggered.size == 0 and fin.attrs['trigger_names'].size == 0: logger.warning("file {} has no triggering events. " "Using trigger names from another file".format(filename)) else: error_msg = ("file {} has inconsistent trigger names: " "{}\ncurrent trigger names {}").format( filename, fin.attrs['trigger_names'], trigger_names) logger.error(error_msg) raise AttributeError(error_msg) else: logger.warning(f"file {filename} has no triggering events. " "Using trigger names from a different file: {trigger_names}") weights = np.array(fin['weights']) # if theta range is restricted, select events in that range if theta_fraction < 1: # generate boolean mask for events in fin inside selected theta range mask_theta = np.array((fin['zeniths'] > thetamin) & (fin['zeniths'] < thetamax)) # multiply events with mask, events outside are zero-weighted weights *= mask_theta if triggered.size == 0: for iT, trigger_name in enumerate(trigger_names): out[veff_aeff][trigger_name] = [0, 0, 0, v_eff_low_0, v_eff_high_0] for trigger_name, values in iteritems(trigger_combinations): out[veff_aeff][trigger_name] = [0, 0, 0, v_eff_low_0, v_eff_high_0] return out for iT, trigger_name in enumerate(trigger_names): triggered = np.array(fin['multiple_triggers'][:, iT], dtype=bool) triggered = remove_duplicate_triggers(triggered, fin['event_group_ids']) out[veff_aeff][trigger_name] = get_veff_output(volume_proj_area, np.sum(weights[triggered]), n_events) for trigger_name, values in iteritems(trigger_combinations): indiv_triggers = values['triggers'] triggered = np.zeros_like(fin['multiple_triggers'][:, 0], dtype=bool) if isinstance(indiv_triggers, str): triggered = triggered | \ np.array(fin['multiple_triggers'][:, trigger_names_dict[indiv_triggers]], dtype=bool) else: for indiv_trigger in indiv_triggers: triggered = triggered | \ np.array(fin['multiple_triggers'][:, trigger_names_dict[indiv_trigger]], dtype=bool) if 'triggerAND' in values: triggered = triggered & \ np.array(fin['multiple_triggers'][:, trigger_names_dict[values['triggerAND']]], dtype=bool) if 'notriggers' in values: indiv_triggers = values['notriggers'] if(isinstance(indiv_triggers, str)): triggered = triggered & \ ~np.array(fin['multiple_triggers'][:, trigger_names_dict[indiv_triggers]], dtype=bool) else: for indiv_trigger in indiv_triggers: triggered = triggered & \ ~np.array(fin['multiple_triggers'][:, trigger_names_dict[indiv_trigger]], dtype=bool) if 'min_sigma' in values.keys(): if isinstance(values['min_sigma'], list): if trigger_name not in out['SNR']: out['SNR'][trigger_name] = {} masks = np.zeros_like(triggered) for iS in range(len(values['min_sigma'])): As = np.max(np.nan_to_num(fin['max_amp_ray_solution']), axis=-1) # we use the this quantity because it is always computed before noise is added! As_sorted = np.sort(As[:, values['channels'][iS]], axis=1) # the smallest of the three largest amplitudes max_amplitude = As_sorted[:, -values['n_channels'][iS]] mask = np.sum( As[:, values['channels'][iS]] >= (values['min_sigma'][iS] * Vrms), axis=1) >= values['n_channels'][iS] masks = masks | mask out['SNR'][trigger_name][iS] = max_amplitude[mask] / Vrms triggered = triggered & masks else: As = np.max(np.nan_to_num(fin['max_amp_ray_solution']), axis=-1) # we use the this quantity because it is always computed before noise is added! As_sorted = np.sort(As[:, values['channels']], axis=1) max_amplitude = As_sorted[:, -values['n_channels']] # the smallest of the three largest amplitudes mask = np.sum(As[:, values['channels']] >= (values['min_sigma'] * Vrms), axis=1) >= values['n_channels'] out['SNR'][trigger_name] = As_sorted[mask] / Vrms triggered = triggered & mask if 'ray_solution' in values.keys(): As = np.array(fin['max_amp_ray_solution']) max_amps = np.argmax(As[:, values['ray_channel']], axis=-1) sol = np.array(fin['ray_tracing_solution_type']) mask = np.array([sol[i, values['ray_channel'], max_amps[i]] == values['ray_solution'] for i in range(len(max_amps))], dtype=bool) triggered = triggered & mask if 'n_reflections' in values.keys(): if(np.sum(triggered)): As = np.array(fin[f'station_{station:d}/max_amp_ray_solution']) # find the ray tracing solution that produces the largest amplitude max_amps = np.argmax(np.argmax(As[:,:], axis=-1), axis=-1) # advanced indexing: selects the ray tracing solution per event with the highest amplitude triggered = triggered & (np.array(fin[f'station_{station:d}/ray_tracing_reflection'])[..., max_amps, 0][:, 0] == values['n_reflections']) triggered = remove_duplicate_triggers(triggered, fin['event_group_ids']) v_eff, v_eff_err, count, v_eff_low, v_eff_high = \ get_veff_output(volume_proj_area, np.sum(weights[triggered]), n_events) if 'efficiency' in values.keys() and v_eff > 0: get_efficiency = values['efficiency']['func'] channel_ids = values['efficiency']['channel_ids'] gids = np.array(fin['event_group_ids']) ugids = np.unique(np.array(fin['event_group_ids'])) # calculate the group event ids that triggered ugids_triggered_index = [] ugids_triggered = [] for i_ugid, ugid in enumerate(ugids): mask = ugid == gids if(np.any(triggered[mask])): ugids_triggered_index.append(i_ugid) ugids_triggered.append(ugid) ugids_triggered = np.array(ugids_triggered) ugids_triggered_index = np.array(ugids_triggered_index) n_unique_gids = len(ugids_triggered) sorter = np.argsort(ugids_triggered) max_amplitudes = np.zeros(n_unique_gids) for key in fin.keys(): if key.startswith("station_"): if 'event_group_ids' not in fin[key]: continue # the station might have no triggers sgids = np.array(fin[key]['event_group_ids']) usgids = np.unique(sgids) usgids, comm1, comm2 = np.intersect1d(usgids, ugids_triggered, assume_unique=True, return_indices=True) # select only the gids that triggered common_mask = np.isin(sgids, usgids) sgids = sgids[common_mask] # also reduce gids array to the event groups that triggered if(len(usgids) == 0): # skip stations that don't have any trigger for this trigger combination continue usgids_index = sorter[np.searchsorted(ugids_triggered, usgids, sorter=sorter)] # each station might have multiple triggeres per event group id. We need to select the one # event with the largest amplitude. Let's first check if one event group created more than one event max_amps_per_event_channel = np.nan_to_num(np.array(fin[key]['maximum_amplitudes_envelope'])[common_mask]) max_amps_per_event = np.amax(max_amps_per_event_channel[:, channel_ids], axis=1) # select the maximum amplitude of all considered channels if len(sgids) != len(usgids): # at least one event group created more than one event. Let's calculate it the slow but correct way for sgid in np.unique(sgids): # loop over all event groups which triggered this station if(sgid not in usgids): continue mask_gid = sgid == sgids # select all event that are part of this event group index = np.squeeze(np.argwhere(ugids_triggered == sgid)) max_amplitudes[index] = max(max_amplitudes[index], max_amps_per_event[mask_gid].max()) else: max_amplitudes[usgids_index] = np.maximum(max_amplitudes[usgids_index], max_amps_per_event) if 'scale' in values['efficiency']: max_amplitudes *= values['efficiency']['scale'] if "Vrms" in values['efficiency']: Vrms = values['efficiency']['Vrms'] e = get_efficiency(max_amplitudes / Vrms) # we calculated the maximum amplitudes for all gids, now we select only those that triggered v_eff, v_eff_err, count, v_eff_low, v_eff_high = \ get_veff_output(volume_proj_area, np.sum(weights[triggered] * e), n_events) out[veff_aeff][trigger_name] = [v_eff, v_eff_err, count, v_eff_low, v_eff_high] return out
[docs]def get_Veff_Aeff_single_wrapper(args): return get_Veff_Aeff_single(*args)
[docs]def get_Veff_Aeff(folder, trigger_combinations={}, station=101, veff_aeff="veff", n_cores=1, oversampling_theta=1): """ calculates the effective volume or effective area from surface muons from NuRadioMC hdf5 files the effective volume is NOT normalized to a water equivalent. It is also NOT multiplied with the solid angle (typically 4pi). Parameters ---------- folder: string folder conaining the hdf5 files, one per energy OR filename trigger_combinations: dict, optional keys are the names of triggers to calculate. Values are dicts again: * 'triggers': list of strings name of individual triggers that are combined with an OR the following additional options are optional * 'efficiency': dict allows to apply an (analysis) efficiency cut for calculating effective volumes * 'func': function a function that paramaterized the efficiency as a function of SNR (=Vmax/Vrms) * 'channel_ids': array on ints the channels for which the maximum signal amplitude should be determined * 'scale': float rescaling of the efficiency curve by SNR' = SNR * scale * 'n_reflections': int the number of bottom reflections of the ray tracing solution that likely triggered assuming that the solution with the shortest travel time caused the trigger, only considering channel 0 station: int the station that should be considered veff_aeff: string specifiy if the effective volume or the effective area for surface muons is calculated can be * "veff" (default) * "aeff_surface_muons" n_cores: int the number of cores to use oversampling_theta: int calculate the effective volume for finer binning (<oversampling_theta> data points per file): * 1: no oversampling * >1: oversampling with <oversampling_theta> equal-size cos(theta) bins within thetamin/max of the input file .. Note:: oversampling assumes that events were simulated uniformly in cos(theta) Returns ------- list of dictionaries. Each file is one entry. The dictionary keys store all relevant properties """ trigger_combinations = copy.copy(trigger_combinations) trigger_names = None trigger_names_dict = {} prev_deposited = None deposited = False if os.path.isfile(folder): filenames = [folder] else: filenames = glob.glob(os.path.join(folder, '*.hdf5')) if len(filenames) == 0: raise FileNotFoundError(f"couldnt find any hdf5 file in folder {folder}") filenames = sorted(filenames) for iF, filename in enumerate(filenames): fin = h5py.File(filename, 'r') if 'deposited' in fin.attrs: deposited = fin.attrs['deposited'] if prev_deposited is None: prev_deposited = deposited elif prev_deposited != deposited: raise AttributeError("The deposited parameter is not consistent among the input files!") for iF, filename in enumerate(filenames): fin = h5py.File(filename, 'r') if 'trigger_names' in fin.attrs: trigger_names = fin.attrs['trigger_names'] if len(trigger_names) > 0: for iT, trigger_name in enumerate(trigger_names): trigger_names_dict[trigger_name] = iT logger.info(f"first file with triggernames {filename}: {trigger_names}") break trigger_combinations['all_triggers'] = {'triggers': trigger_names} logger.info(f"Trigger names: {trigger_names}") for key in trigger_combinations: i = -1 for value in trigger_combinations[key]['triggers']: i += 1 if value not in trigger_names: logger.warning(f"trigger {value} not available, removing this trigger from the trigger combination {key}") trigger_combinations[key]['triggers'].pop(i) i -= 1 from multiprocessing import Pool logger.warning(f"running {len(filenames)} jobs on {n_cores} cores") args = [] if oversampling_theta == 1: for f in filenames: args.append([f, trigger_names, trigger_names_dict, trigger_combinations, deposited, station, veff_aeff]) else: # get the thetamin, thetamax from the files and do oversampling logger.info("Calculating effective volumes with finer binning, {} bins per input file".format(oversampling_theta)) for f in filenames: fin = h5py.File(f, 'r') costhetamin = np.cos(fin.attrs['thetamin']) costhetamax = np.cos(fin.attrs['thetamax']) thetas = np.arccos(np.linspace(costhetamin, costhetamax, oversampling_theta + 1)) thetas_min = thetas[:-1] thetas_max = thetas[1:] for bounds_theta in zip(thetas_min, thetas_max): args.append([f, trigger_names, trigger_names_dict, trigger_combinations, deposited, station, veff_aeff, bounds_theta]) if n_cores == 1: output = [] for arg in args: output.append(get_Veff_Aeff_single_wrapper(arg)) return output else: with Pool(n_cores) as p: output = p.map(get_Veff_Aeff_single_wrapper, args) print("output") print(output) return output
[docs]def get_Veff_Aeff_array(data): """ calculates a multi dimensional array of effective volume or effective area from surface muons calculations for fast slicing the array dimensions are (energy, zenith bin, triggername, 5) where the last tuple is the effective volume, its uncertainty, the weighted sum of triggered events, lower 68% uncertainty, upper 68% uncertainty Parameters ---------- data: dict the result of the `get_Veff` function Returns ------- * (n_energy, n_zenith_bins, n_triggernames, 5) dimensional array of floats * array of unique mean energies (the mean is calculated in the logarithm of the energy) * array of unique lower bin edges of energies * array of unique upper bin edges of energies * array of unique zenith bins * array of unique trigger names Examples -------- To plot the full sky effective volume for 'all_triggers' do .. code-block:: output, uenergies, uzenith_bins, utrigger_names, zenith_weights = get_Veff_array(data) fig, ax = plt.subplots(1, 1) tname = "all_triggers" Veff = np.average(output[:,:,get_index(tname, utrigger_names),0], axis=1, weights=zenith_weights) Vefferror = Veff / np.sum(output[:,:,get_index(tname, utrigger_names),2], axis=1)**0.5 ax.errorbar(uenergies/units.eV, Veff/units.km**3 * 4 * np.pi, yerr=Vefferror/units.km**3 * 4 * np.pi, fmt='-o', label=tname) ax.legend() ax.semilogy(True) ax.semilogx(True) fig.tight_layout() plt.show() To plot the effective volume for different declination bands do .. code-block:: python fig, ax = plt.subplots(1, 1) tname = "LPDA_2of4_100Hz" iZ = 9 Veff = output[:,iZ,get_index(tname, utrigger_names)] ax.errorbar(uenergies/units.eV, Veff[:,0]/units.km**3, yerr=Veff[:,1]/units.km**3, label=f"zenith bin {uzenith_bins[iZ][0]/units.deg:.0f} - {uzenith_bins[iZ][1]/units.deg:.0f}") iZ = 8 Veff = output[:,iZ,get_index(tname, utrigger_names)] ax.errorbar(uenergies/units.eV, Veff[:,0]/units.km**3, yerr=Veff[:,1]/units.km**3, label=f"zenith bin {uzenith_bins[iZ][0]/units.deg:.0f} - {uzenith_bins[iZ][1]/units.deg:.0f}") iZ = 7 Veff = output[:,iZ,get_index(tname, utrigger_names)] ax.errorbar(uenergies/units.eV, Veff[:,0]/units.km**3, yerr=Veff[:,1]/units.km**3, label=f"zenith bin {uzenith_bins[iZ][0]/units.deg:.0f} - {uzenith_bins[iZ][1]/units.deg:.0f}") iZ = 10 Veff = output[:,iZ,get_index(tname, utrigger_names)] ax.errorbar(uenergies/units.eV, Veff[:,0]/units.km**3, yerr=Veff[:,1]/units.km**3, label=f"zenith bin {uzenith_bins[iZ][0]/units.deg:.0f} - {uzenith_bins[iZ][1]/units.deg:.0f}") ax.legend() ax.semilogy(True) ax.semilogx(True) fig.tight_layout() plt.show() """ energies = [] energies_min = [] energies_max = [] zenith_bins = [] trigger_names = [] veff_aeff = None for d in data: if(veff_aeff is None): if "veff" in d: veff_aeff = "veff" print(f"data contains effective volume") elif "aeff_surface_muons" in d: veff_aeff = "aeff_surface_muons" print(f"data contains effective area for surface muons") else: print(f"dictionary does neither contain key `veff` nor `aeff_surface_muons`") raise AttributeError(f"dictionary does neither contain key `veff` nor `aeff_surface_muons`") energies.append(d['energy']) energies_min.append(d['energy_min']) energies_max.append(d['energy_max']) zenith_bins.append([d['thetamin'], d['thetamax']]) for triggername in d[veff_aeff]: trigger_names.append(triggername) energies = np.array(energies) energies_min = np.array(energies_min) energies_max = np.array(energies_max) zenith_bins = np.array(zenith_bins) trigger_names = np.array(trigger_names) uenergies = np.unique(energies) uenergies_min = np.unique(energies_min) uenergies_max = np.unique(energies_max) uzenith_bins = np.unique(zenith_bins, axis=0) utrigger_names = np.unique(trigger_names) output = np.zeros((len(uenergies), len(uzenith_bins), len(utrigger_names), 5)) logger.debug(f"unique energies {uenergies}") logger.debug(f"unique zenith angle bins {uzenith_bins/units.deg}") logger.debug(f"unique energies {utrigger_names}") for d in data: iE = np.squeeze(np.argwhere(d['energy'] == uenergies)) iT = np.squeeze(np.argwhere([d['thetamin'], d['thetamax']] == uzenith_bins))[0][0] for triggername, Veff in d[veff_aeff].items(): iTrig = np.squeeze(np.argwhere(triggername == utrigger_names)) output[iE, iT, iTrig] = Veff for d in data: iE = np.squeeze(np.argwhere(d['energy'] == uenergies)) iT = np.squeeze(np.argwhere([d['thetamin'], d['thetamax']] == uzenith_bins))[0][0] return output, uenergies, uenergies_min, uenergies_max, uzenith_bins, utrigger_names
[docs]def get_index(value, array): return np.squeeze(np.argwhere(value == array))
[docs]def export(filename, data, trigger_names=None, export_format='yaml'): """ export effective volumes (or effective areas) into a human readable JSON or YAML file Parameters ---------- filename: string the output filename of the JSON file data: array the output of the `getVeff` function trigger_names: list of strings (optional, default None) save only specific trigger names, if None all triggers are exported export_format: string (default "yaml") specify output format, choose * "yaml" * "json" """ output = [] for i in range(len(data)): tmp_out = {} for key in data[i]: if (key not in ['veffs', 'aeff_surface_muons']): if isinstance(data[i][key], np.generic): tmp_out[key] = data[i][key].item() else: tmp_out[key] = data[i][key] for key in ["veffs", "aeff_surface_muons"]: if(key in data[i]): tmp_out[key] = {} for trigger_name in data[i][key]: if(trigger_names is None or trigger_name in trigger_names): logger.info(trigger_name) tmp_out[key][trigger_name] = [] for value in data[i][key][trigger_name]: tmp_out[key][trigger_name].append(float(value)) output.append(tmp_out) with open(filename, 'w') as fout: if(export_format == 'yaml'): import yaml yaml.dump(output, fout) elif(export_format == 'json'): json.dump(output, fout, sort_keys=True, indent=4)