【譯】避免使用GroupByKey
by:leotse
譯文
讓我們來看兩個wordcount的例子,一個使用了reduceByKey
,而另一個使用groupByKey
:
1 |
val words = Array("one", "two", "two", "three", "three", "three") |
上面兩個函數所得到的結果都是正確的,但是當數據集很大時,使用了reduceByKey
的例子表現更佳。這是因為在shuffle輸出的數據前,Spark會Combine每一個partition上具有相同key的輸出結果。
看下圖我們就能理解reduceByKey
的工作流程。我們注意到同一台機器上數據shuffle之前,相同key的數據(通過調用傳入reduceByKey
的lambda函數)Combine在一起的,然后再一次調用這個lambda函數去reduce來自各個partition的全部值,從而得到最終的結果。
另一方面,當調用groupByKey
的時候,所有的鍵值對都會進行shuffle,這將增加很多無謂的數據進行網絡傳輸。
為了確定哪台機器將接受Shuffle后的鍵值對,Spark會針對該鍵值對數據的key調用一個分區函數。當某一台executor機器上的內存不足以保存過多的Shuffle后數據時,Spark就會溢寫數據到磁盤上。然而,這種溢寫磁盤會一次性將一個key的全部鍵值對數據寫入磁盤,因此如果一個key擁有過多鍵值對數據——多到內存放不下時,將會拋出Out Of Memory異常。在之后發布的Spark中將會更加優雅地處理這種情況,使得這個job仍會繼續運行,但是我們仍然需要避免(使用groupByKey
)。當Spark需要溢寫磁盤的時候,它的性能將受到嚴重影響。
如果你有一個非常大的數據集,那么reduceByKey
和groupByKey
進行shuffle的數據量之間的差異將會更加誇張。
下面是一些你可以用來替代groupByKey
的函數:
1)當你在combine數據但是返回的數據類型因輸入值的類型而異時,你可以使用combineByKey
;
2)如果key使用到結合函數和“零值”,你可以用foldByKey
函數合並value;