I want to work with Kafka Streams real time processing in my spring boot project. So I need Kafka Streams configuration or I want to use KStreams or KTable, but I could not find example on the internet.
I did producer and consumer now I want to stream real time.
Let me start by saying that if you are new to Kafka streams, adding spring-boot on top of it is adding another level of complexity, and Kafka streams has a big learning curve as is. Here are the basics to get you going: pom:
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.10.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
</dependency>
Now the configuration object. The code below assumes you are creating two stream apps, and keep in mind that each app represents its own processing topology:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaStreamConfig {
@Value("${delivery-stats.stream.threads:1}")
private int threads;
@Value("${delivery-stats.kafka.replication-factor:1}")
private int replicationFactor;
@Value("${messaging.kafka-dp.brokers.url:localhost:9092}")
private String brokersUrl;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
setDefaults(config);
return new StreamsConfig(config);
}
public void setDefaults(Map<String, Object> config) {
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
}
@Bean("app1StreamBuilder")
public StreamsBuilderFactoryBean app1StreamBuilderFactoryBean() {
Map<String, Object> config = new HashMap<>();
setDefaults(config);
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
return new StreamsBuilderFactoryBean(config);
}
@Bean("app2StreamBuilder")
public StreamsBuilderFactoryBean app2StreamBuilderFactoryBean() {
Map<String, Object> config = new HashMap<>();
setDefaults(config);
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app2");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
return new StreamsBuilderFactoryBean(config);
}
}
Now comes the fun part, using the the streamsBuilder to build your app (app1 in this example).
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class App1 {
@SuppressWarnings("unchecked")
@Bean("app1StreamTopology")
public KStream<String, Long> startProcessing(@Qualifier("app1StreamBuilder") StreamsBuilder builder) {
final KStream<String, Long> toSquare = builder.stream("toSquare", Consumed.with(Serdes.String(), Serdes.Long()));
toSquare.map((key, value) -> { // do something with each msg, square the values in our case
return KeyValue.pair(key, value * value);
}).to("squared", Produced.with(Serdes.String(), Serdes.Long())); // send downstream to another topic
return toSquare;
}
}
Hope this helps.