- --num-executors
設置任務executor個數,默認值為4,一般調整此參數需要同時調整並行度(參考4)。任務設置executor個數的依據是業務期望任務運行時間,可以先設置一個較小值,通過調整此參數及並行度直到任務運行時間達到期望。
- --executor-cores
設置單個executor的core數,默認為1,建議不要超過2。任務申請的總core數為executor個數*單個executor的core數,如:申請4個executor,每個executor申請2個core,則此任務申請的總core數為8,故此任務會向YARN申請8個vcore。另外,建議單個executor申請的core數與內存的比例要保持在1:4,如:單個executor申請2個core,那么內存相應需要申請到8g。
-
--executor-memory
設置executor內存,默認值為4g。每個executor申請的core數與內存比例要保持在1:4,如:單個executor申請2個core,那么內存相應需要申請8g。另外,單個executor內存溢出,請不要簡單地調大executor內存,這樣會導致任務平均內存利用率較低,如果是數據傾斜導致的內存溢出,請解決數據傾斜;如果是任務並行度設置較小導致的內存溢出,可以通過調大並行度解決(參考4)。
-
spark.sql.shuffle.partitions
調整stage的並行度,也就是每個stage的task個數,默認值為40。此參數一般設置為任務申請的總core數的2-4倍,如:申請100個executor,每個executor申請2個core,那么總core數為200,此參數設置的合理范圍是400-800。注意,此參數不能調整某些讀外部數據stage的並行度,如:讀hdfs的stage,絕大多數情況它的並行度取決於需要讀取的文件數。
-
spark.shuffle.service.enabled、spark.dynamicAllocation.enabled
以上兩個參數同時設置為true可開啟動態資源分配,開啟后可防止資源浪費情況。Spark-2.1.0默認關閉動態資源分配,Spark-2.3.3默認打開。動態資源分配開啟后,當executor閑置超過60s將被回收,但executor不會低於spark.dynamicAllocation.minExecutors配置個數。當任務資源不足時,任務會自動向YARN申請資源,但executor不會超過spark.dynamicAllocation.maxExecutors配置個數。
-
spark.dynamicAllocation.minExecutors
此參數可以調整動態資源分配任務executor的下限,默認為0。
-
spark.dynamicAllocation.maxExecutors
此參數可以調整動態資源分配任務executor的上限,默認為--num-executors個數。
-
spark.sql.autoBroadcastJoinThreshold
在Spark關聯中,小表數據量小於等於此參數,可將關聯轉化為BroadcastHashJoin,類似Hive中的mapjoin,此參數默認值是10485760(10M),設置為-1可以禁用BroadcastHashJoin。
-
spark.sql.broadcastTimeout
用來控制broadcast超時時間,默認值為5min。當廣播小表時,如果廣播時間超過此參數設置值,會導致任務失敗。
-
spark.sql.join.preferSortMergeJoin
用來控制是否優先使用SortMergeJoin,默認指為true。SortMergeJoin是比較慢的Join實現方式,在shuffle過后,所有relation數據都要sort,然后再關聯。此參數設置為false時,spark將優先使用ShuffleHashJoin,ShuffleHashJoin是在shuffle過后,將數據量較小的relation構建為hash表,提高關聯速度。
-
spark.sql.adaptive.enabled(spark-2.3.3)
用來控制是否開啟adaptive execution,默認為false。一直以來,Spark只能設置固定的並行度(參考4),在大促期間,數據量激增,每個task處理的數量增加,很容易出現oom的情況。在開啟此項參數后,Spark將會按照spark.sql.ataptive.shuffle.targetPostShuffleInputSize設置的每個task的目標處理數據量自動調整stage並行度,減少task出現oom的情況。
-
spark.sql.ataptive.shuffle.targetPostShuffleInputSize(spark-2.3.3)
在開啟adaptive execution時,用來控制每個task處理的目標數據量,Spark將會根據此參數值動態調整task個數,默認值為64M。
-
spark.sql.ataptive.skewedJoin.enabled(spark-2.3.3)
在開啟adaptive execution時,用來控制是否開啟自動處理join時的數據傾斜,默認為false。
-
spark.sql.ataptive.skewedPartitionMaxSplits(spark-2.3.3)
在開啟adaptive execution時,控制處理一個傾斜 Partition 的 Task 個數上限,默認值為 5。
-
spark.sql.ataptive.skewedPartitionRowCountThreshold(spark-2.3.3)
在開啟adaptive execution時,設置一個 Partition 被視為傾斜 Partition 的行數下限,也即行數低於該值的 Partition 不會被當作傾斜 Partition 處理。其默認值為 10L * 1000 * 1000 即一千萬。
-
spark.sql.ataptive.skewedPartitionSizeThreshold(spark-2.3.3)
在開啟adaptive execution時,設置一個 Partition 被視為傾斜 Partition 的大小下限,也即大小小於該值的 Partition 不會被視作傾斜 Partition。其默認值為 64 * 1024 * 1024 也即 64MB。
-
spark.sql.ataptive.skewedPartitionFactor(spark-2.3.3)
在開啟adaptive execution時,設置傾斜因子。如果一個 Partition 的大小大於
spark.sql.adaptive.skewedPartitionSizeThreshold
的同時大於各 Partition 大小中位數與該因子的乘積,或者行數大於spark.sql.adaptive.skewedPartitionRowCountThreshold
的同時大於各 Partition 行數中位數與該因子的乘積,則它會被視為傾斜的 Partition。默認為10。