Aggregate函數
一、源碼定義
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using
* given combine functions and a neutral "zero value". This function can return a different result
* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*
* @param zeroValue the initial value for the accumulated result of each partition for the
* `seqOp` operator, and also the initial value for the combine results from
* different partitions for the `combOp` operator - this will typically be the
* neutral element (e.g. `Nil` for list concatenation or `0` for summation)
* @param seqOp an operator used to accumulate results within a partition
* @param combOp an associative operator used to combine results from different partitions
*/
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}
首先,大致解釋一下源碼中的定義:
因為通常為我們的spark程序計算是分布式的,所以我們通常需要聚合的數據都分部在不同的分區,不同的機器上。
該函數它會首先對每個分區內的數據基於初始值進行一個首次聚合,然后將每個分區聚合的結果,通過使用給定的聚合函數,再次基於初始值進行分區之間的聚合,並且最終干函數的返回結果的類型,可以是與該RDD的類型相同。什么意思呢,其實這樣說,還是有點蒙圈的,下面詳細道來。
從源碼中可以看出,該函數是一個柯里化函數,它需要接受一共三個參數,分別是:
- zeroValue:U
這個值代表的是我們需要設置的初始值,該初始值可以是不與原RDD的元素的類型相同,可以是Int,String,元組等等任何我們所需要的類型,根據自己的需求來,為了方便后面的表示,假設我把它定義為數值類型的元組(0,0),注意,這里必須是具體的值,並非函數
- seqOp: (U, T) => U
這里需要定義一個函數,注意,是函數,U的類型與我們在第一步中定義的初始值得類型相同,所以,這里的U指的就是(Int,Int)類型
這里的T代表的即為RDD中每個元素的值。
該函數的功能是,在每個分區內遍歷每個元素,將每個元素與U進行聚合,具體的聚合方式,我們可以自定義,不過有一點需要注意,這里聚合的時候依然要基於初始值來進行計算
- combOp: (U, U) => U
這里同樣需要定義一個函數,這里的U即為每個分區內聚合之后的結果,如上,上一步中的U為(Int,Int)類型,則這里的U也為該類型
該函數的主要作用就是對每個分區聚合之后的結果進行再次合並,即分區之間的合並,但是,同樣,在合並的開始,也是要基於初始進行合並,其實這里我們可以發現,這里U的類型是與初始值的類型是相同的。
上面啰里啰嗦的說了這么所,感覺還是不太直觀,上代碼瞧瞧:
案例1:求取給定RDD的平均數
object TestAggreate { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) sc.setLogLevel("warn") val rdd1 = sc.parallelize(Seq(("a", 2), ("a", 5), ("a", 4), ("b", 5), ("c", 3), ("b", 3), ("c", 6), ("a", 8)), 1) val r1 = rdd1.aggregate((0, 0))( (u, c) => (u._1 + 1, u._2 + c._2), (r1, r2) => (r1._1 + r2._1, r1._2 + r2._2) ) println(r1) sc.stop() } }
這里先給出運行結果,再來解釋:
首先我們需要確定,該RDD的分區數為1,也就是說所有的數據都是在一個分區內進行計算,其次該RDD的類型是RDD[(String,Int)],我的目標是求該RDD總個數以及第二個值得總和
1、首先定義初始值,該例子中我定義為了(0,0),是一個(Int,Int)類型的,我准備第一個0代表計數,第二個0代表對每個元素進行求和
2、(u,c),這個函數,這里的u類型就是(Int,Int)類型,c指的就是RDD中的每個元素,每遍歷一個元素c,u的第一個元素就會加1,也就是u._1 + 1,同時u的第二個元素會對c的第二個元素進行累加,也就是u._2 + c._2,不過這里的累加都是要基於初始值進行累加的,順序是這樣的:
第一次 0+1,0+2
第二次 1+1,2+5
第三次 2+1,7+4
第四次 3+1,11+5
第五次 4+1,16+3
第六次 5+1,19+3
第七次 6+1,22+6
第八次 7+1,28+8
最終結果就是(8,36)
3、(r1.r2),該函數是實現每個分區內的數據進行合並,因為這里只有一個分區,所以只是分區0與另外一個空分區進行合並。
這里如果我們將分區數設置為超過1個的情況下,會怎樣呢,來看下:
bject TestAggreate { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) sc.setLogLevel("warn") val rdd1 = sc.parallelize(Seq(("a", 2), ("a", 5), ("a", 4), ("b", 5), ("c", 3), ("b", 3), ("c", 6), ("a", 8)), 4) val r1 = rdd1.aggregate((0, 0))( (u, c) => (u._1 + 1, u._2 + c._2), (r1, r2) => (r1._1 + r2._1, r1._2 + r2._2) ) println(r1) sc.stop() } }
這種情況下,我們將RDD分在了四個分區內,每個分區內分配兩個數據,具體每個分區內有哪幾個元素,可以這樣查看:
rdd1.foreachPartition(part => { val partitionId = TaskContext.getPartitionId() part.foreach(x => { println((partitionId, x._1, x._2)) }) })
從上面可以看出,分區數據分別是存在了part0((a,2),(a,5)),part1((a,4),(b,5)),part1((c,3),(b,3)),part3((c,6),(a,8)),這種情況下的合並過程是這樣的:
1、每個分區內合並,結果就是 part0(0+1+1,0+2+5) part1(0+1+1,0+4+5) part2(0+1+1,0+3+3) part3(0+1+1,0+6+8)
2、(part0,part1) => (part0._1 + part1._1,part0._2+part1._2),然后使用該結果,在依次與part2,part3進行合並,結果就為(0+2+2+2+2,0+7+9+6+14),結果(8,36)
這里我在測試的過程中發現一個問題,就是說在分區數大於1的情況下,當我最后將分區合並的函數中的聚合過程,相互顛倒過來的話,也就是,正常,我應該得到(8,36),但是我聚合的時候想得到(36,8)這個結果,下面這段代碼:
val r1 = rdd1.aggregate((0, 0))(
(u, c) => (u._1 + 1, u._2 + c._2),
(r1, r2) => (r1._2 + r2._2, r1._1 + r2._1)
)
上面標紅的代碼,我顛倒了順序,我的預期的得到(36,8),但是結果卻是隨機產生的結果,像這樣:
上面是執行了兩次,產生了兩次不同的結果,但是顯然是錯誤的。但是具體它是怎么計算出來的,博主現在目前還沒有研究出來。
案例2:求和
該案例主要是測試一下初始值的變化對結果產生的影響,進一步證明,不管是在分區內進行聚合還是分區之間進行聚合的時候,都會使用到初始值,案例1中的初始值我們都設置的是0,此時我將其設置成2在來看看結果,測試代碼:
object TestAggreate { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) sc.setLogLevel("warn") val rdd1 = sc.parallelize(Seq(("a", 2), ("a", 5), ("a", 4), ("b", 5), ("c", 3), ("b", 3), ("c", 6), ("a", 8)), 4) val r1 = rdd1.aggregate((0))( (u, c) => (u + c._2), (r1, r2) => (r1 + r2) ) println(r1) sc.stop() } }
結果:(46),但是實際之和加起來是36,顯然多出了10,這個10是怎么來的呢?
計算方式如下:
1、首先這里是4個分區,每個分區進行聚合,而之前說過,分區內聚合都是要以初始值為基准的,也就是說要在初始值得基礎上進行相加:
part0 (2+2+5)
part1(2+4+5)
part2(2+3+3)
part3(2+6+8)
2、其次是分區之間的聚合,分區之間的聚合也是要在初始值的基礎上相加的,即
2+part0+part1+part2+part3
結果即為46,
所以說,如果我們想要得到預想的結果,對於該函數生成的結果還要減去如下數:
result-initValue*(partitions+1)
總結一下:
該函數是spark中的一個高性能的算子,它實現了先進性分區內的聚合之后在進行了對每個分區的聚合結果再次進行聚合的操作,這樣的在大數據量的情況下,大大減少了數據在各個節點之間不必要的網絡IO,大大提升了性能,相比於groupBy的函數,在特定情況下,性能提升數十倍不止,不過在使用的過程中一定要對該函數所對應的每個參數的含義了如指掌,這樣運用起來才能得心應手。