Spark性能優化(2)——廣播變量、本地緩存目錄、RDD操作、數據傾斜


廣播變量

背景

一般Task大小超過10K時(Spark官方建議是20K),需要考慮使用廣播變量進行優化。
大表小表Join,小表使用廣播的方式,減少Join操作。

參考:Spark廣播變量與累加器

Local Dir

背景

shuffle過程中,臨時數據需要寫入本地磁盤。本地磁盤的臨時目錄通過參數spark.local.dir配置。

性能優化點

spark.local.dir支持配置多個目錄。配置spark.local.dir有多個目錄,每個目錄對應不同的磁盤,這樣可以提升IO效率。另外,可以采用IO性能較高的磁盤作為local dir的磁盤。

注意:

  • 如果使用YARN、Mesos等資源框架,此參數應該通過相應資源框架的參數來設置。
  • 如果只有一個磁盤,配置了多個目錄,性能提升不大。

RDD操作:使用MapPartitions替代Map

性能優化點

map方法是對RDD的每一條記錄逐一操作。mapPartitons是對整個RDD,以迭代器的方式逐一操作。比如對條記錄的開銷較大,比如需要連接、斷開數據庫。使用map方法需要對每一條記錄都連接、斷開數據庫,效率差。此時,可以改用mapPartitons操作,只需要整個Partition連接、斷開一次數據庫即可。

1
rdd.map{x => conn=getDBConn;conn.write(x.toString);conn.close} 

改為:

1
rdd.mapPartitions(records => conn.getDBConn;for(item <- records) write(item.toString); conn.close) 

RDD操作:使用coalesce減小空運行的任務數量

性能優化點

  • 場景一

當對RDD進行多次過濾時,可能會形成很多空的、無數據的Partition。通過調用coalesce方法,可以減小Task個數。讓有的Task可以同時管理多個Partition。

  • 場景二

當任務數過多的時候,Shuffle壓力太大導致程序掛住不動,或者出現linux資源受限的問題。此時,可以通過調用coalesce方法,可以減小Task個數,讓程序得以繼續運行。

coalesce()方法接受一個參數,為減小后的目標Partition個數。

RDD操作:collect

Collect操作會將Executor的數據發送的Driver端。需要確保Driver有足夠的內存。Driver的內存通過參數spark.driver.memory參數進行配置。

RDD操作:使用reduceByKey替代groupByKey

reduceByKey會在Map端做本地聚合,而groupByKey等Shuffle操作不會再Map端做聚合。 能使用reduceByKey的地方盡量使用該方式,避免出現.groupByKey().map(x=>(x.1,x.2.size))

  • 舉例

對於數據

2015-05-01 13:00:00,B101,MEILIN
2015-05-01 10:04:20,B101,GUANLAN
2015-05-01 09:18:00,F301,MEILIN
2015-05-01 12:00:00,B107,WUHE
2015-05-01 18:20:00,F301,WUHE
2015-05-02 12:00:02,T442,GUANLAN
2015-05-01 07:00:00,B101,GUANLAN
2015-05-01 21:31:00,M721,WUHE
2015-05-01 09:00:00,Z007,MEILIN

現在要統計各個車牌(第二列)出現的次數,則應使用:

1
2
3
4
5
var dataRDD = sc.textFile("file:///tmp/data.txt") var data2RDD = dataRDD.map(s => s.split(",")) var data3RDD = data2RDD.map( a => (a(1),1) ) var data4RDD = data3RDD.reduceByKey(_ + _) data4RDD.collect 

而不是:

1
2
3
4
5
6
var dataRDD = sc.textFile("file:///tmp/data.txt") var data2RDD = dataRDD.map(s => s.split(",")) var data3RDD = data2RDD.map( a => (a(1),Array(a(0),a(2))) ) var data4RDD = data3RDD.groupByKey() var data5RDD = data4RDD.map(x => (x._1,x._2.size)) data5RDD.collect 

避免數據傾斜

如何檢測數據傾斜?

現象:沒有GC,各Task執行時間嚴重不一致。

性能優化點

  • 重新設計key,以更小粒度的key使得Task大小合理化。
  • 有時提升並行度,有助於解決數據傾斜


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM