API Reference

WorkerPool

class mpire.WorkerPool(n_jobs=None, daemon=True, cpu_ids=None, shared_objects=None, pass_worker_id=False, use_worker_state=False, start_method='fork', keep_alive=False, use_dill=False, enable_insights=False, order_tasks=False)[source]

A multiprocessing worker pool which acts like a multiprocessing.Pool, but is faster and has more options.

__enter__()[source]

Enable the use of the with statement.

Return type

WorkerPool

__exit__(*_)[source]

Enable the use of the with statement. Gracefully terminates workers, if there are any

Return type

None

__init__(n_jobs=None, daemon=True, cpu_ids=None, shared_objects=None, pass_worker_id=False, use_worker_state=False, start_method='fork', keep_alive=False, use_dill=False, enable_insights=False, order_tasks=False)[source]
Parameters
  • n_jobs (Optional[int]) – Number of workers to spawn. If None, will use mpire.cpu_count()

  • daemon (bool) – Whether to start the child processes as daemon

  • cpu_ids (Optional[List[Union[int, List[int]]]]) – List of CPU IDs to use for pinning child processes to specific CPUs. The list must be as long as the number of jobs used (if n_jobs equals None it must be equal to mpire.cpu_count()), or the list must have exactly one element. In the former case, element i specifies the CPU ID(s) to use for child process i. In the latter case the single element specifies the CPU ID(s) for all child processes to use. A single element can be either a single integer specifying a single CPU ID, or a list of integers specifying that a single child process can make use of multiple CPU IDs. If None, CPU pinning will be disabled

  • shared_objects (Optional[Any]) – Objects to be passed on as shared objects to the workers once. It will be passed on to the target, worker_init, and worker_exit functions. shared_objects is only passed on when it’s not None. Shared objects will be copy-on-write when using fork as start method. When enabled, functions receive the shared objects as second argument, depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on from iterable_of_args

  • pass_worker_id (bool) – Whether to pass on a worker ID to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker ID as first argument, depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on from iterable_of_args

  • use_worker_state (bool) – Whether to let a worker have a worker state. The worker state will be passed on to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker state as third argument, depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on from iterable_of_args

  • start_method (str) – Which process start method to use. Options for multiprocessing: 'fork' (default, if available), 'forkserver' and 'spawn' (default, if 'fork' isn’t available). For multithreading use 'threading'. See https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods for more information and https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods for some caveats when using the 'spawn' or 'forkserver' methods

  • keep_alive (bool) – When True it will keep workers alive after completing a map call, allowing to reuse workers

  • use_dill (bool) – Whether to use dill as serialization backend. Some exotic types (e.g., lambdas, nested functions) don’t work well when using spawn as start method. In such cased, use dill (can be a bit slower sometimes)

  • enable_insights (bool) – Whether to enable worker insights. Might come at a small performance penalty (often neglible)

  • order_tasks (bool) – Whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.

__weakref__

list of weak references to the object (if defined)

apply(func, args=(), kwargs=None, callback=None, error_callback=None, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None)[source]

Apply a function to a single task. This is a blocking call.

Parameters
  • func (Callable) – Function to apply to the task. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • args (Any) – Arguments to pass to a worker, which passes it to the function func as func(*args)

  • kwargs (Optional[Dict]) – Keyword arguments to pass to a worker, which passes it to the function func as func(**kwargs)

  • callback (Optional[Callable]) – Callback function to call when the task is finished. The callback function receives the output of the function func as its argument

  • error_callback (Optional[Callable]) – Callback function to call when the task has failed. The callback function receives the exception as its argument

  • worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively

  • worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

Return type

Any

Returns

Result of the function func applied to the task

apply_async(func, args=(), kwargs=None, callback=None, error_callback=None, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None)[source]

Apply a function to a single task. This is a non-blocking call.

Parameters
  • func (Callable) – Function to apply to the task. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • args (Any) – Arguments to pass to a worker, which passes it to the function func as func(*args)

  • kwargs (Optional[Dict]) – Keyword arguments to pass to a worker, which passes it to the function func as func(**kwargs)

  • callback (Optional[Callable]) – Callback function to call when the task is finished. The callback function receives the output of the function func as its argument

  • error_callback (Optional[Callable]) – Callback function to call when the task has failed. The callback function receives the exception as its argument

  • worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively

  • worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

Return type

