I am using Kafka 0.8.2
to receive data from AdExchange then I use Spark Streaming 1.4.1
to store data to MongoDB
.
My problem is when I restart my Spark Streaming
Job for instance like update new version, fix bug, add new features. It will continue read the latest offset
of kafka
at the time then I will lost data AdX push to kafka during restart the job.
I try something like auto.offset.reset -> smallest
but it will receive from 0 -> last then data was huge and duplicate in db.
I also try to set specific group.id
and consumer.id
to Spark
but it the same.
How to save the latest offset
spark consumed to zookeeper
or kafka
then can read back from that to latest offset
?
One of the constructors of createDirectStream function can get a map that will hold the partition id as the key and the offset from which you are starting to consume as the value.
Just look at api here: http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html The map that I was talking about usually called: fromOffsets
You can insert data to the map:
startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)
And use it when you create the direct stream:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))
After each iteration you can get the processed offsets using:
rdd.asInstanceOf[HasOffsetRanges].offsetRanges
You would be able to use this data to construct the fromOffsets map in the next iteration.
You can see the full code and usage here: https://spark.apache.org/docs/latest/streaming-kafka-integration.html at the end of the page