spark shuffle寫操作三部曲之BypassMergeSortShuffleWriter


前言

再上一篇文章 spark shuffle的寫操作之准備工作 中,主要介紹了 spark shuffle的准備工作,本篇文章主要介紹spark shuffle使用BypassMergeSortShuffleWriter寫數據詳細細節。

在本篇文章中如果有不了解的術語,也可以參照 spark shuffle的寫操作之准備工作  做進一步了解。

這種shuffle寫數據的方式是最簡單的,spark計划在以后會移除這種shuffle機制。

 

先上源碼,后解釋:

流程如下:

map數據根據分區函數寫入分區文件

如果沒有數據要寫,那么數據文件為空,索引文件中各個segment的大小為0,返回初始化的MapStatus。

如果有數據要寫到各個reducer的文件中,首先初始化序列化工具實例,遍歷初始化各個partition的partitionWriter數組中的DiskBlockObjectWriter對象,初始化各個partition的FileSegment數組。

然后遍歷每一個要寫入的記錄值,並且取出記錄的key值,根據Partitioner的getPartition函數確定其reduce到的目標分區索引,然后根據計算出的索引確定負責寫數據的DiskBlockObjectWriter對象,然后根據該對象將鍵值對寫入到臨時分區文件。

當每一個要寫入的記錄值遍歷操作完畢,遍歷每一個分區,將該分區對應的partitionWriter執行commitAndGet操作,返回該分區的FileSegment對象。

其依賴方法commitAndGet源碼如下:

至此,大多數情況下,reduce的每一個partition的數據有被寫入到一個單獨的文件。明明是FileSegment,為什么是單獨的文件呢?原因就在於DiskBlockManager返回的臨時ShuffleBlockId是不重復的,org.apache.spark.storage.DiskBlockManager#createTempShuffleBlock源碼如下:

又因為創建臨時文件,只是創建臨時文件的句柄,此時對應的物理文件,並不存在,所以,這個方法不能保證創建的臨時文件不重復。所以多個partition數據寫入到一個臨時文件的概率還是有的,只不過是小概率事件。

最后小的分區文件會被合並為一個文件。

首先調用ShuffleBlockResolver(它是IndexShuffleBlockResolver實例)的getDataFile方法獲取數據文件的句柄File對象,org.apache.spark.util.Utils的tempFileWith獲取臨時文件,org.apache.spark.util.Utils#tempFileWith源碼如下,即獲得一個帶uuid后綴的文件:

合並分區文件

最后調用org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter的writePartitionedFile方法將多個小文件合並為一個大文件並返回包含每一個partition

對應的文件段的大小的數組,源碼如下:

更新索引文件

最后更新索引文件,給數據文件重命名后整個寫過程就徹底結束了,源碼不再做過多解釋,在  spark shuffle的寫操作之准備工作 中 IndexShuffleBlockResolver類中有說明。

總結

BypassMergeSortShuffleWriter是基於文件做的分區,沒有sort操作,最后分區數據被寫入一個完整文件,並且有一個索引文件記錄文件中每一個分區對應的FileSegment的大小。這種設計是比較朴素的,也很簡單,易實現。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM