Map family¶
This section describes the different ways of interacting with a mpire.WorkerPool
object.
Contents
map family of functions¶
mpire.WorkerPool
implements four types of parallel map
functions, being:
mpire.WorkerPool.map()
: Blocks until results are ready, results are ordered in the same way as the provided argumentsmpire.WorkerPool.map_unordered()
: The same asmpire.WorkerPool.map()
, but results are ordered by task completion time. Usually faster thanmpire.WorkerPool.map()
.mpire.WorkerPool.imap()
: Lazy version ofmpire.WorkerPool.map()
, returns a generator. The generator will give results back whenever new results are ready. Results are ordered in the same way as the provided arguments.mpire.WorkerPool.imap_unordered()
: The same asmpire.WorkerPool.imap()
, but results are ordered by task completion time. Usually faster thanmpire.WorkerPool.imap()
.
When using a single worker the unordered versions are equivalent to their ordered counterpart.
Important
Each map
function should receive a function pointer and an iterable of arguments, where the elements of the
iterable are expected to be iterables that are unpacked as arguments. If the elements are not iterables, the single
value is simply passed on as the only argument. However, if a single value is a dictionary the (key, value) pairs
will be unpacked with the **
-operator.
A few examples:
def square(x):
return x * x
def multiply(x, y):
return x * y
with WorkerPool(n_jobs=4) as pool:
# 1. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
results = pool.map(square, range(100))
with WorkerPool(n_jobs=4) as pool:
# 2. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
# Note: you'll probably don't want to execute this, it will take a long time ...
results = pool.map(square, range(int(1e30)), iterable_len=int(1e30), chunk_size=1)
with WorkerPool(n_jobs=4) as pool:
# 3. Multiply the numbers, results should be [0, 101, 204, 309, 416, ...]
for result in pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100):
# Do something with this result
print(result)
with WorkerPool(n_jobs=4) as pool:
# 4. Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply, [{'x': 0, 'y': 100}, {'y': 101, 'x': 1}, ...]):
# Do something with this result
print(result)
The first example should work as expected, the numbers are simply squared. MPIRE knows how many tasks there are because
a range
object implements the __len__
method (see section below).
In the second example the 1e30
number is too large for Python: try calling len(range(int(1e30)))
, this will
throw an OverflowError
(I know …). Therefore, we must use the iterable_len
parameter to let MPIRE know how
large the tasks list is. We also have to specify a chunk size here as the chunk size should be lower than
sys.maxsize
.
The third example shows an example of using multiple function arguments. Also note that we use imap
in the third
example, which allows us to process the results whenever they come available, not having to wait for all results to be
ready.
The final example shows the use of an iterable of dictionaries. The (key, value) pairs are unpacked with the
**
-operator, as you would expect. So it doesn’t matter in what order the keys are stored. This should work for
collection.OrderedDict
objects as well.
If you want to pass those dictionaries in example 4 as a whole to the following function, for example:
def multiply_dict(d):
return d['x'] * d['y']
you would have to convert the list of dictionaries to a list of single argument tuples, where each argument is a dictionary:
with WorkerPool(n_jobs=4) as pool:
# Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply_dict, [({'x': 0, 'y': 100},), ({'y': 101, 'x': 1},), ...]):
# Do something with this result
print(result)
There is, however, a utility function that does this transformation for you:
from mpire.utils import make_single_arguments
with WorkerPool(n_jobs=4) as pool:
# Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply_dict, make_single_arguments([{'x': 0, 'y': 100},
{'y': 101, 'x': 1}, ...],
generator=False)):
# Do something with this result
print(result)
mpire.utils.make_single_arguments()
expects an iterable of arguments and converts them to tuples accordingly. The
second argument of this function specifies if you want the function to return a generator or a materialized list. If we
would like to return a generator we would need to pass on the iterable length as well.
Task chunking¶
By default, MPIRE chunks the given tasks in to four times the number of jobs chunks. Each worker is given one chunk of tasks at a time before returning its results. This usually makes processing faster when you have rather small tasks (computation wise) and results are pickled/unpickled when they are send to a worker or main process. Chunking the tasks and results ensures that each process has to pickle/unpickle less often.
However, to determine the number of tasks in the argument list the iterable should implement the __len__
method,
which is available in default containers like list
or tuple
, but isn’t available in most generator objects
(the range
object is one of the exceptions). To allow working with generators each map
function has the option
to pass the iterable length:
with WorkerPool(n_jobs=4) as pool:
# 1. This will issue a warning and sets the chunk size to 1
results = pool.map(square, ((x,) for x in range(100)))
# 2. This will issue a warning as well and sets the chunk size to 1
results = pool.map(square, ((x,) for x in range(100)), n_splits=4)
# 3. Square the numbers using a generator using a specific number of splits
results = pool.map(square, ((x,) for x in range(100)), iterable_len=100, n_splits=4)
# 4. Square the numbers using a generator using automatic chunking
results = pool.map(square, ((x,) for x in range(100)), iterable_len=100)
# 5. Square the numbers using a generator using a fixed chunk size
results = pool.map(square, ((x,) for x in range(100)), chunk_size=4)
In the first two examples the function call will fail because MPIRE doesn’t know how large the chunks should be as the total number of tasks is unknown, therefore it will fall back to a chunk size of 1. The third example should work as expected where 4 chunks are used. The fourth example uses 16 chunks (the default four times the number of workers). The last example uses a fixed chunk size of four, so MPIRE doesn’t need to know the iterable length.
You can also call the chunk function manually:
from mpire.utils import chunk_tasks
# Convert to list because chunk_tasks returns a generator
print(list(chunk_tasks(range(10), n_splits=3)))
print(list(chunk_tasks(range(10), chunk_size=2.5)))
print(list(chunk_tasks((x for x in range(10)), iterable_len=10, n_splits=6)))
will output:
[(0, 1, 2, 3), (4, 5, 6), (7, 8, 9)]
[(0, 1, 2), (3, 4), (5, 6, 7), (8, 9)]
[(0, 1), (2, 3), (4,), (5, 6), (7, 8), (9,)]
Numpy arrays¶
Chunking¶
Numpy arrays are treated a little bit differently when passed on to the map
functions. Usually MPIRE uses
itertools.islice
for chunking, which depends on the __iter__
special function of the container object. But
applying that to numpy arrays would yield:
import numpy as np
# Create random array
arr = np.random.rand(10, 3)
# Chunk the array using default chunking
arr_iter = iter(arr)
chunk_size = 3
while True:
chunk = list(itertools.islice(arr_iter, chunk_size))
if chunk:
yield chunk
else:
break
with output:
[array([0.68438994, 0.9701514 , 0.40083965]), array([0.88428556, 0.2083905 , 0.61490443]),
array([0.89249174, 0.39902235, 0.70762541])]
[array([0.18850964, 0.1022777 , 0.41539432]), array([0.07327858, 0.18608165, 0.75862301]),
array([0.69215651, 0.4211941 , 0.31029439])]
[array([0.82571272, 0.72257819, 0.86079131]), array([0.91285817, 0.49398461, 0.27863929]),
array([0.146981 , 0.84671211, 0.30122806])]
[array([0.11783283, 0.12585031, 0.39864368])]
In other words, each row of the array is now in its own array and each one of them is given to the target function individually. Instead, MPIRE will chunk them in to something more reasonable using numpy slicing instead:
from mpire.utils import chunk_tasks
for chunk in chunk_tasks(arr, chunk_size=chunk_size):
print(repr(chunk))
Output:
array([[0.68438994, 0.9701514 , 0.40083965],
[0.88428556, 0.2083905 , 0.61490443],
[0.89249174, 0.39902235, 0.70762541]])
array([[0.18850964, 0.1022777 , 0.41539432],
[0.07327858, 0.18608165, 0.75862301],
[0.69215651, 0.4211941 , 0.31029439]])
array([[0.82571272, 0.72257819, 0.86079131],
[0.91285817, 0.49398461, 0.27863929],
[0.146981 , 0.84671211, 0.30122806]])
array([[0.11783283, 0.12585031, 0.39864368]])
Each chunk is now a single numpy array containing as many rows as the chunk size, except for the last chunk as there aren’t enough rows left.
Return value¶
When the user defined function returns numpy arrays and you’re applying the mpire.WorkerPool.map()
function MPIRE
will concatenate the resulting numpy arrays to a single array by default. For example:
def add_five(x):
return x + 5
with WorkerPool(n_jobs=4) as pool:
results = pool.map(add_five, arr, chunk_size=chunk_size)
will return:
array([[5.68438994, 5.9701514 , 5.40083965],
[5.88428556, 5.2083905 , 5.61490443],
[5.89249174, 5.39902235, 5.70762541],
[5.18850964, 5.1022777 , 5.41539432],
[5.07327858, 5.18608165, 5.75862301],
[5.69215651, 5.4211941 , 5.31029439],
[5.82571272, 5.72257819, 5.86079131],
[5.91285817, 5.49398461, 5.27863929],
[5.146981 , 5.84671211, 5.30122806],
[5.11783283, 5.12585031, 5.39864368]])
This behavior can be cancelled by using the concatenate_numpy_output
flag:
with WorkerPool(n_jobs=4) as pool:
results = pool.map(add_five, arr, chunk_size=chunk_size, concatenate_numpy_output=False)
This will return individual arrays:
[array([[5.68438994, 5.9701514 , 5.40083965],
[5.88428556, 5.2083905 , 5.61490443],
[5.89249174, 5.39902235, 5.70762541]]),
array([[5.18850964, 5.1022777 , 5.41539432],
[5.07327858, 5.18608165, 5.75862301],
[5.69215651, 5.4211941 , 5.31029439]]),
array([[5.82571272, 5.72257819, 5.86079131],
[5.91285817, 5.49398461, 5.27863929],
[5.146981 , 5.84671211, 5.30122806]]),
array([[5.11783283, 5.12585031, 5.39864368]])]
Maximum number of active tasks¶
When you have tasks that take up a lot of memory you can limit the number of jobs or limit the number of active tasks (i.e., the number of tasks currently being available to the workers, tasks that are in the queue ready to be processed). The first option is the most obvious one to save memory when the processes themselves use up much memory. The second is convenient when the argument list takes up too much memory. For example, suppose you want to kick off an enormous amount of jobs (let’s say a billion) of which the arguments take up 1 KB per task (e.g., large strings), then that task queue would take up ~1 TB of memory!
In such cases, a good rule of thumb would be to have twice the amount of active tasks than there are jobs. This means that when all workers complete their task at the same time each would directly be able to continue with another task. When workers take on their new tasks the generator of tasks is iterated to the point that again there would be twice the amount of active tasks.
with WorkerPool(n_jobs=4) as pool:
# Square the numbers using a generator
results = pool.map(square, range(int(1e300)), iterable_len=int(1e300),
chunk_size=int(1e5), max_tasks_active=2*4)
Worker lifespan¶
Occasionally, workers that process multiple, memory intensive tasks do not release their used up memory properly, which results in memory usage building up. This is not a bug in MPIRE, but a consequence of Python’s poor garbage collection. To avoid this type of problem you can set the worker lifespan: the number of tasks (well, actually the number of chunks of tasks) after which a worker should restart.
with WorkerPool(n_jobs=4) as pool:
# Square the numbers using a generator
results = pool.map(square, range(100), worker_lifespan=1)
In this example each worker is restarted after finishing a single chunk of tasks.
Progress bar¶
Progress bar support is added through the tqdm package (installed by default when installing MPIRE). The most easy way
to include a progress bar is by enabling the progress_bar
flag in any of the map
functions:
with WorkerPool(n_jobs=4) as pool:
pool.map(square, range(100), progress_bar=True)
This will display a basic tqdm
progress bar displaying the time elapsed and remaining, number of tasks completed
(including a percentage value) and the speed (i.e., number of tasks completed per time unit).
When inside a Jupyter/IPython notebook, the progress bar will change automatically to a native Jupyter widget.
Note
The Jupyter tqdm
widget requires the Javascript widget to run, which might not be enabled by default. You will
notice a Widget Javascript not detected
error message in your notebook if so. To remedy this, enable the widget
by executing jupyter nbextension enable --py --sys-prefix widgetsnbextension
in your terminal before starting
the notebook.
Note
Please keep in mind that to show real-time progress information MPIRE starts an additional child process, which could consume a bit of the available compute power of your machine.
Multiple progress bars with nested WorkerPools¶
In MPIRE you can easily print a progress bar on a different position on the terminal using the progress_bar_position
parameter in the map functions, which facilitates the use of multiple progress bars. Here’s an example of using multiple
progress bars using nested WorkerPools:
from mpire import tqdm
def dispatcher(worker_id, X):
with WorkerPool(n_jobs=4) as nested_pool:
return nested_pool.map(square, X, progress_bar=True, progress_bar_position=worker_id + 1)
def main():
with WorkerPool(n_jobs=4, daemon=False, pass_worker_id=True) as pool:
pool.map(dispatcher, ((range(x, x + 100),) for x in range(100)), iterable_len=100,
n_splits=4, progress_bar=True)
main()
We use worker_id + 1
here because the worker IDs start at zero, and we reserve position 0 for the progress bar of
the main WorkerPool (which is the default).
Note
Unfortunately, multiple tqdm
progress bars from child processes don’t play that nicely within a Jupyter/IPython
notebook session. It’ll work but you’ll get some additional new lines in your output and it could be that your main
progress bar won’t update as you would expect. Note that you can always use the MPIRE dashboard.