Map family

This section describes the different ways of interacting with a mpire.WorkerPool object.

map family of functions

mpire.WorkerPool implements four types of parallel map functions, being:

When using a single worker the unordered versions are equivalent to their ordered counterpart.

Important

Each map function should receive a function pointer and an iterable of arguments, where the elements of the iterable are expected to be iterables that are unpacked as arguments. If the elements are not iterables, the single value is simply passed on as the only argument. However, if a single value is a dictionary the (key, value) pairs will be unpacked with the **-operator.

A few examples:

def square(x):
    return x * x

def multiply(x, y):
    return x * y

with WorkerPool(n_jobs=4) as pool:
    # 1. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
    results = pool.map(square, range(100))

with WorkerPool(n_jobs=4) as pool:
    # 2. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
    # Note: you'll probably don't want to execute this, it will take a long time ...
    results = pool.map(square, range(int(1e30)), iterable_len=int(1e30), chunk_size=1)

with WorkerPool(n_jobs=4) as pool:
    # 3. Multiply the numbers, results should be [0, 101, 204, 309, 416, ...]
    for result in pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100):
        # Do something with this result
        print(result)

with WorkerPool(n_jobs=4) as pool:
    # 4. Multiply the numbers, results should be [0, 101, ...]
    for result in pool.imap(multiply, [{'x': 0, 'y': 100}, {'y': 101, 'x': 1}, ...]):
        # Do something with this result
        print(result)

The first example should work as expected, the numbers are simply squared. MPIRE knows how many tasks there are because a range object implements the __len__ method (see section below).

In the second example the 1e30 number is too large for Python: try calling len(range(int(1e30))), this will throw an OverflowError (I know …). Therefore, we must use the iterable_len parameter to let MPIRE know how large the tasks list is. We also have to specify a chunk size here as the chunk size should be lower than sys.maxsize.

The third example shows an example of using multiple function arguments. Also note that we use imap in the third example, which allows us to process the results whenever they come available, not having to wait for all results to be ready.

The final example shows the use of an iterable of dictionaries. The (key, value) pairs are unpacked with the **-operator, as you would expect. So it doesn’t matter in what order the keys are stored. This should work for collection.OrderedDict objects as well.

If you want to pass those dictionaries in example 4 as a whole to the following function, for example:

def multiply_dict(d):
    return d['x'] * d['y']

you would have to convert the list of dictionaries to a list of single argument tuples, where each argument is a dictionary:

with WorkerPool(n_jobs=4) as pool:
    # Multiply the numbers, results should be [0, 101, ...]
    for result in pool.imap(multiply_dict, [({'x': 0, 'y': 100},), ({'y': 101, 'x': 1},), ...]):
        # Do something with this result
        print(result)

There is, however, a utility function that does this transformation for you:

from mpire.utils import make_single_arguments

with WorkerPool(n_jobs=4) as pool:
    # Multiply the numbers, results should be [0, 101, ...]
    for result in pool.imap(multiply_dict, make_single_arguments([{'x': 0, 'y': 100},
                                                                  {'y': 101, 'x': 1}, ...],
                                                                 generator=False)):
        # Do something with this result
        print(result)

mpire.utils.make_single_arguments() expects an iterable of arguments and converts them to tuples accordingly. The second argument of this function specifies if you want the function to return a generator or a materialized list. If we would like to return a generator we would need to pass on the iterable length as well.

Task chunking

By default, MPIRE chunks the given tasks in to four times the number of jobs chunks. Each worker is given one chunk of tasks at a time before returning its results. This usually makes processing faster when you have rather small tasks (computation wise) and results are pickled/unpickled when they are send to a worker or main process. Chunking the tasks and results ensures that each process has to pickle/unpickle less often.

However, to determine the number of tasks in the argument list the iterable should implement the __len__ method, which is available in default containers like list or tuple, but isn’t available in most generator objects (the range object is one of the exceptions). To allow working with generators each map function has the option to pass the iterable length:

with WorkerPool(n_jobs=4) as pool:
    # 1. This will issue a warning and sets the chunk size to 1
    results = pool.map(square, ((x,) for x in range(100)))

    # 2. This will issue a warning as well and sets the chunk size to 1
    results = pool.map(square, ((x,) for x in range(100)), n_splits=4)

    # 3. Square the numbers using a generator using a specific number of splits
    results = pool.map(square, ((x,) for x in range(100)), iterable_len=100, n_splits=4)

    # 4. Square the numbers using a generator using automatic chunking
    results = pool.map(square, ((x,) for x in range(100)), iterable_len=100)

    # 5. Square the numbers using a generator using a fixed chunk size
    results = pool.map(square, ((x,) for x in range(100)), chunk_size=4)

