在spark中,特别是流计算的时候使用这种RDD的检查点会比较多。在spark的整个运行中,我们可以知道整个rdd会形成一个拓扑图,里面主要是各个rdd的依赖信息。如下图:
这篇文章我们介绍spark的检查点,也就是checkpoint,这个就相当于截断RDD的依赖关系,然后把当前的rdd信息保存到检查点上,如果集群节点出现故障的时候,重新启动后不用再挨个计算rdd的依赖关系从而得到当前的rdd,而是直接从检查点获取对应的rdd信息即可,节省了重新计算的时间和算力。提高了容错性的保障,同时也提高了程序的执行效率。
下面我们演示一下rdd的checkpoint案例:
package org.example
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object SparkWordCount {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().config(new SparkConf()).master("local[*]").appName("SparkWordCount").getOrCreate();
val sc = session.sparkContext
//需要在最开始的context里面设置检查点
sc.setCheckpointDir("hdfs://master:9000/wordcountcheckpoint")
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val data = sc.parallelize(list).filter(e => e % 2 == 0)
//只要是rdd都可以写入检查点。根据实际情况使用即可。
data.checkpoint()
println(data.count())
println("我是分割线")
data.collect().foreach(println)
}
}备注:
1、需要在context里面设置检查点的目录,所有检查点的rdd都会存储到这个目录上。
2、这里的目录我们使用hdfs目录即可。
3、在后面的rdd里面,调用checkpoint()方法即可,这样就可以存储对应的rdd信息
4、当rdd的chekpoint方法执行的时候,当前rdd的父依赖关系会被全部清除掉。
5、这种checkpoint在流计算的场景里面使用频率较高。
6、chekpoint也是一个transformation操作,只有在rdd执行action操作的时候,当前的rdd的chekpoint方法才会被执行。










还没有评论,来说两句吧...