combineByKey:
Generic function to combine the elements for each key using a custom set of aggregation functions.
概述
.combineByKey
方法是基於鍵進行聚合的函數(大多數基於鍵聚合的函數都是用它實現的),所以這個方法還是挺重要的。
我們設聚合前Pair RDD的鍵值對格式為:鍵為K,鍵值格式為V;而聚合后,鍵格式不便,鍵值格式為C。
combineByKey
函數的定義為:
combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc35dbc8e60>)
該函數的參數主要為前三個:
- createCombiner
- mergeValue
- mergeCombiners
示意圖如下:
一個例子
還是先看一個例子,暫時看不懂可以先看下面再回來。
>>> test = sc.parallelize([('panda', (1,2)), ('pink',(7,2)), ('pirate',(3,1))])
>>> xx = test.combineByKey((lambda x : (x,1)),\
... (lambda x,y: (x[0] + y, x[1]+ 1)),\
... (lambda x,y : (x[0] + y[0], x[1] + y[1])) )
>>> xx.collect()
[('coffee', (3, 2)), ('panda', (3, 1))]
這里,三個參數分別用了3個lambda表達式代替,分別為:
- createCombiner : lambda x : (x,1)
- mergeValue : lambda x , y : (x[0] + y , x[1] + 1 )
- mergeCombiners : lambda x , y : (x[0] + y[0], x[1] + y[1])
下面解釋這三個參數。
createCombiner
由於聚合操作會遍分區中所有的元素,因此每個元素(這里指的是鍵值對)的鍵只有兩種情況:
- 以前沒出現過
- 以前出現過
如果以前沒出現過,則執行的是createCombiner
方法;否則執行mergeValue方法
,即:
.createCombiner()
會在新遇到的鍵對應的累加器中賦予初始值。
該函數在格式上是由 V -> C
的,在上面的例子里面,是由 整數類型 -> 二元元組類型,這個二元元組第二個元素為1。
mergeValue
對於已經出現過的鍵(key),調用mergeValue來進行聚合操作,對該鍵的累加器對應的當前值(C格式)於這個新的值(V格式)進行合並。
mergeCombiners
如果有兩個或者更多的分區(這里的例子里沒提到)都有對應同一個鍵的累加器,就需要使用用戶提供的mergeCombiners()
方法將各個分區的結果(全是C格式)進行合並。