Map family

This section describes the different ways of interacting with a mpire.WorkerPool instance.

map family of functions

mpire.WorkerPool implements four types of parallel map functions, being:

When using a single worker the unordered versions are equivalent to their ordered counterparts.

Important

Each map function should receive a function pointer and an iterable of arguments, where the elements of the iterable can be single values or iterables that are unpacked as arguments. If an element is a dictionary, the (key, value) pairs will be unpacked with the **-operator. Look at the examples below on ways to circumvent this behavior.

A few examples:

def square(x):
    return x * x

def multiply(x, y):
    return x * y

with WorkerPool(n_jobs=4) as pool:
    # 1. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
    results = pool.map(square, range(100))

with WorkerPool(n_jobs=4) as pool:
    # 2. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
    # Note: don't execute this, it will take a long time ...
    results = pool.map(square, range(int(1e30)), iterable_len=int(1e30), chunk_size=1)

with WorkerPool(n_jobs=4) as pool:
    # 3. Multiply the numbers, results should be [0, 101, 204, 309, 416, ...]
    for result in pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100):
        # Do something with this result
        print(result)

with WorkerPool(n_jobs=4) as pool:
    # 4. Multiply the numbers, results should be [0, 101, ...]
    for result in pool.imap(multiply, [{'x': 0, 'y': 100}, {'y': 101, 'x': 1}, ...]):
        # Do something with this result
        print(result)

The first example should work as expected, the numbers are simply squared. MPIRE knows how many tasks there are because a range object implements the __len__ method (see Task chunking).

In the second example the 1e30 number is too large for Python: try calling len(range(int(1e30))), this will throw an OverflowError (don’t get me started …). Therefore, we must use the iterable_len parameter to let MPIRE know how large the tasks list is. We also have to specify a chunk size here as the chunk size should be lower than sys.maxsize.

The third example shows an example of using multiple function arguments. Note that we use imap in the third example, which allows us to process the results whenever they come available, not having to wait for all results to be ready.

The final example shows the use of an iterable of dictionaries. The (key, value) pairs are unpacked with the **-operator, as you would expect. So it doesn’t matter in what order the keys are stored. This should work for collection.OrderedDict objects as well.

If you want to pass those dictionaries in example 4 as a whole to the following function, for example:

def multiply_dict(d):
    return d['x'] * d['y']

you would have to convert the list of dictionaries to a list of single argument tuples, where each argument is a dictionary:

with WorkerPool(n_jobs=4) as pool:
    # Multiply the numbers, results should be [0, 101, ...]
    for result in pool.imap(multiply_dict, [({'x': 0, 'y': 100},),
                                            ({'y': 101, 'x': 1},),
                                            ...]):
        # Do something with this result
        print(result)

There is a utility function available that does this transformation for you:

from mpire.utils import make_single_arguments

with WorkerPool(n_jobs=4) as pool:
    # Multiply the numbers, results should be [0, 101, ...]
    for result in pool.imap(multiply_dict, make_single_arguments([{'x': 0, 'y': 100},
                                                                  {'y': 101, 'x': 1}, ...],
                                                                 generator=False)):
        # Do something with this result
        print(result)

mpire.utils.make_single_arguments() expects an iterable of arguments and converts them to tuples accordingly. The second argument of this function specifies if you want the function to return a generator or a materialized list. If we would like to return a generator we would need to pass on the iterable length as well.

Task chunking

By default, MPIRE chunks the given tasks in to 64 * n_jobs chunks. Each worker is given one chunk of tasks at a time before returning its results. This usually makes processing faster when you have rather small tasks (computation wise) and results are pickled/unpickled when they are send to a worker or main process. Chunking the tasks and results ensures that each process has to pickle/unpickle less often.

However, to determine the number of tasks in the argument list the iterable should implement the __len__ method, which is available in default containers like list or tuple, but isn’t available in most generator objects (the range object is one of the exceptions). To allow working with generators each map function has the option to pass the iterable length:

with WorkerPool(n_jobs=4) as pool:
    # 1. This will issue a warning and sets the chunk size to 1
    results = pool.map(square, ((x,) for x in range(1000)))

    # 2. This will issue a warning as well and sets the chunk size to 1
    results = pool.map(square, ((x,) for x in range(1000)), n_splits=4)

    # 3. Square the numbers using a generator using a specific number of splits
    results = pool.map(square, ((x,) for x in range(1000)), iterable_len=1000, n_splits=4)

    # 4. Square the numbers using a generator using automatic chunking
    results = pool.map(square, ((x,) for x in range(1000)), iterable_len=1000)

    # 5. Square the numbers using a generator using a fixed chunk size
    results = pool.map(square, ((x,) for x in range(1000)), chunk_size=4)

In the first two examples the function call will issue a warning because MPIRE doesn’t know how large the chunks should be as the total number of tasks is unknown, therefore it will fall back to a chunk size of 1. The third example should work as expected where 4 chunks are used. The fourth example uses 256 chunks (the default 64 times the number of workers). The last example uses a fixed chunk size of four, so MPIRE doesn’t need to know the iterable length.

You can also call the chunk function manually:

from mpire.utils import chunk_tasks

# Convert to list because chunk_tasks returns a generator
print(list(chunk_tasks(range(10), n_splits=3)))
print(list(chunk_tasks(range(10), chunk_size=2.5)))
print(list(chunk_tasks((x for x in range(10)), iterable_len=10, n_splits=6)))

will output:

[(0, 1, 2, 3), (4, 5, 6), (7, 8, 9)]
[(0, 1, 2), (3, 4), (5, 6, 7), (8, 9)]
[(0, 1), (2, 3), (4,), (5, 6), (7, 8), (9,)]

Numpy arrays

Chunking

Numpy arrays are treated a little bit differently when passed on to the map functions. Usually MPIRE uses itertools.islice for chunking, which depends on the __iter__ special function of the container object. But applying that to numpy arrays would yield:

import numpy as np

# Create random array
arr = np.random.rand(10, 3)

# Chunk the array using default chunking
arr_iter = iter(arr)
chunk_size = 3
while True:
    chunk = list(itertools.islice(arr_iter, chunk_size))
    if chunk:
        yield chunk
    else:
        break

with output:

[array([0.68438994, 0.9701514 , 0.40083965]), array([0.88428556, 0.2083905 , 0.61490443]),
 array([0.89249174, 0.39902235, 0.70762541])]
[array([0.18850964, 0.1022777 , 0.41539432]), array([0.07327858, 0.18608165, 0.75862301]),
 array([0.69215651, 0.4211941 , 0.31029439])]
[array([0.82571272, 0.72257819, 0.86079131]), array([0.91285817, 0.49398461, 0.27863929]),
 array([0.146981  , 0.84671211, 0.30122806])]
[array([0.11783283, 0.12585031, 0.39864368])]

In other words, each row of the array is now in its own array and each one of them is given to the target function individually. Instead, MPIRE will chunk them in to something more reasonable using numpy slicing instead:

from mpire.utils import chunk_tasks

for chunk in chunk_tasks(arr, chunk_size=chunk_size):
    print(repr(chunk))

Output:

array([[0.68438994, 0.9701514 , 0.40083965],
       [0.88428556, 0.2083905 , 0.61490443],
       [0.89249174, 0.39902235, 0.70762541]])
array([[0.18850964, 0.1022777 , 0.41539432],
       [0.07327858, 0.18608165, 0.75862301],
       [0.69215651, 0.4211941 , 0.31029439]])
array([[0.82571272, 0.72257819, 0.86079131],
       [0.91285817, 0.49398461, 0.27863929],
       [0.146981  , 0.84671211, 0.30122806]])
array([[0.11783283, 0.12585031, 0.39864368]])

Each chunk is now a single numpy array containing as many rows as the chunk size, except for the last chunk as there aren’t enough rows left.

Return value

When the user defined function returns numpy arrays and you’re applying the mpire.WorkerPool.map() function MPIRE will concatenate the resulting numpy arrays to a single array by default. For example:

def add_five(x):
    return x + 5

with WorkerPool(n_jobs=4) as pool:
    results = pool.map(add_five, arr, chunk_size=chunk_size)

will return:

array([[5.68438994, 5.9701514 , 5.40083965],
       [5.88428556, 5.2083905 , 5.61490443],
       [5.89249174, 5.39902235, 5.70762541],
       [5.18850964, 5.1022777 , 5.41539432],
       [5.07327858, 5.18608165, 5.75862301],
       [5.69215651, 5.4211941 , 5.31029439],
       [5.82571272, 5.72257819, 5.86079131],
       [5.91285817, 5.49398461, 5.27863929],
       [5.146981  , 5.84671211, 5.30122806],
       [5.11783283, 5.12585031, 5.39864368]])

This behavior can be cancelled by using the concatenate_numpy_output flag:

with WorkerPool(n_jobs=4) as pool:
    results = pool.map(add_five, arr, chunk_size=chunk_size, concatenate_numpy_output=False)

This will return individual arrays:

[array([[5.68438994, 5.9701514 , 5.40083965],
        [5.88428556, 5.2083905 , 5.61490443],
        [5.89249174, 5.39902235, 5.70762541]]),
 array([[5.18850964, 5.1022777 , 5.41539432],
        [5.07327858, 5.18608165, 5.75862301],
        [5.69215651, 5.4211941 , 5.31029439]]),
 array([[5.82571272, 5.72257819, 5.86079131],
        [5.91285817, 5.49398461, 5.27863929],
        [5.146981  , 5.84671211, 5.30122806]]),
 array([[5.11783283, 5.12585031, 5.39864368]])]

