一.JVM調優之原理概述以及降低cache操作的內存占比
1、常規性能調優:分配資源、並行度。。。等
2、JVM調優(Java虛擬機):JVM相關的參數,通常情況下,如果你的硬件配置、基礎的JVM的配置,都ok的話,JVM通常不會造成太嚴重的性能問題;反而更多的是,在troubleshooting中,JVM占了很重要的地位;JVM造成線上的spark作業的運行報錯,甚至失敗(比如OOM)。
3、shuffle調優(相當重要):spark在執行groupByKey、reduceByKey等操作時的,shuffle環節的調優。這個很重要。shuffle調優,其實對spark作業的性能的影響,是相當之高!!!經驗:在spark作業的運行過程中,只要一牽扯到有shuffle的操作,基本上shuffle操作的性能消耗,要占到整個spark作業的50%~90%。10%用來運行map等操作,90%耗費在兩個shuffle操作。groupByKey、countByKey。
4、spark操作調優(spark算子調優,比較重要):groupByKey,countByKey或aggregateByKey來重構實現。有些算子的性能,是比其他一些算子的性能要高的。foreachPartition替代foreach。如果一旦遇到合適的情況,效果還是不錯的。
理論基礎:
spark是用scala開發的。大家不要以為scala就跟java一點關系都沒有了,這是一個很常見的錯誤。
spark的scala代碼調用了很多java api。scala也是運行在java虛擬機中的。spark是運行在java虛擬機中的。
java虛擬機可能會產生什么樣的問題:內存不足??!!
我們的RDD的緩存、task運行定義的算子函數,可能會創建很多對象。都可能會占用大量內存,沒搞好的話,可能導致JVM出問題。
每一次放對象的時候,都是放入eden區域,和其中一個survivor區域;另外一個survivor區域是空閑的。
當eden區域和一個survivor區域放滿了以后(spark運行過程中,產生的對象實在太多了),就會觸發minor gc,小型垃圾回收。把不再使用的對象,從內存中清空,給后面新創建的對象騰出來點兒地方。
清理掉了不再使用的對象之后,那么也會將存活下來的對象(還要繼續使用的),放入之前空閑的那一個survivor區域中。這里可能會出現一個問題。默認eden、survior1和survivor2的內存占比是8:1:1。問題是,如果存活下來的對象是1.5,一個survivor區域放不下。此時就可能通過JVM的擔保機制(不同JVM版本可能對應的行為),將多余的對象,直接放入老年代了。
如果你的JVM內存不夠大的話,可能導致頻繁的年輕代內存滿溢,頻繁的進行minor gc。頻繁的minor gc會導致短時間內,有些存活的對象,多次垃圾回收都沒有回收掉。會導致這種短聲明周期(其實不一定是要長期使用的)對象,年齡過大,垃圾回收次數太多還沒有回收到,跑到老年代。
老年代中,可能會因為內存不足,囤積一大堆,短生命周期的,本來應該在年輕代中的,可能馬上就要被回收掉的對象。此時,可能導致老年代頻繁滿溢。頻繁進行full gc(全局/全面垃圾回收)。full gc就會去回收老年代中的對象。full gc由於這個算法的設計,是針對的是,老年代中的對象數量很少,滿溢進行full gc的頻率應該很少,因此采取了不太復雜,但是耗費性能和時間的垃圾回收算法。full gc很慢。
full gc / minor gc,無論是快,還是慢,都會導致jvm的工作線程停止工作,stop the world。簡而言之,就是說,gc的時候,spark停止工作了。等着垃圾回收結束。
內存不充足的時候,問題:
1、頻繁minor gc,也會導致頻繁spark停止工作
2、老年代囤積大量活躍對象(短生命周期的對象),導致頻繁full gc,full gc時間很長,短則數十秒,長則數分鍾,甚至數小時。可能導致spark長時間停止工作。
3、嚴重影響咱們的spark的性能和運行的速度。
JVM調優的第一個點:降低cache操作的內存占比
spark中,堆內存又被划分成了兩塊兒,一塊兒是專門用來給RDD的cache、persist操作進行RDD數據緩存用的;另外一塊兒,就是我們剛才所說的,用來給spark算子函數的運行使用的,存放函數中自己創建的對象。
默認情況下,給RDD cache操作的內存占比,是0.6,60%的內存都給了cache操作了。但是問題是,如果某些情況下,cache不是那么的緊張,問題在於task算子函數中創建的對象過多,然后內存又不太大,導致了頻繁的minor gc,甚至頻繁full gc,導致spark頻繁的停止工作。性能影響會很大。
針對上述這種情況,大家可以在之前我們講過的那個spark ui。yarn去運行的話,那么就通過yarn的界面,去查看你的spark作業的運行統計,很簡單,大家一層一層點擊進去就好。可以看到每個stage的運行情況,包括每個task的運行時間、gc時間等等。如果發現gc太頻繁,時間太長。此時就可以適當調價這個比例。
降低cache操作的內存占比,大不了用persist操作,選擇將一部分緩存的RDD數據寫入磁盤,或者序列化方式,配合Kryo序列化類,減少RDD緩存的內存占用;降低cache操作內存占比;對應的,算子函數的內存占比就提升了。這個時候,可能,就可以減少minor gc的頻率,同時減少full gc的頻率。對性能的提升是有一定的幫助的。
一句話,讓task執行算子函數時,有更多的內存可以使用。
spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2
二.JVM調優之調節executor堆外內存與連接等待時長
executor堆外內存
有時候,如果你的spark作業處理的數據量特別特別大,幾億數據量;然后spark作業一運行,時不時的報錯,shuffle file cannot find,executor、task lost,out of memory(內存溢出);
可能是說executor的堆外內存不太夠用,導致executor在運行的過程中,可能會內存溢出;然后可能導致后續的stage的task在運行的時候,可能要從一些executor中去拉取shuffle map output文件,但是executor可能已經掛掉了,關聯的block manager也沒有了;所以可能會報shuffle output file not found;resubmitting task;executor lost;spark作業徹底崩潰。
上述情況下,就可以去考慮調節一下executor的堆外內存。也許就可以避免報錯;此外,有時,堆外內存調節的比較大的時候,對於性能來說,也會帶來一定的提升。
--conf spark.yarn.executor.memoryOverhead=2048
spark-submit腳本里面,去用--conf的方式,去添加配置;一定要注意!!!切記,不是在你的spark作業代碼中,用new SparkConf().set()這種方式去設置,不要這樣去設置,是沒有用的!一定要在spark-submit腳本中去設置。
spark.yarn.executor.memoryOverhead(看名字,顧名思義,針對的是基於yarn的提交模式)
默認情況下,這個堆外內存上限大概是300多M;后來我們通常項目中,真正處理大數據的時候,這里都會出現問題,導致spark作業反復崩潰,無法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G
通常這個參數調節上去以后,就會避免掉某些JVM OOM的異常問題,同時呢,會讓整體spark作業的性能,得到較大的提升。
三.Shuffle調優之原理概述
什么樣的情況下,會發生shuffle?
在spark中,主要是以下幾個算子:groupByKey、reduceByKey、countByKey、join,等等。
什么是shuffle?
groupByKey,要把分布在集群各個節點上的數據中的同一個key,對應的values,都給集中到一塊兒,集中到集群中同一個節點上,更嚴密一點說,就是集中到一個節點的一個executor的一個task中。
然后呢,集中一個key對應的values之后,才能交給我們來進行處理,<key, Iterable<value>>;reduceByKey,算子函數去對values集合進行reduce操作,最后變成一個value;countByKey,需要在一個task中,獲取到一個key對應的所有的value,然后進行計數,統計總共有多少個value;join,RDD<key, value>,RDD<key, value>,只要是兩個RDD中,key相同對應的2個value,都能到一個節點的executor的task中,給我們進行處理。
reduceByKey(_+_)
問題在於,同一個單詞,比如說(hello, 1),可能散落在不同的節點上;對每個單詞進行累加計數,就必須讓所有單詞都跑到同一個節點的一個task中,給一個task來進行處理。
每一個shuffle的前半部分stage的task,每個task都會創建下一個stage的task數量相同的文件,比如下一個stage會有100個task,那么當前stage每個task都會創建100份文件;會將同一個key對應的values,一定是寫入同一個文件中的;不同節點上的task,也一定會將同一個key對應的values,寫入下一個stage,同一個task對應的文件中。
shuffle的后半部分stage的task,每個task都會從各個節點上的task寫的屬於自己的那一份文件中,拉取key, value對;然后task會有一個內存緩沖區,然后會用HashMap,進行key, values的匯聚;(key ,values);
task會用我們自己定義的聚合函數,比如reduceByKey(_+_),把所有values進行一對一的累加;聚合出來最終的值。就完成了shuffle。
shuffle,一定是分為兩個stage來完成的。因為這其實是個逆向的過程,不是stage決定shuffle,是shuffle決定stage。
reduceByKey(_+_),在某個action觸發job的時候,DAGScheduler,會負責划分job為多個stage。划分的依據,就是,如果發現有會觸發shuffle操作的算子,比如reduceByKey,就將這個操作的前半部分,以及之前所有的RDD和transformation操作,划分為一個stage;shuffle操作的后半部分,以及后面的,直到action為止的RDD和transformation操作,划分為另外一個stage。
shuffle前半部分的task在寫入數據到磁盤文件之前,都會先寫入一個一個的內存緩沖,內存緩沖滿溢之后,再spill溢寫到磁盤文件中。
四.Shuffle調優之合並map端輸出文件
問題來了:默認的這種shuffle行為,對性能有什么樣的惡劣影響呢?
實際生產環境的條件:
100個節點(每個節點一個executor):100個executor
每個executor:2個cpu core
總共1000個task:每個executor平均10個task
每個節點,10個task,每個節點會輸出多少份map端文件?10 * 1000=1萬個文件
總共有多少份map端輸出文件?100 * 10000 = 100萬。
第一個stage,每個task,都會給第二個stage的每個task創建一份map端的輸出文件
第二個stage,每個task,會到各個節點上面去,拉取第一個stage每個task輸出的,屬於自己的那一份文件。
shuffle中的寫磁盤的操作,基本上就是shuffle中性能消耗最為嚴重的部分。
通過上面的分析,一個普通的生產環境的spark job的一個shuffle環節,會寫入磁盤100萬個文件。
磁盤IO對性能和spark作業執行速度的影響,是極其驚人和嚇人的。
基本上,spark作業的性能,都消耗在shuffle中了,雖然不只是shuffle的map端輸出文件這一個部分,但是這里也是非常大的一個性能消耗點。
開啟了map端輸出文件的合並機制之后:
第一個stage,同時就運行cpu core個task,比如cpu core是2個,並行運行2個task;每個task都創建下一個stage的task數量個文件;
第一個stage,並行運行的2個task執行完以后;就會執行另外兩個task;另外2個task不會再重新創建輸出文件;而是復用之前的task創建的map端輸出文件,將數據寫入上一批task的輸出文件中。
第二個stage,task在拉取數據的時候,就不會去拉取上一個stage每一個task為自己創建的那份輸出文件了;而是拉取少量的輸出文件,每個輸出文件中,可能包含了多個task給自己的map端輸出。
提醒一下(map端輸出文件合並):
只有並行執行的task會去創建新的輸出文件;下一批並行執行的task,就會去復用之前已有的輸出文件;但是有一個例外,比如2個task並行在執行,但是此時又啟動要執行2個task;那么這個時候的話,就無法去復用剛才的2個task創建的輸出文件了;而是還是只能去創建新的輸出文件。
要實現輸出文件的合並的效果,必須是一批task先執行,然后下一批task再執行,才能復用之前的輸出文件;負責多批task同時起來執行,還是做不到復用的。
開啟了map端輸出文件合並機制之后,生產環境上的例子,會有什么樣的變化?
實際生產環境的條件:
100個節點(每個節點一個executor):100個executor
每個executor:2個cpu core
總共1000個task:每個executor平均10個task
每個節點,2個cpu core,有多少份輸出文件呢?2 * 1000 = 2000個
總共100個節點,總共創建多少份輸出文件呢?100 * 2000 = 20萬個文件
相比較開啟合並機制之前的情況,100萬個
map端輸出文件,在生產環境中,立減5倍!
合並map端輸出文件,對咱們的spark的性能有哪些方面的影響呢?
1、map task寫入磁盤文件的IO,減少:100萬文件 -> 20萬文件
2、第二個stage,原本要拉取第一個stage的task數量份文件,1000個task,第二個stage的每個task,都要拉取1000份文件,走網絡傳輸;合並以后,100個節點,每個節點2個cpu core,第二個stage的每個task,主要拉取100 * 2 = 200個文件即可;網絡傳輸的性能消耗是不是也大大減少
分享一下,實際在生產環境中,使用了spark.shuffle.consolidateFiles機制以后,實際的性能調優的效果:對於上述的這種生產環境的配置,性能的提升,還是相當的客觀的。spark作業,5個小時 -> 2~3個小時。
大家不要小看這個map端輸出文件合並機制。實際上,在數據量比較大,你自己本身做了前面的性能調優,executor上去->cpu core上去->並行度(task數量)上去,shuffle沒調優,shuffle就很糟糕了;大量的map端輸出文件的產生。對性能有比較惡劣的影響。
這個時候,去開啟這個機制,可以很有效的提升性能。
五.Shuffle調優之調節map端內存緩沖與reduce端內存占比
spark.shuffle.file.buffer,默認32k
spark.shuffle.memoryFraction,0.2
map端內存緩沖,reduce端內存占比;很多資料、網上視頻,都會說,這兩個參數,是調節shuffle性能的不二選擇,很有效果的樣子,實際上,不是這樣的。
以實際的生產經驗來說,這兩個參數沒有那么重要,往往來說,shuffle的性能不是因為這方面的原因導致的
但是,有一點點效果的,broadcast,數據本地化等待時長;這兩個shuffle調優的小點,其實也是需要跟其他的大量的小點配合起來使用,一點一點的提升性能,最終很多個性能調優的小點的效果,匯集在一起之后,那么就會有可以看見的還算不錯的性能調優的效果。
reduce端task,在拉取到數據之后,會用hashmap的數據格式,來對各個key對應的values進行匯聚。
針對每個key對應的values,執行我們自定義的聚合函數的代碼,比如_ + _(把所有values累加起來)
reduce task,在進行匯聚、聚合等操作的時候,實際上,使用的就是自己對應的executor的內存,executor(jvm進程,堆),默認executor內存中划分給reduce task進行聚合的比例,是0.2。
問題來了,因為比例是0.2,所以,理論上,很有可能會出現,拉取過來的數據很多,那么在內存中,放不下;這個時候,默認的行為,就是說,將在內存放不下的數據,都spill(溢寫)到磁盤文件中去。
原理說完之后,來看一下,默認情況下,不調優,可能會出現什么樣的問題?
默認,map端內存緩沖是每個task,32kb。
默認,reduce端聚合內存比例,是0.2,也就是20%。
如果map端的task,處理的數據量比較大,但是呢,你的內存緩沖大小是固定的。可能會出現什么樣的情況?
每個task就處理320kb,32kb,總共會向磁盤溢寫320 / 32 = 10次。
每個task處理32000kb,32kb,總共會向磁盤溢寫32000 / 32 = 1000次。
在map task處理的數據量比較大的情況下,而你的task的內存緩沖默認是比較小的,32kb。可能會造成多次的map端往磁盤文件的spill溢寫操作,發生大量的磁盤IO,從而降低性能。
reduce端聚合內存,占比。默認是0.2。如果數據量比較大,reduce task拉取過來的數據很多,那么就會頻繁發生reduce端聚合內存不夠用,頻繁發生spill操作,溢寫到磁盤上去。而且最要命的是,磁盤上溢寫的數據量越大,后面在進行聚合操作的時候,很可能會多次讀取磁盤中的數據,進行聚合。
默認不調優,在數據量比較大的情況下,可能頻繁地發生reduce端的磁盤文件的讀寫。
這兩個點之所以放在一起講,是因為他們倆是有關聯的。數據量變大,map端肯定會出點問題;reduce端肯定也會出點問題;出的問題是一樣的,都是磁盤IO頻繁,變多,影響性能。
調優:
調節map task內存緩沖:spark.shuffle.file.buffer,默認32k(spark 1.3.x不是這個參數,后面還有一個后綴,kb;spark 1.5.x以后,變了,就是現在這個參數)
調節reduce端聚合內存占比:spark.shuffle.memoryFraction,0.2
在實際生產環境中,我們在什么時候來調節兩個參數?
看Spark UI,如果你的公司是決定采用standalone模式,那么狠簡單,你的spark跑起來,會顯示一個Spark UI的地址,4040的端口,進去看,依次點擊進去,可以看到,你的每個stage的詳情,有哪些executor,有哪些task,每個task的shuffle write和shuffle read的量,shuffle的磁盤和內存,讀寫的數據量;如果是用的yarn模式來提交,課程最前面,從yarn的界面進去,點擊對應的application,進入Spark UI,查看詳情。
如果發現shuffle 磁盤的write和read,很大。這個時候,就意味着最好調節一些shuffle的參數。進行調優。首先當然是考慮開啟map端輸出文件合並機制。
調節上面說的那兩個參數。調節的時候的原則。spark.shuffle.file.buffer,每次擴大一倍,然后看看效果,64,128;spark.shuffle.memoryFraction,每次提高0.1,看看效果。
不能調節的太大,太大了以后過猶不及,因為內存資源是有限的,你這里調節的太大了,其他環節的內存使用就會有問題了。
調節了以后,效果?map task內存緩沖變大了,減少spill到磁盤文件的次數;reduce端聚合內存變大了,減少spill到磁盤的次數,而且減少了后面聚合讀取磁盤文件的數量。
六.Shuffle調優之HashShuffleManager與SortShuffleManager
spark.shuffle.manager:hash、sort、tungsten-sort(自己實現內存管理)
spark.shuffle.sort.bypassMergeThreshold:200
首先先聲明一點:
之前我們所講的,其實都是已經屬於Spark中,比較老舊的一種shuffle manager,HashShuffleManager;這種manager,實際上,從spark 1.2.x版本以后,就不再是默認的選擇了。
HashShuffleManager的原理,以及對應的一些性能調優的點,基本上,之前幾講,咱們就都講過了。
spark 1.2.x版本以后,默認的shuffle manager,是什么呢?SortShuffleManager。
在spark 1.5.x以后,對於shuffle manager又出來了一種新的manager,tungsten-sort(鎢絲),鎢絲sort shuffle manager。官網上一般說,鎢絲sort shuffle manager,效果跟sort shuffle manager是差不多的。
但是,唯一的不同之處在於,鎢絲manager,是使用了自己實現的一套內存管理機制,性能上有很大的提升, 而且可以避免shuffle過程中產生的大量的OOM,GC,等等內存相關的異常。
來一個總結,現在相當於把spark的shuffle的東西又多講了一些。大家理解的更加深入了。hash、sort、tungsten-sort。如何來選擇?
1、需不需要數據默認就讓spark給你進行排序?就好像mapreduce,默認就是有按照key的排序。如果不需要的話,其實還是建議搭建就使用最基本的HashShuffleManager,因為最開始就是考慮的是不排序,換取高性能;
2、什么時候需要用sort shuffle manager?如果你需要你的那些數據按key排序了,那么就選擇這種吧,而且要注意,reduce task的數量應該是超過200的,這樣sort、merge(多個文件合並成一個)的機制,才能生效把。但是這里要注意,你一定要自己考量一下,有沒有必要在shuffle的過程中,就做這個事情,畢竟對性能是有影響的。
3、如果你不需要排序,而且你希望你的每個task輸出的文件最終是會合並成一份的,你自己認為可以減少性能開銷;可以去調節bypassMergeThreshold這個閾值,比如你的reduce task數量是500,默認閾值是200,所以默認還是會進行sort和直接merge的;可以將閾值調節成550,不會進行sort,按照hash的做法,每個reduce task創建一份輸出文件,最后合並成一份文件。(一定要提醒大家,這個參數,其實我們通常不會在生產環境里去使用,也沒有經過驗證說,這樣的方式,到底有多少性能的提升)
4、如果你想選用sort based shuffle manager,而且你們公司的spark版本比較高,是1.5.x版本的,那么可以考慮去嘗試使用tungsten-sort shuffle manager。看看性能的提升與穩定性怎么樣。
總結:
1、在生產環境中,不建議大家貿然使用第三點和第四點:
2、如果你不想要你的數據在shuffle時排序,那么就自己設置一下,用hash shuffle manager。
3、如果你的確是需要你的數據在shuffle時進行排序的,那么就默認不用動,默認就是sort shuffle manager;
或者是什么?如果你壓根兒不care是否排序這個事兒,那么就默認讓他就是sort的。調節一些其他的參數(consolidation機制)。(80%,都是用這種)
spark.shuffle.manager:hash、sort、tungsten-sort
new SparkConf().set("spark.shuffle.manager", "hash")
new SparkConf().set("spark.shuffle.manager", "tungsten-sort")
// 默認就是,new SparkConf().set("spark.shuffle.manager", "sort")
new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")