spark作業性能調優
優化的目標
- 保證大數據量下任務運行成功
- 降低資源消耗
- 提高計算性能
一、開發調優:
(1)避免創建重復的RDD
RDD lineage,也就是“RDD的血緣關系鏈”
開發RDD lineage極其冗長的Spark作業時,創建多個代表相同數據的RDD,進而增加了作業的性能開銷。
(2)盡可能復用同一個RDD
比如說,有一個RDD的數據格式是key-value類型的,另一個是單value類型的,這兩個RDD的value數據是完全一樣的。那么此時我們可以只使用key-value類型的那個RDD,因為其中已經包含了另一個的數據。對於類似這種多個RDD的數據有重疊或者包含的情況,我們應該盡量復用一個RDD,這樣可以盡可能地減少RDD的數量,從而盡可能減少算子執行的次數。
(3)對多次使用的RDD進行持久化
Spark中對於一個RDD執行多次算子的默認原理是這樣的:每次你對一個RDD執行一個算子操作時,都會重新從源頭處計算一遍,這種方式的性能是很差的。
因此對於這種情況,我們的建議是:對多次使用的RDD進行持久化。此時Spark就會根據你的持久化策略,將RDD中的數據保存到內存或者磁盤中。以后每次對這個RDD進行算子操作時,都會直接從內存或磁盤中提取持久化的RDD數據,然后執行算子,而不會從源頭處重新計算一遍這個RDD,再執行算子操作。
持久化級別:后綴為_2的級別,必須將所有數據都復制一份副本,並發送到其他節點上,數據復制以及網絡傳輸會導致較大的性能開銷
(4)盡量避免使用shuffle類算子
因為Spark作業運行過程中,最消耗性能的地方就是shuffle過程。shuffle過程,簡單來說,就是將分布在集群中多個節點上的同一個key,拉取到同一個節點上,進行聚合或join等操作。比如reduceByKey、join等算子,都會觸發shuffle操作。
shuffle過程中,各個節點上的相同key都會先寫入本地磁盤文件中,然后其他節點需要通過網絡傳輸拉取各個節點上的磁盤文件中的相同key。而且相同key都拉取到同一個節點進行聚合操作時,還有可能會因為一個節點上處理的key過多,導致內存不夠存放,進而溢寫到磁盤文件中。因此在shuffle過程中,可能會發生大量的磁盤文件讀寫的IO操作,以及數據的網絡傳輸操作。磁盤IO和網絡數據傳輸也是shuffle性能較差的主要原因。
因此在我們的開發過程中,能避免則盡可能避免使用reduceByKey、join、distinct、repartition等會進行shuffle的算子,盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業,可以大大減少性能開銷。
(5)使用map-side預聚合功能的shuffle類算子
所謂的map-side預聚合,說的是在每個節點本地對相同的key進行一次聚合操作,類似於MapReduce中的本地combiner。map-side預聚合之后,每個節點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了。其他節點在拉取所有節點上的相同key時,就會大大減少需要拉取的數據數量,從而也就減少了磁盤IO以及網絡傳輸開銷。通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因為reduceByKey和aggregateByKey算子都會使用用戶自定義的函數對每個節點本地的相同key進行預聚合。而groupByKey算子是不會進行預聚合的,全量的數據會在集群的各個節點之間分發和傳輸,性能相對來說比較差。
(6)使用高性能的算子
使用mapPartitions替代普通map
mapPartitions類的算子,一次函數調用會處理一個partition所有的數據,而不是一次函數調用處理一條,性能相對來說會高一些。但是有的時候,使用mapPartitions會出現OOM(內存溢出)的問題。因為單次函數調用就要處理掉一個partition所有的數據,如果內存不夠,垃圾回收時是無法回收掉太多對象的,很可能出現OOM異常。所以使用這類操作時要慎重!
使用foreachPartitions替代foreach
原理類似於“使用mapPartitions替代map”,也是一次函數調用處理一個partition的所有數據,而不是一次函數調用處理一條數據。在實踐中發現,foreachPartitions類的算子,對性能的提升還是很有幫助的。比如在foreach函數中,將RDD中所有數據寫MySQL,那么如果是普通的foreach算子,就會一條數據一條數據地寫,每次函數調用可能就會創建一個數據庫連接,此時就勢必會頻繁地創建和銷毀數據庫連接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個partition的數據,那么對於每個partition,只要創建一個數據庫連接即可,然后執行批量插入操作,此時性能是比較高的。實踐中發現,對於1萬條左右的數據量寫MySQL,性能可以提升30%以上。
使用filter之后進行coalesce操作
通常對一個RDD執行filter算子過濾掉RDD中較多數據后(比如30%以上的數據),建議使用coalesce算子,手動減少RDD的partition數量,將RDD中的數據壓縮到更少的partition中去。因為filter之后,RDD的每個partition中都會有很多數據被過濾掉,此時如果照常進行后續的計算,其實每個task處理的partition中的數據量並不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢。因此用coalesce減少partition數量,將RDD中的數據壓縮到更少的partition之后,只要使用更少的task即可處理完所有的partition。在某些場景下,對於性能的提升會有一定的幫助。
使用repartitionAndSortWithinPartitions替代repartition與sort類操作
repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,如果需要在repartition重分區之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。因為該算子可以一邊進行重分區的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。
(7)廣播大變量
有時在開發過程中,會遇到需要在算子函數中使用外部變量的場景(尤其是大變量,比如100M以上的大集合),那么此時就應該使用Spark的廣播(Broadcast)功能來提升性能。
在算子函數中使用到外部變量時,默認情況下,Spark會將該變量復制多個副本,通過網絡傳輸到task中,此時每個task都有一個變量副本。如果變量本身比較大的話(比如100M,甚至1G),那么大量的變量副本在網絡中傳輸的性能開銷,以及在各個節點的Executor中占用過多內存導致的頻繁GC,都會極大地影響性能。
因此對於上述情況,如果使用的外部變量比較大,建議使用Spark的廣播功能,對該變量進行廣播。廣播后的變量,會保證每個Executor的內存中,只駐留一份變量副本,而Executor中的task執行時共享該Executor中的那份變量副本。這樣的話,可以大大減少變量副本的數量,從而減少網絡傳輸的性能開銷,並減少對Executor內存的占用開銷,降低GC的頻率
廣播變量的好處,不是每個task一份變量副本,而是變成每個節點的executor才一份副本。這樣的話,就可以讓變量產生的副本大大減少。
廣播變量,初始的時候,就在Drvier上有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中,嘗試獲取變量副本;如果本地沒有,BlockManager,也許會從遠程的Driver上面去獲取變量副本;也有可能從距離比較近的其他節點的Executor的BlockManager上去獲取,並保存在本地的BlockManager中;BlockManager負責管理某個Executor對應的內存和磁盤上的數據,此后這個executor上的task,都會直接使用本地的BlockManager中的副本。
(8)使用Kryo優化序列化性能
在Spark中,主要有三個地方涉及到了序列化:
在算子函數中使用到外部變量時,該變量會被序列化后進行網絡傳輸(見“原則七:廣播大變量”中的講解)。
將自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable接口。
使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節數組。
對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以默認沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要注冊所有需要進行序列化的自定義類型,因此對於開發者來說,這種方式比較麻煩。
以下是使用Kryo的代碼示例,我們只要設置序列化類,再注冊要序列化的自定義類型即可(比如算子函數中使用到的外部變量類型、作為RDD泛型類型的自定義類型等):
Kryo之所以沒有被作為默認的序列化類庫的原因,就要出現了:主要是因為Kryo要求,如果要達到它的最佳性能的話,那么就一定要注冊你自定義的類(比如,你的算子函數中使用到了外部自定義類型的對象變量,這時,就要求必須注冊你的類,否則Kryo達不到最佳性能)。
當使用了序列化的持久化級別時,在將每個RDD partition序列化成一個大的字節數組時,就會使用Kryo進一步優化序列化的效率和性能。默認情況下,Spark內部是使用Java的序列化機制,ObjectOutputStream / ObjectInputStream,對象輸入輸出流機制,來進行序列化。
這種默認序列化機制的好處在於,處理起來比較方便;也不需要我們手動去做什么事情,只是,你在算子里面使用的變量,必須是實現Serializable接口的,可序列化即可。但是缺點在於,默認的序列化機制的效率不高,序列化的速度比較慢;序列化以后的數據,占用的內存空間相對還是比較大。
Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化后的數據要更小,大概是Java序列化機制的1/10
Kryo序列化機制,一旦啟用以后,會生效的幾個地方:
1、算子函數中使用到的外部變量,使用Kryo以后:優化網絡傳輸的性能,可以優化集群中內存的占用和消耗
2、持久化RDD,StorageLevel.MEMORY_ONLY_SER優化內存的占用和消耗;持久化RDD占用的內存越少,task執行的時候,創建的對象,就
不至於頻繁的占滿內存,頻繁發生GC。
3、shuffle:可以優化網絡傳輸的性能。
(9)優化數據結構
Java中,有三種類型比較耗費內存:
對象,每個Java對象都有對象頭、引用等額外的信息,因此比較占用內存空間。
字符串,每個字符串內部都有一個字符數組以及長度等額外信息。
集合類型,比如HashMap、LinkedList等,因為集合類型內部通常會使用一些內部類來封裝集合元素,比如Map.Entry。
因此Spark官方建議,在Spark編碼實現中,特別是對於算子函數中的代碼,盡量不要使用上述三種數據結構,盡量使用字符串替代對象,使用原始類型(比如Int、Long)替代字符串,使用數組替代集合類型,這樣盡可能地減少內存占用,從而降低GC頻率,提升性能。
但是在筆者的編碼實踐中發現,要做到該原則其實並不容易。因為我們同時要考慮到代碼的可維護性,如果一個代碼中,完全沒有任何對象抽象,全部是字符串拼接的方式,那么對於后續的代碼維護和修改,無疑是一場巨大的災難。同理,如果所有操作都基於數組實現,而不使用HashMap、LinkedList等集合類型,那么對於我們的編碼難度以及代碼可維護性,也是一個極大的挑戰。因此筆者建議,在可能以及合適的情況下,使用占用內存較少的數據結構,但是前提是要保證代碼的可維護性。
(10)數據本地化
task和數據在一個節點上,直接從本地executor的BlockManager中獲取數據,純內存,或者帶一點磁盤IO;如果要通過網絡傳輸數據的話,那么實在是,性能肯定會下降的,大量網絡傳輸,以及磁盤IO,都是性能的殺手。
調節步驟:
a、觀察日志,spark作業的運行日志,推薦大家在測試的時候,先用client模式,在本地就直接可以看到比較全的日志。
日志里面會顯示,starting task。。。,PROCESS LOCAL、NODE LOCAL,觀察大部分task的數據本地化級別。
B、如果大多都是PROCESS_LOCAL,那就不用調節了
如果是發現,好多的級別都是NODE_LOCAL、ANY,那么最好就去調節一下數據本地化的等待時長
調節完,應該是要反復調節,每次調節完以后,再來運行,觀察日志
看看大部分的task的本地化級別有沒有提升;看看,整個spark作業的運行時間有沒有縮短
但是注意別本末倒置,本地化級別倒是提升了,但是因為大量的等待時長,spark作業的運行時間反而增加了,那就還是不要調節了。
spark.locality.wait,默認是3s;可以改成6s,10s
二、運行資源調優:
在開發完Spark作業之后,就該為作業配置合適的資源了。Spark的資源參數,基本都可以在spark-submit命令中作為參數設置。資源參數設置的不合理,可能會導致沒有充分利用集群資源,作業運行會極其緩慢;或者設置的資源過大,隊列沒有足夠的資源來提供,進而導致各種異常。總之,無論是哪種情況,都會導致Spark作業的運行效率低下,甚至根本無法運行。因此我們必須對Spark作業的資源使用原理有一個清晰的認識,並知道在Spark作業運行過程中,有哪些資源參數是可以設置的,以及如何設置合適的參數值。
Executor的內存主要分為三塊:第一塊是讓task執行我們自己編寫的代碼時使用,默認是占Executor總內存的20%;第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用,默認也是占Executor總內存的20%;第三塊是讓RDD持久化時使用,默認占Executor總內存的60%。
task的執行速度是跟每個Executor進程的CPU core數量有直接關系的。一個CPU core同一時間只能執行一個線程。而每個Executor進程上分配到的多個task,都是以每個task一條線程的方式,多線程並發運行的。如果CPU core數量比較充足,而且分配到的task數量比較合理,那么通常來說,可以比較快速和高效地執行完這些task線程。
num-executors
參數說明:該參數用於設置Spark作業總共要用多少個Executor進程來執行。Driver在向YARN集群管理器申請資源時,YARN集群管理器會盡可能按照你的設置來在集群的各個工作節點上,啟動相應數量的Executor進程。這個參數非常之重要,如果不設置的話,默認只會給你啟動少量的Executor進程,此時你的Spark作業的運行速度是非常慢的。
參數調優建議:每個Spark作業的運行一般設置50~100個左右的Executor進程比較合適,設置太少或太多的Executor進程都不好。設置的太少,無法充分利用集群資源;設置的太多的話,大部分隊列可能無法給予充分的資源。
executor-memory
參數說明:該參數用於設置每個Executor進程的內存。Executor內存的大小,很多時候直接決定了Spark作業的性能,而且跟常見的JVM OOM異常,也有直接的關聯。
參數調優建議:每個Executor進程的內存設置4G~8G較為合適。但是這只是一個參考值,具體的設置還是得根據不同部門的資源隊列來定。可以看看自己團隊的資源隊列的最大內存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作業申請到的總內存量(也就是所有Executor進程的內存總和),這個量是不能超過隊列的最大內存量的。此外,如果你是跟團隊里其他人共享這個資源隊列,那么申請的總內存量最好不要超過資源隊列最大總內存的1/3~1/2,避免你自己的Spark作業占用了隊列所有的資源,導致別的同學的作業無法運行。
executor-cores
參數說明:該參數用於設置每個Executor進程的CPU core數量。這個參數決定了每個Executor進程並行執行task線程的能力。因為每個CPU core同一時間只能執行一個task線程,因此每個Executor進程的CPU core數量越多,越能夠快速地執行完分配給自己的所有task線程。
參數調優建議:Executor的CPU core數量設置為2~4個較為合適。同樣得根據不同部門的資源隊列來定,可以看看自己的資源隊列的最大CPU core限制是多少,再依據設置的Executor數量,來決定每個Executor進程可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個隊列,那么num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學的作業運行。
driver-memory
參數說明:該參數用於設置Driver進程的內存。
參數調優建議:Driver的內存通常來說不設置,或者設置1G左右應該就夠了。唯一需要注意的一點是,如果需要使用collect算子將RDD的數據全部拉取到Driver上進行處理,那么必須確保Driver的內存足夠大,否則會出現OOM內存溢出的問題。
spark.default.parallelism
參數說明:該參數用於設置每個stage的默認task數量。這個參數極為重要,如果不設置可能會直接影響你的Spark作業性能。
參數調優建議:Spark作業的默認task數量為500~1000個較為合適。很多同學常犯的一個錯誤就是不去設置這個參數,那么此時就會導致Spark自己根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。通常來說,Spark默認設置的數量是偏少的(比如就幾十個task),如果task數量偏少的話,就會導致你前面設置好的Executor的參數都前功盡棄。試想一下,無論你的Executor進程有多少個,內存和CPU有多大,但是task只有1個或者10個,那么90%的Executor進程可能根本就沒有task執行,也就是白白浪費了資源!因此Spark官網建議的設置原則是,設置該參數為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數量為300個,那么設置1000個task是可以的,此時可以充分地利用Spark集群的資源。
spark.storage.memoryFraction
參數說明:該參數用於設置RDD持久化數據在Executor內存中能占的比例,默認是0.6。也就是說,默認Executor 60%的內存,可以用來保存持久化的RDD數據。根據你選擇的不同的持久化策略,如果內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。
參數調優建議:如果Spark作業中,有較多的RDD持久化操作,該參數的值可以適當提高一些,保證持久化的數據能夠容納在內存中。避免內存不夠緩存所有的數據,導致數據只能寫入磁盤中,降低了性能。但是如果Spark作業中的shuffle類操作比較多,而持久化操作比較少,那么這個參數的值適當降低一些比較合適。此外,如果發現作業由於頻繁的gc導致運行緩慢(通過spark web ui可以觀察到作業的gc耗時),意味着task執行用戶代碼的內存不夠用,那么同樣建議調低這個參數的值。
spark.shuffle.memoryFraction
參數說明:該參數用於設置shuffle過程中一個task拉取到上個stage的task的輸出后,進行聚合操作時能夠使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操作。shuffle操作在進行聚合時,如果發現使用的內存超出了這個20%的限制,那么多余的數據就會溢寫到磁盤文件中去,此時就會極大地降低性能。
參數調優建議:如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的內存占比,提高shuffle操作的內存占比比例,避免shuffle過程中數據過多時內存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發現作業由於頻繁的gc導致運行緩慢,意味着task執行用戶代碼的內存不夠用,那么同樣建議調低這個參數的值。
資源參數的調優,沒有一個固定的值,需要同學們根據自己的實際情況(包括Spark作業中的shuffle操作數量、RDD持久化操作數量以及spark web ui中顯示的作業gc情況),同時參考本篇文章中給出的原理以及調優建議,合理地設置上述參數。
3、資源參數參考示例
以下是一份spark-submit命令的示例,大家可以參考一下,並根據自己的實際情況進行調節:
- ./bin/spark-submit \
- --master yarn-cluster \
- --num-executors 100 \
- --executor-memory 6G \
- --executor-cores 4 \
- --driver-memory 1G \
- --conf spark.default.parallelism=1000 \
- --conf spark.storage.memoryFraction=0.5 \
- --conf spark.shuffle.memoryFraction=0.3 \
三、數據傾斜調優:
數據傾斜發生時的現象
- 絕大多數task執行得都非常快,但個別task執行極慢。比如,總共有1000個task,997個task都在1分鍾之內執行完了,但是剩余兩三個task卻要一兩個小時。這種情況很常見。
- 原本能夠正常執行的Spark作業,某天突然報出OOM(內存溢出)異常,觀察異常棧,是我們寫的業務代碼造成的。這種情況比較少見。
數據傾斜發生的原理
數據傾斜的原理很簡單:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的數據量特別大的話,就會發生數據傾斜。比如大部分key對應10條數據,但是個別key卻對應了100萬條數據,那么大部分task可能就只會分配到10條數據,然后1秒鍾就運行完了;但是個別task可能分配到了100萬數據,要運行一兩個小時。因此,整個Spark作業的運行進度是由運行時間最長的那個task決定的。
因此出現數據傾斜的時候,Spark作業看起來會運行得非常緩慢,甚至可能因為某個task處理的數據量過大導致內存溢出。
數據傾斜只會發生在shuffle過程中。
找到數據傾斜對應的stage:
首先要看的,就是數據傾斜發生在第幾個stage中。
如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到當前運行到了第幾個stage;如果是用yarn-cluster模式提交,則可以通過Spark Web UI來查看當前運行到了第幾個stage。此外,無論是使用yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI上深入看一下當前這個stage各個task分配的數據量,從而進一步確定是不是task分配的數據不均勻導致了數據傾斜。
知道數據傾斜發生在哪一個stage之后,接着我們就需要根據stage划分原理,推算出來發生傾斜的那個stage對應代碼中的哪一部分,這部分代碼中肯定會有一個shuffle類算子。精准推算stage與代碼的對應關系,需要對Spark的源碼有深入的理解,這里我們可以介紹一個相對簡單實用的推算方法:只要看到Spark代碼中出現了一個shuffle類算子或者是Spark SQL的SQL語句中出現了會導致shuffle的語句(比如group by語句),那么就可以判定,以那個地方為界限划分出了前后兩個stage。
但是大家要注意的是,不能單純靠偶然的內存溢出就判定發生了數據傾斜。因為自己編寫的代碼的bug,以及偶然出現的數據異常,也可能會導致內存溢出。因此還是要按照上面所講的方法,通過Spark Web UI查看報錯的那個stage的各個task的運行時間以及分配的數據量,才能確定是否是由於數據傾斜才導致了這次內存溢出。
查看數據傾斜key的分布情況:
(1)如果是Spark SQL中的group by、join語句導致的數據傾斜,那么就查詢一下SQL中使用的表的key分布情況。
(2)如果是對Spark RDD執行shuffle算子導致的數據傾斜,那么可以在Spark作業中加入查看key分布的代碼,比如RDD.countByKey()。然后對統計出來的各個key出現的次數,collect/take到客戶端打印一下,就可以看到key的分布情況。
解決方案:
(1)過濾少數導致傾斜的key
方案適用場景:如果發現導致傾斜的key就少數幾個,而且對計算本身的影響並不大的話,那么很適合使用這種方案。比如99%的key就對應10條數據,但是只有一個key對應了100萬數據,從而導致了數據傾斜。
方案實現思路:如果我們判斷那少數幾個數據量特別多的key,對作業的執行和計算結果不是特別重要的話,那么干脆就直接過濾掉那少數幾個key。比如,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對RDD執行filter算子過濾掉這些key。如果需要每次作業執行時,動態判定哪些key的數據量最多然后再進行過濾,那么可以使用sample算子對RDD進行采樣,然后計算出每個key的數量,取數據量最多的key過濾掉即可。
(2)提高shuffle操作的並行度
方案適用場景:如果我們必須要對數據傾斜迎難而上,那么建議優先使用這種方案,因為這是處理數據傾斜最簡單的一種方案。
方案實現思路:在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,比如reduceByKey(1000),該參數就設置了這個shuffle算子執行時shuffle read task的數量。對於Spark SQL中的shuffle類語句,比如group by、join等,需要設置一個參數,即spark.sql.shuffle.partitions,該參數代表了shuffle read task的並行度,該值默認是200,對於很多場景來說都有點過小。
方案實現原理:增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。舉例來說,如果原本有5個key,每個key對應10條數據,這5個key都是分配給一個task的,那么這個task就要處理50條數據。而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數據,那么自然每個task的執行時間都會變短了。
(3)兩階段聚合(局部聚合+全局聚合)--針對聚合類算子
方案適用場景:對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。
方案實現思路:這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每個key都打上一個隨機數,比如10以內的隨機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着對打上隨機數后的數據,執行reduceByKey等聚合操作,進行局部聚合,那么局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。然后將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果了,比如(hello, 4)。
方案實現原理:將原本相同的key通過附加隨機前綴的方式,變成多個不同的key,就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合,進而解決單個task處理數據量過多的問題。接着去除掉隨機前綴,再次進行全局聚合,就可以得到最終的結果。具體原理見下圖。
方案優點:對於聚合類的shuffle操作導致的數據傾斜,效果是非常不錯的。通常都可以解決掉數據傾斜,或者至少是大幅度緩解數據傾斜,將Spark作業的性能提升數倍以上。
(4)使用Broadcast變量進行map join
這個方案只適用於一個大表和一個小表的情況。畢竟我們需要將小表進行廣播,此時會比較消耗內存資源,driver和每個Executor內存中都會駐留一份小RDD的全量數據。如果我們廣播出去的RDD數據比較大,比如10G以上,那么就可能發生內存溢出了。因此並不適合兩個都是大表的情況。
(5) 采樣傾斜key並分拆join操作
方案實現思路:
A、對包含少數幾個數據量過大的key的那個RDD,通過sample算子采樣出一份樣本來,然后統計一下每個key的數量,計算出來數據量最大的是哪幾個key。
B、然后將這幾個key對應的數據從原來的RDD中拆分出來,形成一個單獨的RDD,並給每個key都打上n以內的隨機數作為前綴,而不會導致傾斜的大部分key形成另外一個RDD。
C、接着將需要join的另一個RDD,也過濾出來那幾個傾斜key對應的數據並形成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會導致傾斜的大部分key也形成另外一個RDD。
D、再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。
E、而另外兩個普通的RDD就照常join即可。
F、最后將兩次join的結果使用union算子合並起來即可,就是最終的join結果。
(6)使用隨機前綴和擴容RDD進行join
如果在進行join操作時,RDD中有大量的key導致數據傾斜,那么進行分拆key也沒什么意義,此時就只能使用這種方法了。
方案實現思路:
- 找到那個造成數據傾斜的RDD/Hive表,比如有多個key都對應了超過1萬條數據。
- 然后將該RDD的每條數據都打上一個n以內的隨機前綴。
- 同時對另外一個正常的RDD進行擴容,將每條數據都擴容成n條數據,擴容出來的每條數據都依次打上一個0~n的前綴。
- 最后將兩個處理后的RDD進行join即可。
四、Shuffle調優:
大多數Spark作業的性能主要就是消耗在了shuffle環節,因為該環節包含了大量的磁盤IO、序列化、網絡數據傳輸等操作。因此,如果要讓作業的性能更上一層樓,就有必要對shuffle過程進行調優。但是也必須提醒大家的是,影響一個Spark作業性能的因素,主要還是代碼開發、資源參數以及數據傾斜,shuffle調優只能在整個Spark的性能調優中占到一小部分而已。因此大家務必把握住調優的基本原則,千萬不要舍本逐末。
ShuffleManager概述
在Spark的源碼中,負責shuffle過程的執行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。
在Spark 1.2以前,默認的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有着一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操作影響了性能。
因此在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。SortShuffleManager相較於HashShuffleManager來說,有了一定的改進。主要就在於,每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最后會將所有的臨時文件合並(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可
(1)未經優化的HashShuffleManager
根據Key進行Hash,就是對key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬於下游stage的一個task。在將數據寫入磁盤之前,會先將數據寫入內存緩沖中,當內存緩沖填滿之后,才會溢寫到磁盤文件中去。下一個stage的task有多少個,當前stage的每個task就要創建多少份磁盤文件。
shuffle read的過程中,每個task只要從上游stage的所有task所在節點上,拉取屬於自己的那一個磁盤文件即可。
shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數據,然后通過內存中的一個Map進行聚合等操作。聚合完一批數據后,再拉取下一批數據,並放到buffer緩沖中進行聚合操作。以此類推,直到最后將所有數據到拉取完,並得到最終的結果。
(2)優化后的HashShuffleManager
這里說的優化,是指我們可以設置一個參數,spark.shuffle.consolidateFiles。該參數默認值為false,將其設置為true即可開啟優化機制。通常來說,如果我們使用HashShuffleManager,那么都建議開啟這個選項。
開啟consolidate機制之后,在shuffle write過程中,task就不是為下游stage的每個task創建一個磁盤文件了。此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就可以並行執行多少個task。而第一批並行執行的每個task都會創建一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。
當Executor的CPU core執行完一批task,接着執行下一批task時,下一批task就會復用之前已有的shuffleFileGroup,包括其中的磁盤文件。也就是說,此時task會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate機制允許不同的task復用同一批磁盤文件,這樣就可以有效將多個task的磁盤文件進行一定程度上的合並,從而大幅度減少磁盤文件的數量,進而提升shuffle write的性能。
(3)SortShuffleManager普通運行機制
在該模式下,數據會先寫入一個內存數據結構中,此時根據不同的shuffle算子,可能選用不同的數據結構。如果是reduceByKey這種聚合類的shuffle算子,那么會選用Map數據結構,一邊通過Map進行聚合,一邊寫入內存;如果是join這種普通的shuffle算子,那么會選用Array數據結構,直接寫入內存。接着,每寫一條數據進入內存數據結構之后,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那么就會嘗試將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。
在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序。排序過后,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是通過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩沖輸出流,首先會將數據緩沖在內存中,當內存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤IO次數,提升性能。
一個task將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,也就會產生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合並,這就是merge過程,此時會將之前所有臨時磁盤文件中的數據讀取出來,然后依次寫入最終的磁盤文件之中。此外,由於一個task就只對應一個磁盤文件,也就意味着該task為下游stage的task准備的數據都在這一個文件中,因此還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。
(4)SortShuffleManager--bypass運行機制
bypass運行機制的觸發條件如下:
- shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值。
- 不是聚合類的shuffle算子(比如reduceByKey)。
此時task會為每個下游task都創建一個臨時磁盤文件,並將數據按key進行hash然后根據key的hash值,將key寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合並成一個磁盤文件,並創建一個單獨的索引文件。
該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要創建數量驚人的磁盤文件,只是在最后會做一個磁盤文件的合並而已。因此少量的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來說,shuffle read的性能會更好。
而該機制與普通SortShuffleManager運行機制的不同在於:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在於,shuffle write過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。
shuffle相關參數調優
spark.shuffle.file.buffer
- 默認值:32k
- 參數說明:該參數用於設置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數據寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。
- 調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如64k),從而減少shuffle write過程中溢寫磁盤文件的次數,也就可以減少磁盤IO次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。
- 默認值:48m
- 參數說明:該參數用於設置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數據。
- 調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如96m),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。
- 默認值:3
- 參數說明:shuffle read task從shuffle write task所在節點拉取屬於自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗。
- 調優建議:對於那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由於JVM的full gc或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。
- 默認值:5s
- 參數說明:具體解釋同上,該參數代表了每次重試拉取數據的等待間隔,默認是5s。
- 調優建議:建議加大間隔時長(比如60s),以增加shuffle操作的穩定性。
- 默認值:0.2
- 參數說明:該參數代表了Executor內存中,分配給shuffle read task進行聚合操作的內存比例,默認是20%。
- 調優建議:在資源參數調優中講解過這個參數。如果內存充足,而且很少使用持久化操作,建議調高這個比例,給shuffle read的聚合操作更多內存,以避免由於內存不足導致聚合過程中頻繁讀寫磁盤。在實踐中發現,合理調節該參數可以將性能提升10%左右。
- 默認值:sort
- 參數說明:該參數用於設置ShuffleManager的類型。Spark 1.5以后,有三個可選項:hash、sort和tungsten-sort(鎢絲)。HashShuffleManager是Spark 1.2以前的默認選項,但是Spark 1.2以及之后的版本默認都是SortShuffleManager了。tungsten-sort與sort類似,但是使用了tungsten計划中的堆外內存管理機制,內存使用效率更高。
- 調優建議:由於SortShuffleManager默認會對數據進行排序,因此如果你的業務邏輯中需要該排序機制的話,則使用默認的SortShuffleManager就可以;而如果你的業務邏輯不需要對數據進行排序,那么建議參考后面的幾個參數調優,通過bypass機制或優化的HashShuffleManager來避免排序操作,同時提供較好的磁盤讀寫性能。這里要注意的是,tungsten-sort要慎用,因為之前發現了一些相應的bug。
- 默認值:200
- 參數說明:當ShuffleManager為SortShuffleManager時,如果shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫數據,但是最后會將每個task產生的所有臨時磁盤文件都合並成一個文件,並會創建單獨的索引文件。
- 調優建議:當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數調大一些,大於shuffle read task的數量。那么此時就會自動啟用bypass機制,map-side就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高。
- 默認值:false
- 參數說明:如果使用HashShuffleManager,該參數有效。如果設置為true,那么就會開啟consolidate機制,會大幅度合並shuffle write的輸出文件,對於shuffle read task數量特別多的情況下,這種方法可以極大地減少磁盤IO開銷,提升性能。
- 調優建議:如果的確不需要SortShuffleManager的排序機制,那么除了使用bypass機制,還可以嘗試將spark.shffle.manager參數手動指定為hash,使用HashShuffleManager,同時開啟consolidate機制。在實踐中嘗試過,發現其性能比開啟了bypass機制的SortShuffleManager要高出10%~30%。
