API Reference¶
WorkerPool¶
-
class
mpire.
WorkerPool
(n_jobs=None, daemon=True, cpu_ids=None, shared_objects=None, pass_worker_id=False, use_worker_state=False, start_method='fork', keep_alive=False, use_dill=False, enable_insights=False, order_tasks=False)[source]¶ A multiprocessing worker pool which acts like a
multiprocessing.Pool
, but is faster and has more options.-
__exit__
(*_)[source]¶ Enable the use of the
with
statement. Gracefully terminates workers, if there are any- Return type
None
-
__init__
(n_jobs=None, daemon=True, cpu_ids=None, shared_objects=None, pass_worker_id=False, use_worker_state=False, start_method='fork', keep_alive=False, use_dill=False, enable_insights=False, order_tasks=False)[source]¶ - Parameters
n_jobs (
Optional
[int
]) – Number of workers to spawn. IfNone
, will usempire.cpu_count()
daemon (
bool
) – Whether to start the child processes as daemoncpu_ids (
Optional
[List
[Union
[int
,List
[int
]]]]) – List of CPU IDs to use for pinning child processes to specific CPUs. The list must be as long as the number of jobs used (ifn_jobs
equalsNone
it must be equal tompire.cpu_count()
), or the list must have exactly one element. In the former case, element i specifies the CPU ID(s) to use for child process i. In the latter case the single element specifies the CPU ID(s) for all child processes to use. A single element can be either a single integer specifying a single CPU ID, or a list of integers specifying that a single child process can make use of multiple CPU IDs. IfNone
, CPU pinning will be disabledshared_objects (
Optional
[Any
]) – Objects to be passed on as shared objects to the workers once. It will be passed on to the target,worker_init
, andworker_exit
functions.shared_objects
is only passed on when it’s notNone
. Shared objects will be copy-on-write when usingfork
as start method. When enabled, functions receive the shared objects as second argument, depending on other settings. The order is:worker_id
,shared_objects
,worker_state
, and finally the arguments passed on fromiterable_of_args
pass_worker_id (
bool
) – Whether to pass on a worker ID to the target,worker_init
, andworker_exit
functions. When enabled, functions receive the worker ID as first argument, depending on other settings. The order is:worker_id
,shared_objects
,worker_state
, and finally the arguments passed on fromiterable_of_args
use_worker_state (
bool
) – Whether to let a worker have a worker state. The worker state will be passed on to the target,worker_init
, andworker_exit
functions. When enabled, functions receive the worker state as third argument, depending on other settings. The order is:worker_id
,shared_objects
,worker_state
, and finally the arguments passed on fromiterable_of_args
start_method (
str
) – Which process start method to use. Options for multiprocessing:'fork'
(default, if available),'forkserver'
and'spawn'
(default, if'fork'
isn’t available). For multithreading use'threading'
. See https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods for more information and https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods for some caveats when using the'spawn'
or'forkserver'
methodskeep_alive (
bool
) – WhenTrue
it will keep workers alive after completing a map call, allowing to reuse workersuse_dill (
bool
) – Whether to use dill as serialization backend. Some exotic types (e.g., lambdas, nested functions) don’t work well when usingspawn
as start method. In such cased, usedill
(can be a bit slower sometimes)enable_insights (
bool
) – Whether to enable worker insights. Might come at a small performance penalty (often neglible)order_tasks (
bool
) – Whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.
-
__weakref__
¶ list of weak references to the object (if defined)
-
apply
(func, args=(), kwargs=None, callback=None, error_callback=None, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None)[source]¶ Apply a function to a single task. This is a blocking call.
- Parameters
func (
Callable
) – Function to apply to the task. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentargs (
Any
) – Arguments to pass to a worker, which passes it to the functionfunc
asfunc(*args)
kwargs (
Optional
[Dict
]) – Keyword arguments to pass to a worker, which passes it to the functionfunc
asfunc(**kwargs)
callback (
Optional
[Callable
]) – Callback function to call when the task is finished. The callback function receives the output of the functionfunc
as its argumenterror_callback (
Optional
[Callable
]) – Callback function to call when the task has failed. The callback function receives the exception as its argumentworker_init (
Optional
[Callable
]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentworker_exit (
Optional
[Callable
]) – Function to call each time a worker exits. Return values will be fetched and made available throughmpire.WorkerPool.get_exit_results
. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumenttask_timeout (
Optional
[float
]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default). Note: the timeout doesn’t apply toworker_init
andworker_exit
functions, use worker_init_timeout and worker_exit_timeout for that, respectivelyworker_init_timeout (
Optional
[float
]) – Timeout in seconds for theworker_init
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).worker_exit_timeout (
Optional
[float
]) – Timeout in seconds for theworker_exit
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).
- Return type
Any
- Returns
Result of the function
func
applied to the task
-
apply_async
(func, args=(), kwargs=None, callback=None, error_callback=None, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None)[source]¶ Apply a function to a single task. This is a non-blocking call.
- Parameters
func (
Callable
) – Function to apply to the task. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentargs (
Any
) – Arguments to pass to a worker, which passes it to the functionfunc
asfunc(*args)
kwargs (
Optional
[Dict
]) – Keyword arguments to pass to a worker, which passes it to the functionfunc
asfunc(**kwargs)
callback (
Optional
[Callable
]) – Callback function to call when the task is finished. The callback function receives the output of the functionfunc
as its argumenterror_callback (
Optional
[Callable
]) – Callback function to call when the task has failed. The callback function receives the exception as its argumentworker_init (
Optional
[Callable
]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentworker_exit (
Optional
[Callable
]) – Function to call each time a worker exits. Return values will be fetched and made available throughmpire.WorkerPool.get_exit_results
. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumenttask_timeout (
Optional
[float
]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default). Note: the timeout doesn’t apply toworker_init
andworker_exit
functions, use worker_init_timeout and worker_exit_timeout for that, respectivelyworker_init_timeout (
Optional
[float
]) – Timeout in seconds for theworker_init
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).worker_exit_timeout (
Optional
[float
]) – Timeout in seconds for theworker_exit
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).
- Return type
- Returns
Result of the function
func
applied to the task
-
get_exit_results
()[source]¶ Obtain a list of exit results when an exit function is defined.
- Return type
List
- Returns
Exit results list
-
get_insights
()[source]¶ Creates insights from the raw insight data
- Return type
Dict
- Returns
Dictionary containing worker insights
-
imap
(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)[source]¶ Same as
multiprocessing.imap_unordered()
, but ordered. Also allows a user to set the maximum number of tasks available in the queue.- Parameters
func (
Callable
) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentiterable_of_args (
Union
[Sized
,Iterable
]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the functionfunc
iterable_len (
Optional
[int
]) – Number of elements in theiterable_of_args
. When chunk_size is set toNone
it needs to know the number of tasks. This can either be provided by implementing the__len__
function on the iterable object, or by specifying the number of tasksmax_tasks_active (
Optional
[int
]) – Maximum number of active tasks in the queue. IfNone
it will be converted ton_jobs * chunk_size * 2
chunk_size (
Optional
[int
]) – Number of simultaneous tasks to give to a worker. WhenNone
it will usen_splits
.n_splits (
Optional
[int
]) – Number of splits to use whenchunk_size
isNone
. When bothchunk_size
andn_splits
areNone
, it will usen_splits = n_jobs * 64
.worker_lifespan (
Optional
[int
]) – Number of tasks a worker can handle before it is restarted. IfNone
, workers will stay alive the entire time. Use this when workers use up too much memory over the course of timeprogress_bar (
bool
) – WhenTrue
it will display a progress barworker_init (
Optional
[Callable
]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentworker_exit (
Optional
[Callable
]) – Function to call each time a worker exits. Return values will be fetched and made available throughmpire.WorkerPool.get_exit_results
. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumenttask_timeout (
Optional
[float
]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default). Note: the timeout doesn’t apply toworker_init
andworker_exit
functions, use worker_init_timeout and worker_exit_timeout for that, respectivelyworker_init_timeout (
Optional
[float
]) – Timeout in seconds for theworker_init
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).worker_exit_timeout (
Optional
[float
]) – Timeout in seconds for theworker_exit
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).progress_bar_options (
Optional
[Dict
[str
,Any
]]) – Dictionary containing keyword arguments to pass to thetqdm
progress bar. Seetqdm.tqdm()
for details. The argumentstotal
andleave
will be overwritten by MPIRE.progress_bar_style (
Optional
[str
]) – The progress bar style to use. Can be one ofNone
,'std'
, or'notebook'
- Return type
Generator
[Any
,None
,None
]- Returns
Generator yielding ordered results
-
imap_unordered
(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)[source]¶ Same as
multiprocessing.imap_unordered()
. Also allows a user to set the maximum number of tasks available in the queue.- Parameters
func (
Callable
) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentiterable_of_args (
Union
[Sized
,Iterable
]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the functionfunc
iterable_len (
Optional
[int
]) – Number of elements in theiterable_of_args
. When chunk_size is set toNone
it needs to know the number of tasks. This can either be provided by implementing the__len__
function on the iterable object, or by specifying the number of tasksmax_tasks_active (
Optional
[int
]) – Maximum number of active tasks in the queue. IfNone
it will be converted ton_jobs * chunk_size * 2
chunk_size (
Optional
[int
]) – Number of simultaneous tasks to give to a worker. WhenNone
it will usen_splits
.n_splits (
Optional
[int
]) – Number of splits to use whenchunk_size
isNone
. When bothchunk_size
andn_splits
areNone
, it will usen_splits = n_jobs * 64
.worker_lifespan (
Optional
[int
]) – Number of tasks a worker can handle before it is restarted. IfNone
, workers will stay alive the entire time. Use this when workers use up too much memory over the course of timeprogress_bar (
bool
) – WhenTrue
it will display a progress barworker_init (
Optional
[Callable
]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentworker_exit (
Optional
[Callable
]) – Function to call each time a worker exits. Return values will be fetched and made available throughmpire.WorkerPool.get_exit_results
. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumenttask_timeout (
Optional
[float
]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default). Note: the timeout doesn’t apply toworker_init
andworker_exit
functions, use worker_init_timeout and worker_exit_timeout for that, respectivelyworker_init_timeout (
Optional
[float
]) – Timeout in seconds for theworker_init
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).worker_exit_timeout (
Optional
[float
]) – Timeout in seconds for theworker_exit
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).progress_bar_options (
Optional
[Dict
[str
,Any
]]) – Dictionary containing keyword arguments to pass to thetqdm
progress bar. Seetqdm.tqdm()
for details. The argumentstotal
andleave
will be overwritten by MPIRE.progress_bar_style (
Optional
[str
]) – The progress bar style to use. Can be one ofNone
,'std'
, or'notebook'
- Return type
Generator
[Any
,None
,None
]- Returns
Generator yielding unordered results
-
join
(keep_alive=False)¶ When
keep_alive=False
: inserts a poison pill, grabs the exit results, waits until the tasks/results queues are done, and waits until all workers are finished. Whenkeep_alive=True
: inserts a non-lethal poison pill, and waits until the tasks/results queues are done.join``and ``stop_and_join
are aliases.- Parameters
keep_alive (
bool
) – Whether to keep the workers alive- Return type
None
-
map
(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, concatenate_numpy_output=True, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)[source]¶ Same as
multiprocessing.map()
. Also allows a user to set the maximum number of tasks available in the queue. Note that this function can be slower than the unordered version.- Parameters
func (
Callable
) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentiterable_of_args (
Union
[Sized
,Iterable
]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the functionfunc
iterable_len (
Optional
[int
]) – Number of elements in theiterable_of_args
. When chunk_size is set toNone
it needs to know the number of tasks. This can either be provided by implementing the__len__
function on the iterable object, or by specifying the number of tasksmax_tasks_active (
Optional
[int
]) – Maximum number of active tasks in the queue. IfNone
it will be converted ton_jobs * chunk_size * 2
chunk_size (
Optional
[int
]) – Number of simultaneous tasks to give to a worker. WhenNone
it will usen_splits
.n_splits (
Optional
[int
]) – Number of splits to use whenchunk_size
isNone
. When bothchunk_size
andn_splits
areNone
, it will usen_splits = n_jobs * 64
.worker_lifespan (
Optional
[int
]) – Number of tasks a worker can handle before it is restarted. IfNone
, workers will stay alive the entire time. Use this when workers use up too much memory over the course of timeprogress_bar (
bool
) – WhenTrue
it will display a progress barconcatenate_numpy_output (
bool
) – WhenTrue
it will concatenate numpy output to a single numpy arrayworker_init (
Optional
[Callable
]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentworker_exit (
Optional
[Callable
]) – Function to call each time a worker exits. Return values will be fetched and made available throughmpire.WorkerPool.get_exit_results
. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumenttask_timeout (
Optional
[float
]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default). Note: the timeout doesn’t apply toworker_init
andworker_exit
functions, use worker_init_timeout and worker_exit_timeout for that, respectivelyworker_init_timeout (
Optional
[float
]) – Timeout in seconds for theworker_init
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).worker_exit_timeout (
Optional
[float
]) – Timeout in seconds for theworker_exit
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).progress_bar_options (
Optional
[Dict
[str
,Any
]]) – Dictionary containing keyword arguments to pass to thetqdm
progress bar. Seetqdm.tqdm()
for details. The argumentstotal
andleave
will be overwritten by MPIRE.progress_bar_style (
Optional
[str
]) – The progress bar style to use. Can be one ofNone
,'std'
, or'notebook'
- Return type
Any
- Returns
List with ordered results
-
map_unordered
(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None, progress_bar_style=None)[source]¶ Same as
multiprocessing.map()
, but unordered. Also allows a user to set the maximum number of tasks available in the queue.- Parameters
func (
Callable
) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentiterable_of_args (
Union
[Sized
,Iterable
]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the functionfunc
iterable_len (
Optional
[int
]) – Number of elements in theiterable_of_args
. When chunk_size is set toNone
it needs to know the number of tasks. This can either be provided by implementing the__len__
function on the iterable object, or by specifying the number of tasksmax_tasks_active (
Optional
[int
]) – Maximum number of active tasks in the queue. IfNone
it will be converted ton_jobs * chunk_size * 2
chunk_size (
Optional
[int
]) – Number of simultaneous tasks to give to a worker. WhenNone
it will usen_splits
.n_splits (
Optional
[int
]) – Number of splits to use whenchunk_size
isNone
. When bothchunk_size
andn_splits
areNone
, it will usen_splits = n_jobs * 64
.worker_lifespan (
Optional
[int
]) – Number of tasks a worker can handle before it is restarted. IfNone
, workers will stay alive the entire time. Use this when workers use up too much memory over the course of timeprogress_bar (
bool
) – WhenTrue
it will display a progress barworker_init (
Optional
[Callable
]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumentworker_exit (
Optional
[Callable
]) – Function to call each time a worker exits. Return values will be fetched and made available throughmpire.WorkerPool.get_exit_results
. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argumenttask_timeout (
Optional
[float
]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default). Note: the timeout doesn’t apply toworker_init
andworker_exit
functions, use worker_init_timeout and worker_exit_timeout for that, respectivelyworker_init_timeout (
Optional
[float
]) – Timeout in seconds for theworker_init
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).worker_exit_timeout (
Optional
[float
]) – Timeout in seconds for theworker_exit
function. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default).progress_bar_options (
Optional
[Dict
[str
,Any
]]) – Dictionary containing keyword arguments to pass to thetqdm
progress bar. Seetqdm.tqdm()
for details. The argumentstotal
andleave
will be overwritten by MPIRE.progress_bar_style (
Optional
[str
]) – The progress bar style to use. Can be one ofNone
,'std'
, or'notebook'
- Return type
Any
- Returns
List with unordered results
-
pass_on_worker_id
(pass_on=True)[source]¶ Set whether to pass on the worker ID to the function to be executed or not (default=
False
).- Parameters
pass_on (
bool
) – Whether to pass on a worker ID to the target,worker_init
, andworker_exit
functions. When enabled, functions receive the worker ID depending on other settings. The order is:worker_id
,shared_objects
,worker_state
, and finally the arguments passed on usingiterable_of_args
- Return type
None
-
set_keep_alive
(keep_alive=True)[source]¶ Set whether workers should be kept alive in between consecutive map calls.
- Parameters
keep_alive (
bool
) – When True it will keep workers alive after completing a map call, allowing to reuse workers- Return type
None
-
set_order_tasks
(order_tasks=True)[source]¶ Set whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.
- Parameters
order_tasks (
bool
) – Whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.- Return type
None
Set shared objects to pass to the workers.
- Parameters
shared_objects (
Optional
[Any
]) – Objects to be passed on as shared objects to the workers once. It will be passed on to the target,worker_init
, andworker_exit
functions.shared_objects
is only passed on when it’s notNone
. Shared objects will be copy-on-write when usingfork
as start method. When enabled, functions receive the shared objects depending on other settings. The order is:worker_id
,shared_objects
,worker_state
, and finally the arguments passed on usingiterable_of_args`
- Return type
None
-
set_use_worker_state
(use_worker_state=True)[source]¶ Set whether or not each worker should have its own state variable. Each worker has its own state, so it’s not shared between the workers.
- Parameters
use_worker_state (
bool
) – Whether to let a worker have a worker state. The worker state will be passed on to the target,worker_init
, andworker_exit
functions. When enabled, functions receive the worker state depending on other settings. The order is:worker_id
,shared_objects
,worker_state
, and finally the arguments passed on usingiterable_of_args
- Return type
None
-
stop_and_join
(keep_alive=False)[source]¶ When
keep_alive=False
: inserts a poison pill, grabs the exit results, waits until the tasks/results queues are done, and waits until all workers are finished. Whenkeep_alive=True
: inserts a non-lethal poison pill, and waits until the tasks/results queues are done.join``and ``stop_and_join
are aliases.- Parameters
keep_alive (
bool
) – Whether to keep the workers alive- Return type
None
-
AsyncResult¶
-
class
mpire.async_result.
AsyncResult
(cache, callback, error_callback, job_id=None, delete_from_cache=True, timeout=None)[source]¶ Adapted from
multiprocessing.pool.ApplyResult
.-
__init__
(cache, callback, error_callback, job_id=None, delete_from_cache=True, timeout=None)[source]¶ - Parameters
cache (
Dict
) – Cache for storing intermediate resultscallback (
Optional
[Callable
]) – Callback function to call when the task is finished. The callback function receives the output of the function as its argumenterror_callback (
Optional
[Callable
]) – Callback function to call when the task has failed. The callback function receives the exception as its argumentjob_id (
Optional
[int
]) – Job ID of the task. If None, a new job ID is generateddelete_from_cache (
bool
) – If True, the result is deleted from the cache when the task is finishedtimeout (
Optional
[float
]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise aTimeoutError
. UseNone
to disable (default)
-
__weakref__
¶ list of weak references to the object (if defined)
-
get
(timeout=None)[source]¶ Wait until the task is finished and return the output of the function
- Parameters
timeout (
Optional
[float
]) – Timeout in seconds. If None, wait indefinitely- Return type
Any
- Returns
Output of the function
- Raises
TimeoutError if the task is not finished within the timeout. When the task has failed, the exception raised by the function is re-raised
-
Task chunking¶
-
mpire.utils.
chunk_tasks
(iterable_of_args, iterable_len=None, chunk_size=None, n_splits=None)[source]¶ Chunks tasks such that individual workers will receive chunks of tasks rather than individual ones, which can speed up processing drastically.
- Parameters
iterable_of_args (
Iterable
) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the functioniterable_len (
Optional
[int
]) – Number of tasks available initerable_of_args
. Only needed wheniterable_of_args
is a generatorchunk_size (
Union
[int
,float
,None
]) – Number of simultaneous tasks to give to a worker. IfNone
, will usen_splits
to determine the chunk sizen_splits (
Optional
[int
]) – Number of splits to use whenchunk_size
isNone
- Return type
Generator
[Collection
,None
,None
]- Returns
Generator of chunked task arguments
Converting iterable of arguments¶
-
mpire.utils.
make_single_arguments
(iterable_of_args, generator=True)[source]¶ Converts an iterable of single arguments to an iterable of single argument tuples
- Parameters
iterable_of_args (
Iterable
) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the functiongenerator (
bool
) – Whether or not to return a generator, otherwise a materialized list will be returned
- Return type
Union
[List
,Generator
]- Returns
Iterable of single argument tuples
Dashboard¶
-
mpire.dashboard.
start_dashboard
(port_range=range(8080, 8100))[source]¶ Starts a new MPIRE dashboard
- Parameters
port_range (
Sequence
) – Port range to try.- Return type
Dict
[str
,Union
[str
,int
]]- Returns
A dictionary containing the dashboard port number and manager host and port number being used
-
mpire.dashboard.
connect_to_dashboard
(manager_port_nr, manager_host=None)[source]¶ Connects to an existing MPIRE dashboard
- Parameters
manager_port_nr (
int
) – Port to use when connecting to a managermanager_host (
Union
[str
,bytes
,None
]) – Host to use when connecting to a manager. IfNone
it will use localhost
- Return type
None
Other¶
-
mpire.
cpu_count
()¶ Returns the number of CPUs in the system