1 前言
combineByKey是使用Spark無法避免的一個方法,總會在有意或無意,直接或間接的調用到它。從它的字面上就可以知道,它有聚合的作用,對於這點不想做過多的解釋,原因很簡單,因為reduceByKey、aggregateByKey、foldByKey等函數都是使用它來實現的。
combineByKey是一個高度抽象的聚合函數,可以用於數據的聚合和分組,由它牽出的shuffle也是Spark中重中之重,現在就讓我們去看看它到底是怎么去實現的。
不足或錯誤之處, 煩請指出更正。
2 方法源碼介紹
這是PairRDDFunctions里面的combineByKey的方法片段,這兩個方法放在一塊,就是說明了,調用該方法若不填分區函數Partitioner則使用HashPartitioner,默認情況下會使用Map段合並(這個是對shuffle而言的)。
3 方法源碼走讀
廢話不多說,直接貼源碼,
有注釋,則看注釋,注釋要表達的意思就是combineByKey是一個范函數,使用一組自定義聚合函數以Key為聚合條件進行聚合,至於其他的就不多說了,往下看代碼。
首先就進行了判斷,Key是否為數組,假如是數組則不能使用Map段合並和HashPartitioner,原因:
要想進行Map段合並和Hash分區,那么Key就必須可以通過比較內容是否相同來確定Key是否相等以及通過內容計算hash值,進而進行合並和分區,然而數組判斷相等和計算hash值並不是根據它里面的內容,而是根據數組在堆棧中的信息來實現的。
接着往下,構造了一個Aggregator,這玩意可以說是combineByKey的核心,因為聚合全是交給它來完成的。進去看看下Aggregator。
上面是Aggregator的默認構造器,需要傳入三個自定義的方法,現在重點說說這三個方法的意義:
首先緊跟着Aggregator的三個泛型,第一個K,這個是你進行combineByKey也就是聚合的條件Key,可以是任意類型。后面的V,C兩個泛型是需要聚合的值的類型,和聚合后的值的類型,兩個類型是可以一樣,也可以不一樣,例如,Spark中用的多的reduceByKey這個方法,若聚合前的值為long,那么聚合后仍為long。再比如groupByKey,若聚合前為String,那么聚合后為Iterable<String>。
再看三個自定義方法:
- createCombiner
這個方法會在每個分區上都執行的,而且只要在分區里碰到在本分區里沒有處理過的Key,就會執行該方法。執行的結果就是在本分區里得到指定Key的聚合類型C(可以是數組,也可以是一個值,具體還是得看方法的定義了。)
2. mergeValue
這方法也會在每個分區上都執行的,和createCombiner不同,它主要是在分區里碰到在本分區內已經處理過的Key才執行該方法,執行的結果就是將目前碰到的Key的值聚合到已有的聚合類型C中。
其實方法1和2放在一起看,就是一個if判斷條件,進來一個Key,就去判斷一下若以前沒出現過就執行方法1,否則執行方法2.
3. mergeCombiner
前兩個方法是實現分區內部的相同Key值的數據合並,而這個方法主要用於分區間的相同Key值的數據合並,形成最終的結果。
接下來就看看Aggregator實現了哪些方法。
從它的方法列表上來看,其實就它只有三個方法,那就依次來看看這三個方法是干嘛的:
- combineValuesByKey
看到這個名字,再根據構造器,就可以猜出,這個方法主要實現的就是分區內部的數據合並。看它的代碼:
這里根據是否可以刷磁盤分了兩條路,其實做的事情都是一樣的,區別是在存儲數據的時候一個當內存不夠是直接oom,一個是可以刷磁盤。代碼的實現很簡單,就是迭代一個分區的數據,然后不斷插入或更新Map里面的數據,這里就不再細說。
2. combineCombinersByKey
這個方法主要是實現分區間的數據合並,也就是合並combineValuesByKey的結果,看它是怎么實現的:
代碼就不說了,和combineValuesByKey如出一轍,只是使用的自定義的方法不同而已。
3. updateMetrics
這個方法和刷磁盤有關,
就是記錄下,當前是否刷了磁盤,刷了多少。
到這里Aggregator就結束了,接着combineByKey往下。
實例化Aggregator后,接着就是判斷,是否需要重新分區(shuffle):
- 不需要分區
當self.partitioner == Some(partitioner)時,也就是分區實例是同一個的時候,就不需要分區了,因此只需要對先用的分區進行combineValuesByKey操作就好了,沒有分區間的合並了,也不需要shuffle了。
2. 需要分區
兩個分區器不一樣,需要對現在分區的零散數據按Key重新分區,目的就是在於將相同的Key匯集到同一個分區上,由於數據分布的不確定性,因此有可能現在的每個分區的數據是由重新分區后的所有分區的部分數據構成的(寬依賴),因此需要shuffle,則構建ShuffledRDD,
其實到這里,我們就應該意識到,combineByKey的關鍵在於分區器partitioner,它是針對分區的一個操作,分區器的選擇就決定了執行combineByKey后的結果,如果所給的分區器不能保證相同的Key值被分區到同一個分區,那么最終的合並的結果可能存在多個分區里有相同的Key。
Shuffle的目的就是將零散於所有分區的數據按Key分區並集中。
需要shuffle的部分下部分再細說。