作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請注明出處
用spark來快速計算分組的平均值,寫法很便捷,話不多說上代碼
object ColumnValueAvg extends App { /** * ID,Name,ADDRESS,AGE * 001,zhangsan,chaoyang,20 * 002,zhangsa,chaoyang,27 * 003,zhangjie,chaoyang,35 * 004,lisi,haidian,24 * 005,lier,haidian,40 * 006,wangwu,chaoyang,90 * 007,wangchao,haidian,80 */ val conf = new SparkConf().setAppName("test column value sum and avg").setMaster("local[1]") val sc = new SparkContext(conf) val textRdd = sc.textFile(args(0)) //be careful the toInt here is necessary ,if no cast ,then it will be age string append val addressAgeMap = textRdd.map(x => (x.split(",")(2), x.split(",")(3).toInt)) val sumAgeResult = addressAgeMap.reduceByKey(_ + _).collect().foreach(println) val avgAgeResult = addressAgeMap.combineByKey( (v) => (v, 1), (accu: (Int, Int), v) => (accu._1 + v, accu._2 + 1), (accu1: (Int, Int), accu2: (Int, Int)) => (accu1._1 + accu2._1, accu1._2 + accu2._2) ).mapValues(x => (x._1 / x._2).toDouble).collect().foreach(println) println("Sum and Avg calculate successfuly") sc.stop() }
用textFile讀取數據后,以address進行分組來求age的平均值,這里用combineByKey來計算,這是一個抽象層次很高的函數.稍微總結一下自己的理解
查看源代碼會發現combineByKey定義如下
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) }
combineByKey函數需要傳遞三個函數做為參數,分別為createCombiner、mergeValue、mergeCombiner,需要理解這三個函數的意義
結合數據來講的話,combineByKey默認按照key來進行元素的combine,這里三個參數都是對value的一些操作
1>第一個參數createCombiner,如代碼中定義的是 : (v) => (v, 1)
這里是創建了一個combiner,作用是當遍歷rdd的分區時,遇到第一次出現的key值,那么生成一個(v,1)的combiner,比如這里key為address,當遇到第一個
chaoyang,20 的時候,(v,1)中的v就是age的值20,1是address出現的次數
2>第2個參數是mergeValue,顧名思義就是合並value,如代碼中定義的是:(accu: (Int, Int), v) => (accu._1 + v, accu._2 + 1)
這里的作用是當處理當前分區時,遇到已經出現過的key,那么合並combiner中的value,注意這里accu: (Int, Int)對應第一個參數中出現的combiner,即(v,1),注意類型要一致
那么(accu._1 + v, accu._2 + 1)就很好理解了,accu._1即使需要合並的age的值,而acc._2是需要合並的key值出現的次數,出現一次即加1
3>第三個參數是mergeCombiners,用來合並各個分區上的累加器,因為各個分區分別運行了前2個函數后需要最后合並分區結果.
ok,運行代碼,結果如下,分別按照address來計算出age的平均值
(haidian,48.0)
(chaoyang,43.0)
由於combineByKey抽象程度很高,可以自己custom一些函數做為計算因子,因此可以靈活的完成更多的計算功能.