I am trying to connect to my broker on aws with auto.create.topics.enable=true in my server.properties file. But when I am trying to connect to broker using Java client producer I am getting the following error
.
1197 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 619631, only 37 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) at java.lang.Thread.run(Unknown Source)
Following is my Client producer code.
public static void main(String[] argv){
Properties props = new Properties();
props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("block.on.buffer.full",true);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
try{ for(int i = 0; i < 10; i++)
{ producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
System.out.println("Tried sending:"+i);}
}
catch (Exception e){
e.printStackTrace();
}
producer.close();
}
Can someone help me resolve this?
I have faced the similar issue. The problem here is, when there is a mismatch between kafka clients version in pom file and kafka server is different. I was using kafka clients 0.10.0.0_1 but the kafka server was still in 0.9.0.0. So i upgraded the kafka server version to 10 the issue got resolved.
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.10.0.0_1</version>
</dependency>