Flexible CountDownLatch?

Adamski picture Adamski · Oct 28, 2009 · Viewed 8.8k times · Source

I have encountered a problem twice now whereby a producer thread produces N work items, submits them to an ExecutorService and then needs to wait until all N items have been processed.

Caveats

  • N is not known in advance. If it were I would simply create a CountDownLatch and then have producer thread await() until all work was complete.
  • Using a CompletionService is inappropriate because although my producer thread needs to block (i.e. by calling take()) there's no way of signalling that all work is complete, to cause the producer thread to stop waiting.

My current favoured solution is to use an integer counter, and to increment this whenever an item of work is submitted and to decrement it when a work item is processed. Following the subsmission of all N tasks my producer thread will need to wait on a lock, checking whether counter == 0 whenever it is notified. The consumer thread(s) will need to notify the producer if it has decremented the counter and the new value is 0.

Is there a better approach to this problem or is there a suitable construct in java.util.concurrent I should be using rather than "rolling my own"?

Thanks in advance.

Answer

John Vint picture John Vint · Oct 28, 2009

java.util.concurrent.Phaser looks like it would work well for you. It is planned to be release in Java 7 but the most stable version can be found at jsr166's interest group website.

The phaser is a glorified Cyclic Barrier. You can register N number of parties and when youre ready await their advance at the specific phase.

A quick example on how it would work:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}