在spark中,我们可能会在某些节点上使用同一个变量的值做加法,最后统计总的结果。这时候spark为我们提供了一个累加器。也就是声明一个累加器,然后我们可以在各个executor对当前的累加器进行计数操作。下面演示一下:
package org.example import com.alibaba.fastjson.JSON import org.apache.spark.rdd.ShuffledRDD import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf, SparkContext} object SparkWordCount { def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).master("local[*]").appName("SparkWordCount").getOrCreate(); val sc = session.sparkContext val acc1 = sc.longAccumulator("AccumulatorTest") val data = sc.parallelize(1 to 10) data.map(e => acc1.add(e)).collect().foreach(println) println("累加器结果是:"+acc1.value) } }
备注:
1、累加器只能被add。
2、累加器变量会被分发给集群的每一个节点。
3、使用累加器变量的话,应该使用${累加器变量属性}.value取值。
还没有评论,来说两句吧...