簡介
Spark中的OOM問題不外乎以下兩種情況
- map執行中內存溢出
- shuffle后內存溢出
map執行中內存溢出代表了所有map類型的操作。包括:flatMap,filter,mapPatitions等。
shuffle后內存溢出的shuffle操作包括join,reduceByKey,repartition等操作。
后面先總結一下我對Spark內存模型的理解,再總結各種OOM的情況相對應的解決辦法和性能優化方面的總結。如果理解有錯,希望在評論中指出。
Spark 內存模型
Spark在一個Executor中的內存分為三塊,一塊是execution內存,一塊是storage內存,一塊是other內存。
- execution內存是執行內存,文檔中說join,aggregate都在這部分內存中執行,shuffle的數據也會先緩存在這個內存中,滿了再寫入磁盤,能夠減少IO。其實map過程也是在這個內存中執行的。
- storage內存是存儲broadcast,cache,persist數據的地方。
- other內存是程序執行時預留給自己的內存。
execution和storage是Spark Executor中內存的大戶,other占用內存相對少很多,這里就不說了。
在spark-1.6.0以前的版本,execution和storage的內存分配是固定的,使用的參數配置(占Executor總內存大小):
- execution:spark.shuffle.memoryFraction (default 0.2)
- storage: spark.storage.memoryFraction(default 0.6)
因為在1.6.0以前這兩塊內存是互相隔離的,這就導致了Executor的內存利用率不高,而且需要根據Application的具體情況,使用者自己來調節這兩個參數才能優化Spark的內存使用。
在spark-1.6.0以后的版本,execution內存和storage內存可以相互借用,提高了內存的Spark中內存的使用率,同時也減少了OOM的情況。
使用堆外內存有兩種方式,一種是在rdd調用persist的時候傳入參數StorageLevel.OFF_HEAP,這種使用方式需要配合Tachyon一起使用。另外一種是使用Spark自帶的spark.memory.offHeap.enabled 配置為true進行使用,但是這種方式在1.6.0的版本還不支持使用,只是多了這個參數,在以后的版本中會開放。 OOM的問題通常出現在execution這塊內存中,因為storage這塊內存在存放數據滿了之后,會直接丟棄內存中舊的數據,對性能有影響但是不會有OOM的問題。
內存溢出解決方法
1. map過程產生大量對象導致內存溢出
這種溢出的原因是在單個map中產生了大量的對象導致的。
例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),這個操作在rdd中,每個對象都產生了10000個對象,這肯定很容易產生內存溢出的問題。針對這種問題,在不增加內存的情況下,可以通過減少每個Task的大小,以便達到每個Task即使產生大量的對象Executor的內存也能夠裝得下。具體做法可以在會產生大量對象的map操作之前調用repartition方法,分區成更小的塊傳入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
面對這種問題注意,不能使用rdd.coalesce方法,這個方法只能減少分區,不能增加分區,不會有shuffle的過程。
2.數據不平衡導致內存溢出
數據不平衡除了有可能導致內存溢出外,也有可能導致性能的問題,解決方法和上面說的類似,就是調用repartition重新分區。這里就不再累贅了。
3.coalesce調用導致內存溢出
這是我最近才遇到的一個問題,因為hdfs中不適合存小問題,所以Spark計算后如果產生的文件太小,我們會調用coalesce合並文件再存入hdfs中。但是這會導致一個問題,例如在coalesce之前有100個文件,這也意味着能夠有100個Task,現在調用coalesce(10),最后只產生10個文件,因為coalesce並不是shuffle操作,這意味着coalesce並不是按照我原本想的那樣先執行100個Task,再將Task的執行結果合並成10個,而是從頭到位只有10個Task在執行,原本100個文件是分開執行的,現在每個Task同時一次讀取10個文件,使用的內存是原來的10倍,這導致了OOM。解決這個問題的方法是令程序按照我們想的先執行100個Task再將結果合並成10個文件,這個問題同樣可以通過repartition解決,調用repartition(10),因為這就有一個shuffle的過程,shuffle前后是兩個Stage,一個100個分區,一個是10個分區,就能按照我們的想法執行。
4.shuffle后內存溢出
shuffle內存溢出的情況可以說都是shuffle后,單個文件過大導致的。在Spark中,join,reduceByKey這一類型的過程,都會有shuffle的過程,在shuffle的使用,需要傳入一個partitioner,大部分Spark中的shuffle操作,默認的partitioner都是HashPatitioner,默認值是父RDD中最大的分區數,這個參數通過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism參數只對HashPartitioner有效,所以如果是別的Partitioner或者自己實現的Partitioner就不能使用spark.default.parallelism這個參數來控制shuffle的並發量了。如果是別的partitioner導致的shuffle內存溢出,就需要從partitioner的代碼增加partitions的數量。
5. standalone模式下資源分配不均勻導致內存溢出
在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 這兩個參數,但是沒有配置–executor-cores這個參數的話,就有可能導致,每個Executor的memory是一樣的,但是cores的數量不同,那么在cores數量多的Executor中,由於能夠同時執行多個Task,就容易導致內存溢出的情況。這種情況的解決方法就是同時配置–executor-cores或者spark.executor.cores參數,確保Executor資源分配均勻。
6.在RDD中,共用對象能夠減少OOM的情況
這個比較特殊,這里說記錄一下,遇到過一種情況,類似這樣rdd.flatMap(x=>for(i <- 1 to 1000) yield (“key”,”value”))導致OOM,但是在同樣的情況下,使用rdd.flatMap(x=>for(i <- 1 to 1000) yield “key”+”value”)就不會有OOM的問題,這是因為每次(“key”,”value”)都產生一個Tuple對象,而”key”+”value”,不管多少個,都只有一個對象,指向常量池。具體測試如下:
這個例子說明(“key”,”value”)和(“key”,”value”)在內存中是存在不同位置的,也就是存了兩份,但是”key”+”value”雖然出現了兩次,但是只存了一份,在同一個地址,這用到了JVM常量池的知識.於是乎,如果RDD中有大量的重復數據,或者Array中需要存大量重復數據的時候我們都可以將重復數據轉化為String,能夠有效的減少內存使用.
優化
這一部分主要記錄一下到spark-1.6.1版本,筆者覺得有優化性能作用的一些參數配置和一些代碼優化技巧,在參數優化部分,如果筆者覺得默認值是最優的了,這里就不再記錄。
1.使用mapPartitions代替大部分map操作,或者連續使用的map操作
這里需要稍微講一下RDD和DataFrame的區別。RDD強調的是不可變對象,每個RDD都是不可變的,當調用RDD的map類型操作的時候,都是產生一個新的對象,這就導致了一個問題,如果對一個RDD調用大量的map類型操作的話,每個map操作會產生一個到多個RDD對象,這雖然不一定會導致內存溢出,但是會產生大量的中間數據,增加了gc操作。另外RDD在調用action操作的時候,會出發Stage的划分,但是在每個Stage內部可優化的部分是不會進行優化的,例如rdd.map(+1).map(+1),這個操作在數值型RDD中是等價於rdd.map(_+2)的,但是RDD內部不會對這個過程進行優化。DataFrame則不同,DataFrame由於有類型信息所以是可變的,並且在可以使用sql的程序中,都有除了解釋器外,都會有一個sql優化器,DataFrame也不例外,有一個優化器Catalyst,具體介紹看后面參考的文章。
上面說到的這些RDD的弊端,有一部分就可以使用mapPartitions進行優化,mapPartitions可以同時替代rdd.map,rdd.filter,rdd.flatMap的作用,所以在長操作中,可以在mapPartitons中將RDD大量的操作寫在一起,避免產生大量的中間rdd對象,另外是mapPartitions在一個partition中可以復用可變類型,這也能夠避免頻繁的創建新對象。使用mapPartitions的弊端就是犧牲了代碼的易讀性。
2.broadcast join和普通join
在大數據分布式系統中,大量數據的移動對性能的影響也是巨大的。基於這個思想,在兩個RDD進行join操作的時候,如果其中一個RDD相對小很多,可以將小的RDD進行collect操作然后設置為broadcast變量,這樣做之后,另一個RDD就可以使用map操作進行join,這樣能夠有效的減少相對大很多的那個RDD的數據移動。
3.先filter在join
這個就是謂詞下推,這個很顯然,filter之后再join,shuffle的數據量會減少,這里提一點是spark-sql的優化器已經對這部分有優化了,不需要用戶顯示的操作,個人實現rdd的計算的時候需要注意這個。
4.partitonBy優化
這一部分在另一篇文章《spark partitioner使用技巧 》有詳細介紹,這里不說了。
5.combineByKey的使用:
這個操作在Map-Reduce中也有,這里舉個例子:rdd.groupByKey().mapValue(_.sum)比rdd.reduceByKey的效率低
上下兩幅圖的區別就是上面那幅有combineByKey的過程減少了shuffle的數據量,下面的沒有。combineByKey是key-value型rdd自帶的API,可以直接使用。
6.內存不足時的優化
在內存不足的使用,使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache():
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等價的,在內存不足的時候rdd.cache()的數據會丟失,再次使用的時候會重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在內存不足的時候會存儲在磁盤,避免重算,只是消耗點IO時間。
7.在spark使用hbase的時候,spark和hbase搭建在同一個集群:
在spark結合hbase的使用中,spark和hbase最好搭建在同一個集群上上,或者spark的集群節點能夠覆蓋hbase的所有節點。hbase中的數據存儲在HFile中,通常單個HFile都會比較大,另外Spark在讀取Hbase的數據的時候,不是按照一個HFile對應一個RDD的分區,而是一個region對應一個RDD分區。所以在Spark讀取Hbase的數據時,通常單個RDD都會比較大,如果不是搭建在同一個集群,數據移動會耗費很多的時間。
參數優化部分
8.spark.driver.memory (default 1g)
這個參數用來設置Driver的內存。在Spark程序中,SparkContext,DAGScheduler都是運行在Driver端的。對應rdd的Stage切分也是在Driver端運行,如果用戶自己寫的程序有過多的步驟,切分出過多的Stage,這部分信息消耗的是Driver的內存,這個時候就需要調大Driver的內存。
9.spark.rdd.compress (default false)
這個參數在內存吃緊的時候,又需要persist數據有良好的性能,就可以設置這個參數為true,這樣在使用persist(StorageLevel.MEMORY_ONLY_SER)的時候,就能夠壓縮內存中的rdd數據。減少內存消耗,就是在使用的時候會占用CPU的解壓時間。
10.spark.serializer (default org.apache.spark.serializer.JavaSerializer )
建議設置為 org.apache.spark.serializer.KryoSerializer,因為KryoSerializer比JavaSerializer快,但是有可能會有些Object會序列化失敗,這個時候就需要顯示的對序列化失敗的類進行KryoSerializer的注冊,這個時候要配置spark.kryo.registrator參數或者使用參照如下代碼:
valconf=newSparkConf().setMaster(…).setAppName(…)
conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2]))
valsc =newSparkContext(conf)
11.spark.memory.storageFraction (default 0.5)
這個參數設置內存表示 Executor內存中 storage/(storage+execution),雖然spark-1.6.0+的版本內存storage和execution的內存已經是可以互相借用的了,但是借用和贖回也是需要消耗性能的,所以如果明知道程序中storage是多是少就可以調節一下這個參數。
12.spark.locality.wait (default 3s)
spark中有4中本地化執行level,PROCESS_LOCAL->NODE_LOCAL->RACK_LOCAL->ANY,一個task執行完,等待spark.locality.wait時間如果,第一次等待PROCESS的Task到達,如果沒有,等待任務的等級下調到NODE再等待spark.locality.wait時間,依次類推,直到ANY。分布式系統是否能夠很好的執行本地文件對性能的影響也是很大的。如果RDD的每個分區數據比較多,每個分區處理時間過長,就應該把 spark.locality.wait 適當調大一點,讓Task能夠有更多的時間等待本地數據。特別是在使用persist或者cache后,這兩個操作過后,在本地機器調用內存中保存的數據效率會很高,但是如果需要跨機器傳輸內存中的數據,效率就會很低。
13.spark.speculation (default false)
一個大的集群中,每個節點的性能會有差異,spark.speculation這個參數表示空閑的資源節點會不會嘗試執行還在運行,並且運行時間過長的Task,避免單個節點運行速度過慢導致整個任務卡在一個節點上。這個參數最好設置為true。與之相配合可以一起設置的參數有spark.speculation.×開頭的參數。參考中有文章詳細說明這個參數。