1.定義 【aggregate】 /** * Aggregate the elements of each partition, and then the results for all the partitions, using * given combine functions and a neutral "zero value". This function can return a different result * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. */ 即: aggregate需要三個參數(初始值zeroValue,函數seqOp和函數combOp),返回值類型U同初始值zeroValue一樣。 處理過程: 1.在rdd的每個分區上應用seqOp函數(應用初始值zeroValue)並返回分區的結果值(U類型)。 2.分區的結果值返回到driver端做reduce處理,也就是說在分區的結果集上應用函數combOp(應用初始值zeroValue), 並返回最終結果值(U類型)。 函數頭: def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U 【treeAggregate】 /** * Aggregates the elements of this RDD in a multi-level tree pattern. * @param depth suggested depth of the tree (default: 2) * @see [[org.apache.spark.rdd.RDD#aggregate]] */ 即:treeAggregate和aggregate可以一樣用,只是多了一個參數depth,但此參數默認為2,可以不指定。 treeAggregate和aggregate的參數,返回值及用法完全一樣。只是處理過程及最終的結果集處理有些微不同,下面詳細說明。 函數頭: def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U,combOp: (U, U) => U,depth: Int = 2): U 2.用法示例 【aggregate】 scala> def seq(a:Int,b:Int):Int={ | println("seq:"+a+":"+b) | math.min(a,b)} seq: (a: Int, b: Int)Int scala> def comb(a:Int,b:Int):Int={ | println("comb:"+a+":"+b) | a+b} comb: (a: Int, b: Int)Int val z =sc.parallelize(List(1,2,4,5,8,9),3) scala> z.aggregate(3)(seq,comb) seq:3:4 seq:3:1 seq:1:2 seq:3:8 seq:3:5 seq:3:9 comb:3:1 comb:4:3 comb:7:3 res0: Int = 10 【treeAggregate】 scala> def seq(a:Int,b:Int):Int={ | println("seq:"+a+":"+b) | math.min(a,b)} seq: (a: Int, b: Int)Int scala> def comb(a:Int,b:Int):Int={ | println("comb:"+a+":"+b) | a+b} comb: (a: Int, b: Int)Int val z =sc.parallelize(List(1,2,4,5,8,9),3) scala> z.treeAggregate(3)(seq,comb) seq:3:4 //3 分區1 seq:3:1 //1 分區1 seq:1:2 //1 分區1 seq:3:8 //3 分區2 seq:3:5 //3 分區2 seq:3:9 //3 分區3 comb:1:3 comb:4:3 res1: Int = 7 由上可見,形式上兩種用法一致,只是aggregate 比 treeAggregate在最后結果的reduce操作時,多使用了一次初始值。 3.區別 查看aggregate的代碼和treeAggregate的代碼實現會發現,確實如上現象所反映,整理結果如下: (1)最終結果上,aggregate會比treeAggregate多做一次對於初始值的combOp操作。但從參數名字上就可以看到, 一般要傳入類似0或者空的集合的zeroValue初始值。 (2)aggregate會把分區的結果直接拿到driver端做reduce操作。treeAggregate會先把分區結果做reduceByKey, 最后再把結果拿到driver端做reduce,算出最終結果。reduceByKey需要幾層,由參數depth決定,也就是相當於 做了depth層的reduceByKey,這也是treeAggregate名字的由來。 4.源碼解釋 源碼邏輯如上分析,較簡單,不贅述了。
借鑒圖一張(http://blog.csdn.net/lookqlp/article/details/52121057)
5.優缺點 (1) aggregate在combine上的操作,復雜度為O(n). treeAggregate的時間復雜度為O(lg n)。n為分區數。
(2) aggregate把數據全部拿到driver端,存在內存溢出的風險。treeAggregate則不會。
因此,筆者覺得就用treeAggregate好了,如有不對之處,敬請留言指正。
