The JavaDoc for ThreadPoolExecutor is unclear on whether it is acceptable to add tasks directly to the BlockingQueue
backing the executor. The docs say calling executor.getQueue()
is "intended primarily for debugging and monitoring".
I'm constructing a ThreadPoolExecutor
with my own BlockingQueue
. I retain a reference to the queue so I can add tasks to it directly. The same queue is returned by getQueue()
so I assume the admonition in getQueue()
applies to a reference to the backing queue acquired through my means.
General pattern of the code is:
int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
Runnable job = ...;
queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
try {
Thread.sleep(...);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
executor.shutdownNow();
queue.offer()
vs executor.execute()
As I understand it, the typical use is to add tasks via executor.execute()
. The approach in my example above has the benefit of blocking on the queue whereas execute()
fails immediately if the queue is full and rejects my task. I also like that submitting jobs interacts with a blocking queue; this feels more "pure" producer-consumer to me.
An implication of adding tasks to the queue directly: I must call prestartAllCoreThreads()
otherwise no worker threads are running. Assuming no other interactions with the executor, nothing will be monitoring the queue (examination of ThreadPoolExecutor
source confirms this). This also implies for direct enqueuing that the ThreadPoolExecutor
must additionally be configured for > 0 core threads and mustn't be configured to allow core threads to timeout.
Given a ThreadPoolExecutor
configured as follows:
BlockingQueue
backing the executorIs it acceptable to add tasks directly to the queue instead of calling executor.execute()
?
This question ( producer/consumer work queues ) is similar, but doesn't specifically cover adding to the queue directly.
One trick is to implement a custom subclass of ArrayBlockingQueue and to override the offer() method to call your blocking version, then you can still use the normal code path.
queue = new ArrayBlockingQueue<Runnable>(queueSize) {
@Override public boolean offer(Runnable runnable) {
try {
return offer(runnable, 1, TimeUnit.HOURS);
} catch(InterruptedException e) {
// return interrupt status to caller
Thread.currentThread().interrupt();
}
return false;
}
};
(as you can probably guess, i think calling offer directly on the queue as your normal code path is probably a bad idea).