講明白combineByKey()算子,不是談源碼


簡單介紹

 combineByKey()是最通用的對key-value型rdd進行聚集操作的聚集函數(aggregation function)。類似於aggregate(),combineByKey()允許用戶返回值的類型與輸入不一致。

其定義如下,我們可以根據這個形式來分別定義createCombiner、mergeValue和mergeCombiners三個函數:

def combineByKey[C](
  createCombiner: V => C, ##A
  mergeValue: (C, V) => C, ##B
  mergeCombiners: (C, C) => C,##C 
  partitioner: Partitioner,   
  mapSideCombine: Boolean = true,
  serializer: Serializer = null

)

自定義combineByKey

以實現一個計算平均值的功能為例來分別說明createCombiner、mergeValue和mergeCombiners三個函數的作用和定義方法。

##A createCombiner(value)

createCombiner: V => C ,這個函數把當前rdd中的值(value)作為參數,此時我們可以對其做些附加操作(類型轉換)並把它返回 (這一步類似於初始化操作,分區內操作)

def createCombiner(value):

   (value, 1)

##B mergeValue(acc, value)

 mergeValue: (C, V) => C,該函數把元素V合並到之前的元素C(createCombiner)上 (每個分區內合並)

def mergeValue(acc, value):
# 注意,這里的acc即為createCombiner產生的C。
# 這里,用acc[0]表明為acc這個元組中的第一個元素,在scala中acc._1表示
  (acc[0]+value, acc[1]+1)
###C   mergeCombiners: (acc1, acc2)

 mergeCombiners: (C, C) => C,該函數把2個元素C合並 (此函數作用范圍在rdd的不同分區間內,跨分區合並)

def mergeCombiners(acc1, acc2):

# 注意,到這一步,表明這個rdd的每條數據都已經被###A和###B捕獲匹配完畢

   (acc1[0]+acc2[0], acc1[1]+acc2[1])

 

 

案例:

 

 

 

 如圖,有個分區,key-value(類別-數量)形式也清楚,我們想知道coffee的平均數量和panda的平均數量。以scala形式寫法如下:

val init_data = Array(("coffee", 1), ("coffee", 2), ("panda", 3), ("coffee", 9))
val data = sc.parallelize(init_data) # 兩個分區
type MVType = (Int, Int) //定義一個元組類型
data.combineByKey(
   score => (1, score), # createCombiner函數
   (c: MVType, newScore) => (c._1 + 1, c._2 + newScore), # mergeValue函數
   (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2) # mergeCombiners函數
).map { case (key, value) => (key, value._2/ value._1) }.map(println(_))

 

 

 

 

 

 

 

 

分析:

Partition 1 trace:
(coffee, 1) => new key
accumulators[coffee] = createCombiner(1)
得到:(coffee, (1, 1))
(coffee, 2) => existing key
accumulators[coffee] = mergeValue(accumulators[coffee], 2)
得到:(coffee, (2, 3))
顯然(panda, 3) => new key,調用createCombiner方法。
得到:(panda, (1, 3))

Partition 2 trace:
(coffee, 9) => new key
accumulators[coffee] = createCombiner(9)
得到:(coffee, (1, 9))

接下來,mergeCombiners來合並分區:

Merge Partitions
mergeCombiners(partition1.accumulators[coffee], partition2.accumulators[coffee])
得到:(coffee, (3,12))

 

---------------------------------------------細心看 反復看 不然是假懂--------------------------------

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM