Asynchronous HTTP client with Netty

Nitzan Tomer picture Nitzan Tomer · Mar 17, 2012 · Viewed 15.6k times · Source

I'm new to netty and still strugling to find my way. I'm looking to create an http client that works asynchronously. The netty examples of http only show how to wait for IO operations, and not how to use addListener, and so I've been trying to figure this out for the last few days.

I'm trying to create a request class that will handle all of the different states of a request, from connecting, sending the data, handling the response and then closing of the connection. In order to do that my class extends SimpleChannelUpstreamHandler and implements ChannelFutureListener. I use a ChannelPipelineFactory which adds the (this) instance the class (as a SimpleChannelUpstreamHandler) to the pipeline as a handler.

The connection is created like this:

this.state = State.Connecting;
this.clientBootstrap.connect(this.address).addListener(this);

Then the operationComplete method:

@Override
public void operationComplete(ChannelFuture future) throws Exception {
    State oldState = this.state;

    if (!future.isSuccess()) {
        this.status = Status.Failed;
        future.getChannel().disconnect().addListener(this);
    }
    else if (future.isCancelled()) {
        this.status = Status.Canceled;
        future.getChannel().disconnect().addListener(this);
    }
    else switch (this.state) {
        case Connecting:
            this.state = State.Sending;
            Channel channel = future.getChannel();
            channel.write(this.createRequest()).addListener(this);
            break;

        case Sending:
            this.state = State.Disconnecting;
            future.getChannel().disconnect().addListener(this);
            break;

        case Disconnecting:
            this.state = State.Closing;
            future.getChannel().close().addListener(this);
            break;

        case Closing:
            this.state = State.Finished;
            break;
    }
    System.out.println("request operationComplete start state: " + oldState + ", end state: " + this.state + ", status: " + this.status);
}

private HttpRequest createRequest() {
    String url = this.url.toString();

    HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
    request.setHeader(HttpHeaders.Names.HOST, this.url.getHost());
    request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
    request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);

    return request;
}

The class also overrides the messageReceived method:

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    System.out.println("messageReceived");
    HttpResponse response = (HttpResponse) e.getMessage();

    ChannelBuffer content = response.getContent();
    if (content.readable()) {
        System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8));
    }
}

The problem is that I get this output:

request operationComplete start state: Connecting, end state: Sending, status: Unknown
request operationComplete start state: Sending, end state: Disconnecting, status: Unknown
request operationComplete start state: Closing, end state: Finished, status: Unknown
request operationComplete start state: Disconnecting, end state: Finished, status: Unknown

As you can see the messageReceived of the is not being executed for some reason, even though the pipeline factory adds the instance of this class to the pipeline.

Any ideas what I'm missing here? Thanks.


Edit

I managed to finally get this working thanks to the help of @JestanNirojan, in case someone will be interested in the solution:

public class ClientRequest extends SimpleChannelUpstreamHandler {

    ....

    public void connect() {
        this.state = State.Connecting;
        System.out.println(this.state);
        this.clientBootstrap.connect(this.address);
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.state = State.Sending;
        System.out.println(this.state);
        ctx.getChannel().write(this.createRequest());
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        HttpResponse response = (HttpResponse) e.getMessage();

        ChannelBuffer content = response.getContent();
        if (content.readable()) {
            System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8));
        }

        this.state = State.Disconnecting;
        System.out.println(this.state);
    }

    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.state = State.Closing;
        System.out.println(this.state);
    }

    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.state = State.Finished;
        System.out.println(this.state);
    }

    private HttpRequest createRequest() {
        String url = this.url.toString();

        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
        request.setHeader(HttpHeaders.Names.HOST, this.url.getHost());
        request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
        request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);

        return request;
    }
}

Answer

Jestan Nirojan picture Jestan Nirojan · Mar 17, 2012

You are using a ChannelFutureListener to do all operations in the channel (which is bad), and the future listener will be executed right after calling those channel operations.

The problem is, After sending the message, channel is disconnected immediately and the handler can not receive the response message which comes later.

        ........
    case Sending:
        this.state = State.Disconnecting;
        future.getChannel().disconnect().addListener(this);
        break;
        ........

you should not block the channel future thread at all. The best approach is extend the SimpleChannelUpstreamHandler's

    channelConnected(..) {} 
    messageReceived(..) {} 
    channelDisconnected(..) {} 

methods and react to those events. you can keep the state in that handler too.