python pool apply_async and map_async do not block on full queue

konstantin picture konstantin · Mar 7, 2012 · Viewed 15.1k times · Source

I am fairly new to python. I am using the multiprocessing module for reading lines of text on stdin, converting them in some way and writing them into a database. Here's a snippet of my code:

batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        pool.apply_async(insert, args=(batch,i+1))
        batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()

Now that all works fine, until I get to process huge input files (hundreds of millions of lines) that i pipe into my python program. At some point, when my database gets slower, I see the memory getting full.

After some playing, it turned out that pool.apply_async as well as pool.map_async never ever block, so that the queue of the calls to be processed grows bigger and bigger.

What is the correct approach to my problem? I would expect a parameter that I can set, that will block the pool.apply_async call, as soon as a certain queue length has been reached. AFAIR in Java one can give the ThreadPoolExecutor a BlockingQueue with a fixed length for that purpose.

Thanks!

Answer

konstantin picture konstantin · Mar 8, 2012

Just in case some one ends up here, this is how I solved the problem: I stopped using multiprocessing.Pool. Here is how I do it now:

#set amount of concurrent processes that insert db data
processes = multiprocessing.cpu_count() * 2

#setup batch queue
queue = multiprocessing.Queue(processes * 2)

#start processes
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() 

#fill queue with batches    
batch=[]
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        queue.put((batch,i+1))
        batch = []
if batch:
    queue.put((batch,i+1))

#stop processes using poison-pill
for _ in range(processes): queue.put((None,None))

print "all done."

in the insert method the processing of each batch is wrapped in a loop that pulls from the queue until it receives the poison pill:

while True:
    batch, end = queue.get()
    if not batch and not end: return #poison pill! complete!
    [process the batch]
print 'worker done.'