In the first two examples the function call will fail because MPIRE doesn’t know how large the chunks should be as the total number of tasks is unknown, therefore it will fall back to a chunk size of 1. The third example should work as expected where 4 chunks are used. The fourth example uses 16 chunks (the default four times the number of workers). The last example uses a fixed chunk size of four, so MPIRE doesn’t need to know the iterable length.

You can also call the chunk function manually:

from mpire.utils import chunk_tasks

# Convert to list because chunk_tasks returns a generator
print(list(chunk_tasks(range(10), n_splits=3)))
print(list(chunk_tasks(range(10), chunk_size=2.5)))
print(list(chunk_tasks((x for x in range(10)), iterable_len=10, n_splits=6)))

will output:

[(0, 1, 2, 3), (4, 5, 6), (7, 8, 9)]
[(0, 1, 2), (3, 4), (5, 6, 7), (8, 9)]
[(0, 1), (2, 3), (4,), (5, 6), (7, 8), (9,)]

Numpy arrays

Chunking

Numpy arrays are treated a little bit differently when passed on to the map functions. Usually MPIRE uses itertools.islice for chunking, which depends on the __iter__ special function of the container object. But applying that to numpy arrays would yield:

import numpy as np

# Create random array
arr = np.random.rand(10, 3)

# Chunk the array using default chunking
arr_iter = iter(arr)
chunk_size = 3
while True:
    chunk = list(itertools.islice(arr_iter, chunk_size))
    if chunk:
        yield chunk
    else:
        break

with output:

[array([0.68438994, 0.9701514 , 0.40083965]), array([0.88428556, 0.2083905 , 0.61490443]),
 array([0.89249174, 0.39902235, 0.70762541])]
[array([0.18850964, 0.1022777 , 0.41539432]), array([0.07327858, 0.18608165, 0.75862301]),
 array([0.69215651, 0.4211941 , 0.31029439])]
[array([0.82571272, 0.72257819, 0.86079131]), array([0.91285817, 0.49398461, 0.27863929]),
 array([0.146981  , 0.84671211, 0.30122806])]
[array([0.11783283, 0.12585031, 0.39864368])]

In other words, each row of the array is now in its own array and each one of them is given to the target function individually. Instead, MPIRE will chunk them in to something more reasonable using numpy slicing instead:

from mpire.utils import chunk_tasks

for chunk in chunk_tasks(arr, chunk_size=chunk_size):
    print(repr(chunk))

Output:

array([[0.68438994, 0.9701514 , 0.40083965],
       [0.88428556, 0.2083905 , 0.61490443],
       [0.89249174, 0.39902235, 0.70762541]])
array([[0.18850964, 0.1022777 , 0.41539432],
       [0.07327858, 0.18608165, 0.75862301],
       [0.69215651, 0.4211941 , 0.31029439]])
array([[0.82571272, 0.72257819, 0.86079131],
       [0.91285817, 0.49398461, 0.27863929],
       [0.146981  , 0.84671211, 0.30122806]])
array([[0.11783283, 0.12585031, 0.39864368]])

Each chunk is now a single numpy array containing as many rows as the chunk size, except for the last chunk as there aren’t enough rows left.

Return value

When the user defined function returns numpy arrays and you’re applying the mpire.WorkerPool.map() function MPIRE will concatenate the resulting numpy arrays to a single array by default. For example:

def add_five(x):
    return x + 5

with WorkerPool(n_jobs=4) as pool:
    results = pool.map(add_five, arr, chunk_size=chunk_size)

will return:

array([[5.68438994, 5.9701514 , 5.40083965],
       [5.88428556, 5.2083905 , 5.61490443],
       [5.89249174, 5.39902235, 5.70762541],
       [5.18850964, 5.1022777 , 5.41539432],
       [5.07327858, 5.18608165, 5.75862301],
       [5.69215651, 5.4211941 , 5.31029439],
       [5.82571272, 5.72257819, 5.86079131],
       [5.91285817, 5.49398461, 5.27863929],
       [5.146981  , 5.84671211, 5.30122806],
       [5.11783283, 5.12585031, 5.39864368]])

This behavior can be cancelled by using the concatenate_numpy_output flag:

with WorkerPool(n_jobs=4) as pool:
    results = pool.map(add_five, arr, chunk_size=chunk_size, concatenate_numpy_output=False)

This will return individual arrays:

[array([[5.68438994, 5.9701514 , 5.40083965],
        [5.88428556, 5.2083905 , 5.61490443],
        [5.89249174, 5.39902235, 5.70762541]]),
 array([[5.18850964, 5.1022777 , 5.41539432],
        [5.07327858, 5.18608165, 5.75862301],
        [5.69215651, 5.4211941 , 5.31029439]]),
 array([[5.82571272, 5.72257819, 5.86079131],
        [5.91285817, 5.49398461, 5.27863929],
        [5.146981  , 5.84671211, 5.30122806]]),
 array([[5.11783283, 5.12585031, 5.39864368]])]

Maximum number of active tasks

When you have tasks that take up a lot of memory you can limit the number of jobs or limit the number of active tasks (i.e., the number of tasks currently being available to the workers, tasks that are in the queue ready to be processed). The first option is the most obvious one to save memory when the processes themselves use up much memory. The second is convenient when the argument list takes up too much memory. For example, suppose you want to kick off an enormous amount of jobs (let’s say a billion) of which the arguments take up 1 KB per task (e.g., large strings), then that task queue would take up ~1 TB of memory!

In such cases, a good rule of thumb would be to have twice the amount of active tasks than there are jobs. This means that when all workers complete their task at the same time each would directly be able to continue with another task. When workers take on their new tasks the generator of tasks is iterated to the point that again there would be twice the amount of active tasks.

with WorkerPool(n_jobs=4) as pool:
    # Square the numbers using a generator
    results = pool.map(square, range(int(1e300)), iterable_len=int(1e300),
                       chunk_size=int(1e5), max_tasks_active=2*4)

Worker lifespan

Occasionally, workers that process multiple, memory intensive tasks do not release their used up memory properly, which results in memory usage building up. This is not a bug in MPIRE, but a consequence of Python’s poor garbage collection. To avoid this type of problem you can set the worker lifespan: the number of tasks (well, actually the number of chunks of tasks) after which a worker should restart.

with WorkerPool(n_jobs=4) as pool:
    # Square the numbers using a generator
    results = pool.map(square, range(100), worker_lifespan=1)

In this example each worker is restarted after finishing a single chunk of tasks.

Progress bar

Progress bar support is added through the tqdm package (installed by default when installing MPIRE). The most easy way to include a progress bar is by enabling the progress_bar flag in any of the map functions:

with WorkerPool(n_jobs=4) as pool:
    pool.map(square, range(100), progress_bar=True)

This will display a basic tqdm progress bar displaying the time elapsed and remaining, number of tasks completed (including a percentage value) and the speed (i.e., number of tasks completed per time unit).

When inside a Jupyter/IPython notebook, the progress bar will change automatically to a native Jupyter widget.

Note

The Jupyter tqdm widget requires the Javascript widget to run, which might not be enabled by default. You will notice a Widget Javascript not detected error message in your notebook if so. To remedy this, enable the widget by executing jupyter nbextension enable --py --sys-prefix widgetsnbextension in your terminal before starting the notebook.

Note

Please keep in mind that to show real-time progress information MPIRE starts an additional child process, which could consume a bit of the available compute power of your machine.

Multiple progress bars with nested WorkerPools

In MPIRE you can easily print a progress bar on a different position on the terminal using the progress_bar_position parameter in the map functions, which facilitates the use of multiple progress bars. Here’s an example of using multiple progress bars using nested WorkerPools:

from mpire import tqdm

def dispatcher(worker_id, X):
    with WorkerPool(n_jobs=4) as nested_pool:
        return nested_pool.map(square, X, progress_bar=True, progress_bar_position=worker_id + 1)

def main():
    with WorkerPool(n_jobs=4, daemon=False, pass_worker_id=True) as pool:
        pool.map(dispatcher, ((range(x, x + 100),) for x in range(100)), iterable_len=100,
                 n_splits=4, progress_bar=True)

main()

We use worker_id + 1 here because the worker IDs start at zero, and we reserve position 0 for the progress bar of the main WorkerPool (which is the default).

Note

Unfortunately, multiple tqdm progress bars from child processes don’t play that nicely within a Jupyter/IPython notebook session. It’ll work but you’ll get some additional new lines in your output and it could be that your main progress bar won’t update as you would expect. Note that you can always use the MPIRE dashboard.