Intercepting/Logging requests and Responses in GRPC

Umer Farooq picture Umer Farooq · Nov 7, 2017 · Viewed 9.8k times · Source

I'm developing a chat app using GRPC in which the server receives information from the client and sends it back out to all the clients connected to it. For this, I've used Saturnism's chat-example as a reference. I've replicated the code, the code compiles and runs but the server supposedly never receives any requests from client.

My question is:

  1. Is there a way to enable verbos server side and client side logging in GRPC to see what requests and responses are going in and out & what might be failing?
  2. I'm using the following code for server and client. What might be missing/wrong in the following code that's resulting in no communication between client and server.

WingokuServer.java

public class WingokuServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = ServerBuilder.forPort(8091)
                .intercept(recordRequestHeadersInterceptor())
                .addService(new WingokuServiceImpl())
                .build();

        System.out.println("Starting server...");
        server.start();
        System.out.println("Server started!");
        server.awaitTermination();
    }

WingokuServerSideServiceImplementation:

public class WingokuServiceImpl extends WingokuServiceGrpc.WingokuServiceImplBase {
    private static Set<StreamObserver<Response>> observers =
            Collections.newSetFromMap(new ConcurrentHashMap<>());

    public WingokuServiceImpl() {
        System.out.println("WingokuServiceImp");
    }

    @Override
    public StreamObserver<Request> messages(StreamObserver<Response> responseObserver) {
        System.out.println("messages");
        observers.add(responseObserver);
        return new StreamObserver<Request>() {
            @Override
            public void onNext(Request request) {
                System.out.println("Server onNext: ");
                System.out.println("request from client is: "+ request.getRequestMessage());
                Response response = Response.newBuilder().setResponseMessage("new Message From server at time: "+ System.nanoTime()).build();
                for (StreamObserver<Response> observer : observers) {
                    observer.onNext(response);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Server onError: ");
                throwable.printStackTrace();
            }

            @Override
            public void onCompleted() {
                observers.remove(responseObserver);
                System.out.println("Server onCompleted ");
            }
        };
    }
}

WingokuClient:

public class WingokuClient {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8091).usePlaintext(true).build();
        WingokuServiceGrpc.WingokuServiceStub asyncStub = WingokuServiceGrpc.newStub(channel);
        StreamObserver<Request> requestStreamObserver = asyncStub.messages(new StreamObserver<Response>() {
            @Override
            public void onNext(Response response) {
                System.out.println("Client onNext");
                System.out.println("REsponse from server is: "+ response.getResponseMessage());
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Client onError");
                throwable.printStackTrace();
            }

            @Override
            public void onCompleted() {
                System.out.println("Client OnComplete");
            }
        });

        requestStreamObserver.onNext(Request.newBuilder().setRequestMessage("Message From Client").build());
        requestStreamObserver.onCompleted();
        channel.shutdown();
        System.out.println("exiting client");
    }
}

Edit:

There is nothing wrong with the code. It works. I just needed to add awaitTermination to the client's channel because without it just closes the connection between client and server instantly, probably even before the requests go out of the client onto the network. That's why the server never received any requests.

However my question for enabling verbose logging and/or adding some sort of interceptor to the server side still remains unanswered. So I'm looking forward to getting some pointers from experts here.

Answer

Shoohei picture Shoohei · Jul 12, 2019

I found a way to log request and response in both server and client sides by using the Interceptor, it makes the code cleaner. It is also possible to use sleuth for the tracing as well.

Please use spring:

implementation 'io.github.lognet:grpc-spring-boot-starter'

Server part

You can then use the GRpcGlobalInterceptor annotation

import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import org.lognet.springboot.grpc.GRpcGlobalInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

