簡單介紹
combineByKey()是最通用的對key-value型rdd進行聚集操作的聚集函數(aggregation function)。類似於aggregate(),combineByKey()允許用戶返回值的類型與輸入不一致。
其定義如下,我們可以根據這個形式來分別定義createCombiner、mergeValue和mergeCombiners三個函數:
def combineByKey[C](
createCombiner: V => C, ##A
mergeValue: (C, V) => C, ##B
mergeCombiners: (C, C) => C,##C
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null
)
自定義combineByKey
以實現一個計算平均值的功能為例來分別說明createCombiner、mergeValue和mergeCombiners三個函數的作用和定義方法。
##A createCombiner(value)
createCombiner: V => C ,這個函數把當前rdd中的值(value)作為參數,此時我們可以對其做些附加操作(類型轉換)並把它返回 (這一步類似於初始化操作,分區內操作)
def createCombiner(value):
(value, 1)
##B mergeValue(acc, value)
mergeValue: (C, V) => C,該函數把元素V合並到之前的元素C(createCombiner)上 (每個分區內合並)
def mergeValue(acc, value):
# 注意,這里的acc即為createCombiner產生的C。
# 這里,用acc[0]表明為acc這個元組中的第一個元素,在scala中acc._1表示
(acc[0]+value, acc[1]+1)
###C mergeCombiners: (acc1, acc2)
mergeCombiners: (C, C) => C,該函數把2個元素C合並 (此函數作用范圍在rdd的不同分區間內,跨分區合並)
def mergeCombiners(acc1, acc2):
# 注意,到這一步,表明這個rdd的每條數據都已經被###A和###B捕獲匹配完畢
(acc1[0]+acc2[0], acc1[1]+acc2[1])
案例:
如圖,有兩個分區,key-value(類別-數量)形式也清楚,我們想知道coffee的平均數量和panda的平均數量。以scala形式寫法如下:
val init_data = Array(("coffee", 1), ("coffee", 2), ("panda", 3), ("coffee", 9)) |
分析:
Partition 1 trace:
(coffee, 1) => new key
accumulators[coffee] = createCombiner(1)
得到:(coffee, (1, 1))
(coffee, 2) => existing key
accumulators[coffee] = mergeValue(accumulators[coffee], 2)
得到:(coffee, (2, 3))
顯然(panda, 3) => new key,調用createCombiner方法。
得到:(panda, (1, 3))
Partition 2 trace:
(coffee, 9) => new key
accumulators[coffee] = createCombiner(9)
得到:(coffee, (1, 9))
接下來,mergeCombiners來合並分區:
Merge Partitions:
mergeCombiners(partition1.accumulators[coffee], partition2.accumulators[coffee])
得到:(coffee, (3,12))
---------------------------------------------細心看 反復看 不然是假懂--------------------------------