在spark中,我们可能会在某些节点上使用同一个变量的值做加法,最后统计总的结果。这时候spark为我们提供了一个累加器。也就是声明一个累加器,然后我们可以在各个executor对当前的累加器进行计数操作。下面演示一下:
package org.example
import com.alibaba.fastjson.JSON
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf, SparkContext}
object SparkWordCount {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().config(new SparkConf()).master("local[*]").appName("SparkWordCount").getOrCreate();
val sc = session.sparkContext
val acc1 = sc.longAccumulator("AccumulatorTest")
val data = sc.parallelize(1 to 10)
data.map(e => acc1.add(e)).collect().foreach(println)
println("累加器结果是:"+acc1.value)
}
}备注:
1、累加器只能被add。
2、累加器变量会被分发给集群的每一个节点。
3、使用累加器变量的话,应该使用${累加器变量属性}.value取值。









还没有评论,来说两句吧...