http://spark.apache.org/docs/1.6.1/tuning.html
1) 代碼優化
a. 對於多次使用的RDD,進行數據持久化操作(eg: cache、persist)
b. 如果對同一個份數據進行操作,那么盡量公用一個RDD
c. 優先使用reduceByKey和aggregateByKey取代groupByKey
原因:前兩個API存在combiner,可以降低數據量;groupByKey可能存在OOM異常
d. 對於Executor使用到Driver中的變量的情況,使用廣播變量進行數據傳遞, 可以減少網絡傳輸量,原理是:使用廣播變量后,原來Driver傳遞給Task的數據,變成只需要傳遞給Executor即可。
e. 當大表join小表,而且存在shuffle的時候,可以考慮使用map join來進行替換<使用廣播變量將小表的數據廣播出去,前提:Driver和單個的Executor的內存可以存儲下小表的數據>;
h. 啟動kyro序列化機制
2) 資源優化
a. spark-submit腳本相關參數
driver的內存:--driver-memory
driver的cpu:
standalone(cluster):--driver-cores
yarn(cluster): --driver-cores
executor的數量:
yarn: --num-executors
總的executor的CPU數量:
standalone/mesos:--total-executor-cores
單個executor的內存:--executor-memory
單個executor的cpu:
standalone/yarn:--executor-cores
b. 資源相關參數
spark.driver.cores:1
spark.driver.memory:1g
spark.executor.cores:1(yarn)/all(standalone)
spark.executor.memory:1g
spark.memory.fraction:0.75
spark.memory.storageFraction:0.5
Spark中執行和緩存的內存是公用的,執行可以爭奪緩存的內存,就是可以將部分緩存自動清楚,用於執行過程中使用內存;這兩個參數的含義分別是:spark.memory.fraction指定總內存占比((1g-300M)*0.75),spark.memory.storageFraction指定緩存部分最少占比內存((1g-300M)*0.75*0.5);當沒有執行的情況下,緩存可以使用全部的公用內存,即緩存最多使用((1g-300M)*0.75),最少可占用((1g-300M)*0.75*0.5)
1.5版本以前的采用固定內存設置:spark.storage.memoryFraction(0.6)以及spark.shuffle.memoryFraction(0.2)
spark.default.parallelism: 默認的分區數量,默認兩個,一般比較小;在實際環境中一般需要改大。
spark.scheduler.mode:FIFO(默認,先進先出)/FAIR(公平調度)
spark.task.cpus:每個Task執行需要的CPU數量(默認值1)
spark.task.maxFailures:每個Task允許的最大失敗次數(默認值4)
spark.dynamicAllocation.enabled: false; 是否啟動動態分配資源,默認為不啟動
spark.shuffle.service.enabled:false,當啟動動態資源分配的時候,該參數必須設置為true,表示允許額外的shuffle服務管理
spark.dynamicAllocation.initialExecutors:動態資源初始executor數量
spark.dynamicAllocation.maxExecutors:動態資源設置最大允許分配資源
spark.dynamicAllocation.minExecutors:動態資源設置最小允許分配資源,默認(0)
Spark on Yarn:
spark.yarn.am.memory:512m; 運行在Yarn上的時候ApplicationMaster運行的內存大小(client模式下)
spark.yarn.am.cores:1; ApplicationMaster運行的CPU核數(client模式下)
spark.executor.instances: Executor的數量,默認2個;該參數和動態參數參數互斥,當兩者都存在的時候,動態參數設置無效。
3) 數據傾斜優化
a. 兩階段聚合
b. 使用MAP JOIN替代REDUCE JOIN
c. 數據重分區(更改分區數量)
e. 擴容RDD及隨機前綴JOIN方式
4) shuffle過程優化(兩種ShuffleManager,四種模式一定要懂)
a. spark.shuffle.file.buffer:32k; 數據溢出磁盤的緩沖區內存大小
b. spark.shuffle.manager: sort; 給定數據
shuffle的管理器,sort(基於排序規則)或者hash(基於Hash值)
c. spark.shuffle.sort.bypassMergeThreshold: 200; 當分區數量小於該值的時候,啟動SortShuffleManager中的bypass模式
d. spark.shuffle.consolidateFiles: false; 當該參數為true的時候,使用hash shuffle的時候,可以提高shuffle速度,原理是:合並shuffle過程中的數據輸出文件