Source code for NuRadioMC.utilities.runner

import numpy as np
from NuRadioMC.simulation import simulation
import os
import multiprocessing
from multiprocessing import Queue
import time


[docs]class NuRadioMCRunner(object): """ The purpose of this classe is to run NuRadioMC on a cluster using a full node submission, i.e., a full node with X cores is reserved. As NuRadioMC is a single-core process, this class will start multiple NuRadioMC simulations and will distribute them on the X cores. As soon as one job is finished, a new one is started so that all X cores are kept busy. The job ends after a fixed time, or after enough triggers are acquired, or after X crashes occurred. """ def __init__(self, n_worker, task, output_path, kwargs={}, max_runtime=3600 * 24 * 8, n_triggers_max=1e6, max_crashes=10): self.q = Queue() self.task = task self.n_worker = n_worker self.i_task = 0 self.worker = [] self.n_triggers = 0 self.iworker = 0 self.i_skipped = 0 self.stop_processing = False self.max_runtime = max_runtime self.n_triggers_max = n_triggers_max self.output_path = output_path self.kwargs = kwargs self.start_time = time.time() self.crashed_counter = 0 self.crashed_counter = 0 self.max_crashes = max_crashes
[docs] def get_outputfilename(self): """ define how output files are named """ return os.path.join(self.output_path, f"{np.log10(self.kwargs['nu_energy']):.2f}_{self.i_task:06d}.hdf5")
[docs] def run(self): while True: outputfilename = self.get_outputfilename() self.kwargs["output_filename"] = outputfilename if(os.path.exists(outputfilename)): print(f"outputfile {outputfilename} for task {self.i_task} already exists", flush=True) elif(os.path.exists(outputfilename + ".nur")): print(f"outputfile {outputfilename}.nur for task {self.i_task} already exists", flush=True) else: print(f"starting job {self.i_task}", flush=True) n = multiprocessing.Process(name=f'worker-{self.i_task}', target=self.task, args=(self.q, self.i_task), kwargs=self.kwargs) n.start() self.worker.append(n) self.iworker += 1 self.i_task += 1 if(self.iworker >= self.n_worker): break stop_processing = False while True: # check on all processes for iN, n in enumerate(self.worker): if not n.is_alive(): print(f"job is not alive, getting results {n}, exitcode = {n.exitcode}", flush=True) n_trig = 0 if(n.exitcode != 0): self.crashed_counter += 1 else: if(not self.q.empty()): n_trig = self.q.get_nowait() self.n_triggers += n_trig print(f"{iN} has finished with {n_trig} events, total number of triggered events is {self.n_triggers}", flush=True) outputfilename = self.get_outputfilename() self.kwargs["output_filename"] = outputfilename if(os.path.exists(outputfilename)): print(f"outputfile {outputfilename} for task {self.i_task} already exists", flush=True) else: n = multiprocessing.Process(name=f'worker-{self.i_task}', target=self.task, args=(self.q, self.i_task), kwargs=self.kwargs) n.start() self.worker[iN] = n self.i_task += 1 time.sleep(10) if(self.crashed_counter > self.max_crashes): print(f"more than {self.max_crashes} jobs crashed. Exiting...", flush=True) stop_processing = True if(stop_processing or self.n_triggers > self.n_triggers_max or ((time.time() - self.start_time) > self.max_runtime)): if(self.n_triggers > self.n_triggers_max): print(f"more than {self.n_triggers_max} triggers, waiting for workers to stop \n\n\n\n\n\n", flush=True) else: print(f"{simulation.pretty_time_delta(time.time()-self.start_time)} passed. No more jobs will be submitted.", flush=True) for iN, n in enumerate(self.worker): if(not self.q.empty()): n_trig = self.q.get() self.n_triggers += n_trig n.join() print(f"{iN} has finished with {n_trig} events, total number of triggered events is {self.n_triggers}", flush=True) break