AsyncResult

Returns

Result of the function func applied to the task

get_exit_results()[source]

Obtain a list of exit results when an exit function is defined.

Return type

List

Returns

Exit results list

get_insights()[source]

Creates insights from the raw insight data

Return type

Dict

Returns

Dictionary containing worker insights

imap(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)[source]

Same as multiprocessing.imap_unordered(), but ordered. Also allows a user to set the maximum number of tasks available in the queue.

Parameters
  • func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func

  • iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks

  • max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * chunk_size * 2

  • chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.

  • n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.

  • worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time

  • progress_bar (bool) – When True it will display a progress bar

  • worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively

  • worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.

  • progress_bar_style (Optional[str]) – The progress bar style to use. Can be one of None, 'std', or 'notebook'

Return type

Generator[Any, None, None]

Returns

Generator yielding ordered results

imap_unordered(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)[source]

Same as multiprocessing.imap_unordered(). Also allows a user to set the maximum number of tasks available in the queue.

Parameters
  • func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func

  • iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks

  • max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * chunk_size * 2

  • chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.

  • n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.

  • worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time

  • progress_bar (bool) – When True it will display a progress bar

  • worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively

  • worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.

  • progress_bar_style (Optional[str]) – The progress bar style to use. Can be one of None, 'std', or 'notebook'

Return type

Generator[Any, None, None]

Returns

Generator yielding unordered results

join(keep_alive=False)

When keep_alive=False: inserts a poison pill, grabs the exit results, waits until the tasks/results queues are done, and waits until all workers are finished. When keep_alive=True: inserts a non-lethal poison pill, and waits until the tasks/results queues are done.

join``and ``stop_and_join are aliases.

Parameters

keep_alive (bool) – Whether to keep the workers alive

Return type

None

map(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, concatenate_numpy_output=True, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)[source]

Same as multiprocessing.map(). Also allows a user to set the maximum number of tasks available in the queue. Note that this function can be slower than the unordered version.

Parameters
  • func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func

  • iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks

  • max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * chunk_size * 2

  • chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.

  • n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.

  • worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time

  • progress_bar (bool) – When True it will display a progress bar

  • concatenate_numpy_output (bool) – When True it will concatenate numpy output to a single numpy array

  • worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively

  • worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.

  • progress_bar_style (Optional[str]) – The progress bar style to use. Can be one of None, 'std', or 'notebook'

Return type

Any

Returns

List with ordered results

map_unordered(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)[source]

Same as multiprocessing.map(), but unordered. Also allows a user to set the maximum number of tasks available in the queue.

Parameters
  • func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func

  • iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks

  • max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * chunk_size * 2

  • chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.

  • n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.

  • worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time

  • progress_bar (bool) – When True it will display a progress bar

  • worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively

  • worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.

  • progress_bar_style (Optional[str]) – The progress bar style to use. Can be one of None, 'std', or 'notebook'

Return type

Any

Returns

List with unordered results

pass_on_worker_id(pass_on=True)[source]

Set whether to pass on the worker ID to the function to be executed or not (default= False).

Parameters

pass_on (bool) – Whether to pass on a worker ID to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker ID depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on using iterable_of_args

Return type

None

print_insights()[source]

Prints insights per worker

Return type

None

set_keep_alive(keep_alive=True)[source]

Set whether workers should be kept alive in between consecutive map calls.

Parameters

keep_alive (bool) – When True it will keep workers alive after completing a map call, allowing to reuse workers

Return type

None

set_order_tasks(order_tasks=True)[source]

Set whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.

Parameters

order_tasks (bool) – Whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.

Return type

None

set_shared_objects(shared_objects=None)[source]

Set shared objects to pass to the workers.

Parameters

