在spark应用程序执行的时候,整个链路会产生非常多的rdd。在前面我们介绍过使用transformation的时候,只有遇到action的时候才会执行相关的transformation,也就是这些rdd是惰性执行的。那也就是当遇到action的时候,就会重复去执行这些transformation的操作。那也就是会重复生成相同的rdd对象。此时带来的就有2个问题:
1、生成重复的rdd会占据大量内存,同时会导致频繁的jvm垃圾回收。 2、重复执行生成相同的rdd,会浪费很多时间,程序执行效率会很低。
所以那有没有办法可以让我们重复使用这个rdd即可呢,这就是这篇文章介绍的rdd的持久化。在spark中rdd的持久化操作有两种方式,第一种是cache,第二种是persist。这两种方式的区别就是:
1、cache只支持memory_only级别。 2、persist可以支持其他的缓存级别。
基于上诉信息,cache的用法如下:
dataRdd.cache()
persist的用法如下:
data.persist() data.persist(${持久化级别})
下面列举个详细的案例说明一下:
package org.example import com.alibaba.fastjson.JSON import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount { def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).master("local[*]").appName("SparkWordCount").getOrCreate(); val sc = session.sparkContext val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val data = sc.parallelize(list).filter(e => e % 2 == 0) data.persist(StorageLevel.MEMORY_ONLY) println(data.count()) println("我是分割线") data.collect().foreach(println) } }
上面的案例是把data这个rdd对象给使用persist进行持久化,持久化级别是inmemory。
下面再列举下spark rdd持久化的级别信息.
序号 | 持久化级别 | 内存使用情况 | CPU使用时间 | 是否位于内存中 | 是否位于磁盘中 | 说明 |
1 | memory_only | 高 | 低 | 是 | 否 | RDD作为反序列化的Java对象存储在JVM中。如果 RDD不适合内存,那么一些分区将不会被缓存, 并在每次需要它们时动态地重新计算。这是默认级 别。 |
2 | MEMORY_ONLY_SER (Java和Scala) | 低 | 高 | 是 | 否 | 将RDD存储为序列化的Java对象(每个分区一个字 节数组)。这比反序列化对象更节省空间,特别是 在使用快速序列化器时,但读取时需要更多CPU。 |
3 | MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 将RDD作为反序列化的Java对象存储在JVM中。如 果RDD不适合内存,那么将不适合的分区存储在 磁盘上,并在需要的时候从那里读取它们。 |
4 | MEMORY_AND_DISK_SER (Java和Scala) | 低 | 高 | 部分 | 部分 | 类似于MEMORY_ONLY_SER,但是将不适合内 存的分区溢出到磁盘,而不是在每次需要它们时动 态地重新计算它们。 |
5 | DISK_ONLY | 低 | 高 | 否 | 是 | 仅在磁盘上存储RDD分区 |
6 | MEMORY_ONLY_2, MEMORY_AND_DISK_2 | 与上面的级别相同,但是在两个集群节点上复制每 个分区。 | ||||
7 | OFF_HEAP | 与MEMORY_ONLY_SER类似,但将数据存储在 堆外内存中。这需要启用堆外内存。 |
上面列举了很多的持久化级别,那么我们在真实情况下如何选择呢?
1、如果 RDDs 适合默认存储级别(MEMORY_ONLY),那么就保留它们。这是 CPU 效率最高的选项,允许 RDDs 上的操作尽可能快地运行。 2、如果没有,可以尝试使用 MEMORY_ONLY_SER 并选择一个快速序列化库,以使对象更节省空间,但访问速度仍然相当快。(Java 和 Scala) 3、不要溢出到磁盘,除非计算数据集的函数非常昂贵,或者它们过滤了大量数据。否则,重新计算分区的速度可能与从磁盘读取分区的速度一样快。 4、如果希望快速恢复故障(例如,如果使用 Spark 为来自 web 应用程序的请求提供服务),请使用复制的存储级别。通过重新计算丢失的数据,所有存储级别都提供了完全的容错能力,但是复制的存储级别允许在 RDD 上继续运行任务,而无需等待重新计算丢失的分区。
备注:
1、在spark中,spark会自动缓存部分的rdd,这些rdd是不需要我们来标记持久化操作的。
2、在spark中,会定期删除废弃的rdd缓存,这块使用的算法是LRU。
还没有评论,来说两句吧...