Java High-load NIO TCP server

Juriy picture Juriy · Jul 9, 2013 · Viewed 18.5k times · Source

As a part of my research I'm writing an high-load TCP/IP echo server in Java. I want to serve about 3-4k of clients and see the maximum possible messages per second that I can squeeze out of it. Message size is quite small - up to 100 bytes. This work doesn't have any practical purpose - only a research.

According to numerous presentations that I've seen (HornetQ benchmarks, LMAX Disruptor talks, etc), real-world high-load systems tend to serve millions of transactions per second (I believe Disruptor mentioned about 6 mils and and Hornet - 8.5). For example, this post states that it possible to achieve up to 40M MPS. So I took it as a rough estimate of what should modern hardware be capable of.

I wrote simplest single-threaded NIO server and launched a load test. I was little surprised that I can get only about 100k MPS on localhost and 25k with actual networking. Numbers look quite small. I was testing on Win7 x64, core i7. Looking at CPU load - only one core is busy (which is expected on a single-threaded app), while the rest sit idle. However even if I load all 8 cores (including virtual) I will have no more than 800k MPS - not even close to 40 millions :)

My question is: what is a typical pattern for serving massive amounts of messages to clients? Should I distribute networking load over several different sockets inside a single JVM and use some sort of load balancer like HAProxy to distribute load to multiple cores? Or I should look towards using multiple Selectors in my NIO code? Or maybe even distribute the load between multiple JVMs and use Chronicle to build an inter-process communication between them? Will testing on a proper serverside OS like CentOS make a big difference (maybe it is Windows that slows things down)?

Below is the sample code of my server. It always answers with "ok" to any incoming data. I know that in real world I'd need to track the size of the message and be prepared that one message might be split between multiple reads however I'd like to keep things super-simple for now.

public class EchoServer {

private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;

// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);

private InetAddress hostAddress = null;

private int port;
private Selector selector;

private long loopTime;
private long numMessages = 0;

public EchoServer() throws IOException {
    this(DEFAULT_PORT);
}

public EchoServer(int port) throws IOException {
    this.port = port;
    selector = initSelector();
    loop();
}

private void loop() {
    while (true) {
        try{
            selector.select();
            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check what event is available and deal with it
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                } else if (key.isWritable()) {
                    write(key);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

private void accept(SelectionKey key) throws IOException {
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    SocketChannel socketChannel = serverSocketChannel.accept();
    socketChannel.configureBlocking(false);
    socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
    socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
    socketChannel.register(selector, SelectionKey.OP_READ);

    System.out.println("Client is connected");
}

private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(readBuffer);
    } catch (IOException e) {
        key.cancel();
        socketChannel.close();

        System.out.println("Forceful shutdown");
        return;
    }

    if (numRead == -1) {
        System.out.println("Graceful shutdown");
        key.channel().close();
        key.cancel();

        return;
    }

    socketChannel.register(selector, SelectionKey.OP_WRITE);

    numMessages++;
    if (numMessages%100000 == 0) {
        long elapsed = System.currentTimeMillis() - loopTime;
        loopTime = System.currentTimeMillis();
        System.out.println(elapsed);
    }
}

private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));

    socketChannel.write(dummyResponse);
    if (dummyResponse.remaining() > 0) {
        System.err.print("Filled UP");
    }

    key.interestOps(SelectionKey.OP_READ);
}

private Selector initSelector() throws IOException {
    Selector socketSelector = SelectorProvider.provider().openSelector();

    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
    serverChannel.socket().bind(isa);
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
    return socketSelector;
}

public static void main(String[] args) throws IOException {
    System.out.println("Starting echo server");
    new EchoServer();
}
}

Answer

Rajiv picture Rajiv · Jan 8, 2014
what is a typical pattern for serving massive amounts of messages to clients?

There are many possible patterns: An easy way to utilize all cores without going through multiple jvms is:

  1. Have a single thread accept connections and read using a selector.
  2. Once you have enough bytes to constitute a single message, pass it on to another core using a construct like a ring buffer. The Disruptor Java framework is a good match for this. This is a good pattern if the processing needed to know what is a complete message is lightweight. For example if you have a length prefixed protocol you could wait till you get the expected number of bytes and then send it to another thread. If the parsing of the protocol is very heavy then you might overwhelm this single thread preventing it from accepting connections or reading bytes of the network.
  3. On your worker thread(s), which receive data from a ring buffer, do the actual processing.
  4. You write out the responses either on your worker threads or through some other aggregator thread.

