Non-blocking sockets

jasonline picture jasonline · Oct 9, 2010 · Viewed 21.4k times · Source

What's the best way to implement a non-blocking socket in Java?

Or is there such a thing? I have a program that communicates with a server through socket but I don't want the socket call to block/cause delay if there is a problem with the data/connection.

Answer

Teocci picture Teocci · Jun 21, 2017

Java non-blocking socket, introduced in Java 2 Standard Edition 1.4, allow net communication between applications without blocking the processes using the sockets. But what a non-blocking socket is, in which contexts it can be useful, and how it works?

What a non-blocking socket is?

A non-blocking socket allows I/O operation on a channel without blocking the processes using it. This means, we can use a single thread to handle multiple concurrent connections and gain an "asynchronous high-performance" read/write operations (some people may not agreed with that)

Ok, in which contexts it can be useful?

Suppose you would like to implement a server accepting diverse client connections. Suppose, as well, that you would like the server to be able to process multiple requests simultaneously. Using the traditional way you have two choices to develop such a server:

  • Implement a multi-thread server that manually handles a thread for each connection.
  • Using an external third-party module.

Both solutions work, but adopting the first one you have to develop the whole thread-management solution, with related concurrency and conflict troubles. The second solution makes the application dependent on a non-JDK external module and probably you have to adapt the library to your necessities. By means of the non-blocking socket, you can implement a non-blocking server without directly managing threads or resorting to external modules.

How it works?

Before going into details, there are few terms that you need to understand:

  • In NIO based implementations, instead of writing data onto output streams and reading data from input streams, we read and write data from buffers. A buffer can be defined as a temporary storage.
  • Channel transports bulk of data into and out of buffers. Also, it can be viewed as an endpoint for communication.
  • Readiness Selection is a concept that refers to “the ability to choose a socket that will not block when data is read or written.”

Java NIO has a class called Selector that allows a single thread to examine I/O events on multiple channels. How is this possible? Well, the selector can check the "readiness" of a channel for events such as a client attempting a connection, or a read/write operation. This is, each instance of Selector can monitor more socket channels and thus more connections. Now, when something happens on the channel (an event occurs), the selector informs the application to process the request. The selector does it by creating event keys (or selection keys), which are instances of the SelectionKey class. Each key holds information about who is making the request and what type of the request is, as shown in the Figure 1.

Figure 1: Structure diagram Figure 1: Structure diagram

A basic implementation

A server implementation consists of an infinite loop in which the selector waits for events and creates the event keys. There are four possible types for a key:

  • Acceptable: the associated client requests a connection.
  • Connectable: the server accepted the connection.
  • Readable: the server can read.
  • Writeable: the server can write.

Usually acceptable keys are created on the server side. In fact, this kind of key simply informs the server that a client required a connection, then the server individuates the socket channel and associates this to the selector for read/write operations. After this, when the accepted client reads or writes something, the selector will create readable or writeable keys for that client..

Now you are ready to write the server in Java, following the proposed algorithm. The creation of the socket channel, the selector, and the socket-selector registration can be made in this way:

final String HOSTNAME = "127.0.0.1";
final int PORT = 8511;

// This is how you open a ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
serverChannel.configureBlocking(false);
// bind to the address that you will use to Serve.
serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT));

// This is how you open a Selector
selector = Selector.open();
/*
 * Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
 * This means that you just told your selector that this channel will be used to accept connections.
 * We can change this operation later to read/write, more on this later.
 */
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

First we create an instance of SocketChannel with ServerSocketChannel.open() method. Next, configureBlocking(false) invocation sets this channel as nonblocking. The connection to the server is made by serverChannel.socket().bind() method. The HOSTNAME represents the IP address of the server, and PORT is the communication port. Finally, invoke Selector.open() method to create a selector instance and register it to the channel and registration type. In this example, the registration type is OP_ACCEPT, which means the selector merely reports that a client attempts a connection to the server. Other possible options are: OP_CONNECT, which will be used by the client; OP_READ; and OP_WRITE.

Now we need to handle this requests using an infinite loop. A simple way is the following:

