spark之combineByKey


combineByKey

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

其中的參數:

createCombiner:組合器函數,用於將V類型轉換成C類型,輸入參數為RDD[K,V]中的V,輸出為C

mergeValue:合並值函數,將一個C類型和一個V類型值合並成一個C類型,輸入參數為(C,V),輸出為C

mergeCombiners:合並組合器函數,用於將兩個C類型值合並成一個C類型,輸入參數為(C,C),輸出為C

numPartitions:結果RDD分區數,默認保持原有的分區數

partitioner:分區函數,默認為HashPartitioner

mapSideCombine:是否需要在Map端進行combine操作,類似於MapReduce中的combine,默認為true


 

舉例理解:

假設我們要將一堆的各類水果給榨果汁,並且要求果汁只能是純的,不能有其他品種的水果。那么我們需要一下幾步:

1 定義我們需要什么樣的果汁。

2 定義一個榨果汁機,即給定水果,就能給出我們定義的果汁。--相當於hadoop中的local combiner

3 定義一個果汁混合器,即能將相同類型的水果果汁給混合起來。--相當於全局進行combiner

 

那么對比上述三步,combineByKey的三個函數也就是這三個功能

1 createCombiner就是定義了v如何轉換為c

2 mergeValue 就是定義了如何給定一個V將其與原來的C合並成新的C

3 就是定義了如何將相同key下的C給合並成一個C

var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))

rdd1.combineByKey(
(v : Int) => List(v),             --將1 轉換成 list(1)
(c : List[Int], v : Int) => v :: c,       --將list(1)和2進行組合從而轉換成list(1,2)

(c1 : List[Int], c2 : List[Int]) => c1 ::: c2  --將全局相同的key的value進行組合
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM