I'm trying to setup Spark Streaming to get messages from Kafka queue. I'm getting the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o30.createDirectStream.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test-topic,0])
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
Here is the code I'm executing (pyspark):
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "host.domain:9092"})
ssc.start()
ssc.awaitTermination()
There were a couple of similar posts with the same error. In all cases the cause was the empty kafka topic. There are messages in my "test-topic". I can get them out with
kafka-console-consumer --zookeeper host.domain:2181 --topic test-topic --from-beginning --max-messages 100
Does anyone know what might be the problem?
I'm using:
You need to check 2 things:
check if this topic and partition exists , in your case is topic is test-topic
and partition is 0.
based on your code, you are trying consume message from offset 0 and it might be possible message is not available from offset 0, check what is you earliest offset and try consume from there.
Below is command to check earliest offset:
sh kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "your broker list" --topic "topic name" --time -1