[Spark] 關於函數 combineByKey


combineByKey:

Generic function to combine the elements for each key using a custom set of aggregation functions.

概述

.combineByKey 方法是基於鍵進行聚合的函數(大多數基於鍵聚合的函數都是用它實現的),所以這個方法還是挺重要的。

我們設聚合前Pair RDD的鍵值對格式為:鍵為K,鍵值格式為V;而聚合后,鍵格式不便,鍵值格式為C。

combineByKey函數的定義為:

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc35dbc8e60>)

該函數的參數主要為前三個:

  • createCombiner
  • mergeValue
  • mergeCombiners

示意圖如下:

combineByKey-illustrated

一個例子

還是先看一個例子,暫時看不懂可以先看下面再回來。

>>> test = sc.parallelize([('panda', (1,2)), ('pink',(7,2)), ('pirate',(3,1))])
>>> xx = test.combineByKey((lambda x : (x,1)),\
...                     (lambda x,y: (x[0] + y, x[1]+ 1)),\
...                     (lambda x,y : (x[0] + y[0], x[1] + y[1])) )
>>> xx.collect()
[('coffee', (3, 2)), ('panda', (3, 1))]

這里,三個參數分別用了3個lambda表達式代替,分別為:

  • createCombiner : lambda x : (x,1)
  • mergeValue : lambda x , y : (x[0] + y , x[1] + 1 )
  • mergeCombiners : lambda x , y : (x[0] + y[0], x[1] + y[1])

下面解釋這三個參數。

createCombiner

由於聚合操作會遍分區中所有的元素,因此每個元素(這里指的是鍵值對)的鍵只有兩種情況:

  • 以前沒出現過
  • 以前出現過

如果以前沒出現過,則執行的是createCombiner方法;否則執行mergeValue方法,即:

Key-Value-Pair

.createCombiner()會在新遇到的鍵對應的累加器中賦予初始值。

該函數在格式上是由 V -> C 的,在上面的例子里面,是由 整數類型 -> 二元元組類型,這個二元元組第二個元素為1。

mergeValue

對於已經出現過的鍵(key),調用mergeValue來進行聚合操作,對該鍵的累加器對應的當前值(C格式)於這個新的值(V格式)進行合並。

mergeCombiners

如果有兩個或者更多的分區(這里的例子里沒提到)都有對應同一個鍵的累加器,就需要使用用戶提供的mergeCombiners()方法將各個分區的結果(全是C格式)進行合並。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM