multiprocessing queue full

rohanp picture rohanp · Jul 22, 2015 · Viewed 9.6k times · Source

I'm using concurrent.futures to implement multiprocessing. I am getting a queue.Full error, which is odd because I am only assigning 10 jobs.

A_list = [np.random.rand(2000, 2000) for i in range(10)]

with ProcessPoolExecutor() as pool:
    pool.map(np.linalg.svd, A_list)

error:

Exception in thread Thread-9:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 921, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 869, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 251, in _queue_management_worker
    shutdown_worker()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 209, in shutdown_worker
    call_queue.put_nowait(None)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 131, in put_nowait
    return self.put(obj, False)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 82, in put
    raise Full
queue.Full

Answer

skrrgwasme picture skrrgwasme · Dec 2, 2015

Short Answer
I believe pipe size limits are the underlying cause. There isn't much you can do about this except break up your data into smaller chunks and deal with them iteratively. This means you may need to find a new algorithm that can work on small portions of your 2000x2000 array at a time to find the Singular Value Composition.

Details
Let's get one thing straight right away: you're dealing with a lot of information. Just because you're working with only ten items doesn't mean it's trivial. Each of those items is a 2000x2000 array full of 4,000,000 floats, which are usually 64 bits each, so you're looking at around 244MB per array, plus the other data that tags along in Numpy's ndarrays.

The ProcessPoolExecutor works by launching a separate thread to manage the worker processes. The management thread uses a multiprocesing.Queue to pass jobs to the workers, called _call_queue. These multiprocessing.Queues are actually just fancy wrappers around pipes, and the ndarrays you're trying to pass to the workers are likely too large for the pipes to handle properly.

Reading over Python Issue 8426 shows that figuring out exactly how big your pipes can be difficult, even when you can look up some nominal pipe size limit for your OS. There are too many variables to make it simple. Even the order that things are pulled off of the queue can induce race conditions in the underlying pipe that trigger odd errors.

I suspect that one of your workers is getting an getting an incomplete or corrupted object off of its _call_queue, because that queue's pipe is full of your giant objects. That worker dies in an unclean way, and the work queue manager detects this failure, so it gives up on the work and tells the remaining workers to exit. But it does this by passing them poison pills over _call_queue, which is still full of your giant ndarrays. This is why you got the full queue exception - your data filled up the queue, then the management thread tried to use the same queue to pass control messages to the other workers.

I think this is a classic example of the potential dangers of mixing data and control flows between different entities in a program. Your large data not only blocked more data from being received by the workers, it also blocked the manager's control communications with the workers because they use the same path.

I haven't been able to recreate your failure, so I can't be sure that all of this is correct. But the fact that you can make this code work with a 200x200 array (~2.5MB) seems to support this theory. Nominal pipe size limits seem to be measured in KB or a few MB at most, depending on the OS and architecture. The fact that this amount of data can get through the pipes isn't surprising, especially when you consider that not all of the 2.5MB needs to actually fit in the pipe at once if a consumer is continuously receiving the data. It suggests a reasonable upper bound on the amount of data that you could get serially through a pipe.