Some days ago there was a discussion with one of my colleague regarding synchronization points between several Java threads. Of course there are CountDownLatch and CyclicBarrier for solving this kind of problems. But even with them we need to know how may threads are we going to start before hand. Because both CountDownLatch and CyclicBarrier expects the number of parties to be synchronized as the constructor argument.
And this is the output:
new CountDownLatch(count); new CyclicBarrier(parties);
Then I came across Phaser. In case of Phaser you don't need to give the number of parties as an constructor argument.
new Phaser();
It is one of the concurrency features coming in Java 7. It is designed in such a way that a thread that needs to be in sync with other threads can register() themselves. Like a CyclicBarrier a Phaser can be reused (during several phases of the task). For this use arriveAndAwaitAdvance(), i.e after completing each phase you arrive and wait for all the other threads and once they all reach you advance (move) to the next phase.
In this example code you see 3 threads being started. Once a task begins the thread will register itself with the Phaser. After completing each phase of the task each thread will tell the Phaser that it has completed a certain phase by calling arrive() and then wait for the others with awaitAdvance(). And at any phase one party can ask the Phaser for the number of parties those have not yet arrived be calling getUnarrivedParties().
@Override public void run() { _phaser.register(); // register on the fly // First phase doSomeWork(); int arr1 = _phaser.arrive(); // let the phaser know that you have arrived this point of the task int unarrivedParties1 = _phaser.getUnarrivedParties(); // ask the phaser how many parties are not yet here if (_phaser.getPhase() == arr1) { System.out.println(_taskId + " completed phase " + arr1 + " and waiting arraival of " + unarrivedParties1 + " threads so that it can enter phase " + (arr1 + 1)); } else { System.out.println(_taskId + " completed phase " + (arr1) + " (the last one to reach) and now all tasks will proceed to phase " + (arr1 + 1)); } _phaser.awaitAdvance(arr1); // be in sync with other threads // Second phase doSomeWork(); int arr2 = _phaser.arrive(); // let the phaser know that you have arrived this point of the task int unarrivedParties2 = _phaser.getUnarrivedParties(); // ask the phaser how many parties are not yet here if (_phaser.getPhase() == arr2) { System.out.println(_taskId + " completed phase " + arr2 + " and waiting arraival of " + unarrivedParties2 + " threads so that it can enter phase " + (arr2 + 1)); } else { System.out.println(_taskId + " completed phase " + (arr2) + " (the last one to reach) and now all tasks will proceed to phase " + (arr2 + 1)); } _phaser.awaitAdvance(arr2); // be in sync with other threads // and so on ... // at some point a task could de-register itself from the phaser _phaser.arriveAndDeregister(); // de-registered threads is not considered need not be in sync with the other threads any more }
And this is the output:
Thread_1 completed phase 0 and waiting arraival of 2 threads so that it can enter phase 1 Thread_2 completed phase 0 and waiting arraival of 1 threads so that it can enter phase 1 Thread_0 completed phase 0 (the last one to reach) and now all tasks will proceed to phase 1 Thread_0 completed phase 1 and waiting arraival of 2 threads so that it can enter phase 2 Thread_1 completed phase 1 and waiting arraival of 1 threads so that it can enter phase 2 Thread_2 completed phase 1 (the last one to reach) and now all tasks will proceed to phase 2