WorkerPool¶
This section describes the different ways of creating and setting up a mpire.WorkerPool
instance.
Contents
Starting a WorkerPool¶
The mpire.WorkerPool
class controls a pool of worker processes similarly to a multiprocessing.Pool
. It
contains all the map
like functions (with the addition of mpire.WorkerPool.map_unordered()
), but currently
lacks the apply
and apply_async
functions.
An mpire.WorkerPool
can be started in two different ways. The first and recommended way to do so is using a
context manager:
from mpire import WorkerPool
# Start a pool of 4 workers
with WorkerPool(n_jobs=4) as pool:
# Do some processing here
pass
The with
statement takes care of properly joining/terminating the spawned worker processes after the block has
ended.
The other way is to do it manually:
# Start a pool of 4 workers
pool = WorkerPool(n_jobs=4)
# Do some processing here
pass
# Only needed when keep_alive=True:
# Clean up pool (this will block until all processing has completed)
pool.stop_and_join()
# In the case you want to kill the processes, even though they are still busy
pool.terminate()
When using n_jobs=None
MPIRE will spawn as many processes as there are CPUs on your system. Specifying more jobs
than you have CPUs is, of course, possible as well.
Warning
In the manual approach, the results queue should be drained first before joining the workers, otherwise you can get
a deadlock. If you want to join either way, use mpire.WorkerPool.terminate()
. For more information, see the
warnings in the Python docs here.
Nested WorkerPools¶
By default, the mpire.WorkerPool
class spawns daemon child processes who are not able to create child processes
themselves, so nested pools are not allowed. There’s an option to create non-daemon child processes to allow for nested
structures:
def job(...)
with WorkerPool(n_jobs=4) as p:
# Do some work
results = p.map(...)
with WorkerPool(n_jobs=4, daemon=True) as pool:
# This will raise an AssertionError telling you daemon processes
# can't start child processes
pool.map(job, ...)
with WorkerPool(n_jobs=4, daemon=False) as pool:
# This will work just fine
pool.map(job, ...)
Note
Nested pools aren’t supported when using threading.
Note
Due to a strange bug in Python, using forkserver
as start method in a nested pool is not allowed when the
outer pool is using fork
, as the forkserver will not have been started there. For it to work your outer pool
will have to have either spawn
or forkserver
as start method.
Warning
Nested pools aren’t production ready. Error handling and keyboard interrupts when using nested pools can, on some rare occassions (~1% of the time), still cause deadlocks. Use at your own risk.
When a function is guaranteed to finish successfully, using nested pools is absolutely fine.
CPU pinning¶
You can pin the child processes of mpire.WorkerPool
to specific CPUs by using the cpu_ids
parameter in the
constructor:
# Pin the two child processes to CPUs 2 and 3
with WorkerPool(n_jobs=2, cpu_ids=[2, 3]) as pool:
...
# Pin the child processes to CPUs 40-59
with WorkerPool(n_jobs=20, cpu_ids=list(range(40, 60))) as pool:
...
# All child processes have to share a single core:
with WorkerPool(n_jobs=4, cpu_ids=[0]) as pool:
...
# All child processes have to share multiple cores, namely 4-7:
with WorkerPool(n_jobs=4, cpu_ids=[[4, 5, 6, 7]]) as pool:
...
# Each child process can use two distinctive cores:
with WorkerPool(n_jobs=4, cpu_ids=[[0, 1], [2, 3], [4, 5], [6, 7]]) as pool:
...
CPU IDs have to be positive integers, not exceeding the number of CPUs available (which can be retrieved by using
mpire.cpu_count()
). Use None
to disable CPU pinning (which is the default).
Note
Pinning processes to CPU IDs doesn’t work when using threading.
Accessing the worker ID¶
Each worker in MPIRE is given an integer ID to distinguish them. Worker #1 will have ID 0
, #2 will have ID 1
,
etc. Sometimes it can be useful to have access to this ID. For example, when you have a shared array of which the size
equals the number of workers and you want worker #1 only to access the first element, and worker #2 only to access the
second element, and so on.
By default, the worker ID is not passed on. You can enable/disable this by setting the pass_worker_id
flag:
def square_sum(worker_id, shared_objects, x):
# Even though the shared objects is a single container, we 'unpack' it anyway
results_container = shared_objects
# Square and sum
results_container[worker_id] += x * x
# Use a shared array of size equal to the number of jobs to store the results
results_container = Array('f', 4, lock=False)
with WorkerPool(n_jobs=4, shared_objects=results_container, pass_worker_id=True) as pool:
# Square the results and store them in the results container
pool.map_unordered(square_sum, range(100))
Important
The worker ID will always be the first argument passed on to the provided function pointer.
Instead of passing the flag to the mpire.WorkerPool
constructor you can also make use of
mpire.WorkerPool.pass_on_worker_id()
:
with WorkerPool(n_jobs=4, shared_objects=results_container) as pool:
pool.pass_on_worker_id()
pool.map_unordered(square_sum, range(100))
Worker state¶
If you want to let each worker have its own state you can use the use_worker_state
flag. The worker state can be
combined with the worker_init
and worker_exit
parameters of each map
function, leading to some really useful
capabilities:
import numpy as np
import pickle
def load_big_model(worker_state):
# Load a model which takes up a lot of memory
with open('./a_really_big_model.p3', 'rb') as f:
worker_state['model'] = pickle.load(f)
def model_predict(worker_state, x):
# Predict
return worker_state['model'].predict(x)
with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
# Let the model predict
data = np.array([[...]])
results = pool.map(model_predict, data, worker_init=load_big_model)
Important
The worker state is passed on as the third argument, after the worker ID and shared objects (when enabled), to the provided function pointer.
More information about the worker_init
and worker_exit
parameters can be found at Worker init and exit.
Instead of passing the flag to the mpire.WorkerPool
constructor you can also make use of
mpire.WorkerPool.set_use_worker_state()
:
with WorkerPool(n_jobs=4) as pool:
pool.set_use_worker_state()
results = pool.map(model_predict, data, worker_init=load_big_model)
Process start method¶
The multiprocessing
package allows you to start processes using a few different methods: 'fork'
, 'spawn'
or
'forkserver'
. Threading is also available by using 'threading'
. For detailed information on the multiprocessing
contexts, please refer to the multiprocessing documentation and caveats section. In short:
'fork'
(the default on Unix based systems) copies the parent process such that the child process is effectively identical. This includes copying everything currently in memory. This is sometimes useful, but other times useless or even a serious bottleneck.'spawn'
(the default on Windows) starts a fresh python interpreter where only those resources necessary are inherited.'forkserver'
first starts a server process (using spawn). Whenever a new process is needed the parent process requests the server to fork a new process.'threading'
starts child threads.
Be aware that global variables (constants are fine) might have a different value than you might expect. You also have to import packages within the called function:
import os
def failing_job(folder, filename):
return os.path.join(folder, filename)
# This will fail because 'os' is not copied to the child processes
with WorkerPool(n_jobs=2, start_method='spawn') as pool:
pool.map(failing_job, [('folder', '0.p3'), ('folder', '1.p3')])
def working_job(folder, filename):
import os
return os.path.join(folder, filename)
# This will work
with WorkerPool(n_jobs=2, start_method='spawn') as pool:
pool.map(working_job, [('folder', '0.p3'), ('folder', '1.p3')])
A lot of effort has been put into making the progress bar, dashboard, and nested pools (with multiple progress bars)
work well with spawn
and forkserver
. So, everything should work fine.
Keep alive¶
Workers can be kept alive in between consecutive map calls using the keep_alive
flag. This is useful when your
workers have a long startup time and you need to call one of the map functions multiple times. When either the function
to execute or the worker_lifespan
parameter changes MPIRE will ignore the flag as it needs to restart the workers
anyway.
Building further on the worker state example:
import numpy as np
import pickle
def load_big_model():
# Load a model which takes up a lot of memory
with open('./a_really_big_model.p3', 'rb') as f:
return pickle.load(f)
def model_predict(worker_state, x):
# Load model
if 'model' not in worker_state:
worker_state['model'] = load_big_model()
# Predict
return worker_state['model'].predict(x)
with WorkerPool(n_jobs=4, use_worker_state=True, keep_alive=True) as pool:
# Let the model predict
data = np.array([[...]])
results = pool.map(model_predict, data)
# Do something with the results
...
# Let the model predict some more. In this call the workers are reused,
# which means the big model doesn't need to be loaded again
data = np.array([[...]])
results = pool.map(model_predict, data)
# Workers are restarted in this case because the function changed
pool.map(square_sum, range(100))
Instead of passing the flag to the mpire.WorkerPool
constructor you can also make use of
mpire.WorkerPool.set_keep_alive()
:
with WorkerPool(n_jobs=4) as pool:
pool.set_keep_alive()
pool.map_unordered(square_sum, range(100))
Dill¶
For some functions or tasks it can be useful to not rely on pickle, but on some more powerful serialization backends
like dill. dill
isn’t installed by default. See Dill for more information on installing the dependencies.
For all benefits of dill
, please refer to the dill documentation.
Once the dependencies have been installed, you can enable it using the use_dill
flag:
with WorkerPool(n_jobs=4, use_dill=True) as pool:
...
Note
When using dill
it can potentially slow down processing. This is the cost of having a more reliable and
powerful serialization backend.