@GRpcGlobalInterceptor
public class GrpcInterceptor implements ServerInterceptor {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);

    @Override
    public <M, R> ServerCall.Listener<M> interceptCall(
            ServerCall<M, R> call, Metadata headers, ServerCallHandler<M, R> next) {
        String traceId = headers.get(TRACE_ID_KEY);
        // TODO: Add traceId to sleuth
        logger.warn("traceId from client: {}. TODO: Add traceId to sleuth", traceId);

        GrpcServerCall grpcServerCall = new GrpcServerCall(call);

        ServerCall.Listener listener = next.startCall(grpcServerCall, headers);

        return new GrpcForwardingServerCallListener<M>(call.getMethodDescriptor(), listener) {
            @Override
            public void onMessage(M message) {
                logger.info("Method: {}, Message: {}", methodName, message);
                super.onMessage(message);
            }
        };
    }

    private class GrpcServerCall<M, R> extends ServerCall<M, R> {

        ServerCall<M, R> serverCall;

        protected GrpcServerCall(ServerCall<M, R> serverCall) {
            this.serverCall = serverCall;
        }

        @Override
        public void request(int numMessages) {
            serverCall.request(numMessages);
        }

        @Override
        public void sendHeaders(Metadata headers) {
            serverCall.sendHeaders(headers);
        }

        @Override
        public void sendMessage(R message) {
            logger.info("Method: {}, Response: {}", serverCall.getMethodDescriptor().getFullMethodName(), message);
            serverCall.sendMessage(message);
        }

        @Override
        public void close(Status status, Metadata trailers) {
            serverCall.close(status, trailers);
        }

        @Override
        public boolean isCancelled() {
            return serverCall.isCancelled();
        }

        @Override
        public MethodDescriptor<M, R> getMethodDescriptor() {
            return serverCall.getMethodDescriptor();
        }
    }

    private class GrpcForwardingServerCallListener<M> extends io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener<M> {

        String methodName;

        protected GrpcForwardingServerCallListener(MethodDescriptor method, ServerCall.Listener<M> listener) {
            super(listener);
            methodName = method.getFullMethodName();
        }
    }
}

Client part

Interceptor:

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

@Component
public class BackendInterceptor implements ClientInterceptor {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);

    @Override
    public <M, R> ClientCall<M, R> interceptCall(
            final MethodDescriptor<M, R> method, CallOptions callOptions, Channel next) {
        return new BackendForwardingClientCall<M, R>(method,
                next.newCall(method, callOptions.withDeadlineAfter(10000, TimeUnit.MILLISECONDS))) {

            @Override
            public void sendMessage(M message) {
                logger.info("Method: {}, Message: {}", methodName, message);
                super.sendMessage(message);
            }

            @Override
            public void start(Listener<R> responseListener, Metadata headers) {
                // TODO: Use the sleuth traceId instead of 999
                headers.put(TRACE_ID_KEY, "999");

                BackendListener<R> backendListener = new BackendListener<>(methodName, responseListener);
                super.start(backendListener, headers);
            }
        };
    }

    private class BackendListener<R> extends ClientCall.Listener<R> {

        String methodName;
        ClientCall.Listener<R> responseListener;

        protected BackendListener(String methodName, ClientCall.Listener<R> responseListener) {
            super();
            this.methodName = methodName;
            this.responseListener = responseListener;
        }

        @Override
        public void onMessage(R message) {
            logger.info("Method: {}, Response: {}", methodName, message);
            responseListener.onMessage(message);
        }

        @Override
        public void onHeaders(Metadata headers) {
            responseListener.onHeaders(headers);
        }

        @Override
        public void onClose(Status status, Metadata trailers) {
            responseListener.onClose(status, trailers);
        }

        @Override
        public void onReady() {
            responseListener.onReady();
        }
    }

    private class BackendForwardingClientCall<M, R> extends io.grpc.ForwardingClientCall.SimpleForwardingClientCall<M, R> {

        String methodName;

        protected BackendForwardingClientCall(MethodDescriptor<M, R> method, ClientCall delegate) {
            super(delegate);
            methodName = method.getFullMethodName();
        }
    }
}

Add the interceptor to the channel:

ManagedChannel managedChannel = ManagedChannelBuilder
                .forAddress(_URL_, _PORT_).usePlaintext().intercept(backendInterceptor).build();