Sorry for the question being too generic, but someone has some tutorial or guide on how to perform producer and consumer testing with kafka embedded. I've tried several, but there are several versions of dependencies and none actually works =/
I'm using spring cloud stream kafka.
We generally recommend using the Test Binder in tests but if you want to use an embedded kafka server, it can be done...
Add this to your POM...
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
Test app...
@SpringBootApplication
@EnableBinding(Processor.class)
public class So43330544Application {
public static void main(String[] args) {
SpringApplication.run(So43330544Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public byte[] handle(byte[] in){
return new String(in).toUpperCase().getBytes();
}
}
application.properties...
spring.cloud.stream.bindings.output.destination=so0544out
spring.cloud.stream.bindings.input.destination=so0544in
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.group=so0544
Test case...
@RunWith(SpringRunner.class)
@SpringBootTest
public class So43330544ApplicationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);
@Autowired
private KafkaTemplate<byte[], byte[]> template;
@Autowired
private KafkaProperties properties;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
}
@Test
public void testSendReceive() {
template.send("so0544in", "foo".getBytes());
Map<String, Object> configs = properties.buildConsumerProperties();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);
Consumer<byte[], byte[]> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("so0544out"));
ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
consumer.commitSync();
assertThat(records.count()).isEqualTo(1);
assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
}
}