Default ForkJoinPool executor taking long time

KayV picture KayV · Aug 2, 2017 · Viewed 10.9k times · Source

I am working with the CompletableFuture for async execution of a stream generated from a list source.

so i am testing the overloaded method i.e. "supplyAsync" of CompletableFuture in which one method takes only single supplier parameter and other takes a supplier parameter and an executor parameter. Here is the documentation for both:

one

supplyAsync(Supplier supplier)

Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.

second

supplyAsync(Supplier supplier, Executor executor)

Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.

And here is my test class:

public class TestCompleteableAndParallelStream {

    public static void main(String[] args) {
        List<MyTask> tasks = IntStream.range(0, 10)
                .mapToObj(i -> new MyTask(1))
                .collect(Collectors.toList());
        
        useCompletableFuture(tasks);
        
        useCompletableFutureWithExecutor(tasks);

    }
    
    public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
          long start = System.nanoTime();
          ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
                   .collect(Collectors.toList());
         
          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
          executor.shutdown();
        }
    
    public static void useCompletableFuture(List<MyTask> tasks) {
          long start = System.nanoTime();
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
                   .collect(Collectors.toList());
         
          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
        }
    
    

}


class MyTask {
      private final int duration;
      public MyTask(int duration) {
        this.duration = duration;
      }
      public int calculate() {
        System.out.println(Thread.currentThread().getName());
        try {
          Thread.sleep(duration * 1000);
        } catch (final InterruptedException e) {
          throw new RuntimeException(e);
        }
        return duration;
      }
    }

while the "useCompletableFuture" method takes around 4 seconds to complete, "useCompletableFutureWithExecutor" method takes only 1 second to complete.

No my question is, What different processing does the ForkJoinPool.commonPool() which could do the overhead? In that shouldn't we always prefer the custom executor pool over ForkJoinPool?

Answer

Szymon Stepniak picture Szymon Stepniak · Aug 2, 2017

Check ForkJoinPool.commonPool() size. By default it creates a pool with a size of

Runtime.getRuntime().availableProcessors() - 1

I run your example on my Intel i7-4800MQ (4 cores + 4 virtual cores) and the size of common pool in my case is 7, so the whole computation took ~2000 ms:

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 2005 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

In second case you used

Executors.newFixedThreadPool(Math.min(tasks.size(), 10));

so the pool has 10 threads ready to perform calculation, so all tasks are run in ~1000 ms:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1002 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Difference between ForkJoinPool and ExecutorService

Eugene in his comment mentioned also one more important thing. ForkJoinPool uses work-stealing approach:

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.

while ExecutorService created with .newFixedThreadPool() uses divide and conquer approach.

How to determine pool size?

There was a question about what is the best thread pool size, you may find useful information there:

Setting Ideal size of Thread Pool

Also this thread is a good place to investigate:

Custom thread pool in Java 8 parallel stream