Maximum number of active tasks

When you have tasks that take up a lot of memory you can limit the number of jobs or limit the number of active tasks (i.e., the number of tasks currently being available to the workers, tasks that are in the queue ready to be processed). The first option is the most obvious one to save memory when the processes themselves use up much memory. The second is convenient when the argument list takes up too much memory. For example, suppose you want to kick off an enormous amount of jobs (let’s say a billion) of which the arguments take up 1 KB per task (e.g., large strings), then that task queue would take up ~1 TB of memory!

In such cases, a good rule of thumb would be to have twice the amount of active tasks than there are jobs. This means that when all workers complete their task at the same time each would directly be able to continue with another task. When workers take on their new tasks the generator of tasks is iterated to the point that again there would be twice the amount of active tasks.

with WorkerPool(n_jobs=4) as pool:
    # Square the numbers using a generator
    results = pool.map(square, range(int(1e300)), iterable_len=int(1e300),
                       chunk_size=int(1e5), max_tasks_active=2*4)

Worker lifespan

Occasionally, workers that process multiple, memory intensive tasks do not release their used up memory properly, which results in memory usage building up. This is not a bug in MPIRE, but a consequence of Python’s poor garbage collection. To avoid this type of problem you can set the worker lifespan: the number of tasks after which a worker should restart.

with WorkerPool(n_jobs=4) as pool:
    # Square the numbers using a generator
    results = pool.map(square, range(100), worker_lifespan=1, chunk_size=1)

In this example each worker is restarted after finishing a single task.

Note

When the worker lifespan has been reached, a worker will finish the current chunk of tasks before restarting. I.e., based on the chunk_size a worker could end up completing more tasks than is allowed by the worker lifespan.

Progress bar

Progress bar support is added through the tqdm package (installed by default when installing MPIRE). The most easy way to include a progress bar is by enabling the progress_bar flag in any of the map functions:

with WorkerPool(n_jobs=4) as pool:
    pool.map(square, range(100), progress_bar=True)

This will display a basic tqdm progress bar displaying the time elapsed and remaining, number of tasks completed (including a percentage value) and the speed (i.e., number of tasks completed per time unit).

When inside a Jupyter/IPython notebook, the progress bar will change automatically to a native Jupyter widget.

Note

The Jupyter tqdm widget requires the Javascript widget to run, which might not be enabled by default. You will notice a Widget Javascript not detected error message in your notebook if so. To remedy this, enable the widget by executing jupyter nbextension enable --py --sys-prefix widgetsnbextension in your terminal before starting the notebook.

Note

Please keep in mind that to show real-time progress information MPIRE starts an additional child process, which could consume a bit of the available compute power of your machine. However, this is often negligible.

Multiple progress bars with nested WorkerPools

In MPIRE you can easily print a progress bar on a different position on the terminal using the progress_bar_position parameter in the map functions, which facilitates the use of multiple progress bars. Here’s an example of using multiple progress bars using nested WorkerPools:

from mpire import tqdm

def dispatcher(worker_id, X):
    with WorkerPool(n_jobs=4) as nested_pool:
        return nested_pool.map(square, X, progress_bar=True,
                               progress_bar_position=worker_id + 1)

def main():
    with WorkerPool(n_jobs=4, daemon=False, pass_worker_id=True) as pool:
        pool.map(dispatcher, ((range(x, x + 100),) for x in range(100)), iterable_len=100,
                 n_splits=4, progress_bar=True)

main()

We use worker_id + 1 here because the worker IDs start at zero, and we reserve position 0 for the progress bar of the main WorkerPool (which is the default).

It goes without saying that you shouldn’t specify the same progress bar position multiple times.

Note

The progress bar position is completely ignored when in a Jupyter/IPython notebook session or in the MPIRE dashboard.

Worker init and exit

When you want to initialize a worker you can make use of the worker_init parameter of any map function. This will call the initialization function only once per worker. Similarly, if you need to clean up the worker at the end of its lifecycle you can use the worker_exit parameter. Additionally, the exit function can return anything you like, which can be collected using mpire.WorkerPool.get_exit_results() after the workers are done.

For example:

def init_func(worker_state):
    # Initialize a counter for each worker
    worker_state['count_even'] = 0

def square_and_count_even(worker_state, x):
    # Count number of even numbers and return the square
    if x % 2 == 0:
        worker_state['count_even'] += 1
    return x**x

def exit_func(worker_state):
    # Return the counter
    return worker_state['count_even']

with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
    pool.map(square_and_count_even, range(100), worker_init=init_func, worker_exit=exit_func)
    print(pool.get_exit_results())  # Output, e.g.: [13, 13, 12, 12]
    print(sum(pool.get_exit_results()))  # Output: 50

Both init and exit functions receive the worker ID, shared objects, and worker state in the same way as the task function does, given they’re enabled.

Important

When the worker_lifespan option is used to restart workers during execution, the exit function will be called for the worker that’s shutting down and the init function will be called again for the new worker. Therefore, the number of elements in the list that’s returned from mpire.WorkerPool.get_exit_results() does not always equal n_jobs.

Important

When keep_alive is enabled the workers won’t be terminated after a map call. This means the exit function won’t be called until it’s time for cleaning up the entire pool. You will have to explicitly call mpire.WorkerPool.stop_and_join() to receive the exit results.

One usecase for these functions is to load a big dataset or model that is needed for every task only once per worker (see Worker state for this example). Another usecase would be to set up and close a database connection on init and exit.

Worker insights

Worker insights gives you insight in your multiprocessing efficiency by tracking worker start up time, waiting time and time spend on executing tasks. Tracking is disabled by default, but can be enabled by setting enable_insights in the map functions:

with WorkerPool(n_jobs=4) as pool:
    pool.map(square, range(100), enable_insights=True)

The overhead is very minimal and you shouldn’t really notice it, even on very small tasks. You can view the tracking results using mpire.WorkerPool.get_insights() or use mpire.WorkerPool.print_insights() to directly print the insights to console:

import time

def sleep_and_square(x):
    # For illustration purposes
    time.sleep(x / 1000)
    return x * x

with WorkerPool(n_jobs=4) as pool:
    pool.map(sleep_and_square, range(100), enable_insights=True)
    insights = pool.get_insights()
    print(insights)

# Output:
{'n_completed_tasks': [28, 24, 24, 24],
 'total_start_up_time': '0:00:00.038',
 'total_init_time': '0:00:00',
 'total_waiting_time': '0:00:00.798',
 'total_working_time': '0:00:04.980',
 'total_exit_time': '0:00:00',
 'total_time': '0:00:05.816',
 'start_up_time': ['0:00:00.010', '0:00:00.008', '0:00:00.008', '0:00:00.011'],
 'start_up_time_mean': '0:00:00.009',
 'start_up_time_std': '0:00:00.001',
 'start_up_ratio': 0.006610452621805033,
 'init_time': ['0:00:00', '0:00:00', '0:00:00', '0:00:00'],
 'init_time_mean': '0:00:00',
 'init_time_std': '0:00:00',
 'init_ratio': 0.0,
 'waiting_time': ['0:00:00.309', '0:00:00.311', '0:00:00.165', '0:00:00.012'],
 'waiting_time_mean': '0:00:00.199',
 'waiting_time_std': '0:00:00.123',
 'waiting_ratio': 0.13722942739284952,
 'working_time': ['0:00:01.142', '0:00:01.135', '0:00:01.278', '0:00:01.423'],
 'working_time_mean': '0:00:01.245',
 'working_time_std': '0:00:00.117',
 'working_ratio': 0.8561601182661567,
 'exit_time': ['0:00:00', '0:00:00', '0:00:00', '0:00:00']
 'exit_time_mean': '0:00:00',
 'exit_time_std': '0:00:00',
 'exit_ratio': 0.0,
 'top_5_max_task_durations': ['0:00:00.099', '0:00:00.098', '0:00:00.097', '0:00:00.096',
                              '0:00:00.095'],
 'top_5_max_task_args': ['Arg 0: 99', 'Arg 0: 98', 'Arg 0: 97', 'Arg 0: 96', 'Arg 0: 95']}

We specified 4 workers, so there are 4 entries in the n_completed_tasks, start_up_time, init_time, waiting_time, working_time, and exit_time containers. They show per worker the number of completed tasks, the total start up time, the total time spend on the worker_init function, the total time waiting for new tasks, total time spend on main function, and the total time spend on the worker_exit function, respectively. The insights also contain mean, standard deviation, and ratio of the tracked time. The ratio is the time for that part divided by the total time. In general, the higher the working ratio the more efficient your multiprocessing setup is. Of course, your setup might still not be optimal because the task itself is inefficient, but timing that is beyond the scope of MPIRE.

Additionally, the insights keep track of the top 5 tasks that took the longest to run. The data is split up in two containers: one for the duration and one for the arguments that were passed on to the task function. Both are sorted based on task duration (desc), so index 0 of the args list corresponds to index 0 of the duration list, etc.

When using the MPIRE Dashboard you can track these insights in real-time. See Dashboard for more information.

Note

When you use imap or imap_unordered you can view the insights during execution. Simply call get_insights() or print_insights() inside your loop where you process the results.