I am new to kafka and i went through the documentation but I couldn't understand anything. Can someone please explain when to use the ConcurrentKafkaListenerContainerFactory
class? I have used the Kafkaconsumer
class but I see ConcurrentKafkaListenerContainerFactory
being used in my current project. Please explain what purpose it serves.
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.
If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.
Spring-kafka
ConcurrentKafkaListenerContainerFactory
is used to create containers for annotated methods with @KafkaListener
There are two MessageListenerContainer
in spring kafka
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
The
KafkaMessageListenerContainer
receives all message from all topics or partitions on a single thread. TheConcurrentMessageListenerContainer
delegates to one or moreKafkaMessageListenerContainer
instances to provide multi-threaded consumption.
Using ConcurrentMessageListenerContainer
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
It has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer
instances.
If you have six
TopicPartition
instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.
here is the clear example with documentation here