spark 之所以需要調優,一是代碼執行效率低,二是經常 OOM
內存溢出
內存溢出無非兩點:
1. Driver 內存不夠
2. Executor 內存不夠
Driver 內存不夠無非兩點:
1. 讀取數據太大
2. 數據回傳
Executor 內存不夠無非兩點:
1. map 類操作產生大量數據,包括 map、flatMap、filter、mapPartitions 等
2. shuffle 后產生數據傾斜
Executor 內存不夠
有個通用的解決辦法就是增加 Executor 內存
--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
但這並不一定是最好的辦法
map 過程產生大量對象
造成 Executor 內存溢出
解決思路是減少每個 task 的大小,從而減少每個 task 的輸出;
具體做法是在 會產生大量對象的 map 操作前 添加 repartition(重新分區) 方法,分區成更小的塊傳入 map
rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count() # 100 * 100000000 個對象,內存溢出 rdd.flatMap(lambda x: len(['%d'%x*50 for _ in range(100000000)])).sum() # 內存溢出 rdd.repartition(1000000).flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count() # 可執行
數據傾斜
參考我的博客 數據傾斜
standalone 模式資源分配不均
該模式下配置了
--total-executor-cores NUM (Total cores for all executors.) 集群 executor 核數
--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). 每個 executor 內存
而沒有配置
--executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
or all available cores on the worker in standalone mode) 每個 executor 核數
假如各個 executor 核數不一樣,核數多的 executor 執行的 task 就多,內存就容易溢出
解決方法是配置參數 --executor-cores,或者是在 spark 中配置 spark.executor.cores
在 RDD 中共用對象
rdd = sc.parallelize(range(100)) def myfunc(x): return x rdd.flatMap(lambda x: [('k', 'v') for _ in range(200000000)]).foreach(myfunc) # 每次生成一個 tuple 對象,內存溢出 rdd.flatMap(lambda x: ['k'+'v' for _ in range(2000000)]).count() # 無需生成新的 string 對象,可執行
tuple 為不可變對象,不過字符串也是可變對象
此條方法有待進一步驗證
Driver 中需要讀取大量數據
造成 Driver 內存溢出
解決思路是增加 Driver 內存,具體做法為設置參數
--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
示例
from pyspark import SparkContext sc = SparkContext(master='yarn') rdd = sc.parallelize(range(300000000)) # spark-submit --master yarn-client --driver-memory 512M driver_oom.py 內存溢出 # spark-submit --master yarn-client --driver-memory 3G driver_oom.py 可以執行
collect
大量數據回傳 Driver,造成內存溢出
解決思路是分區輸出,具體做法是 foreach
rdd = sc.parallelize(range(100)) rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).collect() # 內存溢出 def func(x): print(x) rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).foreach(func) # 分區輸出
或者增加 Driver 內存
代碼優化
mapPartitions
1. 批處理
2. 減少中間輸出
用 mapPartitions 替代多個 map,減少 Executor 內存壓力
from pyspark import SparkContext sc = SparkContext(master='yarn') data = range(10) rdd = sc.parallelize(data, 2) ##### map rdd.map(lambda x: x % 3).filter(lambda x : x>1 ).countByValue().values() # [3] ##### mapPartitions # 避免了中間 RDD 的產生,節約內存,防止 oom def myfunc(datas): # datas type is itertools.chain for data in datas: value = data % 3 if value > 1: yield value print rdd.mapPartitions(myfunc).countByValue().values() # [3] # spark-submit --master yarn-client mapVSmapPartitions.py python 只支持 client 模式
DataFrame 代替 RDD
任務被划分成多個 stage,在每個 stage 內部,RDD 是無法自動優化的,
rdd.map(lambda x: x+1).map(lambda x: x+1) == rdd.map(lambda x: x+2)
如上面兩個操作是等價的,但是 RDD 並不會自動優化,
而 DataFrame 使用 sql 查詢,自帶 sql 優化器,可自動找到最優方案
broadcast join
在分布式計算中,數據跨節點移動是非常影響性能的,網絡傳輸耗時,多次傳輸消耗內存,broadcast 在某些場景可以減少數據移動;
如 一個 小RDD 要和 一個 大RDD 進行 join 操作,常規情況下要互傳 RDD,由於多個 task,故需多次傳輸, 【注意必須是有個小 RDD,否則這種做法意義不大,因為后面要遍歷這個廣播變量】
如果把 小RDD 變成 broadcast 變量,就不用傳輸 大RDD,把 broadcast(小RDD) 緩存在對應 Executor 上即可

對 大RDD 進行 map 操作,在 map 函數中調用 小RDD 的 value,遍歷 小RDD
map(lambda x: i for i in smallRDD.value if x == i)
filter 之后再 join
就是所謂的謂詞下推,在 sparkSQL 中會自動這么操作;
如果是自己操作 RDD,可以減少 shuffle 的數據量
cache and persist
緩存 RDD 既可以節省內存,也可以提高性能;
cahce 是緩存到內存,等同於 persist(Storage.MEMORY_ONLY),在內存不足時,這種緩存方式會丟失數據,再次使用時會重新計算;
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) 在內存不足時會寫到磁盤,避免重復,只是耗費一點 IO 時間
combineByKey
在 hadoop 中也有 combine,有 combine 比 沒有combine 效率高;
比如 reduceByKey (combine操作) 就比 groupyByKey (非combine操作) 效率高
import time from pyspark import SparkContext sc = SparkContext(master='yarn') strs = list('abcd')*10000000 rdd = sc.parallelize(strs) time.clock() print rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y).collect() # combinByKey 操作耗時少3.2s # print rdd.map(lambda x: (x, 1)).groupByKey().mapValues(sum).collect() # 非 combinByKey 操作耗時3.6s # 二者結果一樣 print(time.clock()) strs = list('abcd')*10000000 for i in strs:i = (i, 1) # 6s,單機for循環做更少的事情,耗時更多
圖解如下


參考資料:
https://blog.csdn.net/yhb315279058/article/details/51035631
