1. 寫在前面
之前零散的寫了一些spark在某一塊的性能優化,比如sparkstreaming的性能優化,參數優化,sparkSQL的優化。本篇博文針對spark一些基本的核心優化做一個介紹分享,當然這里的介紹適合rdd,sparkstreaming,sparkSQL等。當然個人認為不管什么樣的優化方案和方式都只是為了提供一個優化參考。具體實際的業務中,優化還是得看具體的實際的情況。還是引用某位大神的一句話,盲目暴力的優化行為都是在耍流氓。
2. 常用參數說明
--driver-memory 4g:driver內存大小,一般沒有廣播變量(broadcast)時,設置4g足夠,
如果有廣播變量,視情況而定,可設置6G,8G,12G等均可
--executor-memory 4g:每個executor的內存,正常情況下是4g足夠,但有時處理大批量數據時容易內存不足,再多申請一點,如6G
--num-executors 15:總共申請的executor數目,普通任務十幾個或者幾十個足夠了,
若是處理海量數據如百G上T的數據時可以申請多一些,100,200等
--executor-cores 2:每個executor內的核數,即每個executor中的任務task數目,此處設置為2,即2個task共享上面設置的6g內存,
每個map或reduce任務的並行度是executor數目*executor中的任務數yarn集群中一般有資源申請上限,
如,executor-memory*num-executors<400G 等,所以調試參數時要注意這一點
--spark.default.parallelism 200:Spark作業的默認為500~1000個比較合適,如果不設置,
spark會根據底層HDFS的block數量設置task的數量,這樣會導致並行度偏少,資源利用不充分。該參數設為num-executors * executor-cores的2~3倍比較合適。
--spark.storage.memoryFraction 0.6:設置RDD持久化數據在Executor內存中能占的最大比例。默認值是0.6
--spark.shuffle.memoryFraction 0.2:設置shuffle過程中一個task拉取到上個stage的task的輸出后,
進行聚合操作時能夠使用的Executor內存的比例,默認是0.2,如果shuffle聚合時使用的內存超出了這個20%的限制,多余數據會
被溢寫到磁盤文件中去,降低shuffle性能
--spark.yarn.executor.memoryOverhead 1G:executor執行的時候,用的內存可能會超過executor-memory,
所以會為executor額外預留一部分內存,spark.yarn.executor.memoryOverhead即代表這部分內存
3. Spark常用編程建議
1> 避免創建重復的RDD,盡量復用同一份數據。
2> 盡量避免使用shuffle類算子,因為shuffle操作是spark中最消耗性能的地方,reduceByKey、join、distinct、repartition等算子都會觸發shuffle操作,盡量使用map類的非shuffle算子
3> 用aggregateByKey和reduceByKey替代groupByKey,因為前兩個是預聚合操作,會在每個節點本地對相同的key做聚合,等其他節點拉取所有節點上相同的key時,會大大減少磁盤IO以及網絡開銷。
4> repartition適用於RDD[V], partitionBy適用於RDD[K, V]
5> mapPartitions操作替代普通map,foreachPartitions替代foreach
6> filter操作之后進行coalesce操作,可以減少RDD的partition數量
7> 如果有RDD復用,尤其是該RDD需要花費比較長的時間,建議對該RDD做cache,若該RDD每個partition需要消耗很多內存,建議開啟Kryo序列化機制(據說可節省2到5倍空間),若還是有比較大的內存開銷,可將storage_level設置為MEMORY_AND_DISK_SER
8> 盡量避免在一個Transformation中處理所有的邏輯,盡量分解成map、filter之類的操作
9> 多個RDD進行union操作時,避免使用rdd.union(rdd).union(rdd).union(rdd)這種多重union,rdd.union只適合2個RDD合並,合並多個時采用SparkContext.union(Array(RDD)),避免union嵌套層數太多,導致的調用鏈路太長,耗時太久,且容易引發StackOverFlow
10> spark中的Group/join/XXXByKey等操作,都可以指定partition的個數,不需要額外使用repartition和partitionBy函數
11> 盡量保證每輪Stage里每個task處理的數據量>128M
12> 如果2個RDD做join,其中一個數據量很小,可以采用Broadcast Join,將小的RDD數據collect到driver內存中,將其BroadCast到另外以RDD中,其他場景想優化后面會講
13> 2個RDD做笛卡爾積時,把小的RDD作為參數傳入,如BigRDD.certesian(smallRDD)
14> 若需要Broadcast一個大的對象到遠端作為字典查詢,可使用多executor-cores,大executor-memory。若將該占用內存較大的對象存儲到外部系統,executor-cores=1, executor-memory=m(默認值2g),可以正常運行,那么當大字典占用空間為size(g)時,executor-memory為2*size
,executor-cores=size/m
(向上取整)
15> 如果對象太大無法BroadCast到遠端,且需求是根據大的RDD中的key去索引小RDD中的key,可使用zipPartitions以hash join的方式實現,具體原理參考下一節的shuffle過程
16> 如果需要在repartition重分區之后還要進行排序,可直接使用repartitionAndSortWithinPartitions,比分解操作效率高,因為它可以一邊shuffle一邊排序
4. shuffle性能優化
1> 什么是shuffle操作
spark中的shuffle操作功能:將分布在集群中多個節點上的同一個key,拉取到同一個節點上,進行聚合或join操作,類似洗牌的操作。這些分布在各個存儲節點上的數據重新打亂然后匯聚到不同節點的過程就是shuffle過程。
2> 哪些操作中包含shuffle操作
RDD的特性是不可變的帶分區的記錄集合,Spark提供了Transformation和Action兩種操作RDD的方式。Transformation是生成新的RDD,包括map, flatMap, filter, union, sample, join, groupByKey, cogroup, ReduceByKey, cros, sortByKey, mapValues等;Action只是返回一個結果,包括collect,reduce,count,save,lookupKey等
Spark所有的算子操作中是否使用shuffle過程要看計算后對應多少分區:
若一個操作執行過程中,結果RDD的每個分區只依賴上一個RDD的同一個分區,即屬於窄依賴,如map、filter、union等操作,這種情況是不需要進行shuffle的,同時還可以按照pipeline的方式,把一個分區上的多個操作放在同一個Task中進行
若結果RDD的每個分區需要依賴上一個RDD的全部分區,即屬於寬依賴,如repartition相關操作(repartition,coalesce)、 * ByKey操作(groupByKey,ReduceByKey,combineByKey、aggregateByKey等)、join相關操作(cogroup,join)、distinct操作,這種依賴是需要進行shuffle操作的
3> shuffle操作過程
shuffle過程分為shuffle write和shuffle read兩部分
shuffle write:分區數由上一階段的RDD分區數控制,shuffle write過程主要是將計算的中間結果按某種規則臨時放到各個executor所在的本地磁盤上(當前stage結束之后,每個task處理的數據按key進行分類,數據先寫入內存緩沖區,緩沖區滿,溢寫spill到磁盤文件,最終相同key被寫入同一個磁盤文件)創建的磁盤文件數量=當前stage中task數量 * 下一個stage的task數量
shuffle read:從上游stage的所有task節點上拉取屬於自己的磁盤文件,每個read task會有自己的buffer緩沖,每次只能拉取與buffer緩沖相同大小的數據,然后聚合,聚合完一批后拉取下一批,邊拉取邊聚合。分區數由Spark提供的一些參數控制,如果這個參數值設置的很小,同時shuffle read的數據量很大,會導致一個task需要處理的數據非常大,容易發生JVM crash,從而導致shuffle數據失敗,同時executor也丟失了,就會看到Failed to connect to host 的錯誤(即executor lost)。
shuffle過程中,各個節點會通過shuffle write過程將相同key都會先寫入本地磁盤文件中,然后其他節點的shuffle read過程通過網絡傳輸拉取各個節點上的磁盤文件中的相同key。這其中大量數據交換涉及到的網絡傳輸和文件讀寫操作是shuffle操作十分耗時的根本原因
4> spark的shuffle類型
參數spark.shuffle.manager用於設置ShuffleManager的類型。Spark1.5以后,該參數有三個可選項:hash、sort和tungsten-sort。
HashShuffleManager是Spark1.2以前的默認值,Spark1.2之后的默認值都是SortShuffleManager。tungsten-sort與sort類似,但是使用了tungsten計划中的堆外內存管理機制,內存使用效率更高。
由於SortShuffleManager默認會對數據進行排序,因此如果業務需求中需要排序的話,使用默認的SortShuffleManager就可以;但如果不需要排序,可以通過bypass機制或設置HashShuffleManager避免排序,同時也能提供較好的磁盤讀寫性能。
HashShuffleManager流程:
SortShuffleManager流程:
5> 如何開啟bypass機制
bypass機制通過參數spark.shuffle.sort.bypassMergeThreshold設置,默認值是200,表示當ShuffleManager是SortShuffleManager時,若shuffle read task的數量小於這個閾值(默認200)時,則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式寫數據,但最后會將每個task產生的所有臨時磁盤文件合並成一個文件,並創建索引文件。
這里給出的調優建議是,當使用SortShuffleManager時,如果的確不需要排序,可以將這個參數值調大一些,大於shuffle read task的數量。那么此時就會自動開啟bypass機制,map-side就不會進行排序了,減少排序的性能開銷,提升shuffle操作效率。但這種方式並沒有減少shuffle write過程產生的磁盤文件數量,所以寫的性能沒有改變。
6> HashShuffleManager優化建議
如果使用HashShuffleManager,可以設置spark.shuffle.consolidateFiles參數。該參數默認為false,只有當使用HashShuffleManager且該參數設置為True時,才會開啟consolidate機制,大幅度合並shuffle write過程產生的輸出文件,對於shuffle read task 數量特別多的情況下,可以極大地減少磁盤IO開銷,提升shuffle性能。參考社區同學給出的數據,consolidate性能比開啟bypass機制的SortShuffleManager高出10% ~ 30%。
7>shuffle調優建議
除了上述的幾個參數調優,shuffle過程還有一些參數可以提高性能
--spark.shuffle.file.buffer : 默認32M,shuffle Write階段寫文件
時的buffer大小,若內存資源比較充足,可適當將其值調大一些(如64M),
減少executor的IO讀寫次數,提高shuffle性能
--spark.shuffle.io.maxRetries :默認3次,Shuffle Read階段取數據
的重試次數,若shuffle處理的數據量很大,可適當將該參數調大。
9>處理shuffle類操作的注意事項
減少shuffle數據量:在shuffle前過濾掉不必要的數據,只選取需要的字段處理
針對SparkSQL和DataFrame的join、group by等操作:可以通過 spark.sql.shuffle.partitions控制分區數,默認設置為200,可根據shuffle的量以及計算的復雜度提高這個值,如2000等
RDD的join、group by、reduceByKey
等操作:通過spark.default.parallelism
控制shuffle
read與reduce處理的分區數,默認為運行任務的core總數,官方建議為設置成運行任務的core的2~3
倍
提高executor的內存:即spark.executor.memory的值
分析數據驗證是否存在數據傾斜的問題:如空值如何處理,異常數據(某個key對應的數據量特別大)時是否可以單獨處理,可以考慮自定義數據分區規則,如何自定義可以參考下面的join優化環節
5. join性能優化
Spark所有的操作中,join操作是最復雜、代價最大的操作,也是大部分業務場景的性能瓶頸所在。所以針對join操作的優化是使用spark必須要學會的技能。
spark的join操作也分為Spark SQL的join和Spark RDD的join
5.1 Spark SQL的join操作
5.1.1 Hash Join
Hash Join的執行方式是先將小表映射成Hash Table的方式,再將大表使用相同方式映射到Hash Table,在同一個hash分區內做join匹配。
hash join又分為broadcast hash join和shuffle hash join兩種。其中Broadcast hash join,顧名思義,就是把小表廣播到每一個節點上的內存中,大表按Key保存到各個分區中,小表和每個分區的大表做join匹配。這種情況適合一個小表和一個大表做join且小表能夠在內存中保存的情況。如下圖所示:
當Hash Join不能適用的場景就需要Shuffle Hash Join了,Shuffle Hash Join的原理是按照join Key分區,key相同的數據必然分配到同一分區中,將大表join分而治之,變成小表的join,可以提高並行度。執行過程也分為兩個階段:
shuffle階段:分別將兩個表按照join key進行分區,將相同的join key數據重分區到同一節點
hash join階段:每個分區節點上的數據單獨執行單機hash join算法
Shuffle Hash Join的過程如下圖所示:
5.1.2 Sort-Merge Join
SparkSQL針對兩張大表join的情況提供了全新的算法——Sort-merge join,整個過程分為三個步驟:
Shuffle階段:將兩張大表根據join key進行重新分區,兩張表數據會分布到整個集群,以便分布式進行處理
sort階段:對單個分區節點的兩表數據,分別進行排序
merge階段:對排好序的兩張分區表數據執行join操作。分別遍歷兩個有序序列,遇到相同的join key就merge輸出,否則繼續取更小一邊的key,即合並兩個有序列表的方式。
sort-merge join流程如下圖所示。
5.2 Spark RDD的join操作
Spark的RDD join沒有上面這么多的分類,但是面臨的業務需求是一樣的。如果是大表join小表的情況,則可以將小表聲明為broadcast變量,使用map操作快速實現join功能,但又不必執行Spark core中的join操作。
如果是兩個大表join,則必須依賴Spark Core中的join操作了。Spark RDD Join的過程可以自行閱讀源碼了解,這里只做一個大概的講解。
spark的join過程中最核心的函數是cogroup方法,這個方法中會判斷join的兩個RDD所使用的partitioner是否一樣,如果分區相同,即存在OneToOneDependency依賴,不用進行hash分區,可直接join;如果要關聯的RDD和當前RDD的分區不一致時,就要對RDD進行重新hash分區,分到正確的分區中,即存在ShuffleDependency,需要先進行shuffle操作再join。因此提升join效率的一個思路就是使得兩個RDD具有相同的partitioners。
所以針對Spark RDD的join操作的優化建議是:
如果需要join的其中一個RDD比較小,可以直接將其存入內存,使用broadcast hash join
在對兩個RDD進行join操作之前,使其使用同一個partitioners,避免join操作的shuffle過程
如果兩個RDD其一存在重復的key也會導致join操作性能變低,因此最好先進行key值的去重處理
5.3 數據傾斜優化
均勻數據分布的情況下,前面所說的優化建議就足夠了。但存在數據傾斜時,仍然會有性能問題。主要體現在絕大多數task執行得都非常快,個別task執行很慢,拖慢整個任務的執行進程,甚至可能因為某個task處理的數據量過大而爆出OOM錯誤。
5.4 分析數據分布
如果是Spark SQL中的group by、join語句導致的數據傾斜,可以使用SQL分析執行SQL中的表的key分布情況;如果是Spark RDD執行shuffle算子導致的數據傾斜,可以在Spark作業中加入分析Key分布的代碼,使用countByKey()統計各個key對應的記錄數