sparksql參數


全局參數:

1. --master yarn-cluster (or yarn-client)

參數說明:
制定yarn的執行模式,分集群模式和客戶端模式,一般使用集群模式

2. --num-executors 50

參數說明:
  該參數用於設置Spark作業總共要用多少個Executor進程來執行。Driver在向YARN集群管理器申請資源時,YARN集群管理器會盡可能按照你的設置來在集群的各個工作節點上,啟動相應數量的Executor進程。這個參數非常之重要,如果不設置的話,默認只會給你啟動少量的Executor進程,此時你的Spark作業的運行速度是非常慢的。 參數調優建議:
  每個Spark作業的運行一般設置20
~50個左右的Executor進程比較合適,設置太少或太多的Executor進程都不好。設置的太少,無法充分利用集群資源;設置的太多的話,大部分隊列可能無法給予充分的資源。

3.--executor-memory 6G

參數說明:
  該參數用於設置每個Executor進程的內存。Executor內存的大小,很多時候直接決定了Spark作業的性能,而且跟常見的JVM OOM異常,也有直接的關聯。
參數調優建議:
  每個Executor進程的內存設置4G~8G較為合適,最大不超過 20G,否則會導致 GC 代價過高,或資源浪費嚴重。但是這只是一個參考值,具體的設置還是得根據不同部門的資源隊列來定。可以看看自己團隊的資源隊列的最大內存限制是多少,num-executors乘以executor-memory,是不能超過隊列的最大內存量的。此外,如果你是跟團隊里其他人共享這個資源隊列,那么申請的內存量最好不要超過資源隊列最大總內存的1/3~1/2,避免你自己的Spark作業占用了隊列所有的資源,導致別的同學的作業無法運行

4.--conf spark.executor.cores=4

參數說明:
    該參數用於設置每個Executor進程的CPU core數量。這個參數決定了每個Executor進程並行執行task線程的能力。因為每個CPU core同一時間只能執行一個task線程,因此每個Executor進程的CPU core數量越多,越能夠快速地執行完分配給自己的所有task線程。
參數調優建議:
    Executor的CPU executor_cores 不宜為1!否則 work 進程中線程數過少,一般 2~4 為宜。。同樣得根據不同部門的資源隊列來定,可以看看自己的資源隊列的最大CPU core限制是多少,再依據設置的Executor數量,來決定每個Executor進程可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個隊列,那么num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學的作業運行。

5.--conf spark.yarn.executor.memoryOverhead=2048

Ececutor堆外內存 
當Spark處理超大數據量時(數十億,百億級別),executor的堆外內存可能會不夠用,出現shuffle file can’t find, task lost,OOM等情況 
默認情況下,這個堆外內存是300M,當運行超大數據量時,通常會出現問題,因此需要調節到1G,2G,4G等大小 
調節方法必須在spark-submit提交腳本中設置而不能在程序中設置

6.--driver-memory 2G

參數說明:
    該參數用於設置Driver進程的內存。
參數調優建議:
    Driver的內存通常來說不設置,或者設置2G左右應該就夠了。唯一需要注意的一點是,如果需要使用collect算子將RDD的數據全部拉取到Driver上進行處理,那么必須確保Driver的內存足夠大,否則會出現OOM內存溢出、GC FULL等問題。

7.--conf spark.default.parallelism=150

參數說明:
  spark_parallelism一般為executor_cores*num_executors 的 1~4 倍,系統默認值 64,不設置的話會導致 task 很多的時候被分批串行執行,或大量 cores 空閑,資源浪費嚴重

8.動態executor    --避免使用

--conf spark.dynamicAllocation.enable=true //打開動態executor模式
--conf spark.shuffle.service.enabled=true //動態executor需要的服務,需要和上面的spark.dynamicAllocation.enable同時打開或關閉

9.--conf spark.storage.memoryFraction=0.2

參數說明:
  該參數用於設置RDD持久化數據在Executor內存中能占的比例,默認是0.6。也就是說,默認Executor 60%的內存,可以用來保存持久化的RDD數據。

10.exector、storage內存分配

 
         

