在spark应用程序执行的时候,每一个rdd都会有分区,当我们没有指定分区的时候,则分区数量是根据spark的配置项:
spark.default.parallelism
决定的,但是我们常常需要根据spark的集群信息来决定分区数量,也就是根据spark集群节点的CPU核数来决定分区的数量,总体公式是:CPU总核数*3或者CPU总核数*4。
所以我们可以通过parallelize函数来指定RDD的分区数。如果遇到特殊的,例如hdfs的文件,我们还需要通过reparation或者coalesce函数来进行调整分区。代码示例如下:
val data = sc.parallelize(List(1,2,3,4,5,6),5) val data1 = data.repartition(3) val data2 = data.coalesce(2,false)
上面的RDD分区比较简单,因此不做过多的演示,下面重点介绍下key-valuea pair这种rdd如何进行分区的调整。在使用key-value pair对的这种如果需要进行重新分区的话,一般我们使用分区器来进行分区。在spark中默认提供了两种分区器,分别是HashPartitioner和RangePartitioner。这两块分别代表的意思是:
HashPartitioner:
HashPartitioner 是 Spark 的默认分区器,它基于一个元素的 Java 散列码(或者是 Pair RDDs 中的 key的散列码)计算的分区索引。计算公式如下: partitionIndex = key hashCode % numberOfPartitions 分区索引是准随机的;因此,分区很可能不会完全相同大小。然而,在具有相对较少分区的大型数据集中,该算法可能会在其中均匀地分布数据。 当使用 HashPartitioner 时,数据分区的默认数量是由 Spark 配置参数“spark.default.parallelism”决定的。如果该参数没有被用户指定,那么它将被设置为集群中的核的数量。
RangePartitioner:
RangePartitioner 将已排序的 RDD 的数据划分为大致相等的范围。它对传递给它的 RDD 的内容进行 了采样,并根据采样数据确定了范围边界。一般不太可能直接使用 RangePartitioner。
在使用分区器的话,此时的rdd一定是key-value pair对的RDD,同时分区的话,使用partitionBy方法,例如:
val list = List("1", "2", "3", "4") val data = sc.parallelize(list).map(e => (e, 1)) val data1 = data.partitionBy(new HashPartitioner(2)) println(data1.partitioner) println(data1.partitions.size) val data2 = data.partitionBy(new RangePartitioner(2,data)) println(data2.partitioner) println(data2.partitions.size)
当然在使用的过程中,自定义partitioner也经常用到,后面的文章会列举几个案例说明一下。
还没有评论,来说两句吧...