spark調優篇-oom 優化(匯總)


spark 之所以需要調優,一是代碼執行效率低,二是經常 OOM 

 

內存溢出

內存溢出無非兩點

1. Driver 內存不夠

2. Executor 內存不夠

Driver 內存不夠無非兩點

1. 讀取數據太大

2. 數據回傳

Executor 內存不夠無非兩點

1. map 類操作產生大量數據,包括 map、flatMap、filter、mapPartitions 等

2. shuffle 后產生數據傾斜

 

Executor 內存不夠

有個通用的解決辦法就是增加 Executor 內存

--executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

但這並不一定是最好的辦法

 

map 過程產生大量對象

造成 Executor 內存溢出

解決思路是減少每個 task 的大小,從而減少每個 task 的輸出;

具體做法是在 會產生大量對象的 map 操作前 添加 repartition(重新分區) 方法,分區成更小的塊傳入 map

rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count()      # 100 * 100000000 個對象,內存溢出
rdd.flatMap(lambda x: len(['%d'%x*50 for _ in range(100000000)])).sum()     # 內存溢出

rdd.repartition(1000000).flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count()   # 可執行

 

數據傾斜

參考我的博客 數據傾斜

 

standalone 模式資源分配不均

該模式下配置了 

--total-executor-cores NUM  (Total cores for all executors.)   集群 executor 核數

--executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).  每個 executor 內存

而沒有配置

--executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
or all available cores on the worker in standalone mode)  每個 executor 核數

 

假如各個 executor 核數不一樣,核數多的 executor 執行的 task 就多,內存就容易溢出

解決方法是配置參數 --executor-cores,或者是在 spark 中配置 spark.executor.cores

 

在 RDD 中共用對象

rdd = sc.parallelize(range(100))
def myfunc(x): return x
rdd.flatMap(lambda x: [('k', 'v') for _ in range(200000000)]).foreach(myfunc)     # 每次生成一個 tuple 對象,內存溢出
rdd.flatMap(lambda x: ['k'+'v' for _ in range(2000000)]).count()        # 無需生成新的 string 對象,可執行

tuple 為不可變對象,不過字符串也是可變對象

此條方法有待進一步驗證

 

Driver 中需要讀取大量數據

造成 Driver 內存溢出

解決思路是增加 Driver 內存,具體做法為設置參數

--driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M). 

示例

from pyspark import SparkContext
sc = SparkContext(master='yarn')
rdd = sc.parallelize(range(300000000))
# spark-submit --master yarn-client  --driver-memory 512M  driver_oom.py    內存溢出
# spark-submit --master yarn-client  --driver-memory 3G  driver_oom.py  可以執行

 

collect

大量數據回傳 Driver,造成內存溢出

解決思路是分區輸出,具體做法是 foreach

rdd = sc.parallelize(range(100))
rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).collect()     # 內存溢出

def func(x): print(x)
rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).foreach(func) # 分區輸出

或者增加 Driver 內存

 

代碼優化

mapPartitions

1. 批處理

2. 減少中間輸出

用 mapPartitions 替代多個 map,減少 Executor 內存壓力

from pyspark import SparkContext
sc = SparkContext(master='yarn')
data = range(10)
rdd = sc.parallelize(data, 2)

##### map
rdd.map(lambda x: x % 3).filter(lambda x : x>1 ).countByValue().values()        # [3]

##### mapPartitions
# 避免了中間 RDD 的產生,節約內存,防止 oom
def myfunc(datas):
    # datas type is itertools.chain
    for data in datas:
        value = data % 3
        if value > 1:
            yield value

print rdd.mapPartitions(myfunc).countByValue().values()     # [3]
# spark-submit --master yarn-client mapVSmapPartitions.py   python 只支持 client 模式

 

DataFrame 代替 RDD

任務被划分成多個 stage,在每個 stage 內部,RDD 是無法自動優化的,

rdd.map(lambda x: x+1).map(lambda x: x+1)   ==  rdd.map(lambda x: x+2)

如上面兩個操作是等價的,但是 RDD 並不會自動優化,

而 DataFrame 使用 sql 查詢,自帶 sql 優化器,可自動找到最優方案

 

broadcast join

在分布式計算中,數據跨節點移動是非常影響性能的,網絡傳輸耗時,多次傳輸消耗內存,broadcast 在某些場景可以減少數據移動;

如 一個 小RDD 要和 一個 大RDD 進行 join 操作,常規情況下要互傳 RDD,由於多個 task,故需多次傳輸,    【注意必須是有個小 RDD,否則這種做法意義不大,因為后面要遍歷這個廣播變量】

如果把 小RDD 變成 broadcast 變量,就不用傳輸 大RDD,把 broadcast(小RDD) 緩存在對應 Executor 上即可

對 大RDD 進行 map 操作,在 map 函數中調用 小RDD 的 value,遍歷 小RDD

map(lambda x: i for i in smallRDD.value if x == i)

 

filter 之后再 join

就是所謂的謂詞下推,在 sparkSQL 中會自動這么操作;

如果是自己操作 RDD,可以減少 shuffle 的數據量 

 

cache and persist

緩存 RDD 既可以節省內存,也可以提高性能;

cahce 是緩存到內存,等同於 persist(Storage.MEMORY_ONLY),在內存不足時,這種緩存方式會丟失數據,再次使用時會重新計算;

rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) 在內存不足時會寫到磁盤,避免重復,只是耗費一點 IO 時間

 

combineByKey

在 hadoop 中也有 combine,有 combine 比 沒有combine 效率高;

比如 reduceByKey (combine操作) 就比 groupyByKey (非combine操作) 效率高

import time
from pyspark import SparkContext

sc = SparkContext(master='yarn')

strs = list('abcd')*10000000
rdd = sc.parallelize(strs)

time.clock()
print rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y).collect()     # combinByKey 操作耗時少3.2s
# print rdd.map(lambda x: (x, 1)).groupByKey().mapValues(sum).collect()        # 非 combinByKey 操作耗時3.6s
# 二者結果一樣
print(time.clock())

strs = list('abcd')*10000000
for i in strs:i = (i, 1)    # 6s,單機for循環做更少的事情,耗時更多

圖解如下

 

 

 

 

參考資料:

https://blog.csdn.net/yhb315279058/article/details/51035631


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM