Spark聚合操作-reduceByKey、groupByKey、combineBykey的使用與比較


  聚合操作是spark運算中特別常見的一種行為。比如分析用戶一天的活着一次登陸的行為,就要把日志按用戶id進行聚合,然后做排序、求和、求平均之類的運算……而spark中對於聚合操作的蒜子也比較豐富,本文主要結合作者個人的經驗和網上資料,對這幾個算子進行整理和比較。
  這里,一般都是對Pair RDD 進行的聚合操作。首先,什么是pair RDD
  Spark為包含鍵值對類型的RDD提供了一些專有的操作。這些RDD被稱為Pair RDD【《spark 快速大數據分析》】。
  關於Pair RDD 這篇博客 講的挺詳細的 http://blog.csdn.net/gamer_gyt/article/details/51747783
Pair RDD又叫做鍵值對RDD 鍵值對RDD通常用來進行聚合運算的(書中原話)。spark 快速大數據分析書中有專門一章來介紹pair rdd 但是對幾個聚合運算算子的比較幾乎沒有,以下結合網上的資料進行的整理。
 
 
1.reduceByKey和groupByKey的比較:
貼一段經典的代碼:
val conf = new SparkConf().setAppName("GroupAndReduce").setMaster("local")
    val sc = new SparkContext(conf)
    val words = Array("one", "two", "two", "three", "three", "three")
    val wordsRDD = sc.parallelize(words).map(word => (word, 1))
    val wordsCountWithReduce = wordsRDD.
      reduceByKey(_ + _).
      collect().
      foreach(println)
    val wordsCountWithGroup = wordsRDD.
      groupByKey().
      map(w => (w._1, w._2.sum)).
      collect().
      foreach(println)

  

1.reduceByKey可以傳入自定義函數,而groupByKey不可以,但是groupByKey省事啊(誤)
 
 
官方文檔(spark2.1.1):
groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
當對(K,V)對的數據集進行調用時,返回(K,Iterable <V>)對的數據集。Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 如果要分組以便在每個鍵上執行聚合(如總和或平均值),則使用reduceByKey或aggregateByKey將會產生更好的性能。
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.默認情況下,輸出中的並行級別取決於父RDD的分區數。您可以傳遞一個可選的numTasks參數來設置不同數量的任務。
reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
其中每個密鑰的值使用給定的reduce函數func進行聚合,該函數必須是類型(V,V)=> V。與groupByKey類似,reduce任務的數量可以通過可選的第二個參數進行配置。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
其中每個鍵的值使用給定的組合函數和中性“零”值進行聚合。允許不同於輸入值類型的聚合值類型,同時避免不必要的分配。像groupByKey一樣,reduce任務的數量可以通過可選的第二個參數進行配置。
 
 
1)當采用reduceByKeyt時,Spark可以在每個分區移動數據之前將待輸出數據與一個共用的key結合。借助下圖可以理解在reduceByKey里究竟發生了什么。 注意在數據對被搬移前同一機器上同樣的key是怎樣被組合的(reduceByKey中的lamdba函數)。然后lamdba函數在每個區上被再次調用來將所有值reduce成一個最終結果。整個過程如下:
(2)當采用groupByKey時,由於它不接收函數,spark只能先將所有的鍵值對(key-value pair)都移動,這樣的后果是集群節點之間的開銷很大,導致傳輸延時。整個過程如下:
 
 
2.在對大數據進行復雜計算時,reduceByKey優於groupByKey,reduceByKey在數據量比較大的時候會遠遠快於groupByKey。
 
另外,如果僅僅是group處理,那么以下函數應該優先於 groupByKey :
  (1)、combineByKey 組合數據,但是組合之后的數據類型與輸入時值的類型不一樣。
  (2)、foldByKey合並每一個 key 的所有值,在級聯函數和“零值”中使用。
 
2.reduceByKey 的使用
1.簡單的例子:
該函數必須是類型(V,V)=> V
val counts = pairs.reduceByKey((a, b) => a + b)
 
2.reduceByKey的作用域是key-value類型的鍵值對,並且是只對每個key的value進行處理,如果含有多個key的話,那么就對多個values進行處理。這里的函數是我們自己傳入的,也就是說是可人為控制的
 
簡單來講,這個V實際上就是value,你要寫一個函數,對同一個key 對兩個value有什么操作?最簡單的,相加!你就寫(a, b) => a + b 或者(pre, after) => pre + after 都可以
3.combineByKey 的使用

combineByKey函數主要接受了三個函數作為參數,分別為createCombiner、mergeValue、mergeCombiners。這三個函數足以說明它究竟做了什么。理解了這三個函數,就可以很好地理解combineByKey。

要理解combineByKey(),要先理解它在處理數據時是如何處理每個元素的。由於combineByKey()會遍歷分區中的所有元素,因此每個元素的鍵要么還沒有遇到過,要么就和之前的鍵相同。combineByKey()的處理流程如下:

  1. 如果是一個新的元素,此時使用createCombiner()來創建那個鍵對應的累加器的初始值。(!注意:這個過程會在每個分區第一次出現各個鍵時發生,而不是在整個RDD中第一次出現一個鍵時發生。)

  2. 如果這是一個在處理當前分區中之前已經遇到鍵,此時combineByKey()使用mergeValue()將該鍵的累加器對應的當前值與這個新值進行合並。

3.由於每個分區都是獨立處理的,因此對於同一個鍵可以有多個累加器。如果有兩個或者更多的分區都有對應同一個鍵的累加器,就需要使用用戶提供的mergeCombiners()將各個分區的結果進行合並。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM