Spark Shuffle數據處理過程與部分調優(源碼閱讀七)


  shuffle。。。相當重要,為什么咩,因為shuffle的性能優劣直接決定了整個計算引擎的性能和吞吐量。相比於Hadoop的MapReduce,可以看到Spark提供多種計算結果處理方式,對shuffle過程進行了優化。

  那么我們從RDD的iterator方法開始:

  

  我們可以看到,它調用了cacheManager的getOrCompute方法,如果分區任務第一次執行還沒有緩存,那么會調用computeOrReadCheckpoint。如果某個partition任務執行失敗,可以利用DAG重新調度,失敗的partition任務將從檢查點恢復狀態,而那些已經成功執行的partition任務由於其執行結果已經緩存到存儲體系,所以調用CacheManager.getOrCompue方法,不需要再次執行。

  在computeOrReadCheckpoint中,如果存在檢查點時,則進行中間數據的拉取,否則將會重新執行compute,我們知道RDD具有linkage機制,所以可以直接找到其父RDD。

  那么compute方法實現了什么呢?從最底層的HadoopRDD看起,所有類型的RDD都繼承自抽象RDD類。HadoopRDD compute方法如下圖:

  

  它實現了一個NextIterator的一個內部類,你有沒有發現那個"input split:"這個日志很熟悉,沒錯,就是跑任務時在container日志中打印的日志信息,也就是第一次數據獲取。然后這個內部類搞了一些事情,從broadcast中獲取jobConf(hadoop的Configuration)創建inputMetrics用於計算字節讀取的測量信息。隨之RecoredReader讀取數據之前創建bytesReadCallback,是用來獲取當前線程從文件系統讀取的字節數。隨后獲取inputFormat:

    

  隨后加入hadoop的配置信息,再通過 reader:RecordReader讀取數據。最終會new出一個InterruptibleIterator對象。這個對象用於map結束后的SortShuffleWriter的write方法。因為本身mapReduce的過程就是要寫入磁盤的,如圖:

  

  查閱資料,它主要干了如下事情:

  1、創建ExternalSorter,調用insertAll將計算結果寫入緩存。

  2、調用shuffleBlockManager.getDataFile方法獲取當前任務要輸出的文件路徑。

  3、調用shuffleBlockManager.consolidateId創建blockId。

  4、調用ExternalSorter的writePartitionFile中間結果持久化

  5、調用shuffleBlockManager.writeIndexFile方法創建索引文件。

  6、最終創建MapStatus。

  

  這里有個重中之重,也就是Hadoop MapReduce過程的問題所在:

  1、Hadoop在reduce任務獲取到map任務的中間輸出后,會對這些數據在磁盤上進行merge sort,產生更多的磁盤I/O.

  2、當數據量很小,但是map任務和reduce任務數目很多時,會產生很多網絡I/O.

  那么spark的優化在於:

  1、map任務逐條輸出計算結果,而不是一次性輸出到內存,並使用AppendOnlyMap緩存及其聚合算法對中間結果進行聚合,大大減少了中間結果所占內存的大小。

  2、當超出myMemoryThreshold的大小時,將數據寫入磁盤,防止內存溢出。

  3、reduce任務也是逐條拉取,並且也用了AppendOnlyMap緩存,並在內存中進行聚合和排序,也大大減少了數據占用的內存。

  4、reduce任務對將要拉取的Block按照BlockManager划分,然后將同一blockManager地址中的Block累積為少量網絡請求,減少網絡I/O.

  這里有個參數,spark.shuffle.sort.bypassMergeThreshold,修改bypassMergeThreshold的大小,在分區數量小的時候提升計算引擎的性能。這個參數主要在partition的數量小於bypassMergeThreshold的值時,就不再Executor中執行聚合和排序操作,知識將各個partition直接寫入Executor中進行存儲。

  還有一個參數,spark.shuffle.sort.bypassMergeSort,這個參數標記是否傳遞到reduce端再做合並和排序,當沒有定義aggregator、ordering函數,並且partition數量小於等於bypassMergeThreshold時,bypassMergeSort為true.如果bypassMergeSort為true,map中間結果將直接輸出到磁盤,就不會占用內存。

  

  那么 哪些Block從本地獲取、哪些需要遠程拉取,是獲取中間計算結果的關鍵。那么reduce端如何處理多個map任務的中間結果?

  這里有個優化的參數spark.reducer.maxMbInFlight,這是單次航班請求的最大字節數,意思是一批請求,這批請求的字節總數不能超過maxBytesInFlight,而且每個請求的字節數不能超過maxBytesInfFlight的五分之一,這樣做是為了提高請求的並發度,允許5個請求分別從5個節點拉取數據。

  調優方案:

  1、在map端溢出分區文件,在reduce端合並組合

  bypassMergeSort不使用緩存,將數據按照paritition寫入不同文件,最后按partition順序合並寫入同一文件。但沒有指定聚合、排序函數,且partition數量較小時,一般蠶蛹這種方式。它將多個bucket合並到一個文件,減少map輸出的文件數量,節省磁盤I/O,最終提升了性能。

  

  2、在map端簡單排序、排序分組,在reduce端合並並組合

    在緩存中利用指定的排序函數對數據按照partition或者Key進行排序,按partition順序合並寫入同一文件。當沒有指定聚合函數,且partition數量大時,采用這種方式。

      

  3、在map端緩存中聚合、排序分組,在reduce端組合

    在緩存中對數據按照key聚合,並且利用指定的排序函數對數據按照partition或者key進行排序,最后按partition順序合並寫入同一文件。當指定了聚合函數時,采用這種方式。

 

參考文獻:《深入理解Spark:核心思想與源碼分析》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM