Lei Mao's Log Book – Progress Bars for Python Multiprocessing Tasks
source link: https://leimao.github.io/blog/Python-tqdm-Multiprocessing/?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Introduction
It is natural that we would like to employ progress bars in our programs to show the progress of tasks. tqdm
is one of my favorite progressing bar tools in Python. It could be easily incorporated to Python using trange
to replace range
or using tqdm.tqdm
to wrap iterators, in order to show progress bars for a for
loop.
Multiprocessing tasks should also have progress bars to show the progress. However, the incorporation of tqdm
and multiprocessing
was not well documented in Python. In this blog post, I would like to present several ways of using multiprocessing
with tqdm
.
Python Multiprocessing
Let’s first take a look at some of the basic class methods in Python multiprocessing
library. The commonly used multiprocessing.Pool
methods could be broadly categorized as apply
and map
. apply
is applying some arguments for a function. map
is a higher level abstraction for apply
, applying each element in an iterable for a same function.
More specifically, the commonly used multiprocessing.Pool
methods are:
apply_async
map
map_async
imap
imap_unordered
apply_async
and map_async
return “future results” immediately, we would need to collect the results using get
. map
, although it is running functions in parallel, blocks the return of the results until they are ready. imap
is a lazier version of map
, but it will return an iterator for the processes. imap_unordered
is similar to imap
, but the execution and returned result order of imap_unordered
does not necessarily follow the order of arguments provided.
Based on the nature of these methods, apply_async
, imap
, imap_unordered
are naturally compatible with tqdm
to show progress bars.
Python Multiprocessing tqdm Examples
Many Small Processes
Sometimes, the entire task consists of many small processes, each of which does not take too much time to finish. The number of processes is much larger than the number of processes we could assign to the multiprocessing.Pool
. We would like to monitor the progress of the entire task using one progressing bar.
The bottom line is not modifying the functions we would like to run using multiprocessing
and tqdm
. In the following examples, I have implemented the examples of using apply_async
, imap
, imap_unordered
with tqdm
for functions that take one single argument or multiple arguments.
# multiprocess_examples_1.py
from tqdm import tqdm
from multiprocessing import Pool
from functools import partial
import time
import random
def func_single_argument(n):
time.sleep(0.5)
return n
def func_multiple_argument(n, m, *args, **kwargs):
time.sleep(0.5)
return n, m
def run_imap_multiprocessing(func, argument_list, num_processes):
pool = Pool(processes=num_processes)
result_list_tqdm = []
for result in tqdm(pool.imap(func=func, iterable=argument_list), total=len(argument_list)):
result_list_tqdm.append(result)
return result_list_tqdm
def run_imap_unordered_multiprocessing(func, argument_list, num_processes):
pool = Pool(processes=num_processes)
result_list_tqdm = []
for result in tqdm(pool.imap_unordered(func=func, iterable=argument_list), total=len(argument_list)):
result_list_tqdm.append(result)
return result_list_tqdm
def run_apply_async_multiprocessing(func, argument_list, num_processes):
pool = Pool(processes=num_processes)
jobs = [pool.apply_async(func=func, args=(*argument,)) if isinstance(argument, tuple) else pool.apply_async(func=func, args=(argument,)) for argument in argument_list]
pool.close()
result_list_tqdm = []
for job in tqdm(jobs):
result_list_tqdm.append(job.get())
return result_list_tqdm
def main():
num_processes = 10
num_jobs = 100
random_seed = 0
random.seed(random_seed)
# imap, imap_unordered
# It only support functions with one dynamic argument
func = func_single_argument
argument_list = [random.randint(0, 100) for _ in range(num_jobs)]
print("Running imap multiprocessing for single-argument functions ...")
result_list = run_imap_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes)
assert result_list == argument_list
print("Running imap_unordered multiprocessing for single-argument functions ...")
result_list = run_imap_unordered_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes)
# partial functions (one dynamic argument, one or more than one fixed arguments)
partial_func = partial(func_multiple_argument, m=10)
print("Running imap multiprocessing for single-argument partial functions ...")
result_list = run_imap_multiprocessing(func=partial_func, argument_list=argument_list, num_processes=num_processes)
print("Running imap_unordered multiprocessing for single-argument partial functions ...")
result_list = run_imap_unordered_multiprocessing(func=partial_func, argument_list=argument_list, num_processes=num_processes)
# Since it is unordered, this assertion might not be valid
# assert result_list == argument_list
# apply_async
# One dynamic argument
func = func_single_argument
argument_list = [random.randint(0, 100) for _ in range(num_jobs)]
print("Running apply_async multiprocessing for single-argument functions ...")
result_list = run_apply_async_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes)
assert result_list == argument_list
# More than one dynamic arguments
func = func_multiple_argument
argument_list = [(random.randint(0, 100), random.randint(0, 100)) for _ in range(num_jobs)]
print("Running apply_async multiprocessing for multi-argument functions ...")
result_list = run_apply_async_multiprocessing(func=func, argument_list=argument_list, num_processes=num_processes)
assert result_list == argument_list
# partial functions (multiple dynamic arguments, one or more than one fixed arguments)
partial_func = partial(func_multiple_argument, x=1, y=2, z=3) # Giving some arguments for kwargs
print("Running apply_async multiprocessing for multi-argument partial functions ...")
result_list = run_apply_async_multiprocessing(func=partial_func, argument_list=argument_list, num_processes=num_processes)
assert result_list == argument_list
if __name__ == "__main__":
main()
During the execution of the multiprocessing program, we could see that the 100 jobs were run in 10 batches whose batch size is 10. Each batch roughly takes 0.5 seconds, and the total execution time is roughly 5 seconds.
$ python multiprocess_examples_1.py
Running imap multiprocessing for single-argument functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.97it/s]
Running imap_unordered multiprocessing for single-argument functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.96it/s]
Running imap multiprocessing for single-argument partial functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.97it/s]
Running imap_unordered multiprocessing for single-argument partial functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.97it/s]
Running apply_async multiprocessing for single-argument functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.97it/s]
Running apply_async multiprocessing for multi-argument functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.97it/s]
Running apply_async multiprocessing for multi-argument partial functions ...
100%|██████████████████████████████████████████████████████████████| 100/100 [00:05<00:00, 19.96it/s]
Few Large Processes
Sometimes, the entire task consists of few large processes, each of which takes long time to finish. We would like to monitor each of the progress using multiple progress bars.
In the following examples, I have implemented the examples of using apply_async
with tqdm
. imap
and imap_unordered
should also work similarly.
# multiprocess_examples_2.py
import time
import random
from tqdm import tqdm
from multiprocessing import Pool, freeze_support, RLock
def func(pid, n):
tqdm_text = "#" + "{}".format(pid).zfill(3)
current_sum = 0
with tqdm(total=n, desc=tqdm_text, position=pid+1) as pbar:
for i in range(1, n+1):
current_sum += i
time.sleep(0.05)
pbar.update(1)
return current_sum
def main():
freeze_support() # For Windows support
num_processes = 10
num_jobs = 30
random_seed = 0
random.seed(random_seed)
pool = Pool(processes=num_processes, initargs=(RLock(),), initializer=tqdm.set_lock)
argument_list = [random.randint(0, 100) for _ in range(num_jobs)]
jobs = [pool.apply_async(func, args=(i,n,)) for i, n in enumerate(argument_list)]
pool.close()
result_list = [job.get() for job in jobs]
# Important to print these blanks
print("\n" * (len(argument_list) + 1))
if __name__ == "__main__":
main()
During execution, 10 progress bars corresponding to the 10 processes would update simultaneously.
$ python multiprocess_examples_2.py
#000: 100%|█████████████████████████████████████| 49/49 [00:02<00:00, 19.95it/s]
#001: 100%|█████████████████████████████████████| 97/97 [00:04<00:00, 19.92it/s]
#002: 100%|█████████████████████████████████████| 53/53 [00:02<00:00, 19.94it/s]
#003: 100%|███████████████████████████████████████| 5/5 [00:00<00:00, 19.94it/s]
#004: 100%|█████████████████████████████████████| 33/33 [00:01<00:00, 19.95it/s]
#005: 100%|█████████████████████████████████████| 65/65 [00:03<00:00, 19.93it/s]
#006: 100%|█████████████████████████████████████| 62/62 [00:03<00:00, 19.91it/s]
#007: 100%|█████████████████████████████████████| 51/51 [00:02<00:00, 19.94it/s]
#008: 100%|███████████████████████████████████| 100/100 [00:05<00:00, 19.90it/s]
#009: 100%|█████████████████████████████████████| 38/38 [00:01<00:00, 19.95it/s]
#010: 100%|█████████████████████████████████████| 61/61 [00:03<00:00, 19.93it/s]
#011: 100%|█████████████████████████████████████| 45/45 [00:02<00:00, 19.91it/s]
#012: 100%|█████████████████████████████████████| 74/74 [00:03<00:00, 19.91it/s]
#013: 100%|█████████████████████████████████████| 27/27 [00:01<00:00, 19.89it/s]
#014: 100%|█████████████████████████████████████| 64/64 [00:03<00:00, 19.90it/s]
#015: 100%|█████████████████████████████████████| 17/17 [00:00<00:00, 19.87it/s]
#016: 100%|█████████████████████████████████████| 36/36 [00:01<00:00, 19.90it/s]
#017: 100%|█████████████████████████████████████| 17/17 [00:00<00:00, 19.91it/s]
#018: 100%|█████████████████████████████████████| 96/96 [00:04<00:00, 19.92it/s]
#019: 100%|█████████████████████████████████████| 12/12 [00:00<00:00, 19.92it/s]
#020: 100%|█████████████████████████████████████| 79/79 [00:03<00:00, 19.91it/s]
#021: 100%|█████████████████████████████████████| 32/32 [00:01<00:00, 19.89it/s]
#022: 100%|█████████████████████████████████████| 68/68 [00:03<00:00, 19.93it/s]
#023: 100%|█████████████████████████████████████| 90/90 [00:04<00:00, 19.93it/s]
#024: 100%|█████████████████████████████████████| 77/77 [00:03<00:00, 19.91it/s]
#025: 100%|█████████████████████████████████████| 18/18 [00:00<00:00, 19.90it/s]
#026: 100%|█████████████████████████████████████| 39/39 [00:01<00:00, 19.90it/s]
#027: 100%|█████████████████████████████████████| 12/12 [00:00<00:00, 19.90it/s]
#028: 100%|█████████████████████████████████████| 93/93 [00:04<00:00, 19.92it/s]
#029: 100%|███████████████████████████████████████| 9/9 [00:00<00:00, 19.89it/s]
Conclusions
imap
and imap_unordered
could be used with tqdm
for some simple multiprocessing tasks for a single function which takes a single dynamic argument. For one single or multiple functions which might take multiple dynamic arguments, we should use apply_async
with tqdm
.
References
Progress Bars for Python Multiprocessing Tasks was published on May 31, 2020 and last modified on May 31, 2020 by Lei Mao.
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK