combineByKey
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
其中的參數:
createCombiner:組合器函數,用於將V類型轉換成C類型,輸入參數為RDD[K,V]中的V,輸出為C
mergeValue:合並值函數,將一個C類型和一個V類型值合並成一個C類型,輸入參數為(C,V),輸出為C
mergeCombiners:合並組合器函數,用於將兩個C類型值合並成一個C類型,輸入參數為(C,C),輸出為C
numPartitions:結果RDD分區數,默認保持原有的分區數
partitioner:分區函數,默認為HashPartitioner
mapSideCombine:是否需要在Map端進行combine操作,類似於MapReduce中的combine,默認為true
舉例理解:
假設我們要將一堆的各類水果給榨果汁,並且要求果汁只能是純的,不能有其他品種的水果。那么我們需要一下幾步:
1 定義我們需要什么樣的果汁。
2 定義一個榨果汁機,即給定水果,就能給出我們定義的果汁。--相當於hadoop中的local combiner
3 定義一個果汁混合器,即能將相同類型的水果果汁給混合起來。--相當於全局進行combiner
那么對比上述三步,combineByKey的三個函數也就是這三個功能
1 createCombiner就是定義了v如何轉換為c
2 mergeValue 就是定義了如何給定一個V將其與原來的C合並成新的C
3 就是定義了如何將相同key下的C給合並成一個C
var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1.combineByKey(
(v : Int) => List(v), --將1 轉換成 list(1)
(c : List[Int], v : Int) => v :: c, --將list(1)和2進行組合從而轉換成list(1,2)
(c1 : List[Int], c2 : List[Int]) => c1 ::: c2 --將全局相同的key的value進行組合
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))