TaskRejectedException in ThreadPoolTaskExecutor

Akshay Chopra picture Akshay Chopra · Mar 15, 2018 · Viewed 10k times · Source

I am trying to call an API asynchronously using Spring's Async and using the ThreadPoolTaskExecutor in my Thread Config which goes:

@Configuration
@EnableAsync
public class ThreadConfig extends AsyncConfigurerSupport {

@Value("${core.pool.size}")
private int corePoolSize;

@Value("${max.pool.size}")
private int maxPoolSize;

@Value("${queue.capacity}")
private int queueCapacity;

@Override
@Bean
public Executor getAsyncExecutor() {

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(corePoolSize);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setQueueCapacity(queueCapacity);
    executor.setThreadNamePrefix("default_task_executor_thread");
    executor.initialize();

    return executor;

}

The settings here are:

corePoolSize = 5;
maxPoolSize = 10;
QueueCapacity = 10;

I'm calling the Async service as follows:

for (String path : testList) {
    Future<Boolean> pro = services.invokeAPI(path);
}

The testList has about 50 records.

When I run this, the compiler processes 10 threads and calls the invokeAPI method 10 times after which it gives:

org.springframework.core.task.TaskRejectedException: Executor[java.util.concurrent.ThreadPoolExecutor@3234ad78[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$1@5c17b70

I was assuming that it will wait for the current tasks to complete and re-assign the threads instead of throwing me the exception and terminating the program.

What should I do to have all my 50 records call the invokeAPI method?

Edit: the number of records in testList can change.

Any suggestions please?

Answer

Sneh picture Sneh · Mar 17, 2018

This is happening because of the size you are using for the pool. Since the size of the queue is 10 and the max threads you can have is 10, therefore after 20 tasks (10 running and 10 in queue) the executor starts rejecting the tasks.

There are various ways to solve this problem.

  1. Use unbounded queue. i.e. Don't specify the size of the queue and hence it will be able to hold all the tasks. Once the threads are free, the tasks will be submitted.
  2. Provide a RejectedExecutionHandler which does something with the tasks. i.e. Runs them on the caller thread or discard them or something else (Depending on the use case). There are some of them already provided by Java like CallerRunsPolicy, AbortPolicy, DiscardPolicy and DiscardOldestPolicy. You can specify them like using executor#setRejectedExecutionHandler.
  3. Provide your own Blocking Thread Pool Executor which blocks till the there is more room for tasks (Uses Semaphore).

Here is an example of Blocking Executor

public class BlockingExecutor extends ThreadPoolExecutor {

    private final Semaphore semaphore;

    public BlockingExecutor(final int corePoolSize, final int poolSize, final int queueSize) {
        super(corePoolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        semaphore = new Semaphore(poolSize + queueSize);
    }

    @Override
    public void execute(final Runnable task) {
        boolean acquired = false;
        do {
            try {
                semaphore.acquire();
                acquired = true;
            } catch (final InterruptedException e) {
                //do something here
            }
        } while (!acquired);

        try {
            super.execute(task);
        } catch (final RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }

    protected void afterExecute(final Runnable r, final Throwable t) {
        super.afterExecute(r, t);
        semaphore.release();
    }
}