That's the gist of it. There are many more possibilities here and the answer really depends on the type of application you are writing. A few examples are:

  1. A CPU heavy stateless application say an image processing application. The amount of CPU/GPU work done per request will probably be significantly higher than the overhead generated by a very naive inter-thread communication solution. In this case an easy solution is a bunch of worker threads pulling work from a single queue. Notice how this is a single queue instead of one queue per worker. The advantage is this is inherently load balanced. Each worker finishes it's work and then just polls the single-producer multiple-consumer queue. Even though this is a source of contention, the image-processing work (seconds?) should be far more expensive than any synchronization alternative.
  2. A pure IO application e.g. a stats server which just increments some counters for a request: Here you do almost no CPU heavy work. Most of the work is just reading bytes and writing bytes. A multi-threaded application might not give you significant benefit here. In fact it might even slow things down if the time it takes to queue items is more than the time it takes to process them. A single threaded Java server should be able to saturate a 1G link easily.
  3. Stateful applications which require moderate amounts of processing e.g. a typical business application: Here every client has some state that determines how each request is handled. Assuming we go multi-threaded since the processing is non-trivial, we could affinitize clients to certain threads. This is a variant of the actor architecture:

    i) When a client first connects hash it to a worker. You might want to do this with some client id, so that if it disconnects and reconnects it is still assigned to the same worker/actor.

    ii) When the reader thread reads a complete request put it on the ring-buffer for the right worker/actor. Since the same worker always processes a particular client all the state should be thread local making all the processing logic simple and single-threaded.

    iii) The worker thread can write requests out. Always attempt to just do a write(). If all your data could not be written out only then do you register for OP_WRITE. The worker thread only needs to make select calls if there is actually something outstanding. Most writes should just succeed making this unnecessary. The trick here is balancing between select calls and polling the ring buffer for more requests. You could also employ a single writer thread whose only responsibility is to write requests out. Each worker thread can put it's responses on a ring buffer connecting it to this single writer thread. The single writer thread round-robin polls each incoming ring-buffer and writes out the data to clients. Again the caveat about trying write before select applies as does the trick about balancing between multiple ring buffers and select calls.

As you point out there are many other options:

Should I distribute networking load over several different sockets inside a single JVM and use some sort of load balancer like HAProxy to distribute load to multiple cores?

You can do this, but IMHO this is not the best use for a load balancer. This does buy you independent JVMs that can fail on their own but will probably be slower than writing a single JVM app that is multi-threaded. The application itself might be easier to write though since it will be single threaded.

Or I should look towards using multiple Selectors in my NIO code?

You can do this too. Look at Ngnix architecture for some hints on how to do this.

Or maybe even distribute the load between multiple JVMs and use Chronicle to build an inter-process communication between them? This is also an option. Chronicle gives you an advantage that memory mapped files are more resilient to a process quitting in the middle. You still get plenty of performance since all communication is done through shared memory.

Will testing on a proper serverside OS like CentOS make a big difference (maybe it is Windows that slows things down)?

I don't know about this. Unlikely. If Java uses the native Windows APIs to the fullest, it shouldn't matter as much. I am highly doubtful of the 40 million transactions/sec figure (without a user space networking stack + UDP) but the architectures I listed should do pretty well.

These architectures tend to do well since they are single-writer architectures that use bounded array based data structures for inter thread communication. Determine if multi-threaded is even the answer. In many cases it is not needed and can lead to slowdown.

Another area to look into is memory allocation schemes. Specifically the strategy to allocate and reuse buffers could lead to significant benefits. The right buffer reuse strategy is dependent on application. Look at schemes like buddy-memory allocation, arena allocation etc to see if they can benefit you. The JVM GC does plenty fine for most work loads though so always measure before you go down this route.

Protocol design has a big effect on performance too. I tend to prefer length prefixed protocols because they let you allocate buffers of right sizes avoiding lists of buffers and/or buffer merging. Length prefixed protocols also make it easy to decide when to handover a request - just check num bytes == expected. The actual parsing can be done by the workers thread. Serialization and deserialization extends beyond length-prefixed protocols. Patterns like flyweight patterns over buffers instead of allocations helps here. Look at SBE for some of these principles.

As you can imagine an entire treatise could be written here. This should set you in the right direction. Warning: Always measure and make sure you need more performance than the simplest option. It's easy to get sucked into a never ending black-hole of performance improvements.