Apply family

mpire.WorkerPool implements two apply functions, which are very similar to the ones in the multiprocessing module:

mpire.WorkerPool.apply()

Apply a function to a single task. This is a blocking call.

mpire.WorkerPool.apply_async()

A variant of the above, but which is non-blocking. This returns an mpire.async_result.AsyncResult object.

apply

The apply function is a blocking call, which means that it will not return until the task is completed. If you want to run multiple different tasks in parallel, you should use the apply_async function instead. If you require to run the same function for many tasks in parallel, use the map functions instead.

The apply function takes a function, positional arguments, and keyword arguments, similar to how multiprocessing does it.

def task(a, b, c, d):
    return a + b + c + d

with WorkerPool(n_jobs=1) as pool:
    result = pool.apply(task, args=(1, 2), kwargs={'d': 4, 'c': 3})
    print(result)

apply_async

The apply_async function is a non-blocking call, which means that it will return immediately. It returns an mpire.async_result.AsyncResult object, which can be used to get the result of the task at a later moment in time.

The apply_async function takes the same parameters as the apply function.

def task(a, b):
    return a + b

with WorkerPool(n_jobs=4) as pool:
    async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)]
    results = [async_result.get() for async_result in async_results]

Obtaining the results should happen while the pool is still running! E.g., the following will deadlock:

with WorkerPool(n_jobs=4) as pool:
    async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)]

# Will wait forever
results = [async_result.get() for async_result in async_results]

You can, however, make use of the mpire.WorkerPool.stop_and_join() function to stop the workers and join the pool. This will make sure that all tasks are completed before the pool exits.

with WorkerPool(n_jobs=4) as pool:
    async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)]
    pool.stop_and_join()

# Will not deadlock
results = [async_result.get() for async_result in async_results]

AsyncResult

The mpire.async_result.AsyncResult object has the following convenient methods:

with WorkerPool(n_jobs=1) as pool:
    async_result = pool.apply_async(task, args=(1, 1))

    # Check if the task is completed
    is_completed = async_result.ready()

    # Wait until the task is completed, or until the timeout is reached.
    async_result.wait(timeout=10)

    # Get the result of the task. This will block until the task is completed,
    # or until the timeout is reached.
    result = async_result.get(timeout=None)

    # Check if the task was successful (i.e., did not raise an exception).
    # This will raise an exception if the task is not completed yet.
    is_successful = async_result.successful()

Callbacks

Each apply function has a callback and error_callback argument. These are functions which are called when the task is finished. The callback function is called with the result of the task when the task was completed successfully, and the error_callback is called with the exception when the task failed.

def task(a):
    return a + 1

def callback(result):
    print("Task completed successfully with result:", result)

def error_callback(exception):
    print("Task failed with exception:", exception)

with WorkerPool(n_jobs=1) as pool:
    pool.apply(task, 42, callback=callback, error_callback=error_callback)

Worker init and exit

As with the map family of functions, the apply family of functions also has worker_init and worker_exit arguments. These are functions which are called when a worker is started and stopped, respectively. See Worker init and exit for more information on these functions.

def worker_init():
    print("Worker started")

def worker_exit():
    print("Worker stopped")

with WorkerPool(n_jobs=5) as pool:
    pool.apply(task, 42, worker_init=worker_init, worker_exit=worker_exit)

There’s a caveat though. When the first apply or apply_async function is executed, the entire pool of workers is started. This means that in the above example all five workers are started, while only one was needed. This also means that the worker_init function is set for all those workers at once. This means you cannot have a different worker_init function for each apply task. A second, different worker_init function will simply be ignored.

Similarly, the worker_exit function can only be set once as well. Additionally, exit functions are only called when a worker exits, which in this case translates to when the pool exits. This means that if you call apply or apply_async multiple times, the worker_exit function is only called once at the end. Use mpire.WorkerPool.stop_and_join() to stop the workers, which will cause the worker_exit function to be triggered for each worker.

Timeouts

The apply family of functions also has task_timeout, worker_init_timeout and worker_exit_timeout arguments. These are timeouts for the task, the worker_init function and the worker_exit function, respectively. They work similarly as those for the map functions.

When a single task times out, only that task is cancelled. The other tasks will continue to run. When a worker init or exit times out, the entire pool is stopped.

See Timeouts for more information.