在spark中,对于rdd进行aggregateByKey,那这个rdd一定是一个key-value pair 类型的rdd。在这里使用aggregateByKey的时候,可能会遇到TupleN的参数格式,例如,如果我们得数据格式是:List(("张三",1,1,1,1,1,1,1))这种多Tuple的参数格式,我们需要把他转换成一个key-value这样的键值对的rdd类型。也就是需要转换成:List(("张三",(1,1,1,1,1,1,1,1))),这样就变成了只有一个key和一个value这样的rdd,然后在此rdd的基础上使用aggregateByKey函数。aggregateByKey函数的使用方式如下:
1、aggregateByKey(zeroValue)(seqOp,combOp)
那么如何来理解这3个参数呢?
第一个参数zeroValue的含义是: 给每一个分区中的每一种key一个初始值
第二个参数seqOp是个函数, Seq Function, 这个函数就是用来先对每个分区内的数据按照 key 分别进行定义进行函数定义的操作
第三个参数combOp是个函数, Combiner Function, 对经过 Seq Function 处理过的数据按照 key 分别进行进行函数定义的操作
下面我们举例来说明一下,也就是统计下学生的最高成绩是多少
package org.example import com.alibaba.fastjson.JSON import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount { def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).master("local[*]").appName("SparkWordCount").getOrCreate(); val sc = session.sparkContext //制作用户的成绩信息 val studentRDD = sc.parallelize(List( ("张三", "数学", 83), ("李四", "数学", 74), ("王五", "数学", 91), ("张三", "英语", 82), ("李四", "英语", 69), ("王五", "英语", 62), ("张三", "物理", 97), ("李四", "物理", 80), ("王五", "物理", 78), ("张三", "化学", 73), ("李四", "化学", 68), ("王五", "化学", 87), ("张三", "语文", 87), ("李四", "语文", 93), ("王五", "语文", 91)), 5) //这里我们把("张三", "数学", 83)转换成("张三", ("数学", 83)) val studentArchivement = studentRDD.map(t => (t._1, (t._1,t._2, t._3))) val rdd = studentArchivement //这里主要是初始值给0,然后先进行小范围对比,再进行大范围对比 .aggregateByKey(0)(seqOp, combOp) // 查看输出结果 rdd.collect.foreach(println) } // Combiner Operation : 从所有分区累加器中找出最大成绩 def combOp = (accumulator1: Int, accumulator2: Int) => if (accumulator1 > accumulator2) accumulator1 else accumulator2 // Sequence operation : 从单个分区查找最大成绩 //首先小范围进行对比,这里传进来的数据,在对比的时候,切记,在使用aggregateByKey的话,分区内聚合,他传进来的参数一定是(key,(value,value))这里的value部分,它不会把key传递进来,所以能比较的,使用的都是value这个第二个括号里面内容 def seqOp = (accumulator: Int, element: (String,String, Int)) => { println("element是" + element) if (accumulator > element._3) accumulator else element._3 } }
具体的说明已经在代码里面注释了。
然后看下结果:
备注:
1、这里的seqOp传递进来的是value的值,如果value是多个,则需要使用map提前转换成1对1的key/value对。
2、最后贴一个aggregaByKey的函数的特点:
1、aggregateByKey 函数的特点总结如下: 2、性能方面的 aggregateByKey 是一个优化的转换; 3、aggregateByKey 是一个宽依赖的转换; 4、当聚合需求加上输入和输出 RDD 类型不同时,我们应该使用 aggregateByKey; 5、当聚合需求加上输入和输出 RDD 类型相同时,我们应该使用 reduceByKey。
还没有评论,来说两句吧...