14

Python multiprocessing: how to limit the number of pending processes?

 3 years ago
source link: https://www.codesd.com/item/python-multiprocessing-how-to-limit-the-number-of-pending-processes.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Python multiprocessing: how to limit the number of pending processes?

advertisements

When running a large number of tasks (with large parameters) using Pool.apply_async, the processes are allocated and go to a waiting state, and there is no limit for the number of waiting processes. This can end up by eating all memory, as in the example below:

import multiprocessing
import numpy as np

def f(a,b):
    return np.linalg.solve(a,b)

def test():

    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))
    p.close()
    p.join()

if __name__ == '__main__':
    test()

I'm searching for a way to limit the waiting queue, in such a way that there is only a limited number of waiting processes, and Pool.apply_async is blocked while the waiting queue is full.


multiprocessing.Pool has a _taskqueue member of type multiprocessing.Queue, which takes an optional maxsize parameter; unfortunately it constructs it without the maxsize parameter set.

I'd recommend subclassing multiprocessing.Pool with a copy-paste of multiprocessing.Pool.__init__ that passes maxsize to _taskqueue constructor.

Monkey-patching the object (either the pool or the queue) would also work, but you'd have to monkeypatch pool._taskqueue._maxsize and pool._taskqueue._sem so it would be quite brittle:

pool._taskqueue._maxsize = maxsize
pool._taskqueue._sem = BoundedSemaphore(maxsize)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK