Spark累加器|系统累加器|自定义累加器
累加器
累加器:分布式共享只写变量。(Task和Task之间不能读数据)
累加器用来对信息进行聚合,通常在向Spark传递函数时,比如使用map()函数或者用 filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。
系统累加器
(1)工作节点上的任务不能相互访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。 (2)对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动操作中。转化操作中累加器可能会发生不止一次更新。
自定义累加器
package com.atguigu.accumlator import org.apache.spark.rdd.RDD import org.apache.spark.{ SparkConf, SparkContext} import org.apache.spark.util.AccumulatorV2 import scala.collection.mutable object Accumlator_define { def main(args: Array[String]): Unit = { //1.创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.创建RDD val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hea", "Haa", "Hello", "Hello", "Spark", "Spark")) // 创建累加器对象 val myAccumlator = new MyAccumlator // 注册累加器到sc sc.register(myAccumlator) // 使用累加器 rdd.foreach( word => { myAccumlator.add(word) } ) // 输出累加器的结果 println(myAccumlator.value) // Map(Hea -> 1, Hello -> 3, Haa -> 1) sc.stop() } } // List("Hello","Haha","Scorpion"),统计以H开头得单词的次数 // 输入:单词 输出:Map[(单词,次数)] class MyAccumlator extends AccumulatorV2[String,mutable.Map[String,Int]]{ // 定义一个Map集合,接受返回得数据.var var map: mutable.Map[String, Int] = mutable.Map[String,Int]() // 判断是否为初始状态 override def isZero: Boolean = map.isEmpty // 拷贝 override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = { val accumlator = new MyAccumlator accumlator.map = this.map accumlator } // 重置,让map变成初始状态 override def reset(): Unit = map.clear() // 业务逻辑 override def add(v: String): Unit = { if(v.startsWith("H")){ map(v) = map.getOrElse(v,0) + 1 } } // 合并不同Task中的累加器 override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = { // 当前Task的累加器的map var map1 = map // 另一个需要合并的Task中累加器的map val map2 = other.value // 两个map集合的合并 map = map1.foldLeft(map2)( // mm2表示map2的集合,kv表示的是map1中的键值对 (mm2,kv) => { val word: String = kv._1 val count: Int = kv._2 mm2(word) = mm2.getOrElse(word,0) + 1 mm2 } ) } // 获取累加器的值 override def value: mutable.Map[String, Int] = map }
上一篇:
JS实现多线程数据分片下载