// Run the server as long as the thread is not interrupted.
while (!Thread.currentThread().isInterrupted()) {
    /*
     * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
     * For example, if a client connects right this second, then it will break from the select()
     * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't
     * block undefinable.
     */
    selector.select(TIMEOUT);

    /*
     * If we are here, it is because an operation happened (or the TIMEOUT expired).
     * We need to get the SelectionKeys from the selector to see what operations are available.
     * We use an iterator for this.
     */
    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

    while (keys.hasNext()) {
        SelectionKey key = keys.next();
        // remove the key so that we don't process this OPERATION again.
        keys.remove();

        // key could be invalid if for example, the client closed the connection.
        if (!key.isValid()) {
            continue;
        }
        /*
         * In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
         * If the key from the keyset is Acceptable, then we must get ready to accept the client
         * connection and do something with it. Go read the comments in the accept method.
         */
        if (key.isAcceptable()) {
            System.out.println("Accepting connection");
            accept(key);
        }
        /*
         * If you already read the comments in the accept() method, then you know we changed
         * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
         * a channel that is writable (key.isWritable()). The write() method will explain further.
         */
        if (key.isWritable()) {
            System.out.println("Writing...");
            write(key);
        }
        /*
         * If you already read the comments in the write method then you understand that we registered
         * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
         * that is ready to read (key.isReadable()). The read() method will explain further.
         */
        if (key.isReadable()) {
            System.out.println("Reading connection");
            read(key);
        }
    }
}

You can find the implementation source here

NOTE: Asynchronous Server

An alternative to the the Non-blocking implementation we can deploy an Asynchronous Server. For instance, you can use the AsynchronousServerSocketChannel class, which provides an asynchronous channel for stream-oriented listening sockets.

To use it, first execute its static open() method and then bind() it to a specific port. Next, you'll execute its accept() method, passing to it a class that implements the CompletionHandler interface. Most often, you'll find that handler created as an anonymous inner class.

From this AsynchronousServerSocketChannel object, you invoke accept() to tell it to start listening for connections, passing to it a custom CompletionHandler instance. When we invoke accept(), it returns immediately. Note that this is different from the traditional blocking approach; whereas the accept() method blocked until a client connected to it, the AsynchronousServerSocketChannel accept() method handles it for you.

Here you have an example:

public class NioSocketServer
{
    public NioSocketServer()
    {
        try {
            // Create an AsynchronousServerSocketChannel that will listen on port 5000
            final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
                    .open()
                    .bind(new InetSocketAddress(5000));

            // Listen for a new request
            listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()
            {
                @Override
                public void completed(AsynchronousSocketChannel ch, Void att)
                {
                    // Accept the next connection
                    listener.accept(null, this);

                    // Greet the client
                    ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes()));

                    // Allocate a byte buffer (4K) to read from the client
                    ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
                    try {
                        // Read the first line
                        int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);

                        boolean running = true;
                        while (bytesRead != -1 && running) {
                            System.out.println("bytes read: " + bytesRead);

                            // Make sure that we have data to read
                            if (byteBuffer.position() > 2) {
                                // Make the buffer ready to read
                                byteBuffer.flip();

                                // Convert the buffer into a line
                                byte[] lineBytes = new byte[bytesRead];
                                byteBuffer.get(lineBytes, 0, bytesRead);
                                String line = new String(lineBytes);

                                // Debug
                                System.out.println("Message: " + line);

                                // Echo back to the caller
                                ch.write(ByteBuffer.wrap(line.getBytes()));

                                // Make the buffer ready to write
                                byteBuffer.clear();

                                // Read the next line
                                bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
                            } else {
                                // An empty line signifies the end of the conversation in our protocol
                                running = false;
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        // The user exceeded the 20 second timeout, so close the connection
                        ch.write(ByteBuffer.wrap("Good Bye\n".getBytes()));
                        System.out.println("Connection timed out, closing connection");
                    }

                    System.out.println("End of conversation");
                    try {
                        // Close the connection if we need to
                        if (ch.isOpen()) {
                            ch.close();
                        }
                    } catch (I/OException e1)
                    {
                        e1.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, Void att)
                {
                    ///...
                }
            });
        } catch (I/OException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args)
    {
        NioSocketServer server = new NioSocketServer();
        try {
            Thread.sleep(60000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

You can find the full code here