Spark累加器使用


累加器
通过SparkContext.accumulator(v)来创建accumulator类型的变量,然后运行的task可以使用“+=”操作符来进行累加。但是task不能读取到该变量,只有driver program能够读取(通过.value),这也是为了避免使用太多读写锁吧。
自定义累加器类型
累加器类型除Spark自带的int、float、Double外,也支持开发人员自定义。方法是继承AccumulatorParam[Vector]。
累加器用法:
 
import org.apache.spark.{AccumulatorParam, SparkConf, SparkContext}
 
/**
  * Created by User on 2016/11/2.
  */
object AccumulatorDemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("AccumulatorDemo")
    val sc: SparkContext = new SparkContext(conf)
 
    val arrAccu=Array(0L,0L,0L,0L,0L)
    val accumulatorArr=sc.accumulator(arrAccu,"HADOOP")(MyAcculumatorParam)
 
    val accumulatorMl=sc.accumulator(0,"ML")
    val accumulatorDl=sc.accumulator(0L,"DL")
    val arr=Array("ML","DL","CNN","RNN","ML","HADOOP","SPARK","ML")
    for(i <- 0 to arr.length-1){
      if(arr(i).equals("ML")){
        accumulatorMl += 1
      }else if(arr(i).equals("DL")){
        accumulatorDl+=1
      }else if(arr(i).equals("HADOOP")){
        accumulatorArr += Array(1L,1L,1L,1L,1L)
      }
    }
    println("ML="+accumulatorMl.name.get+"、"+accumulatorMl.value)
    println("DL="+accumulatorDl.name.get+"、"+accumulatorDl.value)
    println("HADOOP="+accumulatorArr.name.get+"、"+accumulatorArr.value.mkString(","))
  }
  object MyAcculumatorParam extends  AccumulatorParam[Array[Long]]{
    override def addInPlace(r1: Array[Long], r2: Array[Long]): Array[Long] = {
      r1.zip(r2).map(x => x._1+x._2)
    }
    def zero(initialValue: Array[Long]): Array[Long] = {
      new Array[Long](initialValue.length)
    }
  }
}

 

输出:
16/11/02 14:18:35 INFO BlockManagerMaster: Registered BlockManager
ML=ML、3
DL=DL、1
HADOOP=HADOOP、1,1,1,1,1
16/11/02 14:18:35 INFO SparkContext: Invoking stop() from shutdown hook

 

不加(MyAcculumatorParam)的话,运行时候会报错,
Error:(15, 38) could not find implicit value for parameter param: org.apache.spark.AccumulatorParam[Array[Long]]
    val accumulatorArr=sc.accumulator(arrAccu,"HADOOP")
Error:(15, 38) not enough arguments for method accumulator: (implicit param: org.apache.spark.AccumulatorParam[Array[Long]])org.apache.spark.Accumulator[Array[Long]].
Unspecified value parameter param.
    val accumulatorArr=sc.accumulator(arrAccu,"HADOOP")

 

源码:
/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
* in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
* available when you create Accumulators of a specific type.
*
* @tparam T type of value to accumulate
*/
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
  def addAccumulator(t1: T, t2: T): T = {
    addInPlace(t1, t2)
  }
}
 
object AccumulatorParam {
 
  // The following implicit objects were in SparkContext before 1.2 and users had to
  // `import SparkContext._` to enable them. Now we move them here to make the compiler find
  // them automatically. However, as there are duplicate codes in SparkContext for backward
  // compatibility, please update them accordingly if you modify the following implicit objects.
 
  implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
    def addInPlace(t1: Double, t2: Double): Double = t1 + t2
    def zero(initialValue: Double): Double = 0.0
  }
 
  implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
    def addInPlace(t1: Int, t2: Int): Int = t1 + t2
    def zero(initialValue: Int): Int = 0
  }
 
  implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
    def addInPlace(t1: Long, t2: Long): Long = t1 + t2
    def zero(initialValue: Long): Long = 0L
  }
 
  implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
    def addInPlace(t1: Float, t2: Float): Float = t1 + t2
    def zero(initialValue: Float): Float = 0f
  }
 
  // TODO: Add AccumulatorParams for other types, e.g. lists and strings
}

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM