1、Spark優化
1) 使用foreachPartitions替代foreach。
原理類似於“使用mapPartitions替代map”,也是一次函數調用處理一個partition的所有數據,而不是一次函數調用處理一條數據。在實踐中發現,foreachPartitions類的算子,對性能的提升還是很有幫助的。比如在foreach函數中,將RDD中所有數據寫MySQL,那么如果是普通的foreach算子,就會一條數據一條數據地寫,每次函數調用可能就會創建一個數據庫連接,此時就勢必會頻繁地創建和銷毀數據庫連接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個partition的數據,那么對於每個partition,只要創建一個數據庫連接即可,然后執行批量插入操作,此時性能是比較高的。實踐中發現,對於1萬條左右的數據量寫MySQL,性能可以提升30%以上。
2) 設置num-executors參數
參數說明:該參數用於設置Spark作業總共要用多少個Executor進程來執行。Driver在向YARN集群管理器申請資源時,YARN集群管理器會盡可能按照你的設置來在集群的各個工作節點上,啟動相應數量的Executor進程。這個參數非常之重要,如果不設置的話,默認只會給你啟動少量的Executor進程,此時你的Spark作業的運行速度是非常慢的。
參數調優建議:該參數設置的太少,無法充分利用集群資源;設置的太多的話,大部分隊列可能無法給予充分的資源。針對數據交換的業務場景,建議該參數設置1-5。
3) 設置executor-memory參數
參數說明:該參數用於設置每個Executor進程的內存。Executor內存的大小,很多時候直接決定了Spark作業的性能,而且跟常見的JVM OOM異常也有直接的關聯。
參數調優建議:針對數據交換的業務場景,建議本參數設置在512M及以下。
4) executor-cores
參數說明:該參數用於設置每個Executor進程的CPU core數量。這個參數決定了每個Executor進程並行執行task線程的能力。因為每個CPU core同一時間只能執行一個task線程,因此每個Executor進程的CPU core數量越多,越能夠快速地執行完分配給自己的所有task線程。
參數調優建議:Executor的CPU core數量設置為2~4個較為合適。建議,如果是跟他人共享一個隊列,那么num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,避免影響其他人的作業運行。
5) driver-memory
參數說明:該參數用於設置Driver進程的內存。
參數調優建議:Driver的內存通常來說不設置,或者設置512M以下就夠了。唯一需要注意的一點是,如果需要使用collect算子將RDD的數據全部拉取到Driver上進行處理,那么必須確保Driver的內存足夠大,否則會出現OOM內存溢出的問題。
6) spark.default.parallelism
參數說明:該參數用於設置每個stage的默認task數量。這個參數極為重要,如果不設置可能會直接影響你的Spark作業性能。
參數調優建議:如果不設置這個參數, Spark自己根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。Spark官網建議的設置原則是,設置該參數為num-executors * executor-cores的2~3倍較為合適,此時可以充分地利用Spark集群的資源。針對數據交換的場景,建議此參數設置為1-10。
7) spark.storage.memoryFraction
參數說明:該參數用於設置RDD持久化數據在Executor內存中能占的比例,默認是0.6。也就是說,默認Executor 60%的內存,可以用來保存持久化的RDD數據。根據你選擇的不同的持久化策略,如果內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。
參數調優建議:如果Spark作業中,有較多的RDD持久化操作,該參數的值可以適當提高一些,保證持久化的數據能夠容納在內存中。避免內存不夠緩存所有的數據,導致數據只能寫入磁盤中,降低了性能。但是如果Spark作業中的shuffle類操作比較多,而持久化操作比較少,那么這個參數的值適當降低一些比較合適。如果發現作業由於頻繁的gc導致運行緩慢(通過spark web ui可以觀察到作業的gc耗時),意味着task執行用戶代碼的內存不夠用,那么同樣建議調低這個參數的值。針對數據交換的場景,建議降低此參數值到0.2-0.4。
8) spark.shuffle.memoryFraction
參數說明:該參數用於設置shuffle過程中一個task拉取到上個stage的task的輸出后,進行聚合操作時能夠使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操作。shuffle操作在進行聚合時,如果發現使用的內存超出了這個20%的限制,那么多余的數據就會溢寫到磁盤文件中去,此時就會極大地降低性能。
參數調優建議:如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的內存占比,提高shuffle操作的內存占比比例,避免shuffle過程中數據過多時內存不夠用,必須溢寫到磁盤上,降低了性能。如果發現作業由於頻繁的gc導致運行緩慢,意味着task執行用戶代碼的內存不夠用,那么同樣建議調低這個參數的值。針對數據交換的場景,建議此值設置為0.1或以下。
資源參數參考示例
以下是一份spark-submit命令的示例,可以參考一下,並根據自己的實際情況進行調節:
./bin/spark-submit \
--master yarn-cluster \
--num-executors 1 \
--executor-memory 512M \
--executor-cores 2 \
--driver-memory 512M \
--conf spark.default.parallelism=2 \
--conf spark.storage.memoryFraction=0.2 \
--conf spark.shuffle.memoryFraction=0.1 \
2、Spark對磁盤的要求
1) 設置獨立的日志分區
說明:開源Spark 的Job任務在運行過程中產生大量的臨時日志,導致某個分區磁盤寫滿,主要原因spark運行產生臨時目錄的默認路徑/tmp/下的spark-*日志會把/分區磁盤寫滿。
優化建議:更改日志路徑到獨立的分區。
修改方法:
可以通過在$SPARK_HOME/conf/spark-env.sh中指定配置內容來更改默認的存儲位置。
SPARK_WORK_DIR 指定work目錄,默認是$SPARK_HOME/work子目錄
SPARK_LOCAL_DIRS 指定executor運行生成的臨時文件目錄,默認是/tmp,由於/tmp目錄有可能是采用了tmpfs,建議在實際部署中將其更改到其它目錄
修改配置spark-env.sh增加:
export SPARK_LOCAL_DIRS=spark.local.dir /diskb/sparktmp,/diskc/sparktmp,/diskd/sparktmp,/diske/sparktmp,/diskf/sparktmp,/diskg/sparktmp
---------------------
2) Spark磁盤臨時文件自動清理
(1) SPARK_LOCAL_DIRS下的產生的文件夾,會在應用程序退出的時候自動清理掉,如果觀察仔細的話,還會發現在spark_local_dirs目錄有有諸如*_cache和*_lock的文件, *_cache文件是為了避免同一台機器中多個executor執行同一application時多次下載第三方依賴的問題而引進的patch。
(2) SPARK_WORK_DIR目錄下的形如app-timestamp-seqid的文件夾默認不會自動清除。可同通過在spark-env.sh中加入如下內容來自動清除:
SPARK_WORKER_OPTS=”-Dspark.worker.cleanup.enabled=true –Dspark.workder.cleanup.interval=1200”
停止掉的程序文件夾就會被刪除。
(3) 可以通過配置spark.worker.cleaner.appDataTtl來設置清理的時間。
(4) SPARK_WORKER_OPTS支持以下屬性:
| 屬性名 |
默認值 |
含義 |
| spark.worker.cleanup.enabled |
false |
是否定期清理 worker 和應用的工作目錄。注意,該設置僅在獨立模式下有效,YARN有自己的清理方式;同時,只會清理已經結束的應用對應的目錄。 |
| spark.worker.cleanup.interval |
1800 (30 minutes) |
worker清理本地應用工作目錄的時間間隔(秒) |
| spark.worker.cleanup.appDataTtl |
7 * 24 * 3600 (7 days) |
清理多久以前的應用的工作目錄。這個選項值將取決於你的磁盤總量。spark應用會將日志和jar包都放在其對應的工作目錄下。隨着時間流逝,應用的工作目錄很快會占滿磁盤,尤其是在你的應用提交比較頻繁的情況下。 |
