一段程序只能完成功能是沒有用的,只能能夠穩定、高效率地運行才是生成環境所需要的。
本篇記錄了Spark各個角度的調優技巧,以備不時之需。
一、配置參數的方式和觀察性能的方式
額。。。從最基本的開始講,可能一些剛接觸Spark的人不是很清楚Spark的一些參數變量到底要配置在哪里。
可以通過三種方式配置參數,任選其一皆可。
- spark-env.sh文件中配置:最近常使用的配置方式,格式可以參考其中的一些官方保留的配置。
- 程序中通過SparkConf配置:通過SparkConf對象set方法設置鍵值對,比較直觀。
- 程序中通過System.setProperty配置:和方法二差不多。
值得一提的是一個略顯詭異的現象,有些參數在spark-env.sh中配置並不起作用,反而要在程序中設置才有效果。
Spark的參數很多,一些默認的設置可以參考官網推薦的配置參數:/docs/latest/configuration.html
可以通過以下幾種方式來觀察Spark集群的狀態和相關性能問題:
- Web UI:即8088端口進入的UI界面。
- Driver程序日志:根據程序提交方式的不同到指定的節點上觀察Driver程序日志。
- logs文件夾下的日志:Spark集群的大部分信息都會記錄在這里。
- works文件夾下的日志:主要記錄Work節點的信息。
- Profiler工具:沒有使用過。
前景交代完畢,下面進入正題:
二、調度與分區優化
1、小分區合並的問題
由於程序中過度使用filter算子或者使用不當,都會造成大量的小分區出現。
因為每次過濾得到的結果只有原來數據集的一小部分,而這些量很小的數據同樣會以一定的分區數並行化分配到各個節點中執行。
帶來的問題就是:任務處理的數據量很小,反復地切換任務所消耗的資源反而會帶來很大的系統開銷。
解決方案:使用重分區函數coalesce進行數據緊縮、減少分區數並設置shuffle=true保證任務是並行計算的
減少分區數,雖然意味着並行度降低,但是相對比之前的大量小任務過度切換的消耗,卻是比較值得的。
這里也可以直接使用repartition重分區函數進行操作,因為其底層使用的是coalesce並設置Shuffle=true
2、數據傾斜問題
這是一個生產環境中經常遇到的問題,典型的場景是:大量的數據被分配到小部分節點計算,而其他大部分節點卻只計算小部分數據。
問題產生的原因有很多,可能且不全部包括:
- key的數據分布不均勻
- 業務數據本身原因
- 結構化表設計問題
- 某些SQL語句會造成數據傾斜
可選的解決方案有:
- 增大任務數,減少分區數量:這種方法和解決小分區問題類似。
- 對特殊的key進行處理,如空值等:直接過濾掉空值的key以免對任務產生干擾。
- 使用廣播:小數據量直接廣播,大數據量先拆分之后再進行廣播。
還有一種場景是任務執行速度傾斜問題:集群中其他節點都計算完畢了,但是只有少數幾個節點死活運行不完。(其實這和上面的那個場景是差不多的)
解決方案:
- 設置spark.speculation=true將執行事件過長的節點去掉,重新分配任務
- spark.speculation.interval用來設置執行間隔
3、並行度調整
官方推薦每個CPU CORE分配2-3個任務。
- 任務數太多:並行度太高,產生大量的任務啟動和切換開銷。
- 任務數太低:並行度過低,無法發揮集群並行計算能力,任務執行慢
Spark會根據文件大小默認配置Map階段的任務數,所以我們能夠自行調整的就是Reduce階段的分區數了。
- reduceByKey等操作時通過numPartitions參數進行分區數量配置。
- 通過spark.default.parallelism進行默認分區數配置。
4、DAG調度執行優化
DAG圖是Spark計算的基本依賴,所以建議:
- 同一個Stage盡量容納更多地算子,防止多余的Shuffle。
- 復用已經cache的數據。
盡可能地在Transformation算子中完成對數據的計算,因為過多的Action算子會產生很多多余的Shuffle,在划分DAG圖時會形成眾多Stage。
三、網絡傳輸優化
1、大任務分發問題
Spark采用Akka的Actor模型來進行消息傳遞,包括數據、jar包和相關文件等。
而Akka消息通信傳遞默認的容量最大為10M,一旦傳遞的消息超過這個限制就會出現這樣的錯誤:
Worker任務失敗后Master上會打印“Lost TID:”
根據這個信息找到對應的Worker節點后查看SparkHome/work/目錄下的日志,查看Serialized size of result是否超過10M,就可以知道是不是Akka這邊的問題了。
一旦確認是Akka通信容量限制之后,就可以通過配置spark.akka.frameSize控制Akka通信消息的最大容量。
2、Broadcast在調優場景的使用
Broadcast廣播,主要是用於共享Spark每個Task都會用到的一些只讀變量。
對於那些每個Task都會用到的變量來說,如果每個Task都為這些變量分配內存空間顯然會使用很多多余的資源,使用廣播可以有效的避免這個問題,廣播之后,這些變量僅僅會在每台機器上保存一份,有Task需要使用時就到自己的機器上讀取就ok。
官方推薦,Task大於20k時可以使用,可以在控制台上看Task的大小。
3、Collect結果過大的問題
大量數據時將數據存儲在HDFS上或者其他,不是大量數據,但是超出Akka傳輸的Buffer大小,通過配置spark.akka.frameSize調整。
四、序列化與壓縮
1、通過序列化手段優化
序列化之前說過,好處多多,所以是推薦能用就用,Spark上的序列化方式有幾種,具體的可以參考官方文檔。
這里只簡單介紹一下Kryo。
配置參數的時候使用spark.serializer=”org.apache.spark.serializer.KryoSerializer”配置
自定義定義可以被Kryo序列化的類的步驟:
- 自定義類extends KryoRegistrator
- 設置序列化方式conf.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)
- conf.set(“spark.kyro.registrator”,”自定義的class”)
- 如果對象占用空間大,需要增加Kryo的緩沖區則配置spark.kryoserializer.buffer.mb上值默認為2M
2、通過壓縮手段優化
Spark的Job大致可以分為兩種:
- I/O密集型:即存在大量讀取磁盤的操作。
- CPU密集型:即存在大量的數據計算,使用CPU資源較多。
對於I/O密集型的Job,能壓縮就壓縮,因為讀磁盤的時候數據壓縮了,占用空間小了,讀取速度不就快了。
對於CPU密集型的Job,看具體CPU使用情況再做決定,因為使用壓縮是需要消耗一些CPU資源的,如果當前CPU已經超負荷了,再使用壓縮反而適得其反。
Spark支持兩種壓縮算法:
- LZF:高壓縮比
- Snappy:高速度
一些壓縮相關的參數配置:
- spark.broadcast.compress:推薦為true
- spark.rdd.compress:默認為false,看情況配置,壓縮花費一些時間,但是可以節省大量內存空間
- spark.io.compression.codec:org.apache.spark.io.LZFCompressionCodec根據情況選擇壓縮算法
- spark.io.compressions.snappy.block.size:設置Snappy壓縮的塊大小
五、其他優化方式
1、對外部資源的批處理操作
如操作數據庫時,每個分區的數據應該一起執行一次批處理,而不是一條數據寫一次,即map=>mapPartition。
2、reduce和reduceByKey
reduce:內部調用了runJob方法,是一個action操作。
reduceByKey:內部只是調用了combineBykey,是Transformation操作。
大量的數據操作時,reduce匯總所有數據到主節點會有性能瓶頸,將數據轉換為Key-Value的形式使用reduceByKey實現邏輯,會做類似mr程序中的Combiner的操作,Transformation操作分布式進行。
3、Shuffle操作符的內存使用
使用會觸發Shuffle過程的操作符時,操作的數據集合太大造成OOM,每個任務執行過程中會在各自的內存創建Hash表來進行數據分組。
可以解決的方案可能有:
- 增加並行度即分區數可以適當解決問題
- 可以將任務數量擴展到超過集群整體的CPU core數
