累加器
通过SparkContext.accumulator(v)来创建accumulator类型的变量,然后运行的task可以使用“+=”操作符来进行累加。但是task不能读取到该变量,只有driver program能够读取(通过.value),这也是为了避免使用太多读写锁吧。
自定义累加器类型
累加器类型除Spark自带的int、float、Double外,也支持开发人员自定义。方法是继承AccumulatorParam[Vector]。
累加器用法:
import org.apache.spark.{AccumulatorParam, SparkConf, SparkContext} /** * Created by User on 2016/11/2. */ object AccumulatorDemo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setMaster("local[2]") .setAppName("AccumulatorDemo") val sc: SparkContext = new SparkContext(conf) val arrAccu=Array(0L,0L,0L,0L,0L) val accumulatorArr=sc.accumulator(arrAccu,"HADOOP")(MyAcculumatorParam) val accumulatorMl=sc.accumulator(0,"ML") val accumulatorDl=sc.accumulator(0L,"DL") val arr=Array("ML","DL","CNN","RNN","ML","HADOOP","SPARK","ML") for(i <- 0 to arr.length-1){ if(arr(i).equals("ML")){ accumulatorMl += 1 }else if(arr(i).equals("DL")){ accumulatorDl+=1 }else if(arr(i).equals("HADOOP")){ accumulatorArr += Array(1L,1L,1L,1L,1L) } } println("ML="+accumulatorMl.name.get+"、"+accumulatorMl.value) println("DL="+accumulatorDl.name.get+"、"+accumulatorDl.value) println("HADOOP="+accumulatorArr.name.get+"、"+accumulatorArr.value.mkString(",")) } object MyAcculumatorParam extends AccumulatorParam[Array[Long]]{ override def addInPlace(r1: Array[Long], r2: Array[Long]): Array[Long] = { r1.zip(r2).map(x => x._1+x._2) } def zero(initialValue: Array[Long]): Array[Long] = { new Array[Long](initialValue.length) } } }
输出:
16/11/02 14:18:35 INFO BlockManagerMaster: Registered BlockManager ML=ML、3 DL=DL、1 HADOOP=HADOOP、1,1,1,1,1 16/11/02 14:18:35 INFO SparkContext: Invoking stop() from shutdown hook
不加(MyAcculumatorParam)的话,运行时候会报错,
Error:(15, 38) could not find implicit value for parameter param: org.apache.spark.AccumulatorParam[Array[Long]] val accumulatorArr=sc.accumulator(arrAccu,"HADOOP") Error:(15, 38) not enough arguments for method accumulator: (implicit param: org.apache.spark.AccumulatorParam[Array[Long]])org.apache.spark.Accumulator[Array[Long]]. Unspecified value parameter param. val accumulatorArr=sc.accumulator(arrAccu,"HADOOP")
源码:
/** * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be * available when you create Accumulators of a specific type. * * @tparam T type of value to accumulate */ trait AccumulatorParam[T] extends AccumulableParam[T, T] { def addAccumulator(t1: T, t2: T): T = { addInPlace(t1, t2) } } object AccumulatorParam { // The following implicit objects were in SparkContext before 1.2 and users had to // `import SparkContext._` to enable them. Now we move them here to make the compiler find // them automatically. However, as there are duplicate codes in SparkContext for backward // compatibility, please update them accordingly if you modify the following implicit objects. implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double): Double = 0.0 } implicit object IntAccumulatorParam extends AccumulatorParam[Int] { def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def zero(initialValue: Int): Int = 0 } implicit object LongAccumulatorParam extends AccumulatorParam[Long] { def addInPlace(t1: Long, t2: Long): Long = t1 + t2 def zero(initialValue: Long): Long = 0L } implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { def addInPlace(t1: Float, t2: Float): Float = t1 + t2 def zero(initialValue: Float): Float = 0f } // TODO: Add AccumulatorParams for other types, e.g. lists and strings }