shared_objects (Optional[Any]) – Objects to be passed on as shared objects to the workers once. It will be passed on to the target, worker_init, and worker_exit functions. shared_objects is only passed on when it’s not None. Shared objects will be copy-on-write when using fork as start method. When enabled, functions receive the shared objects depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on using iterable_of_args`

Return type

None

set_use_worker_state(use_worker_state=True)[source]

Set whether or not each worker should have its own state variable. Each worker has its own state, so it’s not shared between the workers.

Parameters

use_worker_state (bool) – Whether to let a worker have a worker state. The worker state will be passed on to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker state depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on using iterable_of_args

Return type

None

stop_and_join(keep_alive=False)[source]

When keep_alive=False: inserts a poison pill, grabs the exit results, waits until the tasks/results queues are done, and waits until all workers are finished. When keep_alive=True: inserts a non-lethal poison pill, and waits until the tasks/results queues are done.

join``and ``stop_and_join are aliases.

Parameters

keep_alive (bool) – Whether to keep the workers alive

Return type

None

terminate()[source]

Tries to do a graceful shutdown of the workers, by interrupting them. In the case processes deadlock it will send a sigkill.

Return type

None

AsyncResult

class mpire.async_result.AsyncResult(cache, callback, error_callback, job_id=None, delete_from_cache=True, timeout=None)[source]

Adapted from multiprocessing.pool.ApplyResult.

__init__(cache, callback, error_callback, job_id=None, delete_from_cache=True, timeout=None)[source]
Parameters
  • cache (Dict) – Cache for storing intermediate results

  • callback (Optional[Callable]) – Callback function to call when the task is finished. The callback function receives the output of the function as its argument

  • error_callback (Optional[Callable]) – Callback function to call when the task has failed. The callback function receives the exception as its argument

  • job_id (Optional[int]) – Job ID of the task. If None, a new job ID is generated

  • delete_from_cache (bool) – If True, the result is deleted from the cache when the task is finished

  • timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default)

__weakref__

list of weak references to the object (if defined)

get(timeout=None)[source]

Wait until the task is finished and return the output of the function

Parameters

timeout (Optional[float]) – Timeout in seconds. If None, wait indefinitely

Return type

Any

Returns

Output of the function

Raises

TimeoutError if the task is not finished within the timeout. When the task has failed, the exception raised by the function is re-raised

ready()[source]
Return type

bool

Returns

Returns True if the task is finished

successful()[source]
Return type

bool

Returns

Returns True if the task has finished successfully

Raises

ValueError if the task is not finished yet

wait(timeout=None)[source]

Wait until the task is finished

Parameters

timeout (Optional[float]) – Timeout in seconds. If None, wait indefinitely

Return type

None

Task chunking

mpire.utils.chunk_tasks(iterable_of_args, iterable_len=None, chunk_size=None, n_splits=None)[source]

Chunks tasks such that individual workers will receive chunks of tasks rather than individual ones, which can speed up processing drastically.

Parameters
  • iterable_of_args (Iterable) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function

  • iterable_len (Optional[int]) – Number of tasks available in iterable_of_args. Only needed when iterable_of_args is a generator

  • chunk_size (Union[int, float, None]) – Number of simultaneous tasks to give to a worker. If None, will use n_splits to determine the chunk size

  • n_splits (Optional[int]) – Number of splits to use when chunk_size is None

Return type

Generator[Collection, None, None]

Returns

Generator of chunked task arguments

Converting iterable of arguments

mpire.utils.make_single_arguments(iterable_of_args, generator=True)[source]

Converts an iterable of single arguments to an iterable of single argument tuples

Parameters
  • iterable_of_args (Iterable) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function

  • generator (bool) – Whether or not to return a generator, otherwise a materialized list will be returned

Return type

Union[List, Generator]

Returns

Iterable of single argument tuples

Dashboard

mpire.dashboard.start_dashboard(port_range=range(8080, 8100))[source]

Starts a new MPIRE dashboard

Parameters

port_range (Sequence) – Port range to try.

Return type

Dict[str, Union[str, int]]

Returns

A dictionary containing the dashboard port number and manager host and port number being used

mpire.dashboard.connect_to_dashboard(manager_port_nr, manager_host=None)[source]

Connects to an existing MPIRE dashboard

Parameters
  • manager_port_nr (int) – Port to use when connecting to a manager

  • manager_host (Union[str, bytes, None]) – Host to use when connecting to a manager. If None it will use localhost

Return type

None

mpire.dashboard.shutdown_dashboard()[source]

Shuts down the dashboard

Return type

None

mpire.dashboard.get_stacklevel()[source]

Gets the stack level to use when obtaining function details (used for the dashboard)

Return type

int

Returns

Stack level

mpire.dashboard.set_stacklevel(stacklevel)[source]

Sets the stack level to use when obtaining function details (used for the dashboard)

Parameters

stacklevel (int) – Stack level

Return type

None

Other

mpire.cpu_count()

Returns the number of CPUs in the system