SparkSQL執行時參數優化


  • 具體現象

    1. 內存CPU比例失調 一個Spark任務消耗 120(executor)*4G = 480G內存僅僅使用120個 core.幾個SprakSQL任務就將整個系統資源吃光.
    2. 設置超過40個executor,但未指定分區數,導致多數executor空閑.
  • 原因分析

  • SparkSQL配置時Core與內存比例不恰當

  • 沒有指定executor核心數

  • 未進行其他配置參數優化

  • 解決辦法

    1. 在配置SparkSQL任務時指定executor核心數 建議為4
      (同一executor[進程]內內存共享,當數據傾斜時,使用相同核心數與內存量的兩個任務,executor總量少的任務不容易OOM,因為單核心最大可用內存大.但是並非越大越好,因為單個exector最大core受服務器剩余core數量限制,過大的core數量可能導致資源分配不足)  
    2. 設置spark.default.parallelism=600 每個stage的默認task數量
      (計算公式為num-executors * executor-cores 系統默認值分區為40,這是導致executor並行度上不去的罪魁禍首,之所以這樣計算是為了盡量避免計算最慢的task決定整個stage的時間,將其設置為總核心的2-3倍,讓運行快的task可以繼續領取任務計算直至全部任務計算完畢)
    3. 開啟spark.sql.auto.repartition=true 自動重新分區
      (每個stage[階段]運行時分區並不盡相同,使用此配置可優化計算后分區數,避免分區數過大導致單個分區數據量過少,每個task運算分區數據時時間過短,從而導致task頻繁調度消耗過多時間)
    4. 設置spark.sql.shuffle.partitions=400 提高shuffle並行度
      (shuffle read task的並行度)
    5. 設置spark.shuffle.service.enabled=true 提升shuffle效率 --!並未測試
      (Executor 進程除了運行task 也要進行寫shuffle 數據,當Executor進程任務過重時,導致GC不能為其他Executor提供shuffle數據時將會影響效率.此服務開啟時代替Executor來抓取shuffle數據)


以下為SparkSQL調優相關設置
 

以下列表中動態資源分配相關不建議使用

//1.下列Hive參數對Spark同樣起作用。
set hive.exec.dynamic.partition=true; // 是否允許動態生成分區
set hive.exec.dynamic.partition.mode=nonstrict; // 是否容忍指定分區全部動態生成
set hive.exec.max.dynamic.partitions = 100; // 動態生成的最多分區數

//2.運行行為
set spark.sql.autoBroadcastJoinThreshold; // 大表 JOIN 小表,小表做廣播的閾值
set spark.dynamicAllocation.enabled; // 開啟動態資源分配
set spark.dynamicAllocation.maxExecutors; //開啟動態資源分配后,最多可分配的Executor數
set spark.dynamicAllocation.minExecutors; //開啟動態資源分配后,最少可分配的Executor數
set spark.sql.shuffle.partitions; // 需要shuffle是mapper端寫出的partition個數
set spark.sql.adaptive.enabled; // 是否開啟調整partition功能,如果開啟,spark.sql.shuffle.partitions設置的partition可能會被合並到一個reducer里運行
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize; //開啟spark.sql.adaptive.enabled后,兩個partition的和低於該閾值會合並到一個reducer
set spark.sql.adaptive.minNumPostShufflePartitions; // 開啟spark.sql.adaptive.enabled后,最小的分區數
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize; //當幾個stripe的大小大於該值時,會合並到一個task中處理

//3.executor能力
set spark.executor.memory; // executor用於緩存數據、代碼執行的堆內存以及JVM運行時需要的內存
set spark.yarn.executor.memoryOverhead; //Spark運行還需要一些堆外內存,直接向系統申請,如數據傳輸時的netty等。
set spark.sql.windowExec.buffer.spill.threshold; //當用戶的SQL中包含窗口函數時,並不會把一個窗口中的所有數據全部讀進內存,而是維護一個緩存池,當池中的數據條數大於該參數表示的閾值時,spark將數據寫到磁盤
set spark.executor.cores; //單個executor上可以同時運行的task數

  

 

 

當任務數據量過大時,還可以根據具體資源情況,增大以下參數:

--conf spark.sql.shuffle.partitions

--conf spark.executor.instances

--conf spark.dynamicAllocation.maxExecutors

 

另外,對於Spark SQL 來說,提高並行度時設置 spark.default.parallelism 這個參數時無用的,spark.sql.shuffle.partitions才是決定Spark sql執行並行度的關鍵。

原因在於:

spark.default.parallelism這個參數是在處理RDD時才會起作用的,對Spark sql來說是無效的。

針對Spark sql任務,則是專門提供了spark.sql.shuffle.partitions這樣的參數來控制並行度。

 

 

 

轉自:

https://www.cnblogs.com/-zzz/p/10629226.html

https://blog.csdn.net/Jack_Roy/article/details/89639784

https://blog.csdn.net/Jack_Roy  (博主寫的很細致)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM