Twitter

Tuesday, June 2, 2009

Java nio non blocking server & client

Java supports nonblocking io since java 1.4.

But just now i am really using it in one of my projects.

The difference is that the server socket's accept() or a normal socket's read() or write() method need not be blocking (in a single thread) any more.
E.g. with the normal io, if a server is executing the accept() method then it cannot do anything with the previously accepted sockets.

It has changed with nio in the sense that a server can handle (within a single thread)
1. several existing connections and
2. new incoming connections

Let us see an see an example.

The server (Server.java) listens on port 9999 for any incoming connections.
$>javac Server.java
$>java Server

The client (Client.java) sends the text "I am Client : clientXXX" to the server. XXX->is the command line argument; it is just an identifier to distinguish (on the server's console) different clients.
$>javac Client.java
$>java Client 354
$>java Client dfdsfsd

Sample Output:
Sample Output:
serverSocketChannel's registered key is : sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:9999]

Server is listening on: 127.0.0.1:9999
Key ready to perform accept() : sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:9999]
Key ready to perform read() : java.nio.channels.SocketChannel[connected local=/127.0.0.1:9999 remote=/127.0.0.1:2633]
I am Client : 354
Key ready to perform read() : java.nio.channels.SocketChannel[connected local=/127.0.0.1:9999 remote=/127.0.0.1:2633]
I am Client : 354
Key ready to perform read() : java.nio.channels.SocketChannel[connected local=/127.0.0.1:9999 remote=/127.0.0.1:2633]
I am Client : 354
Key ready to perform accept() : sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:9999]
Key ready to perform read() : java.nio.channels.SocketChannel[connected local=/127.0.0.1:9999 remote=/127.0.0.1:2636]
I am Client : dfdsfsd
Key ready to perform read() : java.nio.channels.SocketChannel[connected local=/127.0.0.1:9999 remote=/127.0.0.1:2633]
I am Client : 354
Key ready to perform read() : java.nio.channels.SocketChannel[connected local=/127.0.0.1:9999 remote=/127.0.0.1:2636]
I am Client : dfdsfsd
Key ready to perform read() : java.nio.channels.SocketChannel[connected local=/127.0.0.1:9999 remote=/127.0.0.1:2633]
I am Client : 354


Server.java






import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {

// this is equivalent to the server socket in the non nio world
ServerSocketChannel serverSocketChannel;

// this is the multiplexer which multiplexes the messages received from different clients
Selector selector;

public Server() {
try {

// get a selector
selector = Selector.open();

// get a server socket channel
serverSocketChannel = ServerSocketChannel.open();

// we force the socket to be Non-blocking.
// if it is set to "true" then this socket acts as a normal (blocking) server socket
serverSocketChannel.configureBlocking(false);

// port and ip address where the server listens for connections
InetSocketAddress add = new InetSocketAddress(InetAddress.getLocalHost(), 9999);

// bind the server socket to the ip/port
serverSocketChannel.socket().bind(add);

// register the serverSocketChannel (for incoming connection events) to the selector.
// The "SelectionKey.OP_ACCEPT" parameter tells the selector that this serverSocketChannel registers
// itself for incoming (acceptable) connections
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("serverSocketChannel's registered key is : " + key.channel().toString());

System.out.println();
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
Server server = new Server();
server.startListening();
}

private void startListening() {

System.out.println("Server is listening on: "
+ serverSocketChannel.socket().getInetAddress().getHostAddress() + ":"
+ serverSocketChannel.socket().getLocalPort());

while (true) {
try {

// this line blocks until some events has occurred in the underlying socket
selector.select();

// get the selected keys set
Set selectedKeys = selector.selectedKeys();

Iterator iterator = selectedKeys.iterator();

while (iterator.hasNext()) {

SelectionKey key = (SelectionKey) iterator.next();

iterator.remove();

// a client has asked for a new connection
if (key.isAcceptable()) {
// only ServerSocketsChannels registered for OP_ACCEPT are excepted to receive an
// "acceptable" key

System.out.println("Key ready to perform accept() : " + key.channel().toString());

// as usual the accept returns the plain socket towards the client
SocketChannel client = serverSocketChannel.accept();

// set the client socket to be non blocking
client.configureBlocking(false);

// register the client socket with the same selector to which we have registered the
// serverSocketChannel
client.register(selector, SelectionKey.OP_READ);
continue;
}

// the client has sent something to be read by this server
if (key.isReadable()) {

System.out.println("Key ready to perform read() : " + key.channel().toString());

// get the underlying socket
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer bb = ByteBuffer.allocate(1024);

// read the msg sent by the client
client.read(bb);

// display the message
bb.flip();
byte[] array = new byte[bb.limit()];
bb.get(array);
System.out.println(new String(array));
continue;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}



Client.java







import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Client {

String myIdentity;

public Client(String pIdentity) {
myIdentity = pIdentity;
}

void talkToServer() {

try {

SocketChannel mySocket = SocketChannel.open();

// non blocking
mySocket.configureBlocking(false);

// connect to a running server
mySocket.connect(new InetSocketAddress(InetAddress.getLocalHost(), 9999));

// get a selector
Selector selector = Selector.open();

// register the client socket with "connect operation" to the selector
mySocket.register(selector, SelectionKey.OP_CONNECT);

// select() blocks until something happens on the underlying socket
while (selector.select() > 0) {

Set keys = selector.selectedKeys();
Iterator it = keys.iterator();

while (it.hasNext()) {

SelectionKey key = it.next();

SocketChannel myChannel = (SocketChannel) key.channel();

it.remove();

if (key.isConnectable()) {
if (myChannel.isConnectionPending()) {
myChannel.finishConnect();
System.out.println("Connection was pending but now is finiehed connecting.");
}

ByteBuffer bb = null;

while (true) {
bb = ByteBuffer.wrap(new String("I am Client : " + myIdentity).getBytes());
myChannel.write(bb);
bb.clear();
synchronized (this) {
wait(3000);
}
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {

Client client = new Client(args[0]);
client.talkToServer();
}

}