Twitter

Thursday, April 28, 2011

A peek into JDK7 - Java Phaser: Taking concurrency to the next level

Some days ago there was a discussion with one of my colleague regarding synchronization points between several Java threads. Of course there are CountDownLatch and CyclicBarrier for solving this kind of problems. But even with them we need to know how may threads are we going to start before hand. Because both  CountDownLatch and CyclicBarrier expects the number of parties to be synchronized as the constructor argument.

new CountDownLatch(count);
new CyclicBarrier(parties);

Then I came across Phaser. In case of Phaser you don't need to give the number of parties as an constructor argument. 

new Phaser();

It is one of the concurrency features coming in Java 7. It is designed in such a way that a thread that needs to be in sync with other threads can register() themselves. Like a CyclicBarrier a Phaser can be reused (during several phases of the task). For this use arriveAndAwaitAdvance(), i.e after completing each phase you arrive and wait for all the other threads and once they all reach you advance (move) to the next phase.

In this example code you see 3 threads being started. Once a task begins the thread will register itself with the Phaser. After completing each phase of the task each thread will tell the Phaser that it has completed a certain phase by calling  arrive() and then wait for the others with awaitAdvance(). And at any phase one party can ask the Phaser for the number of parties those have not yet arrived be calling getUnarrivedParties().

@Override
  public void run() {
   
   _phaser.register(); // register on the fly

   // First phase
   doSomeWork();

   int arr1 = _phaser.arrive(); // let the phaser know that you have arrived this point of the task
   int unarrivedParties1 = _phaser.getUnarrivedParties(); // ask the phaser how many parties are not yet here
   
   if (_phaser.getPhase() == arr1) {
    System.out.println(_taskId + " completed phase " + arr1 + " and waiting arraival of " + 
         unarrivedParties1 + " threads so that it can enter phase " + (arr1 + 1));
   }
   else {
    System.out.println(_taskId + " completed phase " + (arr1) + 
         " (the last one to reach) and now all tasks will proceed to phase " + (arr1 + 1));
   }
   
   _phaser.awaitAdvance(arr1); // be in sync with other threads

   
   
   // Second phase
   doSomeWork();

   int arr2 = _phaser.arrive(); // let the phaser know that you have arrived this point of the task
   int unarrivedParties2 = _phaser.getUnarrivedParties(); // ask the phaser how many parties are not yet here

   if (_phaser.getPhase() == arr2) {
    System.out.println(_taskId + " completed phase " + arr2 + " and waiting arraival of " + 
         unarrivedParties2 + " threads so that it can enter phase " + (arr2 + 1));
   }
   else {
    System.out.println(_taskId + " completed phase " + (arr2) + 
         " (the last one to reach) and now all tasks will proceed to phase " + (arr2 + 1));
   }
   _phaser.awaitAdvance(arr2); // be in sync with other threads

   // and so on ...
   
   // at some point a task could de-register itself from the phaser
   _phaser.arriveAndDeregister(); // de-registered threads is not considered need not be in sync with the other threads any more
   

  }

And this is the output:
Thread_1 completed phase 0 and waiting arraival of 2 threads so that it can enter phase 1
Thread_2 completed phase 0 and waiting arraival of 1 threads so that it can enter phase 1
Thread_0 completed phase 0 (the last one to reach) and now all tasks will proceed to phase 1
Thread_0 completed phase 1 and waiting arraival of 2 threads so that it can enter phase 2
Thread_1 completed phase 1 and waiting arraival of 1 threads so that it can enter phase 2
Thread_2 completed phase 1 (the last one to reach) and now all tasks will proceed to phase 2

No comments:

Post a Comment