When to use ConcurrentKafkaListenerContainerFactory?

Rahul Gupta picture Rahul Gupta · Mar 6, 2019 · Viewed 8.1k times · Source

I am new to kafka and i went through the documentation but I couldn't understand anything. Can someone please explain when to use the ConcurrentKafkaListenerContainerFactory class? I have used the Kafkaconsumer class but I see ConcurrentKafkaListenerContainerFactory being used in my current project. Please explain what purpose it serves.

Answer

Deadpool picture Deadpool · Mar 7, 2019

The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.

If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.

Spring-kafka

ConcurrentKafkaListenerContainerFactory is used to create containers for annotated methods with @KafkaListener

There are two MessageListenerContainer in spring kafka

KafkaMessageListenerContainer
ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.

Using ConcurrentMessageListenerContainer

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

It has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.

If you have six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

here is the clear example with documentation here