pyhrf.parallel module

exception pyhrf.parallel.RemoteException

Bases: exceptions.Exception

pyhrf.parallel.dump_func(func, fn)
pyhrf.parallel.merge_default_kwargs(func, kwargs)
pyhrf.parallel.prepare_treatment_jobs(treatment, tmp_local_dir, local_result_path, local_user, local_host, remote_host, remote_user, remote_path, label_for_cluster)

Prepare somaworkflow jobs to perform one treatment (ie one subject).

Parameters:
  • treatment (FMRITreatment) – the treatment defining the analysis
  • tmp_local_dir (str) – a path where to store the temporary config file before sending it to the remote host
  • local_result_path (str) – path where to store the final result
  • local_user (str) – the user on the local host who enables SHH connection from the remote cluster
  • local_host (str) – local host (used to send back the result)
  • remote_host (str) – remote machine where the treatment will be run
  • remote_user (str) – user login on the remote machine.
  • remote_path (str) – path on the remote machine where to store ROI data and analysis results
  • label_for_cluster (str) – label prefix to name job in somaworkflow
Returns:

a tuple (job_split, jobs, dependencies, mainGroup)

job_split (Job): job handling splitting of input data into ROI data jobs (list of Job): all jobs except the splitting jobs

-> roi analyses, result merge,

scp of result back to local host, data cleaning

dependencies (list of Job pairs): define the pipeline structure mainGroup (Group): top-level object gathering all jobs for

this treatment.

pyhrf.parallel.remote_map(func, largs=None, lkwargs=None, mode='serial')

Execute a function in parallel on a list of arguments.

Parameters:
  • *func* (function) – function to apply on each item. this function must be importable on the remote side
  • *largs* (list of tuple) – each item in the list is a tuple containing all positional argument values of the function
  • *lkwargs* (list of dict) – each item in the list is a dict containing all named arguments of the function mapped to their value.
  • *mode* (str) –

    indicates how execution is distributed. Choices are:

    • “serial”: single-thread loop on the local machine
    • “local”
      : use joblib to run tasks in parallel.
      The number of simultaneous jobs is defined in the configuration section [‘parallel-local’][‘nb_procs’] see ~/.pyhrf/config.cfg
    • “remote_cluster: use somaworkflow to run tasks in parallel.
      The connection setup has to be defined in the configuration section [‘parallel-cluster’] of ~/.pyhrf/config.cfg.
    • “local_with_dumps”: testing purpose only, run each task serially as
      a subprocess.
Returns:

a list of results

Raises:

RemoteException if any remote task has failed

Example: >>> from pyhrf.parallel import remote_map >>> def foo(a, b=2): return a + b >>> remote_map(foo, [(2,),(3,)], [{‘b’:5}, {‘b’:7}]) [7, 10]

pyhrf.parallel.remote_map_marshal(func, largs=None, lkwargs=None, mode='local')
pyhrf.parallel.run_soma_workflow(treatments, exec_cmd, tmp_local_dirs, server_id, remote_host, remote_user, remote_pathes, local_result_pathes, label_for_cluster, wait_ending=False)

Dispatch treatments using soma-workflow. - ‘treatments’ is a dict mapping a treatment name to a treatment object - ‘exec_cmd’ is the command to run on each ROI data. - ‘tmp_local_dirs’ is a dict mapping a treatment name to a local tmp dir

(used to store a temporary configuration file)
  • ‘server_id’ is the server ID as expected by WorkflowController
  • ‘remote_host’ is the remote machine where treatments are treated in parallel
  • ‘remote_user’ is used to log in remote_host
  • ‘remote_pathes’ is a dict mapping a treatment name to an existing remote dir which will be used to store ROI data and result files
  • ‘local_result_pathes’ is a dict mapping a treatment name to a local path where final results will be sorted (host will send it there by scp)
  • ‘label_for_cluster’ is the base name used to label workflows and sub jobs
  • ‘make_outputs’ is a flag to tell wether to build outputs from result or not. -> not operational yet (#TODO)
pyhrf.parallel.save_treatment(t, f)