How thread safe is thrift? re: I seem to have requests disrupting one another

juhanic picture juhanic · Nov 22, 2010 · Viewed 9k times · Source

edit

Apparently what I was hoping to do is outside of the scope of thrift... If I make sure there is never more than one client on the port, everything is a-ok. Of course this kind of defeats the purpose as I'd like to have several reusable connections open to the server to improve response times and lower overhead.

If anyone has a suggestion of an alternate way to achieve this, it would be appreciated(or if my conclusion is wrong)

Background

I have a multi-component application that is mostly connected up by thrift(mostly java->php connections).

So far it all has appeared to be fine, but having introduced a Java->Java connection where the client end is a servlet that can launch hundreds of requests a second.

The method being accessed has the following interface:

bool pvCheck(1:i32 toolId) throws(1:DPNoToolException nte),

To make sure it wasn't something weird on the service end I replaced the implementation with a trivial one:

    @Override
    public boolean pvCheck(int toolId) throws TException {
        //boolean ret = api.getViewsAndDec(toolId);
        return true;
    }

The errors/possible causes?

So long as there's not many connections it goes by fine but as soon as connections come close together, connections start getting stuck in the reader.

If I pull one of them up in the debugger the stack looks like this:

Daemon Thread [http-8080-197] (Suspended)   
    BufferedInputStream.read(byte[], int, int) line: 308    
    TSocket(TIOStreamTransport).read(byte[], int, int) line: 126    
    TSocket(TTransport).readAll(byte[], int, int) line: 84  
    TBinaryProtocol.readAll(byte[], int, int) line: 314 
    TBinaryProtocol.readI32() line: 262 
    TBinaryProtocol.readMessageBegin() line: 192    
    DumboPayment$Client.recv_pvCheck() line: 120    
    DumboPayment$Client.pvCheck(int) line: 105  
    Receiver.performTask(HttpServletRequest, HttpServletResponse) line: 157 
    Receiver.doGet(HttpServletRequest, HttpServletResponse) line: 109   
    Receiver(HttpServlet).service(HttpServletRequest, HttpServletResponse) line: 617    
    Receiver(HttpServlet).service(ServletRequest, ServletResponse) line: 717    
    ApplicationFilterChain.internalDoFilter(ServletRequest, ServletResponse) line: 290  
    ApplicationFilterChain.doFilter(ServletRequest, ServletResponse) line: 206  
    StandardWrapperValve.invoke(Request, Response) line: 233    
    StandardContextValve.invoke(Request, Response) line: 191    
    StandardHostValve.invoke(Request, Response) line: 127   
    ErrorReportValve.invoke(Request, Response) line: 102    
    StandardEngineValve.invoke(Request, Response) line: 109 
    CoyoteAdapter.service(Request, Response) line: 298  
    Http11AprProcessor.process(long) line: 859  
    Http11AprProtocol$Http11ConnectionHandler.process(long) line: 579   
    AprEndpoint$Worker.run() line: 1555 
    Thread.run() line: 619  

This seems to be triggered by data getting corrupted as I get the following exceptions:

10/11/22 18:38:55 WARN logger.Receiver: pvCheck had an exception
org.apache.thrift.TApplicationException: pvCheck failed: unknown result
    at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:135)
    at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
    at *.Receiver.performTask(Receiver.java:157)
    at *.Receiver.doGet(Receiver.java:109)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:617)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:717)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:298)
    at org.apache.coyote.http11.Http11AprProcessor.process(Http11AprProcessor.java:859)
    at org.apache.coyote.http11.Http11AprProtocol$Http11ConnectionHandler.process(Http11AprProtocol.java:579)
    at org.apache.tomcat.util.net.AprEndpoint$Worker.run(AprEndpoint.java:1555)
    at java.lang.Thread.run(Thread.java:619)

and

10/11/22 17:59:46 ERROR [/ninja_ar].[Receiver]: サーブレット Receiver のServlet.service()が例外を投げました
java.lang.OutOfMemoryError: Java heap space
    at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:296)
    at org.apache.thrift.protocol.TBinaryProtocol.readString(TBinaryProtocol.java:290)
    at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:198)
    at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:120)
    at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
    at *.Receiver.performTask(Receiver.java:157)
    at *.Receiver.doGet(Receiver.java:109)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:690)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:269)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:188)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:210)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:172)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:117)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:108)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:151)
    at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:870)
    at org.apache.coyote.http11.Http11BaseProtocol$Http11ConnectionHandler.processConnection(Http11BaseProtocol.java:665)
    at org.apache.tomcat.util.net.PoolTcpEndpoint.processSocket(PoolTcpEndpoint.java:528)
    at org.apache.tomcat.util.net.LeaderFollowerWorkerThread.runIt(LeaderFollowerWorkerThread.java:81)
    at org.apache.tomcat.util.threads.ThreadPool$ControlRunnable.run(ThreadPool.java:685)
    at java.lang.Thread.run(Thread.java:636)

Perhaps I'm way off the mark but I'm pretty sure these are related to the client continuing to attempt to read when there's nothing being sent.

Some implementation details

Both the server and client are using the java binary protocol.

I wrote a simple client pool class which lets me reuse clients, these are the main functions:

public synchronized Client getClient() {
    if(clientQueue.isEmpty()) {
        return newClient();
    } else {
        return clientQueue.getLast();
    }
}

private synchronized Client newClient() {
    int leftToTry = serverArr.length;
    Client cli = null;
    while(leftToTry > 0 && cli == null) {
        log.info("Creating new connection to " + 
                serverArr[roundRobinPos] + port);
        TTransport transport = new TSocket(serverArr[roundRobinPos], port);
        TProtocol protocol = new TBinaryProtocol(transport);
        cli = new Client(protocol);
        try {
            transport.open();
        } catch (TTransportException e) {
            cli = null;
            log.warn("Failed connection to " + 
                    serverArr[roundRobinPos] + port);
        }

        roundRobinPos++;
        if(roundRobinPos >= serverArr.length) {
            roundRobinPos = 0;
        }
        leftToTry--;
    }

    return cli;
}

public void returnClient(Client cli) {
    clientQueue.addFirst(cli);
}

The client applications(namely tomcat servlets) access it in the following way:

    Client dpayClient = null;
    if(dpay != null
            && (dpayClient = dpay.getClient()) != null) {

        try {
            dpayClient.pvCheck(requestParameters.getId());
        } catch (DPNoToolException e) {
            return;
        } catch (TException e) {
            log.warn("pvCheck had an exception", e);
        } finally {
            if(dpayClient != null) {
                dpay.returnClient(dpayClient);
            }
        }
    }

The actual thrift connection is upped in the following manner

private boolean initThrift(int port, Configuration conf) {
    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
    DPaymentHandler handler = new DPaymentHandler(conf);

    DumboPayment.Processor processor = 
        new DumboPayment.Processor(handler);

    InetAddress listenAddress;
    try {
        listenAddress = InetAddress.getLocalHost();
    } catch (UnknownHostException e) {
        LOG.error("Failed in thrift init", e);
        return false;
    }
    TServerTransport serverTransport;
    try {
        serverTransport = new TServerSocket(
                new InetSocketAddress(listenAddress, port));
    } catch (TTransportException e) {
        LOG.error("Failed in thrift init", e);
        return false;
    }

    TTransportFactory transportFactory = new TTransportFactory();
    TServer server = new TThreadPoolServer(processor, serverTransport,
            transportFactory, protocolFactory);

    LOG.info("Starting Dumbo Payment thrift server on " + 
            listenAddress + ":" + Integer.toString(port));
    server.serve();

    return true;
}

Finally

Been stuck on this for a while now... It may well be I'm missing something obvious. I'd really appreciate any help with this.

If any additional information is needed, please let me know. There's a whole mouthful there, so I wanted to try to keep stuff to the most (hopefully) relevant.

Answer

Nthalk picture Nthalk · Jan 20, 2011

My guess is that you have multiple threads trying to use the clients at the same time, and I'm not entirely sure that bulletproof. You might try to use the async interface as well as construct a threadsafe resource pool for accessing your clients.

Using Thrift-0.5.0.0, here is an example of creating an AsyncClient for your thrift generated code:

Factory fac = new AsyncClient.Factory(new TAsyncClientManager(), new TProtocolFactory() {
    @Override
    public TProtocol getProtocol( TTransport trans ) {
        return new TBinaryProtocol(trans);
    }
});
AsyncClient cl = fac.getAsyncClient( new TNonblockingSocket( "127.0.0.1", 12345 ));

However, if you look through the source, you'll notice that it's got a single thread message handler, even though it uses a NIO socket, you might find this to be a bottleneck. To get more, you'll have to make more async clients, check them out, and return them correctly.

To simplify this, I've made a quick little class to manage them. The only thing you need to do to modify it to fit your needs is to include your packages and it should work for you, even though I have not really tested it much (at all, really):

public class Thrift {

    // This is the request
    private static abstract class ThriftRequest {

        private void go( final Thrift thrift, final AsyncClient cli ) {
            on( cli );
            thrift.ret( cli );
        }

        public abstract void on( AsyncClient cli );
    }

    // Holds all of our Async Clients
    private final ConcurrentLinkedQueue<AsyncClient>   instances = new ConcurrentLinkedQueue<AsyncClient>();
    // Holds all of our postponed requests
    private final ConcurrentLinkedQueue<ThriftRequest> requests  = new ConcurrentLinkedQueue<ThriftRequest>();
    // Holds our executor, if any
    private Executor                                 exe       = null;

    /**
     * This factory runs in thread bounce mode, meaning that if you call it from 
     * many threads, execution bounces between calling threads depending on when        
     * execution is needed.
     */
    public Thrift(
            final int clients,
            final int clients_per_message_processing_thread,
            final String host,
            final int port ) throws IOException {

        // We only need one protocol factory
        TProtocolFactory proto_fac = new TProtocolFactory() {

            @Override
            public TProtocol getProtocol( final TTransport trans ) {
                return new TBinaryProtocol( trans );
            }
        };

        // Create our clients
        Factory fac = null;
        for ( int i = 0; i < clients; i++ ) {

            if ( fac == null || i % clients_per_message_processing_thread == 0 ) {
                fac = new AsyncClient.Factory(
                    new TAsyncClientManager(),
                    proto_fac );
            }

            instances.add( fac.getAsyncClient( new TNonblockingSocket(
                host,
                port ) ) );
        }
    }
    /**
     * This factory runs callbacks in whatever mode the executor is setup for,
     * not on calling threads.
     */
    public Thrift( Executor exe,
            final int clients,
            final int clients_per_message_processing_thread,
            final String host,
            final int port ) throws IOException {
        this( clients, clients_per_message_processing_thread, host, port );
        this.exe = exe;
    }

    // Call this to grab an instance
    public void
            req( final ThriftRequest req ) {
        final AsyncClient cli;
        synchronized ( instances ) {
            cli = instances.poll();
        }
        if ( cli != null ) {
            if ( exe != null ) {
                // Executor mode
                exe.execute( new Runnable() {

                    @Override
                    public void run() {
                        req.go( Thrift.this, cli );
                    }

                } );
            } else {
                // Thread bounce mode
                req.go( this, cli );
            }
            return;
        }
        // No clients immediately available
        requests.add( req );
    }

    private void ret( final AsyncClient cli ) {
        final ThriftRequest req;
        synchronized ( requests ) {
            req = requests.poll();
        }
        if ( req != null ) {
            if ( exe != null ) {
                // Executor mode
                exe.execute( new Runnable() {

                    @Override
                    public void run() {
                        req.go( Thrift.this, cli );
                    }
                } );
            } else {
                // Thread bounce mode
                req.go( this, cli );
            }
            return;
        }
        // We did not need this immediately, hold onto it
        instances.add( cli );

    }

}

An example of how to use it:

// Make the pool
Thrift t = new Thrift( 10, "localhost", 8000 );
// Use the pool
t.req( new ThriftRequest() {

    @Override
    public void on( AsyncClient cli ) {
        cli.MyThriftMethod( "stringarg", 111, new AsyncMethodCallback<AsyncClient.MyThriftMethod_call>() {
            @Override
            public void onError( Throwable throwable ) {
                }

            @Override
            public void onComplete( MyThriftMethod_call response ) {
            }
        });
    }
} );

You might want to experiment with different server modes like the THsHaServer to see what works best for your environment.