how to get acknowledgement from Kafka broker if message is produced by producer?

usman picture usman · Dec 4, 2015 · Viewed 20.4k times · Source

I would like to get some response from the broker when I produce a message. I have tried CallBack mechanism (by implementing CallBack) used in KafkaProducer.send but it did not work and does not call onCompletion method.

When I shutdown Kafka server and try to produce message then it does call callback method.

Is there any other way to get acknowledgment?

@Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        System.out.println("Called Callback method");
        if (metadata != null) {
            System.out.println("message(" + key + ", " + message
                    + ") sent to partition(" + metadata.partition() + "), "
                    + "offset(" + metadata.offset() + ") in " + elapsedTime
                    + " ms");
        } else {
            exception.printStackTrace();
        }

    }

props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);

KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props);
long runtime = new Date().getTime(); 
String ip = "192.168.2."+ rnd.nextInt(255); 
String msg = runtime + ".www.ppop.com," + ip;
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`

i am using kafka-client api 0.9.1 with broker version 0.8.2.

Answer

Morgan Kenyon picture Morgan Kenyon · Dec 7, 2015

So I'm not 100% sure on which versions work with which in Kafka. Currently I use 0.8.2, I know 0.9 introduced some breaking changes but I couldn't tell you for sure what does/doesn't work now.

One very strong recommendation, I would use the Kafka-Client version that corresponds to your broker version. If you're using broker 0.8.2, I would use kakfa-client 0.8.2 as well.

You never presented any code of how you're using this, so I'm just somewhat guessing in the dark. But I've implemented the Callback feature in Kafka 0.8.2 by using this method in the producer. Below is the method signature.

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)

Where will I call that method, I actually pass in the class with the overriden method.

KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> record = //data to send to kafka
prod.send(record, new Callback() {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null) {
      e.printStackTrace();
    } else {
      //implement logic here, or call another method to process metadata
      System.out.println("Callback");
    }
  }
}); 

I'm assuming there's a way to also do it as you've done it. But you'd have to provide the code showing how you're actually sending records to Kafka. Other than that I'm just guessing.