What's the best way to implement a non-blocking socket in Java?
Or is there such a thing? I have a program that communicates with a server through socket but I don't want the socket call to block/cause delay if there is a problem with the data/connection.
Java non-blocking socket, introduced in Java 2 Standard Edition 1.4, allow net communication between applications without blocking the processes using the sockets. But what a non-blocking socket is, in which contexts it can be useful, and how it works?
A non-blocking socket allows I/O operation on a channel without blocking the processes using it. This means, we can use a single thread to handle multiple concurrent connections and gain an "asynchronous high-performance" read/write operations (some people may not agreed with that)
Ok, in which contexts it can be useful?
Suppose you would like to implement a server accepting diverse client connections. Suppose, as well, that you would like the server to be able to process multiple requests simultaneously. Using the traditional way you have two choices to develop such a server:
Both solutions work, but adopting the first one you have to develop the whole thread-management solution, with related concurrency and conflict troubles. The second solution makes the application dependent on a non-JDK external module and probably you have to adapt the library to your necessities. By means of the non-blocking socket, you can implement a non-blocking server without directly managing threads or resorting to external modules.
Before going into details, there are few terms that you need to understand:
Java NIO has a class called Selector
that allows a single thread to examine I/O events on multiple channels. How is this possible? Well, the selector
can check the "readiness" of a channel for events such as a client attempting a connection, or a read/write operation. This is, each instance of Selector
can monitor more socket channels and thus more connections. Now, when something happens on the channel (an event occurs), the selector
informs the application to process the request. The selector
does it by creating event keys (or selection keys), which are instances of the SelectionKey
class. Each key
holds information about who is making the request and what type of the request is, as shown in the Figure 1.
A server implementation consists of an infinite loop in which the selector
waits for events and creates the event keys. There are four possible types for a key:
Usually acceptable
keys are created on the server side. In fact, this kind of key simply informs the server that a client required a connection, then the server individuates the socket channel and associates this to the selector for read/write operations. After this, when the accepted client reads or writes something, the selector will create readable
or writeable
keys for that client..
Now you are ready to write the server in Java, following the proposed algorithm. The creation of the socket channel, the selector
, and the socket-selector registration can be made in this way:
final String HOSTNAME = "127.0.0.1";
final int PORT = 8511;
// This is how you open a ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
serverChannel.configureBlocking(false);
// bind to the address that you will use to Serve.
serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT));
// This is how you open a Selector
selector = Selector.open();
/*
* Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
* This means that you just told your selector that this channel will be used to accept connections.
* We can change this operation later to read/write, more on this later.
*/
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
First we create an instance of SocketChannel
with ServerSocketChannel.open()
method. Next, configureBlocking(false)
invocation sets this channel
as nonblocking. The connection to the server is made by serverChannel.socket().bind()
method. The HOSTNAME
represents the IP address of the server, and PORT
is the communication port. Finally, invoke Selector.open()
method to create a selector
instance and register it to the channel
and registration type. In this example, the registration type is OP_ACCEPT
, which means the selector merely reports that a client attempts a connection to the server. Other possible options are: OP_CONNECT
, which will be used by the client; OP_READ
; and OP_WRITE
.
Now we need to handle this requests using an infinite loop. A simple way is the following:
// Run the server as long as the thread is not interrupted.
while (!Thread.currentThread().isInterrupted()) {
/*
* selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
* For example, if a client connects right this second, then it will break from the select()
* call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't
* block undefinable.
*/
selector.select(TIMEOUT);
/*
* If we are here, it is because an operation happened (or the TIMEOUT expired).
* We need to get the SelectionKeys from the selector to see what operations are available.
* We use an iterator for this.
*/
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
// remove the key so that we don't process this OPERATION again.
keys.remove();
// key could be invalid if for example, the client closed the connection.
if (!key.isValid()) {
continue;
}
/*
* In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
* If the key from the keyset is Acceptable, then we must get ready to accept the client
* connection and do something with it. Go read the comments in the accept method.
*/
if (key.isAcceptable()) {
System.out.println("Accepting connection");
accept(key);
}
/*
* If you already read the comments in the accept() method, then you know we changed
* the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
* a channel that is writable (key.isWritable()). The write() method will explain further.
*/
if (key.isWritable()) {
System.out.println("Writing...");
write(key);
}
/*
* If you already read the comments in the write method then you understand that we registered
* the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
* that is ready to read (key.isReadable()). The read() method will explain further.
*/
if (key.isReadable()) {
System.out.println("Reading connection");
read(key);
}
}
}
You can find the implementation source here
An alternative to the the Non-blocking implementation we can deploy an Asynchronous Server. For instance, you can use the AsynchronousServerSocketChannel
class, which provides an asynchronous channel for stream-oriented listening sockets.
To use it, first execute its static open()
method and then bind()
it to a specific port. Next, you'll execute its accept()
method, passing to it a class that implements the CompletionHandler
interface. Most often, you'll find that handler created as an anonymous inner class.
From this AsynchronousServerSocketChannel
object, you invoke accept()
to tell it to start listening for connections, passing to it a custom CompletionHandler
instance. When we invoke accept()
, it returns immediately. Note that this is different from the traditional blocking approach; whereas the accept()
method blocked until a client connected to it, the AsynchronousServerSocketChannel
accept()
method handles it for you.
Here you have an example:
public class NioSocketServer
{
public NioSocketServer()
{
try {
// Create an AsynchronousServerSocketChannel that will listen on port 5000
final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
.open()
.bind(new InetSocketAddress(5000));
// Listen for a new request
listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()
{
@Override
public void completed(AsynchronousSocketChannel ch, Void att)
{
// Accept the next connection
listener.accept(null, this);
// Greet the client
ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes()));
// Allocate a byte buffer (4K) to read from the client
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
try {
// Read the first line
int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
boolean running = true;
while (bytesRead != -1 && running) {
System.out.println("bytes read: " + bytesRead);
// Make sure that we have data to read
if (byteBuffer.position() > 2) {
// Make the buffer ready to read
byteBuffer.flip();
// Convert the buffer into a line
byte[] lineBytes = new byte[bytesRead];
byteBuffer.get(lineBytes, 0, bytesRead);
String line = new String(lineBytes);
// Debug
System.out.println("Message: " + line);
// Echo back to the caller
ch.write(ByteBuffer.wrap(line.getBytes()));
// Make the buffer ready to write
byteBuffer.clear();
// Read the next line
bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
} else {
// An empty line signifies the end of the conversation in our protocol
running = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
// The user exceeded the 20 second timeout, so close the connection
ch.write(ByteBuffer.wrap("Good Bye\n".getBytes()));
System.out.println("Connection timed out, closing connection");
}
System.out.println("End of conversation");
try {
// Close the connection if we need to
if (ch.isOpen()) {
ch.close();
}
} catch (I/OException e1)
{
e1.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void att)
{
///...
}
});
} catch (I/OException e) {
e.printStackTrace();
}
}
public static void main(String[] args)
{
NioSocketServer server = new NioSocketServer();
try {
Thread.sleep(60000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
You can find the full code here