How to use invokeAll() to let all thread pool do their task?

user1948313 picture user1948313 · Aug 13, 2013 · Viewed 54.1k times · Source
    ExecutorService pool=Executors.newFixedThreadPool(7);
        List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
        List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();

        for(int i=0;i<=diff;i++){

            String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);

            callList.add(new HotelCheapestFare(str));

        }       
     future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){

        System.out.println("name is:"+future.get(i).get().getName());
    }

Now I want pool to invokeAll all the task before getting to the for loop but when I run this program for loop gets executed before that invokeAll and throws this exception:

java.util.concurrent.ExecutionException: java.lang.NullPointerException at 
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at  
java.util.concurrent.FutureTask.get(Unknown Source) at 
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:6‌​5)

Caused by: java.lang.NullPointerException at 
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheap‌estFare(HotelCheapes‌​tFare.java:166) 
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:1) 
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknow‌​n Source)
at java.lang.Thread.run

Answer

Boris the Spider picture Boris the Spider · Aug 13, 2013

The way an ExecutorService works is that when you call invokeAll it waits for all tasks to complete:

Executes the given tasks, returning a list of Futures holding their status and results when all complete. Future.isDone() is true for each element of the returned list. Note that a completed task could have terminated either normally or by throwing an exception. The results of this method are undefined if the given collection is modified while this operation is in progress.1(emphasis added)

What that means is that your tasks are all done but some may have thrown an exception. This exception is part of the Future - calling get causes the exception to be rethrown wrapped in an ExecutionException.

From you stacktrack

java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at 
                                ^^^ <-- from get

You can see that this is indeed the case. One of your tasks has failed with an NPE. The ExecutorService caught the exception and is telling you about it by throwing an ExecutionException when you call Future.get.

Now, if you want to take tasks as they complete you need an ExecutorCompletionService. This acts as a BlockingQueue that will allow you to poll for tasks as and when they finish.

public static void main(String[] args) throws Exception {
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100; ++i) {
                try {
                    final Future<String> myValue = completionService.take();
                    //do stuff with the Future
                    final String result = myValue.get();
                    System.out.println(result);
                } catch (InterruptedException ex) {
                    return;
                } catch (ExecutionException ex) {
                    System.err.println("TASK FAILED");
                }
            }
        }
    });
    for (int i = 0; i < 100; ++i) {
        completionService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("FAILED");
                }
                return "SUCCESS";
            }
        });
    }
    executorService.shutdown();
}

In this example I have one task that calls take on the ExecutorCompletionService which gets the Futures as they become available and then I submit tasks to the ExecutorCompletionService.

This will allow you to get the failed task as soon as it fails rather than having to wait for all the tasks to either fail of succeed together.

The only complication is that it is difficult to tell the polling thread that all the tasks are done as everything is now asynchronous. In this instance I have used the knowledge that 100 tasks will be submitted so that it only need to poll 100 times. A more general way would be to collect the Futures from the submit method as well and then loop over them to see if everything is completed.