I am fairly new to python. I am using the multiprocessing module for reading lines of text on stdin, converting them in some way and writing them into a database. Here's a snippet of my code:
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
Now that all works fine, until I get to process huge input files (hundreds of millions of lines) that i pipe into my python program. At some point, when my database gets slower, I see the memory getting full.
After some playing, it turned out that pool.apply_async as well as pool.map_async never ever block, so that the queue of the calls to be processed grows bigger and bigger.
What is the correct approach to my problem? I would expect a parameter that I can set, that will block the pool.apply_async call, as soon as a certain queue length has been reached. AFAIR in Java one can give the ThreadPoolExecutor a BlockingQueue with a fixed length for that purpose.
Thanks!
Just in case some one ends up here, this is how I solved the problem: I stopped using multiprocessing.Pool. Here is how I do it now:
#set amount of concurrent processes that insert db data
processes = multiprocessing.cpu_count() * 2
#setup batch queue
queue = multiprocessing.Queue(processes * 2)
#start processes
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start()
#fill queue with batches
batch=[]
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
queue.put((batch,i+1))
batch = []
if batch:
queue.put((batch,i+1))
#stop processes using poison-pill
for _ in range(processes): queue.put((None,None))
print "all done."
in the insert method the processing of each batch is wrapped in a loop that pulls from the queue until it receives the poison pill:
while True:
batch, end = queue.get()
if not batch and not end: return #poison pill! complete!
[process the batch]
print 'worker done.'