Map family¶
This section describes the different ways of interacting with a mpire.WorkerPool
instance.
Contents
map family of functions¶
mpire.WorkerPool
implements four types of parallel map
functions, being:
mpire.WorkerPool.map()
: Blocks until results are ready, results are ordered in the same way as the provided argumentsmpire.WorkerPool.map_unordered()
: The same asmpire.WorkerPool.map()
, but results are ordered by task completion time. Usually faster thanmpire.WorkerPool.map()
.mpire.WorkerPool.imap()
: Lazy version ofmpire.WorkerPool.map()
, returns a generator. The generator will give results back whenever new results are ready. Results are ordered in the same way as the provided arguments.mpire.WorkerPool.imap_unordered()
: The same asmpire.WorkerPool.imap()
, but results are ordered by task completion time. Usually faster thanmpire.WorkerPool.imap()
.
When using a single worker the unordered versions are equivalent to their ordered counterparts.
Important
Each map
function should receive a function pointer and an iterable of arguments, where the elements of the
iterable can be single values or iterables that are unpacked as arguments. If an element is a dictionary, the
(key, value) pairs will be unpacked with the **
-operator. Look at the examples below on ways to circumvent this
behavior.
A few examples:
def square(x):
return x * x
def multiply(x, y):
return x * y
with WorkerPool(n_jobs=4) as pool:
# 1. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
results = pool.map(square, range(100))
with WorkerPool(n_jobs=4) as pool:
# 2. Square the numbers, results should be: [0, 1, 4, 9, 16, 25, ...]
# Note: don't execute this, it will take a long time ...
results = pool.map(square, range(int(1e30)), iterable_len=int(1e30), chunk_size=1)
with WorkerPool(n_jobs=4) as pool:
# 3. Multiply the numbers, results should be [0, 101, 204, 309, 416, ...]
for result in pool.imap(multiply, zip(range(100), range(100, 200)), iterable_len=100):
# Do something with this result
print(result)
with WorkerPool(n_jobs=4) as pool:
# 4. Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply, [{'x': 0, 'y': 100}, {'y': 101, 'x': 1}, ...]):
# Do something with this result
print(result)
The first example should work as expected, the numbers are simply squared. MPIRE knows how many tasks there are because
a range
object implements the __len__
method (see Task chunking).
In the second example the 1e30
number is too large for Python: try calling len(range(int(1e30)))
, this will
throw an OverflowError
(don’t get me started …). Therefore, we must use the iterable_len
parameter to let
MPIRE know how large the tasks list is. We also have to specify a chunk size here as the chunk size should be lower than
sys.maxsize
.
The third example shows an example of using multiple function arguments. Note that we use imap
in the third example,
which allows us to process the results whenever they come available, not having to wait for all results to be ready.
The final example shows the use of an iterable of dictionaries. The (key, value) pairs are unpacked with the
**
-operator, as you would expect. So it doesn’t matter in what order the keys are stored. This should work for
collection.OrderedDict
objects as well.
If you want to pass those dictionaries in example 4 as a whole to the following function, for example:
def multiply_dict(d):
return d['x'] * d['y']
you would have to convert the list of dictionaries to a list of single argument tuples, where each argument is a dictionary:
with WorkerPool(n_jobs=4) as pool:
# Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply_dict, [({'x': 0, 'y': 100},),
({'y': 101, 'x': 1},),
...]):
# Do something with this result
print(result)
There is a utility function available that does this transformation for you:
from mpire.utils import make_single_arguments
with WorkerPool(n_jobs=4) as pool:
# Multiply the numbers, results should be [0, 101, ...]
for result in pool.imap(multiply_dict, make_single_arguments([{'x': 0, 'y': 100},
{'y': 101, 'x': 1}, ...],
generator=False)):
# Do something with this result
print(result)
mpire.utils.make_single_arguments()
expects an iterable of arguments and converts them to tuples accordingly. The
second argument of this function specifies if you want the function to return a generator or a materialized list. If we
would like to return a generator we would need to pass on the iterable length as well.
Task chunking¶
By default, MPIRE chunks the given tasks in to 64 * n_jobs
chunks. Each worker is given one chunk of tasks at a time
before returning its results. This usually makes processing faster when you have rather small tasks (computation wise)
and results are pickled/unpickled when they are send to a worker or main process. Chunking the tasks and results ensures
that each process has to pickle/unpickle less often.
However, to determine the number of tasks in the argument list the iterable should implement the __len__
method,
which is available in default containers like list
or tuple
, but isn’t available in most generator objects
(the range
object is one of the exceptions). To allow working with generators each map
function has the option
to pass the iterable length:
with WorkerPool(n_jobs=4) as pool:
# 1. This will issue a warning and sets the chunk size to 1
results = pool.map(square, ((x,) for x in range(1000)))
# 2. This will issue a warning as well and sets the chunk size to 1
results = pool.map(square, ((x,) for x in range(1000)), n_splits=4)
# 3. Square the numbers using a generator using a specific number of splits
results = pool.map(square, ((x,) for x in range(1000)), iterable_len=1000, n_splits=4)
# 4. Square the numbers using a generator using automatic chunking
results = pool.map(square, ((x,) for x in range(1000)), iterable_len=1000)
# 5. Square the numbers using a generator using a fixed chunk size
results = pool.map(square, ((x,) for x in range(1000)), chunk_size=4)
In the first two examples the function call will issue a warning because MPIRE doesn’t know how large the chunks should be as the total number of tasks is unknown, therefore it will fall back to a chunk size of 1. The third example should work as expected where 4 chunks are used. The fourth example uses 256 chunks (the default 64 times the number of workers). The last example uses a fixed chunk size of four, so MPIRE doesn’t need to know the iterable length.
You can also call the chunk function manually:
from mpire.utils import chunk_tasks
# Convert to list because chunk_tasks returns a generator
print(list(chunk_tasks(range(10), n_splits=3)))
print(list(chunk_tasks(range(10), chunk_size=2.5)))
print(list(chunk_tasks((x for x in range(10)), iterable_len=10, n_splits=6)))
will output:
[(0, 1, 2, 3), (4, 5, 6), (7, 8, 9)]
[(0, 1, 2), (3, 4), (5, 6, 7), (8, 9)]
[(0, 1), (2, 3), (4,), (5, 6), (7, 8), (9,)]
Numpy arrays¶
Chunking¶
Numpy arrays are treated a little bit differently when passed on to the map
functions. Usually MPIRE uses
itertools.islice
for chunking, which depends on the __iter__
special function of the container object. But
applying that to numpy arrays would yield:
import numpy as np
# Create random array
arr = np.random.rand(10, 3)
# Chunk the array using default chunking
arr_iter = iter(arr)
chunk_size = 3
while True:
chunk = list(itertools.islice(arr_iter, chunk_size))
if chunk:
yield chunk
else:
break
with output:
[array([0.68438994, 0.9701514 , 0.40083965]), array([0.88428556, 0.2083905 , 0.61490443]),
array([0.89249174, 0.39902235, 0.70762541])]
[array([0.18850964, 0.1022777 , 0.41539432]), array([0.07327858, 0.18608165, 0.75862301]),
array([0.69215651, 0.4211941 , 0.31029439])]
[array([0.82571272, 0.72257819, 0.86079131]), array([0.91285817, 0.49398461, 0.27863929]),
array([0.146981 , 0.84671211, 0.30122806])]
[array([0.11783283, 0.12585031, 0.39864368])]
In other words, each row of the array is now in its own array and each one of them is given to the target function individually. Instead, MPIRE will chunk them in to something more reasonable using numpy slicing instead:
from mpire.utils import chunk_tasks
for chunk in chunk_tasks(arr, chunk_size=chunk_size):
print(repr(chunk))
Output:
array([[0.68438994, 0.9701514 , 0.40083965],
[0.88428556, 0.2083905 , 0.61490443],
[0.89249174, 0.39902235, 0.70762541]])
array([[0.18850964, 0.1022777 , 0.41539432],
[0.07327858, 0.18608165, 0.75862301],
[0.69215651, 0.4211941 , 0.31029439]])
array([[0.82571272, 0.72257819, 0.86079131],
[0.91285817, 0.49398461, 0.27863929],
[0.146981 , 0.84671211, 0.30122806]])
array([[0.11783283, 0.12585031, 0.39864368]])
Each chunk is now a single numpy array containing as many rows as the chunk size, except for the last chunk as there aren’t enough rows left.
Return value¶
When the user defined function returns numpy arrays and you’re applying the mpire.WorkerPool.map()
function MPIRE
will concatenate the resulting numpy arrays to a single array by default. For example:
def add_five(x):
return x + 5
with WorkerPool(n_jobs=4) as pool:
results = pool.map(add_five, arr, chunk_size=chunk_size)
will return:
array([[5.68438994, 5.9701514 , 5.40083965],
[5.88428556, 5.2083905 , 5.61490443],
[5.89249174, 5.39902235, 5.70762541],
[5.18850964, 5.1022777 , 5.41539432],
[5.07327858, 5.18608165, 5.75862301],
[5.69215651, 5.4211941 , 5.31029439],
[5.82571272, 5.72257819, 5.86079131],
[5.91285817, 5.49398461, 5.27863929],
[5.146981 , 5.84671211, 5.30122806],
[5.11783283, 5.12585031, 5.39864368]])
This behavior can be cancelled by using the concatenate_numpy_output
flag:
with WorkerPool(n_jobs=4) as pool:
results = pool.map(add_five, arr, chunk_size=chunk_size, concatenate_numpy_output=False)
This will return individual arrays:
[array([[5.68438994, 5.9701514 , 5.40083965],
[5.88428556, 5.2083905 , 5.61490443],
[5.89249174, 5.39902235, 5.70762541]]),
array([[5.18850964, 5.1022777 , 5.41539432],
[5.07327858, 5.18608165, 5.75862301],
[5.69215651, 5.4211941 , 5.31029439]]),
array([[5.82571272, 5.72257819, 5.86079131],
[5.91285817, 5.49398461, 5.27863929],
[5.146981 , 5.84671211, 5.30122806]]),
array([[5.11783283, 5.12585031, 5.39864368]])]
Maximum number of active tasks¶
When you have tasks that take up a lot of memory you can limit the number of jobs or limit the number of active tasks (i.e., the number of tasks currently being available to the workers, tasks that are in the queue ready to be processed). The first option is the most obvious one to save memory when the processes themselves use up much memory. The second is convenient when the argument list takes up too much memory. For example, suppose you want to kick off an enormous amount of jobs (let’s say a billion) of which the arguments take up 1 KB per task (e.g., large strings), then that task queue would take up ~1 TB of memory!
In such cases, a good rule of thumb would be to have twice the amount of active tasks than there are jobs. This means that when all workers complete their task at the same time each would directly be able to continue with another task. When workers take on their new tasks the generator of tasks is iterated to the point that again there would be twice the amount of active tasks.
with WorkerPool(n_jobs=4) as pool:
# Square the numbers using a generator
results = pool.map(square, range(int(1e300)), iterable_len=int(1e300),
chunk_size=int(1e5), max_tasks_active=2*4)
Worker lifespan¶
Occasionally, workers that process multiple, memory intensive tasks do not release their used up memory properly, which results in memory usage building up. This is not a bug in MPIRE, but a consequence of Python’s poor garbage collection. To avoid this type of problem you can set the worker lifespan: the number of tasks after which a worker should restart.
with WorkerPool(n_jobs=4) as pool:
# Square the numbers using a generator
results = pool.map(square, range(100), worker_lifespan=1, chunk_size=1)
In this example each worker is restarted after finishing a single task.
Note
When the worker lifespan has been reached, a worker will finish the current chunk of tasks before restarting. I.e.,
based on the chunk_size
a worker could end up completing more tasks than is allowed by the worker lifespan.
Progress bar¶
Progress bar support is added through the tqdm package (installed by default when installing MPIRE). The most easy way
to include a progress bar is by enabling the progress_bar
flag in any of the map
functions:
with WorkerPool(n_jobs=4) as pool:
pool.map(square, range(100), progress_bar=True)
This will display a basic tqdm
progress bar displaying the time elapsed and remaining, number of tasks completed
(including a percentage value) and the speed (i.e., number of tasks completed per time unit).
When inside a Jupyter/IPython notebook, the progress bar will change automatically to a native Jupyter widget.
Note
The Jupyter tqdm
widget requires the Javascript widget to run, which might not be enabled by default. You will
notice a Widget Javascript not detected
error message in your notebook if so. To remedy this, enable the widget
by executing jupyter nbextension enable --py --sys-prefix widgetsnbextension
in your terminal before starting
the notebook.
Note
Please keep in mind that to show real-time progress information MPIRE starts an additional child process, which could consume a bit of the available compute power of your machine.
Multiple progress bars with nested WorkerPools¶
In MPIRE you can easily print a progress bar on a different position on the terminal using the progress_bar_position
parameter in the map functions, which facilitates the use of multiple progress bars. Here’s an example of using multiple
progress bars using nested WorkerPools:
from mpire import tqdm
def dispatcher(worker_id, X):
with WorkerPool(n_jobs=4) as nested_pool:
return nested_pool.map(square, X, progress_bar=True, progress_bar_position=worker_id + 1)
def main():
with WorkerPool(n_jobs=4, daemon=False, pass_worker_id=True) as pool:
pool.map(dispatcher, ((range(x, x + 100),) for x in range(100)), iterable_len=100,
n_splits=4, progress_bar=True)
main()
We use worker_id + 1
here because the worker IDs start at zero, and we reserve position 0 for the progress bar of
the main WorkerPool (which is the default).
Note
Unfortunately, multiple tqdm
progress bars from child processes don’t play that nicely within a Jupyter/IPython
notebook session. It’ll work but you’ll get some additional new lines in your output and it could be that your main
progress bar won’t update as you would expect. Note that you can always use the MPIRE dashboard.
Worker init and exit¶
When you want to initialize a worker you can make use of the worker_init
parameter of any map
function. This
will call the initialization function only once per worker. Similarly, if you need to clean up the worker at the end of
its lifecycle you can use the worker_exit
parameter. Additionally, the exit function can return anything you like,
which can be collected using mpire.WorkerPool.get_exit_results()
after the workers are done.
For example:
def init_func(worker_state):
# Initialize a counter for each worker
worker_state['count_even'] = 0
def square_and_count_even(worker_state, x):
# Count number of even numbers and return the square
if x % 2 == 0:
worker_state['count_even'] += 1
return x**x
def exit_func(worker_state):
# Return the counter
return worker_state['count_even']
with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
pool.map(square_and_count_even, range(100), worker_init=init_func, worker_exit=exit_func)
print(pool.get_exit_results()) # Output, e.g.: [13, 13, 12, 12]
print(sum(pool.get_exit_results())) # Output: 50
Both init and exit functions receive the worker ID, shared objects, and worker state in the same way as the task function does, given they’re enabled.
Important
When the worker_lifespan
option is used to restart workers during execution, the exit function will be called
for the worker that’s shutting down and the init function will be called again for the new worker. Therefore, the
number of elements in the list that’s returned from mpire.WorkerPool.get_exit_results()
does not always equal
n_jobs
.
Important
When keep_alive
is enabled the workers won’t be terminated after a map
call. This means the exit function
won’t be called until it’s time for cleaning up the entire pool. You will have to explicitly call
mpire.WorkerPool.stop_and_join()
to receive the exit results.
One usecase for these functions is to load a big dataset or model that is needed for every task only once per worker (see Worker state for this example). Another usecase would be to set up and close a database connection on init and exit.
Worker insights¶
Worker insights gives you insight in your multiprocessing efficiency by tracking worker start up time, waiting time and
time spend on executing tasks. Tracking is disabled by default, but can be enabled by setting enable_insights
in the
map functions:
with WorkerPool(n_jobs=4) as pool:
pool.map(square, range(100), enable_insights=True)
The overhead is very minimal and you shouldn’t really notice it, even on very small tasks. You can view the tracking
results using mpire.WorkerPool.get_insights()
or use mpire.WorkerPool.print_insights()
to directly print
the insights to console:
import time
def sleep_and_square(x):
# For illustration purposes
time.sleep(x / 1000)
return x * x
with WorkerPool(n_jobs=4) as pool:
pool.map(sleep_and_square, range(100), enable_insights=True)
insights = pool.get_insights()
print(insights)
# Output:
{'n_completed_tasks': [28, 24, 24, 24],
'total_start_up_time': '0:00:00.038',
'total_init_time': '0:00:00',
'total_waiting_time': '0:00:00.798',
'total_working_time': '0:00:04.980',
'total_exit_time': '0:00:00',
'total_time': '0:00:05.816',
'start_up_time': ['0:00:00.010', '0:00:00.008', '0:00:00.008', '0:00:00.011'],
'start_up_time_mean': '0:00:00.009',
'start_up_time_std': '0:00:00.001',
'start_up_ratio': 0.006610452621805033,
'init_time': ['0:00:00', '0:00:00', '0:00:00', '0:00:00'],
'init_time_mean': '0:00:00',
'init_time_std': '0:00:00',
'init_ratio': 0.0,
'waiting_time': ['0:00:00.309', '0:00:00.311', '0:00:00.165', '0:00:00.012'],
'waiting_time_mean': '0:00:00.199',
'waiting_time_std': '0:00:00.123',
'waiting_ratio': 0.13722942739284952,
'working_time': ['0:00:01.142', '0:00:01.135', '0:00:01.278', '0:00:01.423'],
'working_time_mean': '0:00:01.245',
'working_time_std': '0:00:00.117',
'working_ratio': 0.8561601182661567,
'exit_time': ['0:00:00', '0:00:00', '0:00:00', '0:00:00']
'exit_time_mean': '0:00:00',
'exit_time_std': '0:00:00',
'exit_ratio': 0.0,
'top_5_max_task_durations': ['0:00:00.099', '0:00:00.098', '0:00:00.097', '0:00:00.096',
'0:00:00.095'],
'top_5_max_task_args': ['Arg 0: 99', 'Arg 0: 98', 'Arg 0: 97', 'Arg 0: 96', 'Arg 0: 95']}
We specified 4 workers, so there are 4 entries in the n_completed_tasks
, start_up_time
, init_time
,
waiting_time
, working_time
, and exit_time
containers. They show per worker the number of completed tasks,
the total start up time, the total time spend on the worker_init
function, the total time waiting for new tasks,
total time spend on main function, and the total time spend on the worker_exit
function, respectively. The insights
also contain mean, standard deviation, and ratio of the tracked time. The ratio is the time for that part divided by the
total time. In general, the higher the working ratio the more efficient your multiprocessing setup is. Of course, your
setup might still not be optimal because the task itself is inefficient, but timing that is beyond the scope of MPIRE.
Additionally, the insights keep track of the top 5 tasks that took the longest to run. The data is split up in two
containers: one for the duration and one for the arguments that were passed on to the task function. Both are sorted
based on task duration (desc), so index 0
of the args list corresponds to index 0
of the duration list, etc.
When using the MPIRE Dashboard you can track these insights in real-time. See Dashboard for more information.
Note
When you use imap or imap_unordered you can view the insights during execution. Simply call get_insights()
or print_insights()
inside your loop where you process the results.