spark中groupByKey與reducByKey


【譯】避免使用GroupByKey

by:leotse

原文:Avoid GroupByKey

譯文

讓我們來看兩個wordcount的例子,一個使用了reduceByKey,而另一個使用groupByKey:

1
2
3
4
5
6
7
8
9
10
11
val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

val wordCountsWithReduce = wordPairsRDD
.reduceByKey(_ + _)
.collect()

val wordCountsWithGroup = wordPairsRDD
.groupByKey()
.map(t => (t._1, t._2.sum))
.collect()

 

上面兩個函數所得到的結果都是正確的,但是當數據集很大時,使用了reduceByKey的例子表現更佳。這是因為在shuffle輸出的數據前,Spark會Combine每一個partition上具有相同key的輸出結果。

看下圖我們就能理解reduceByKey的工作流程。我們注意到同一台機器上數據shuffle之前,相同key的數據(通過調用傳入reduceByKey的lambda函數)Combine在一起的,然后再一次調用這個lambda函數去reduce來自各個partition的全部值,從而得到最終的結果。

另一方面,當調用groupByKey的時候,所有的鍵值對都會進行shuffle,這將增加很多無謂的數據進行網絡傳輸。

為了確定哪台機器將接受Shuffle后的鍵值對,Spark會針對該鍵值對數據的key調用一個分區函數。當某一台executor機器上的內存不足以保存過多的Shuffle后數據時,Spark就會溢寫數據到磁盤上。然而,這種溢寫磁盤會一次性將一個key的全部鍵值對數據寫入磁盤,因此如果一個key擁有過多鍵值對數據——多到內存放不下時,將會拋出Out Of Memory異常。在之后發布的Spark中將會更加優雅地處理這種情況,使得這個job仍會繼續運行,但是我們仍然需要避免(使用groupByKey)。當Spark需要溢寫磁盤的時候,它的性能將受到嚴重影響

如果你有一個非常大的數據集,那么reduceByKeygroupByKey進行shuffle的數據量之間的差異將會更加誇張。

下面是一些你可以用來替代groupByKey的函數:
1)當你在combine數據但是返回的數據類型因輸入值的類型而異時,你可以使用combineByKey
2)如果key使用到結合函數和“零值”,你可以用foldByKey函數合並value;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM