map family of functions¶
Contents
mpire.WorkerPool
implements four types of parallel map
functions, being:
mpire.WorkerPool.map()
Blocks until results are ready, results are ordered in the same way as the provided arguments.
mpire.WorkerPool.map_unordered()
The same as
mpire.WorkerPool.map()
, but results are ordered by task completion time. Usually faster thanmpire.WorkerPool.map()
.mpire.WorkerPool.imap()
Lazy version of
mpire.WorkerPool.map()
, returns a generator. The generator will give results back whenever new results are ready. Results are ordered in the same way as the provided arguments.mpire.WorkerPool.imap_unordered()
The same as
mpire.WorkerPool.imap()
, but results are ordered by task completion time. Usually faster thanmpire.WorkerPool.imap()
.
When using a single worker the unordered versions are equivalent to their ordered counterparts.
Iterable of arguments¶
Each map
function should receive a function and an iterable of arguments, where the elements of the iterable can
be single values or iterables that are unpacked as arguments. If an element is a dictionary, the (key, value)
pairs
will be unpacked with the **
-operator.
def square(x):
return x * x
with WorkerPool(n_jobs=4) as pool:
# 1. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
results = pool.map(square, range(100))
The first example should work as expected, the numbers are simply squared. MPIRE knows how many tasks there are because
a range
object implements the __len__
method (see Task chunking).
with WorkerPool(n_jobs=4) as pool:
# 2. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
# Note: don't execute this, it will take a long time ...
results = pool.map(square, range(int(1e30)), iterable_len=int(1e30), chunk_size=1)
In the second example the 1e30
number is too large for Python: try calling len(range(int(1e30)))
, this will
throw an OverflowError
(don’t get me started …). Therefore, we must use the iterable_len
parameter to let
MPIRE know how large the tasks list is. We also have to specify a chunk size here as the chunk size should be lower than
sys.maxsize
.
def multiply(x, y):
return x * y
with WorkerPool(n_jobs=4) as pool:
# 3. Multiply the numbers, results should be [0, 101, 204, 309, 416, ...]
for result in pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100):
...
The third example shows an example of using multiple function arguments. Note that we use imap
in this example,
which allows us to process the results whenever they come available, not having to wait for all results to be ready.
with WorkerPool(n_jobs=4) as pool:
# 4. Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply, [{'x': 0, 'y': 100}, {'y': 101, 'x': 1}, ...]):
...
The final example shows the use of an iterable of dictionaries. The (key, value) pairs are unpacked with the
**
-operator, as you would expect. So it doesn’t matter in what order the keys are stored. This should work for
collection.OrderedDict
objects as well.
Circumvent argument unpacking¶
If you want to avoid unpacking and pass the tuples in example 3 or the dictionaries in example 4 as a whole, you can. We’ll continue on example 4, but the workaround for example 3 is similar.
Suppose we have the following function which expects a dictionary:
def multiply_dict(d):
return d['x'] * d['y']
Then you would have to convert the list of dictionaries to a list of single argument tuples, where each argument is a dictionary:
with WorkerPool(n_jobs=4) as pool:
# Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply_dict, [({'x': 0, 'y': 100},),
({'y': 101, 'x': 1},),
...]):
...
There is a utility function available that does this transformation for you:
from mpire.utils import make_single_arguments
with WorkerPool(n_jobs=4) as pool:
# Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply_dict, make_single_arguments([{'x': 0, 'y': 100},
{'y': 101, 'x': 1}, ...],
generator=False)):
...
mpire.utils.make_single_arguments()
expects an iterable of arguments and converts them to tuples accordingly. The
second argument of this function specifies if you want the function to return a generator or a materialized list. If we
would like to return a generator we would need to pass on the iterable length as well.
Mixing map
functions¶
map
functions cannot be used while another map
function is still running. E.g., the following will raise an
exception:
with WorkerPool(n_jobs=4) as pool:
imap_results = pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100)
next(imap_results) # We actually have to start the imap function
# Will raise because the imap function is still running
map_results = pool.map(square, range(100))
Make sure to first finish the imap
function before starting a new map
function. This holds for all map
functions.
Not exhausting a lazy imap
function¶
If you don’t exhaust a lazy imap
function, but do close the pool, the remaining tasks and results will be lost.
E.g., the following will raise an exception:
with WorkerPool(n_jobs=4) as pool:
imap_results = pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100)
first_result = next(imap_results) # We actually have to start the imap function
pool.terminate()
# This will raise
results = list(imap_results)
Similarly, exiting the with
block terminates the pool as well:
with WorkerPool(n_jobs=4) as pool:
imap_results = pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100)
first_result = next(imap_results) # We actually have to start the imap function
# This will raise
results = list(imap_results)