Batch

When run job action is triggered and all transformations have been executed in DAGScheduler, the last step is to do checkpoint. RDDCheckpointData is responsible for doing the checkpoint. Once RDD.checkpoint() is called, current RDD is saved into RDDCheckpointData. When DAGScheduler triggers doCheckpoint, current RDD will be checkpointed and save data into checkpoint directory, then its dependencies' checkpoint will also be triggered.

There are two implementation for RDDCheckpointData: ReliableRDDCheckpointData and LocalRDDCheckpointData. For ReliableRDDCheckpointData, all data in RDD is serialized and saved into the checkpoint directory. For LocalRDDCheckpointData, all data in RDD is saved at the Executor. Once some executor is done, the result has to be re-calculated.

Checkpoint directory should be set if we want to enable checkpoint. This is done by SparkContext.setCheckpointDir().

Streaming

Streaming checkpoint also uses RDD.checkpoint(). For each streaming interval(mini batch interval), once the accumulated batch interval reaches checkpoint interval, RDDs will checkpoint its data and checkpoint meta data will be saved into Checkpoint object and save to the external directory.

Difference between Checkpoint and Persist / Cache

While processing data in each RDD(like getOrCompute), Spark tries to check the storage level for current RDD. If storage level is None(not set), before processing each data, Spark will check whether it is checkpointed. If it is, skip processing this RDD. If not, Spark will compute it as first run on it. In other case, if storage level is set, like Memory or Disk, Spark tries to get saved data from BlockManager. Once there is a history data in BlockManager, an iterator containing saved data will be returned, and Spark will not traverse to the head RDD to re-process each record.

As we can see, checkpoint may also be treated like persisted / cached data to avoid duplicated process. For any RDD which will be used by multiple children RDDs, caching(persisting) data saves computing time and improve the efficiency. But for the persisted data, it can not be used as checkpoint. Checkpoint acts a global state snapshot to recover from failover, particular for Streaming job.

Failover Recover from Checkpoint

Streaming

While checkpoint is enabled for DStream through DStream.checkpoint(interval), ReliableRDDCheckpointData is created in the DStream. At the end of job running, when all data transformations have been done, RDD.doCheckpoint() will be invoked, which will call RDDCheckpointData.checkpoint(). RDDCheckpointData has two implementations: ReliableRDDCheckpointData and LocalRDDCheckpointData.

  • ReliableRDDCheckpointData: Save finished RDD info out to reliable storage, like HDFS

  • LocalRDDCheckpointData: Save finished RDD info onto local disk storage.

While starting a new Spark Streaming, we can specify checkpoint path when creating StreamingContext. CheckpointReader tries to read the checkpoint data from file system and save it in Checkpoint object. If checkpoint data exists, DStreamGraph in StreamingContext is from checkpoint(deserialized from checkpoint). DStreamingGraph.restoreCheckpointData() will try to restore all RDDs checkpoint from source to sink(actions). ReliableCheckpointRDD, which contains deserialized data from checkpoint, is created for each DStream and saved into generated RDD set. While JobGenerator generates Job based on batch interval, if there are existed RDDs recovered from checkpoint data, the computing step reads data from checkpoint directly, instead of computing them from source.

Batch

results matching ""

    No results matching ""