Process.join() and queue don't work with large numbers

Puibo picture Puibo · Jul 29, 2015 · Viewed 7.2k times · Source

I am trying to split for loop i.e.

N = 1000000
for i in xrange(N):
    #do something

using multiprocessing.Process and it works well for small values of N. Problem arise when I use bigger values of N. Something strange happens before or during p.join() and program doesn't respond. If I put print i, instead of q.put(i) in the definition of the function f everything works well.

I would appreciate any help. Here is the code.

from multiprocessing import Process, Queue

def f(q,nMin, nMax): # function for multiprocessing
    for i in xrange(nMin,nMax):
        q.put(i)

if __name__ == '__main__':

    nEntries = 1000000

    nCpu = 10
    nEventsPerCpu = nEntries/nCpu
    processes = []

    q = Queue()

    for i in xrange(nCpu):
        processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) )

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    print q.qsize()

Answer

Patrick Maupin picture Patrick Maupin · Jul 29, 2015

You are trying to grow your queue without bounds, and you are joining up to a subprocess that is waiting for room in the queue, so your main process is stalled waiting for that one to complete, and it never will.

If you pull data out of the queue before the join it will work fine.

One technique you could use is something like this:

while 1:
    running = any(p.is_alive() for p in processes)
    while not queue.empty():
       process_queue_data()
    if not running:
        break

According to the documentation, the p.is_alive() should perform an implicit join, but it also appears to imply that the best practice might be to explicitly perform joins on all the threads after this.

Edit: Although that is pretty clear, it may not be all that performant. How you make it perform better will be highly task and machine specific (and in general, you shouldn't be creating that many processes at a time, anyway, unless some are going to be blocked on I/O).

Besides reducing the number of processes to the number of CPUs, some easy fixes to make it a bit faster (again, depending on circumstances) might look like this:

liveprocs = list(processes)
while liveprocs:
    try:
        while 1:
            process_queue_data(q.get(False))
    except Queue.Empty:
        pass

    time.sleep(0.5)    # Give tasks a chance to put more data in
    if not q.empty():
        continue
    liveprocs = [p for p in liveprocs if p.is_alive()]