當Spark一個JOB被提交,就會開辟內存空間來存儲和計算
Spark中執行計算和數據存儲都是共享同一個內存區域(M),spark.memory.fraction 表示M的大小,其值為相對於JVM堆內存的比例(默認0.6)。剩余的40%是為其他用戶數據結構、Spark內部元數據以及避免OOM錯誤的安全預留空間(大量稀疏數據和異常大的數據記錄)。

 
         

spark.memory.storageFraction 表示數據存儲比例(R)的大小,其值為相對於M的一個比例(默認0.5)。R是M中專門用於緩存數據塊,且這部分數據塊永遠不會因執行計算任務而逐出內存。
所以當發生FULL GC之后,有兩種辦法:
第一就是增大M區域,也就是增加--executor-memory 10G
這樣相當於增大了new generation區和old generation區,能放得下大數據塊

 
         

第二就是減小R區域,也就是減小-- spark.memory.storageFraction
這樣相當於增大了內存中用於計算的區域,從而避免FULL GC的問題

 
         

spark.memory.fraction這個參數建議保持默認值,非特殊情況不要修改。

 

 
        

shuffle參數:

 1.--conf spark.shuffle.memoryFraction=0.5

參數說明:
    該參數用於設置shuffle過程中一個task拉取到上個stage的task的輸出后,進行聚合操作時能夠使用的Executor內存的比例,默認是0.2

    也就是說,Executor默認只有20%的內存用來進行該操作。shuffle操作在進行聚合時,如果發現使用的內存超出了這個20%的限制,那么多余的數據就會溢寫到磁盤文件中去,此時就會極大地降低性能。
參數調優建議:

  如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的內存占比,提高shuffle操作的內存占比比例,避免shuffle過程中數據過多時內存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發現作業由於頻繁的gc導致運行緩慢,意味着task執行用戶代碼的內存不夠用,那么同樣建議調低這個參數的值。

按實際情況設置這個參數,但是不推薦超過0.6 

2.--conf spark.sql.shuffle.partitions=20

默認值:300
spark中有partition的概念,每個partition都會對應一個task,task越多,在處理大規模數據的時候,就會越有效率。不過task並不是越多越好,如果發現數據量沒有那么大,則沒有必要task數量太多。
其實這個參數相當於Hive參數mapred.reduce.tasks,那種大促期間數據量翻好幾倍的任務不推薦寫死這個參數,否則會造成單個task處理的數據量激增導致任務失敗或者阻塞。

3.--conf spark.shuffle.compress=true   //shuffle過程是否壓縮

4.--conf spark.shuffle.file.buffer=512

默認值:32k
參數說明:
  該參數用於設置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數據寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。 調優建議:
  如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如64k),從而減少shuffle write過程中溢寫磁盤文件的次數,也就可以減少磁盤IO次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1
%~5%的提升。

5.--conf spark.reducer.maxSizeInFlight=256m

默認值:48m
參數說明:
  該參數用於設置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數據。 調優建議:
  如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如96m),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1
%~5%的提升。

6.--conf spark.shuffle.io.maxRetries=20

默認值:3
參數說明:
  shuffle read task從shuffle write task所在節點拉取屬於自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗。 調優建議:
  對於那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由於JVM的full gc或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億
~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。

7.--spark.shuffle.io.retryWait=5s

默認值:5s
參數說明:
  具體解釋同上,該參數代表了每次重試拉取數據的等待間隔,默認是5s。調優建議:建議加大間隔時長(比如60s),以增加shuffle操作的穩定性。

動態分配

 1.reducer的可伸縮化

--spark.sql.adaptive.enabled=true
--spark.sql.adaptive.shuffle.targetPostShuffleInputSize=102400000

2.JOIN過程動態廣播

--spark.sql.autoBroadcastJoinThreshold=10485760  //(10 MB)

類似Hive中的mapjoin,在join的過程中把小於10M的小表廣播到所有節點,從而進行Hashjoin,提升join的效率。

目前動態分配在處理幾十億以上的數據量時還是有很多未知bug缺陷,使用需謹慎

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM