Spark算子選擇策略


摘要

   1.使用reduceByKey/aggregateByKey替代groupByKey

  2.使用mapPartitions替代普通map

  3.使用foreachPartitions替代foreach

  4.使用filter之后進行coalesce操作

  5.使用repartitionAndSortWithinPartitions替代repartition與sort類操作

  6.使用broadcast使各task共享同一Executor的集合替代算子函數中各task傳送一份集合

  7.使用相同分區方式的join可以避免Shuffle

   8.map和flatMap選擇

   9.cache和persist選擇

   10.zipWithIndex和zipWithUniqueId選擇

  

內容

1.使用reduceByKey/aggregateByKey替代groupByKey

 reduceByKey/aggregateByKey底層使用combinerByKey實現,會在map端進行局部聚合;groupByKey不會

2.使用mapPartitions替代普通map

mapPartitions類的算子,一次函數調用會處理一個partition所有的數據,而不是一次函數調用處理一條,性能相對來說會高一些。但是有的時候,使用mapPartitions會出現OOM(內存溢出)的問題。因為單次函數調用就要處理掉一個partition所有的數據,如果內存不夠,垃圾回收時是無法回收掉太多對象的,很可能出現OOM異常。所以使用這類操作時要慎重!

3.使用foreachPartitions替代foreach

原理類似於“使用mapPartitions替代map”,也是一次函數調用處理一個partition的所有數據,而不是一次函數調用處理一條數據。在實踐中發現,foreachPartitions類的算子,對性能的提升還是很有幫助的。比如在foreach函數中,將RDD中所有數據寫MySQL,那么如果是普通的foreach算子,就會一條數據一條數據地寫,每次函數調用可能就會創建一個數據庫連接,此時就勢必會頻繁地創建和銷毀數據庫連接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個partition的數據,那么對於每個partition,只要創建一個數據庫連接即可,然后執行批量插入操作,此時性能是比較高的。實踐中發現,對於1萬條左右的數據量寫MySQL,性能可以提升30%以上。

4.使用filter之后進行coalesce操作

通常對一個RDD執行filter算子過濾掉RDD中較多數據后(比如30%以上的數據),建議使用coalesce算子,手動減少RDD的partition數量,將RDD中的數據壓縮到更少的partition中去。因為filter之后,RDD的每個partition中都會有很多數據被過濾掉,此時如果照常進行后續的計算,其實每個task處理的partition中的數據量並不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢。因此用coalesce減少partition數量,將RDD中的數據壓縮到更少的partition之后,只要使用更少的task即可處理完所有的partition。在某些場景下,對於性能的提升會有一定的幫助。

5.使用repartitionAndSortWithinPartitions替代repartition與sort類操作

repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,如果需要在repartition重分區之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。因為該算子可以一邊進行重分區的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。

6.使用broadcast使各task共享同一Executor的集合替代算子函數中各task傳送一份集合

在算子函數中使用到外部變量時,默認情況下,Spark會將該變量復制多個副本,通過網絡傳輸到task中,此時每個task都有一個變量副本。如果變量本身比較大的話(比如100M,甚至1G),那么大量的變量副本在網絡中傳輸的性能開銷,以及在各個節點的Executor中占用過多內存導致的頻繁GC,都會極大地影響性能。

因此對於上述情況,如果使用的外部變量比較大,建議使用Spark的廣播功能,對該變量進行廣播。廣播后的變量,會保證每個Executor的內存中,只駐留一份變量副本,而Executor中的task執行時共享該Executor中的那份變量副本。這樣的話,可以大大減少變量副本的數量,從而減少網絡傳輸的性能開銷,並減少對Executor內存的占用開銷,降低GC的頻率。

7.使用相同分區方式的join可以避免Shuffle

Spark知道當前面的轉換已經根據相同的partitioner分區器分好區的時候如何避免shuffle。如果RDD有相同數目的分區,join操作不需要額外的shuffle操作。因為RDD是相同分區的,rdd1中任何一個分區的key集合都只能出現在rdd2中的單個分區中。因此rdd3中任何一個輸出分區的內容僅僅依賴rdd1和rdd2中的單個分區,第三次shuffle就沒有必要了。

rdd1 = someRdd.reduceByKey(...)
rdd2 = someOtherRdd.reduceByKey(...)
rdd3 = rdd1.join(rdd2)

那如果rdd1和rdd2使用不同的分區器,或者使用默認的hash分區器但配置不同的分區數呢?那樣的話,僅僅只有一個rdd(較少分區的RDD)需要重新shuffle后再join。(參考自

8.map和flatMap選擇

def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U])RDD[U] //Return a new RDD by applying a function to all elements of this RDD.

def flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U])RDD[U]  //Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
前者的輸入是一個單一數據,后者的輸入數據是一個可迭代的集合。同樣是執行某種映射函數,后者最終會把元素打平,即map的輸入輸出是一對一的,而flatMap的輸出是一對多的

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM