Spark計算均值


作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請注明出處

用spark來快速計算分組的平均值,寫法很便捷,話不多說上代碼

object ColumnValueAvg extends App {
  /**
    * ID,Name,ADDRESS,AGE
    * 001,zhangsan,chaoyang,20
    * 002,zhangsa,chaoyang,27
    * 003,zhangjie,chaoyang,35
    * 004,lisi,haidian,24
    * 005,lier,haidian,40
    * 006,wangwu,chaoyang,90
    * 007,wangchao,haidian,80
    */
  val conf = new SparkConf().setAppName("test column value sum and avg").setMaster("local[1]")
  val sc = new SparkContext(conf)

  val textRdd = sc.textFile(args(0))

  //be careful the toInt here is necessary ,if no cast ,then it will be age string append
  val addressAgeMap = textRdd.map(x => (x.split(",")(2), x.split(",")(3).toInt))

  val sumAgeResult = addressAgeMap.reduceByKey(_ + _).collect().foreach(println)

  val avgAgeResult = addressAgeMap.combineByKey(
    (v) => (v, 1),
    (accu: (Int, Int), v) => (accu._1 + v, accu._2 + 1),
    (accu1: (Int, Int), accu2: (Int, Int)) => (accu1._1 + accu2._1, accu1._2 + accu2._2)
  ).mapValues(x => (x._1 / x._2).toDouble).collect().foreach(println)

  println("Sum and Avg calculate successfuly")

  sc.stop()

}

用textFile讀取數據后,以address進行分組來求age的平均值,這里用combineByKey來計算,這是一個抽象層次很高的函數.稍微總結一下自己的理解

查看源代碼會發現combineByKey定義如下

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
    : RDD[(K, C)] = {
    combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
  }

combineByKey函數需要傳遞三個函數做為參數,分別為createCombiner、mergeValue、mergeCombiner,需要理解這三個函數的意義

結合數據來講的話,combineByKey默認按照key來進行元素的combine,這里三個參數都是對value的一些操作

1>第一個參數createCombiner,如代碼中定義的是 : (v) => (v, 1)

這里是創建了一個combiner,作用是當遍歷rdd的分區時,遇到第一次出現的key值,那么生成一個(v,1)的combiner,比如這里key為address,當遇到第一個

chaoyang,20 的時候,(v,1)中的v就是age的值20,1是address出現的次數
 
2>第2個參數是mergeValue,顧名思義就是合並value,如代碼中定義的是:(accu: (Int, Int), v) => (accu._1 + v, accu._2 + 1)
這里的作用是當處理當前分區時,遇到已經出現過的key,那么合並combiner中的value,注意這里accu: (Int, Int)對應第一個參數中出現的combiner,即(v,1),注意類型要一致
那么(accu._1 + v, accu._2 + 1)就很好理解了,accu._1即使需要合並的age的值,而acc._2是需要合並的key值出現的次數,出現一次即加1
 
3>第三個參數是mergeCombiners,用來合並各個分區上的累加器,因為各個分區分別運行了前2個函數后需要最后合並分區結果.
 
ok,運行代碼,結果如下,分別按照address來計算出age的平均值
 
(haidian,48.0)
(chaoyang,43.0)
 
由於combineByKey抽象程度很高,可以自己custom一些函數做為計算因子,因此可以靈活的完成更多的計算功能.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM