Spark Shuffle原理、Shuffle操作問題解決和參數調優


1 shuffle原理

  1.1 mapreduce的shuffle原理

    1.1.1 map task端操作

    1.1.2 reduce task端操作

   1.2 spark現在的SortShuffleManager

    2 Shuffle操作問題解決

   2.1 數據傾斜原理

       2.2 數據傾斜問題發現與解決

       2.3 數據傾斜解決方案

   3 spark RDD中的shuffle算子

      3.1 去重

      3.2 聚合

      3.3 排序

      3.4 重分區

      3.5 集合操作和表操作

  4 spark shuffle參數調優

內容:

 1 shuffle原理

       概述:Shuffle描述着數據從map task輸出到reduce task輸入的這段過程。在分布式情況下,reduce task需要跨節點去拉取其它節點上的map task結果。這一過程對將會產生網絡資源消耗和內存,磁盤IO的消耗。

   1.1 mapreduce的shuffle原理

    1.1.1 map task端操作

    每個map task都有一個內存緩沖區(默認是100MB),存儲着map的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束后再對磁盤中這個map task產生的所有臨時文件做合並,生成最終的正式輸出文件,然后等待reduce task來拉數據。

    Spill過程:這個從內存往磁盤寫數據的過程被稱為Spill,中文可譯為溢寫。整個緩沖區有個溢寫的比例spill.percent(默認是0.8),當達到閥值時map task 可以繼續往剩余的memory寫,同時溢寫線程鎖定已用memory,先對key(序列化的字節)做排序,如果client程序設置了Combiner,那么在溢寫的過程中就會進行局部聚合。

    Merge過程:每次溢寫都會生成一個臨時文件,在map task真正完成時會將這些文件歸並成一個文件,這個過程叫做Merge。

    1.1.2  reduce task端操作

    當某台TaskTracker上的所有map task執行完成,對應節點的reduce task開始啟動,簡單地說,此階段就是不斷地拉取(Fetcher)每個map task所在節點的最終結果,然后不斷地做merge形成reduce task的輸入文件。

    Copy過程:Reduce進程啟動一些數據copy線程(Fetcher)通過HTTP協議拉取TaskTracker的map階段輸出文件

    Merge過程:Copy過來的數據會先放入內存緩沖區(基於JVM的heap size設置),如果內存緩沖區不足也會發生map task的spill(sort 默認,combine 可選),多個溢寫文件時會發生map task的merge

    下面總結下mapreduce的關鍵詞:

      存儲相關的有:內存緩沖區,默認大小,溢寫閥值

      主要過程:溢寫(spill),排序,合並(combine),歸並(Merge),Copy或Fetch

      相關參數:內存緩沖區默認大小,JVM heao size,spill.percent

      詳細

    1.2 spark現在的SortShuffleManager  

作者:美團點評技術團隊
鏈接:https://zhuanlan.zhihu.com/p/22024169
來源:知乎
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

SortShuffleManager運行原理

SortShuffleManager的運行機制主要分成兩種,一種是普通運行機制,另一種是bypass運行機制。當shuffle read task的數量小於等於spark.shuffle.sort.bypassMergeThreshold參數的值時(默認為200),就會啟用bypass機制。

普通運行機制

下圖說明了普通的SortShuffleManager的原理。在該模式下,數據會先寫入一個內存數據結構中,此時根據不同的shuffle算子,可能選用不同的數據結構。如果是reduceByKey這種聚合類的shuffle算子,那么會選用Map數據結構,一邊通過Map進行聚合,一邊寫入內存;如果是join這種普通的shuffle算子,那么會選用Array數據結構,直接寫入內存。接着,每寫一條數據進入內存數據結構之后,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那么就會嘗試將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。

在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序。排序過后,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是通過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩沖輸出流,首先會將數據緩沖在內存中,當內存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤IO次數,提升性能。

一個task將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,也就會產生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合並,這就是merge過程,此時會將之前所有臨時磁盤文件中的數據讀取出來,然后依次寫入最終的磁盤文件之中。此外,由於一個task就只對應一個磁盤文件,也就意味着該task為下游stage的task准備的數據都在這一個文件中,因此還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。

SortShuffleManager由於有一個磁盤文件merge的過程,因此大大減少了文件數量。比如第一個stage有50個task,總共有10個Executor,每個Executor執行5個task,而第二個stage有100個task。由於每個task最終只有一個磁盤文件,因此此時每個Executor上只有5個磁盤文件,所有Executor只有50個磁盤文件。

bypass運行機制

下圖說明了bypass SortShuffleManager的原理。bypass運行機制的觸發條件如下:

  • shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值(默認為200)。
  • 不是聚合類的shuffle算子(比如reduceByKey)。

此時task會為每個下游task都創建一個臨時磁盤文件,並將數據按key進行hash然后根據key的hash值,將key寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合並成一個磁盤文件,並創建一個單獨的索引文件。

該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要創建數量驚人的磁盤文件,只是在最后會做一個磁盤文件的合並而已。因此少量的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來說,shuffle read的性能會更好。

而該機制與普通SortShuffleManager運行機制的不同在於:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在於,shuffle write過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。

 

    2 Shuffle操作問題解決

   2.1 數據傾斜原理

    在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,此時如果某個key對應的數據量特別大的話,就會發生數據傾斜

       2.2 數據傾斜問題發現與定位

   通過Spark Web UI來查看當前運行的stage各個task分配的數據量,從而進一步確定是不是task分配的數據不均勻導致了數據傾斜。

       知道數據傾斜發生在哪一個stage之后,接着我們就需要根據stage划分原理,推算出來發生傾斜的那個stage對應代碼中的哪一部分,這部分代碼中肯定會有一個shuffle類算子。通過countByKey查看各個key的分布。
 

       2.3 數據傾斜解決方案

    2.3.1 過濾少數導致傾斜的key

    2.3.2 提高shuffle操作的並行度

    2.3.3 局部聚合和全局聚合

        方案實現思路:這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每個key都打上一個隨機數,比如10以內的隨機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着對打上隨機數后的數據,執行reduceByKey等聚合操作,進行局部聚合,那么局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。然后將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果了,比如(hello, 4)。

       代碼:  

        

 

  2.3.4 將reduce join轉為map join((小表幾百M或者一兩G))   

  方案實現思路:     
      不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現join操作,進而完全規避掉shuffle類的操作,徹底避免數據傾斜的發生和出現。將較小RDD中的數據直接通過collect算子拉取到Driver端的內存中來,然后對其創建一個Broadcast變量;      
  接着對另外一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數據用你需要的方式連接起來。       
 
  代碼:

  

 2.3.5 采樣傾斜key並分拆join操作(join的兩表都很大,但僅一個RDD的幾個key的數據量過大)
  

  方案實現思路:

  • 對包含少數幾個數據量過大的key的那個RDD,通過sample算子采樣出一份樣本來,然后統計一下每個key的數量,計算出來數據量最大的是哪幾個key。
  • 然后將這幾個key對應的數據從原來的RDD中拆分出來,形成一個單獨的RDD,並給每個key都打上n以內的隨機數作為前綴,而不會導致傾斜的大部分key形成另外一個RDD。
  • 接着將需要join的另一個RDD,也過濾出來那幾個傾斜key對應的數據並形成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會導致傾斜的大部分key也形成另外一個RDD。
  • 再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。
  • 而另外兩個普通的RDD就照常join即可。
  • 最后將兩次join的結果使用union算子合並起來即可,就是最終的join結果。

  代碼:
  

 

   2.3.4 使用隨機前綴和擴容RDD進行join(RDD中有大量的key導致數據傾斜)

   方案實現思路:

   將含有較多傾斜key的RDD擴大多倍,與相對分布均勻的RDD配一個隨機數。

  

 

   3 spark RDD中的shuffle算子

      3.1 去重:

   def distinct()

   def distinct(numPartitions: Int)

   

      3.2 聚合

   def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

   def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]

   def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]

   def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]

   def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]

   def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

   def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

   def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

 

      3.3 排序

   def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

     def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

 

      3.4 重分區

   def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

   def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)

 

      3.5集合或者表操作

   def intersection(other: RDD[T]): RDD[T]

   def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

     def intersection(other: RDD[T], numPartitions: Int): RDD[T]

   def subtract(other: RDD[T], numPartitions: Int): RDD[T]

   def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

   def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]

   def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]

   def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]

   def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

   def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

   def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

   def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

     

   

 

  4 spark shuffle參數調優 

spark.shuffle.file.buffer

  • 默認值:32k
  • 參數說明:該參數用於設置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數據寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。
  • 調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如64k),從而減少shuffle write過程中溢寫磁盤文件的次數,也就可以減少磁盤IO次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。

spark.reducer.maxSizeInFlight

  • 默認值:48m
  • 參數說明:該參數用於設置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數據。
  • 調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如96m),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。

spark.shuffle.io.maxRetries

  • 默認值:3
  • 參數說明:shuffle read task從shuffle write task所在節點拉取屬於自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗。
  • 調優建議:對於那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由於JVM的full gc或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。

spark.shuffle.io.retryWait

  • 默認值:5s
  • 參數說明:具體解釋同上,該參數代表了每次重試拉取數據的等待間隔,默認是5s。
  • 調優建議:建議加大間隔時長(比如60s),以增加shuffle操作的穩定性。

spark.shuffle.memoryFraction

  • 默認值:0.2
  • 參數說明:該參數代表了Executor內存中,分配給shuffle read task進行聚合操作的內存比例,默認是20%。
  • 調優建議:在資源參數調優中講解過這個參數。如果內存充足,而且很少使用持久化操作,建議調高這個比例,給shuffle read的聚合操作更多內存,以避免由於內存不足導致聚合過程中頻繁讀寫磁盤。在實踐中發現,合理調節該參數可以將性能提升10%左右。

spark.shuffle.manager

  • 默認值:sort
  • 參數說明:該參數用於設置ShuffleManager的類型。Spark 1.5以后,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默認選項,但是Spark 1.2以及之后的版本默認都是SortShuffleManager了。tungsten-sort與sort類似,但是使用了tungsten計划中的堆外內存管理機制,內存使用效率更高。
  • 調優建議:由於SortShuffleManager默認會對數據進行排序,因此如果你的業務邏輯中需要該排序機制的話,則使用默認的SortShuffleManager就可以;而如果你的業務邏輯不需要對數據進行排序,那么建議參考后面的幾個參數調優,通過bypass機制或優化的HashShuffleManager來避免排序操作,同時提供較好的磁盤讀寫性能。這里要注意的是,tungsten-sort要慎用,因為之前發現了一些相應的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默認值:200
  • 參數說明:當ShuffleManager為SortShuffleManager時,如果shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫數據,但是最后會將每個task產生的所有臨時磁盤文件都合並成一個文件,並會創建單獨的索引文件。
  • 調優建議:當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數調大一些,大於shuffle read task的數量。那么此時就會自動啟用bypass機制,map-side就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

    • 默認值:false
    • 參數說明:如果使用HashShuffleManager,該參數有效。如果設置為true,那么就會開啟consolidate機制,會大幅度合並shuffle write的輸出文件,對於shuffle read task數量特別多的情況下,這種方法可以極大地減少磁盤IO開銷,提升性能。
    • 調優建議:如果的確不需要SortShuffleManager的排序機制,那么除了使用bypass機制,還可以嘗試將spark.shffle.manager參數手動指定為hash,使用HashShuffleManager,同時開啟consolidate機制。在實踐中嘗試過,發現其性能比開啟了bypass機制的SortShuffleManager要高出10%~30%。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM