I would like to get some response from the broker when I produce a message.
I have tried CallBack mechanism (by implementing CallBack) used in KafkaProducer.send
but it did not work and does not call onCompletion
method.
When I shutdown Kafka server and try to produce message then it does call callback method.
Is there any other way to get acknowledgment?
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("Called Callback method");
if (metadata != null) {
System.out.println("message(" + key + ", " + message
+ ") sent to partition(" + metadata.partition() + "), "
+ "offset(" + metadata.offset() + ") in " + elapsedTime
+ " ms");
} else {
exception.printStackTrace();
}
}
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props);
long runtime = new Date().getTime();
String ip = "192.168.2."+ rnd.nextInt(255);
String msg = runtime + ".www.ppop.com," + ip;
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`
i am using kafka-client api 0.9.1 with broker version 0.8.2.
So I'm not 100% sure on which versions work with which in Kafka. Currently I use 0.8.2, I know 0.9 introduced some breaking changes but I couldn't tell you for sure what does/doesn't work now.
One very strong recommendation, I would use the Kafka-Client version that corresponds to your broker version. If you're using broker 0.8.2, I would use kakfa-client 0.8.2 as well.
You never presented any code of how you're using this, so I'm just somewhat guessing in the dark. But I've implemented the Callback feature in Kafka 0.8.2 by using this method in the producer. Below is the method signature.
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
Where will I call that method, I actually pass in the class with the overriden method.
KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> record = //data to send to kafka
prod.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
//implement logic here, or call another method to process metadata
System.out.println("Callback");
}
}
});
I'm assuming there's a way to also do it as you've done it. But you'd have to provide the code showing how you're actually sending records to Kafka. Other than that I'm just guessing.