Spark streaming checkpoints for DStreams

Pablo Francisco Pérez Hidalgo picture Pablo Francisco Pérez Hidalgo · Dec 31, 2015 · Viewed 8.6k times · Source

In Spark Streaming it is possible (and mandatory if you're going to use stateful operations) to set the StreamingContext to perform checkpoints into a reliable data storage (S3, HDFS, ...) of (AND):

  • Metadata
  • DStream lineage

As described here, to set the output data storage you need to call yourSparkStreamingCtx.checkpoint(datastoreURL)

On the other hand, it is possible to set lineage checkpoint intervals for each DataStream by just calling checkpoint(timeInterval) at them. In fact, it is recommended to set lineage checkpoint interval between 5 and 10 times the DataStream's sliding interval:

dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.

My question is:

When the streaming context has been set to perform checkpointing and no ds.checkpoint(interval) is called, is lineage checkpointing enabled for all data streams with a default checkpointInterval equal to batchInterval? Or is, on the contrary, only metadata checkpointing what is enabled?

Answer

Checking Spark code (v1.5) I found that DStreams' checkpoint are enabled under two circumstances:

By an explicit call to their checkpoint method (not StreamContext's):

/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
    if (isInitialized) {
        throw new UnsupportedOperationException(
            "Cannot change checkpoint interval of an DStream after streaming context has started")
    }
    persist()
    checkpointDuration = interval
    this
}

At the DStream initialization as long as the concrete 'DStream' subclass has overridden mustCheckpoint attribute (setting it to true):

 private[streaming] def initialize(time: Time) {
  ...
  ...   
   // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
   if (mustCheckpoint && checkpointDuration == null) {
     checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
     logInfo("Checkpoint interval automatically set to " + checkpointDuration)
   }
  ...

The first case is obvious. Performing a naive analysis on the Spark Streaming code:

grep "val mustCheckpoint = true" $(find -type f -name "*.scala")

> ./org/apache/spark/streaming/api/python/PythonDStream.scala:  override     val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala:  override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala:  override val mustCheckpoint = true

I can find that, in general (ignoring PythonDStream), StreamingContext checkpoint only enables lineage checkpoints for StateDStream and ReducedWindowedDStream instances. These instances are the result of the transformations (respectively, AND):

  • updateStateByKey: That is, the stream providing an state through several windows.
  • reduceByKeyAndWindow