Apply family¶
mpire.WorkerPool implements two apply functions, which are very similar to the ones in the
multiprocessing module:
mpire.WorkerPool.apply()Apply a function to a single task. This is a blocking call.
mpire.WorkerPool.apply_async()A variant of the above, but which is non-blocking. This returns an
mpire.async_result.AsyncResultobject.
apply¶
The apply function is a blocking call, which means that it will not return until the task is completed. If you want
to run multiple different tasks in parallel, you should use the apply_async function instead. If you require
to run the same function for many tasks in parallel, use the map functions instead.
The apply function takes a function, positional arguments, and keyword arguments, similar to how
multiprocessing does it.
def task(a, b, c, d):
return a + b + c + d
with WorkerPool(n_jobs=1) as pool:
result = pool.apply(task, args=(1, 2), kwargs={'d': 4, 'c': 3})
print(result)
apply_async¶
The apply_async function is a non-blocking call, which means that it will return immediately. It returns an
mpire.async_result.AsyncResult object, which can be used to get the result of the task at a later moment in time.
The apply_async function takes the same parameters as the apply function.
def task(a, b):
return a + b
with WorkerPool(n_jobs=4) as pool:
async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)]
results = [async_result.get() for async_result in async_results]
Obtaining the results should happen while the pool is still running! E.g., the following will deadlock:
with WorkerPool(n_jobs=4) as pool:
async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)]
# Will wait forever
results = [async_result.get() for async_result in async_results]
You can, however, make use of the mpire.WorkerPool.stop_and_join() function to stop the workers and join the
pool. This will make sure that all tasks are completed before the pool exits.
with WorkerPool(n_jobs=4) as pool:
async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)]
pool.stop_and_join()
# Will not deadlock
results = [async_result.get() for async_result in async_results]
AsyncResult¶
The mpire.async_result.AsyncResult object has the following convenient methods:
with WorkerPool(n_jobs=1) as pool:
async_result = pool.apply_async(task, args=(1, 1))
# Check if the task is completed
is_completed = async_result.ready()
# Wait until the task is completed, or until the timeout is reached.
async_result.wait(timeout=10)
# Get the result of the task. This will block until the task is completed,
# or until the timeout is reached.
result = async_result.get(timeout=None)
# Check if the task was successful (i.e., did not raise an exception).
# This will raise an exception if the task is not completed yet.
is_successful = async_result.successful()
Callbacks¶
Each apply function has a callback and error_callback argument. These are functions which are called when
the task is finished. The callback function is called with the result of the task when the task was completed
successfully, and the error_callback is called with the exception when the task failed.
def task(a):
return a + 1
def callback(result):
print("Task completed successfully with result:", result)
def error_callback(exception):
print("Task failed with exception:", exception)
with WorkerPool(n_jobs=1) as pool:
pool.apply(task, 42, callback=callback, error_callback=error_callback)
Worker init and exit¶
As with the map family of functions, the apply family of functions also has worker_init and worker_exit
arguments. These are functions which are called when a worker is started and stopped, respectively. See
Worker init and exit for more information on these functions.
def worker_init():
print("Worker started")
def worker_exit():
print("Worker stopped")
with WorkerPool(n_jobs=5) as pool:
pool.apply(task, 42, worker_init=worker_init, worker_exit=worker_exit)
There’s a caveat though. When the first apply or apply_async function is executed, the entire pool of workers
is started. This means that in the above example all five workers are started, while only one was needed. This also
means that the worker_init function is set for all those workers at once. This means you cannot have a different
worker_init function for each apply task. A second, different worker_init function will simply be ignored.
Similarly, the worker_exit function can only be set once as well. Additionally, exit functions are only called when
a worker exits, which in this case translates to when the pool exits. This means that if you call apply or
apply_async multiple times, the worker_exit function is only called once at the end. Use
mpire.WorkerPool.stop_and_join() to stop the workers, which will cause the worker_exit function to be
triggered for each worker.
Timeouts¶
The apply family of functions also has task_timeout, worker_init_timeout and worker_exit_timeout
arguments. These are timeouts for the task, the worker_init function and the worker_exit function, respectively.
They work similarly as those for the map functions.
When a single task times out, only that task is cancelled. The other tasks will continue to run. When a worker init or exit times out, the entire pool is stopped.
See Timeouts for more information.