I'm using Spring-Kafka version 1.2.1 and, when the Kafka server is down/unreachable, the asynchronous send calls block for a time. It seems to be the TCP timeout. The code is something like this:
ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
@Override
public void onSuccess(SendResult<K, V> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
I've taken a really quick look at the Spring-Kafka code and it seems to just pass the task along to the kafka client library, translating a callback interaction to a future object interaction. Looking at the kafka client library, the code gets more complex and I didn't take the time to understand it all, but I guess it may be making remote calls (metadata, at least?) in the same thread.
As a user, I expected the Spring-Kafka methods that return a future to return immediately, even if the remote kafka server is unreachable.
Any confirmation if my understanding is wrong or if this is a bug would be welcome. I ended up making it asynchronous on my end for now.
Another problem is that Spring-Kafka documentation says, at the beginning, that it provides synchronous and asynchronous send methods. I couldn't find any methods that do not return futures, maybe the documentation needs updating.
I'm happy to provide any further details if needed. Thanks.
In addition to the @EnableAsync annotation on a configuration class, the @Async annotation needs to be used on the method were you invoke this code.
http://www.baeldung.com/spring-async
Here some code fragements. Kafka producer config:
@EnableAsync
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${kafka.brokers}")
private String servers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
}
@Bean
public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
}
@Bean
public Producer producer() {
return new Producer();
}
}
And the producer itself:
public class Producer {
public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, GenericMessage> kafkaTemplate;
@Async
public void send(String topic, GenericMessage message) {
ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {
@Override
public void onSuccess(final SendResult<String, GenericMessage> message) {
LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
}
@Override
public void onFailure(final Throwable throwable) {
LOGGER.error("unable to send message= " + message, throwable);
}
});
}
}