Source code for NuRadioReco.utilities.minimization

import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool


[docs] class Minimizer: """ Class for radio signal reconstruction and general minimization tasks. This class unifies the interfaces of different minimization algorithms like SciPy and Minuit. The class has two "modes". If no signal_function is provided, it behaves like a standard minimizer, which minimizes an objective function with respect to its parameters. If a signal_function is provided, the class behaves like a minimizer specialized for fitting radio signals to traces. In this case, the signal_function should take the parameters as input and return the expected signal in a number of channels. This is then compared to data traces in the objective_function, which should take the data traces and the signal traces as input. When a minimization is run, the objective function is then minimized with respect to the parameters of the signal_function. This class implements additional functionality, such as user-defined scaling of the parameters, which can improve the stability of the minimization process. Parameters ---------- objective_function : function If no signal_function is provided, this is a function which takes a number of parameters (as one numpy.ndarray) as input and returns the objective value to minimize, e.g., a minus two log likelihood or chi-square, i.e., objective_function(parameters). If a signal_function is provided, the objective_function takes the the output of the signal_function and the data as input and returns the objective value to minimize, i.e., objective_function(data, signal). parameters_initial : numpy.ndarray Values of parameters for initialization of the minimization with dimensions [n_parameters] parameters_bounds : numpy.ndarray Upper and lower bounds on parameters in the minimization. Should have dimensions [2,n_parameters] signal_function : function (optional) Function which takes a numpy array with parameter values as input (i.e., signal_function(parameters)) and returns a signal. The type and shape of the output depends on what happens in the user-defined objective_function, but it could be a numpy array with shape [n_antennas, n_samples] containing a radio signal in a number channels, or a list of channels. normalization : numpy.ndarray (optional) Set normalization factors for the parameters before fitting. Each parameter is divided by this value internally, which can help with minimizer stability when parameters have different scales. This can, e.g., be set to the natural units of a problem (PeV, km, deg, ...) or to the expected/estimated uncertainties on the parameters. Should be a numpy array with length n_parameters. If set to None, a normalization of 1 (i.e., no normalization will be used). fixed : list(bool) Wheter to fix some of the parameters in the optimization. Should be a list of bools with length n_parameters. If left as None, all parameters will be free in the optimization. save_history : bool (optional) Whether to save the history of the parameters in the minimization process. This should only be used for debugging as it can create very large arrays and make the minimization slow. debug : bool (optional) Whether to print debug information during the minimization process. """ def __init__(self, objective_function, parameters_initial, parameters_bounds, normalization=None, fixed=None, signal_function=None, save_history=False, debug=False): self.data = None self.signal_function = signal_function self.objective_function = objective_function self.parameters_initial = parameters_initial self.n_parameters = len(parameters_initial) self.parameters_bounds = parameters_bounds self.fixed = np.zeros(self.n_parameters,dtype=bool) self.normalization = normalization if normalization is not None else np.ones(self.n_parameters) self.n_function_calls = 0 self.save_history = save_history if self.save_history: self.history = [] self.debug = debug # Reconstruction results: self.result = None self.parameters = None self.covariance_matrix = None self.history = np.array([parameters_initial]) self.success = None if fixed is not None: self.fix_parameters(fixed) def _function_to_minimize(self, parameters_normalized): parameters_physical = parameters_normalized * self.normalization if self.signal_function is not None: signal = self.signal_function(parameters_physical) self.n_function_calls += 1 result = self.objective_function(self.data, signal) else: self.n_function_calls += 1 result = self.objective_function(parameters_physical) if self.save_history: self.history = np.append(self.history, [parameters_physical], axis = 0) if self.debug: print(f"Function call {self.n_function_calls}: parameters={parameters_physical}, result={result}") return result
[docs] def fix_parameters(self, fixed): assert len(fixed) == self.n_parameters, "Fixed parameters should match number of parameters" self.fixed = fixed
[docs] def run_minimization(self, method, data=None, **method_kwargs): """ Run minimization algorithm for the objective function. If a signal_function is provided, a data array must be provided which the signal is fitted using the objective_function. Parameters ---------- method : str Name of the method used to run the minimization data : numpy.ndarray, any (optional) Should only be provided if a signal_function is set. The type and shape depends on the objective_function, but it could be a numpy array with shape [n_antennas, n_samples] containing data traces to fit the signal to, or a list of channels. """ if self.signal_function is not None: assert data is not None, "Data must be provided for minimization if a signal_function is set" self.data = data if method == "scipy": result_object = self._scipy_minimization(**method_kwargs) elif method == "minuit": result_object = self._minuit_minimization(**method_kwargs) elif method == "noisyopt": result_object = self._noisyopt_minimization(**method_kwargs) elif method == "skopt": result_object = self._skopt_minimization(**method_kwargs) elif method == "simple_minimizer": result_object = self._simple_minimizer(**method_kwargs) else: raise ValueError(f"Unknown minimizer: {method}") return result_object
[docs] def profile_scan_1D(self, method, i_parameter_scan, parameter_grid_x, data=None, profile=True,true_value=None, plot=True, **method_kwargs): """ Perform minimization and a 1D profile (likelihood) scan around the minimum of the i_parameter_scan'th parameter by fixing it to different values on a grid and optimizing the objective function with respect to the other parameters. """ n_x = len(parameter_grid_x) objective_values = np.zeros(n_x) parameters_initial_copy = np.copy(self.parameters_initial) # Get best fit point: self.run_minimization(method=method, data=data, **method_kwargs) best_fit_x = self.parameters[i_parameter_scan] best_fit_value = self.result # Now fix parameters which are being scanned: fixed_copy = np.copy(self.fixed) self.fixed[i_parameter_scan] = True if not profile: self.fixed[:] = True for i_x in range(n_x): self.parameters_initial[i_parameter_scan] = parameter_grid_x[i_x] self.run_minimization(method=method, data=data, **method_kwargs) objective_values[i_x] = self.result if plot: plt.figure(figsize=[4,3]) plt.plot(parameter_grid_x, objective_values-best_fit_value, "b-", label=r"$-2 \Delta LLH$") axis = plt.axis() plt.plot([min(parameter_grid_x), max(parameter_grid_x)], [2,2], ":", label=r"$1\sigma$") plt.plot([min(parameter_grid_x), max(parameter_grid_x)], [4,4], ":", label=r"$2\sigma$") plt.plot([min(parameter_grid_x), max(parameter_grid_x)], [6,6], ":", label=r"$3\sigma$") plt.plot([best_fit_x,best_fit_x], [0,100], "y--", label="Fit") if true_value is not None: plt.axvline(true_value, color='r', linestyle='--', label="True") plt.axis([parameter_grid_x[0], parameter_grid_x[-1], 0, axis[3]*1.2]) plt.xlabel(r"Parameter [au]") plt.ylabel(r"Objective value") plt.legend() plt.tight_layout() # Set initial and fixed parameters back to initial: self.parameters_initial = parameters_initial_copy self.fixed = fixed_copy return objective_values
[docs] def profile_scan_2D(self, method, i_parameter_x, i_parameter_y, parameter_grid_x, parameter_grid_y, data=None, profile=True, true_values=None, plot=True, cmap="Blues_r", vmax=60, **method_kwargs): """ Perform minimization and a 2D profile (likelihood) scan around the minimum of the i_parameter_x'th and i_parameter_y'th parameters by fixing them to different values on a grid and optimizing the objective function with respect to the other parameters. """ n_x = len(parameter_grid_x) n_y = len(parameter_grid_y) objective_values = np.zeros([n_x,n_y]) parameters_initial_copy = np.copy(self.parameters_initial) # Get best fit point: self.run_minimization(method=method, data=data, **method_kwargs) best_fit_x = self.parameters[i_parameter_x] best_fit_y = self.parameters[i_parameter_y] best_fit_value = self.result # Now fix parameters which are being scanned: fixed_copy = np.copy(self.fixed) self.fixed[i_parameter_x] = True self.fixed[i_parameter_y] = True if not profile: self.fixed[:] = True for i in range(n_x): for j in range(n_y): self.parameters_initial[i_parameter_x] = parameter_grid_x[i] self.parameters_initial[i_parameter_y] = parameter_grid_y[j] self.run_minimization(method=method, data=data, **method_kwargs) objective_values[i,j] = self.result if plot: plt.figure(figsize=[4.2,3]) plt.pcolormesh(parameter_grid_x, parameter_grid_y, objective_values.T-best_fit_value, cmap=cmap, vmax=vmax) plt.colorbar(label=r"Objective value") CS = plt.contour(parameter_grid_x, parameter_grid_y, objective_values.T-best_fit_value, levels=[1.15*2, 3.09*2, 5.91*2]) if true_values is not None: plt.plot(true_values[0], true_values[1], "r*",label="True") plt.plot(best_fit_x, best_fit_y, "g*", label="Fit") plt.legend() plt.xlabel(r"Parameter 1 [au]") plt.ylabel(r"Parameter 2 [au]") plt.tight_layout() # Contour labels: fmt = {} strs = [r'$1\sigma$', r'$2\sigma$', r'$3\sigma$'] for l, s in zip(CS.levels, strs): fmt[l] = s # Label every other level using strings plt.clabel(CS, CS.levels[::2], inline=True, fmt=fmt, fontsize=10) # Set initial and fixed parameters back to initial: self.parameters_initial = parameters_initial_copy self.fixed = fixed_copy return objective_values
### Minimization methods: ### def _scipy_minimization(self, tol=1e-3, scipy_method="L-BFGS-B", options={}): import scipy.optimize as opt # Fix parameters: bounds_scipy = np.copy(self.parameters_bounds) / np.array([self.normalization, self.normalization]).T bounds_scipy[self.fixed,0] = self.parameters_initial[self.fixed] / self.normalization[self.fixed] bounds_scipy[self.fixed,1] = self.parameters_initial[self.fixed] / self.normalization[self.fixed] # Perform minimization: result = opt.minimize( self._function_to_minimize, x0 = self.parameters_initial / self.normalization, tol = tol, bounds = bounds_scipy, method = scipy_method, options=options ) # Save results: self.success = result.success self.result = result.fun self.parameters = result.x * self.normalization return result def _minuit_minimization(self, minuit_method = "migrad"): from iminuit import Minuit # Initialze minimizer: m = Minuit( self._function_to_minimize, self.parameters_initial / self.normalization ) # Set bounds: m.limits = self.parameters_bounds / np.array([self.normalization, self.normalization]).T # Fix parameters: for i_param in range(self.n_parameters): if self.fixed[i_param]: m.fixed[i_param] = True # Run minimization: if minuit_method == "migrad": m.migrad() elif minuit_method == "simplex": m.simplex() else: raise ValueError(f"Minuit method {minuit_method} not recognized") # Save results: self.success = m.valid self.result = m.fval self.parameters = np.array(m.values) * self.normalization return m def _noisyopt_minimization(self, deltatol=0.1, paired=False): from noisyopt import minimizeCompass res = minimizeCompass( self._function_to_minimize, bounds = self.parameters_bounds / np.array([self.normalization, self.normalization]).T, x0 = self.parameters_initial / self.normalization, deltatol = deltatol, paired = paired) self.success = res.success self.result = res.fun self.parameters = np.array(res.x) * self.normalization return res def _skopt_minimization(self, n_calls=1000, n_initial_points=20, random_state=None): from skopt import gp_minimize # Convert bounds to list of tuples bounds_scaled = self.parameters_bounds / np.array([self.normalization, self.normalization]).T dimensions = [(bounds_scaled[i_param, 0], bounds_scaled[i_param, 1]) for i_param in range(len(bounds_scaled))] # Ensure x0 is a list of scalars x0_scaled = (self.parameters_initial / self.normalization).tolist() res = gp_minimize( self._function_to_minimize, dimensions=dimensions, x0=x0_scaled, n_calls=n_calls, n_initial_points=n_initial_points, random_state=random_state ) self.success = None self.result = res.fun self.parameters = np.array(res.x) * self.normalization return res def _simple_minimizer(self, initial_step_size, decrease_rate, max_calls, epsilon, tolerance=None, print_steps=False): """ This is a very simple minimizer, which has not been thoroughly tested. It can be used for debugging. """ import scipy.optimize as opt # Initialize variables current_step_size = np.copy(initial_step_size) current_parameters = self.parameters_initial / self.normalization self.success = None old_result = 0 best_result = None best_parameters = None best_call = None result = np.inf for call in range(max_calls): # Check for convergence if tolerance is not None and call > 0 and abs(result - old_result) < tolerance: self.success = True break # Get gradient gradient = opt.approx_fprime(current_parameters, self._function_to_minimize, epsilon=epsilon / self.normalization) step_direction = -gradient / np.linalg.norm(gradient) # Update parameters current_parameters += current_step_size * step_direction # Clip parameters to bounds if any(current_parameters < self.parameters_bounds[:, 0] / self.normalization) or any(current_parameters > self.parameters_bounds[:, 1] / self.normalization): current_parameters = np.clip(current_parameters, self.parameters_bounds[:, 0] / self.normalization, self.parameters_bounds[:, 1] / self.normalization) # Calculate objective function result = self._function_to_minimize(current_parameters) if print_steps: print(call, best_call, current_parameters, result) # Update step size current_step_size *= decrease_rate # save result #old_result = np.copy(result) if best_result is None or result < best_result: best_result = np.copy(result) best_parameters = np.copy(current_parameters) best_call = np.copy(call) self.result = best_result self.parameters = best_parameters * self.normalization return best_result