Java: ExecutorService that blocks on submission after a certain queue size

Tahir Akhtar picture Tahir Akhtar · Dec 23, 2010 · Viewed 42.2k times · Source

I am trying to code a solution in which a single thread produces I/O-intensive tasks that can be performed in parallel. Each task have significant in-memory data. So I want to be able limit the number of tasks that are pending at a moment.

If I create ThreadPoolExecutor like this:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

Then the executor.submit(callable) throws RejectedExecutionException when the queue fills up and all the threads are already busy.

What can I do to make executor.submit(callable) block when the queue is full and all threads are busy?

EDIT: I tried this:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

And it somewhat achieves the effect that I want achieved but in an inelegant way (basically rejected threads are run in the calling thread, so this blocks the calling thread from submitting more).

EDIT: (5 years after asking the question)

To anyone reading this question and its answers, please don't take the accepted answer as one correct solution. Please read through all answers and comments.

Answer

jtahlborn picture jtahlborn · Dec 23, 2010

I have done this same thing. The trick is to create a BlockingQueue where the offer() method is really a put(). (you can use whatever base BlockingQueue impl you want).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

Note that this only works for thread pool where corePoolSize==maxPoolSize so be careful there (see comments).