前言
再上一篇文章 spark shuffle的寫操作之准備工作 中,主要介紹了 spark shuffle的准備工作,本篇文章主要介紹spark shuffle使用BypassMergeSortShuffleWriter寫數據詳細細節。
在本篇文章中如果有不了解的術語,也可以參照 spark shuffle的寫操作之准備工作 做進一步了解。
這種shuffle寫數據的方式是最簡單的,spark計划在以后會移除這種shuffle機制。
先上源碼,后解釋:
流程如下:
map數據根據分區函數寫入分區文件
如果沒有數據要寫,那么數據文件為空,索引文件中各個segment的大小為0,返回初始化的MapStatus。
如果有數據要寫到各個reducer的文件中,首先初始化序列化工具實例,遍歷初始化各個partition的partitionWriter數組中的DiskBlockObjectWriter對象,初始化各個partition的FileSegment數組。
然后遍歷每一個要寫入的記錄值,並且取出記錄的key值,根據Partitioner的getPartition函數確定其reduce到的目標分區索引,然后根據計算出的索引確定負責寫數據的DiskBlockObjectWriter對象,然后根據該對象將鍵值對寫入到臨時分區文件。
當每一個要寫入的記錄值遍歷操作完畢,遍歷每一個分區,將該分區對應的partitionWriter執行commitAndGet操作,返回該分區的FileSegment對象。
其依賴方法commitAndGet源碼如下:
至此,大多數情況下,reduce的每一個partition的數據有被寫入到一個單獨的文件。明明是FileSegment,為什么是單獨的文件呢?原因就在於DiskBlockManager返回的臨時ShuffleBlockId是不重復的,org.apache.spark.storage.DiskBlockManager#createTempShuffleBlock源碼如下:
又因為創建臨時文件,只是創建臨時文件的句柄,此時對應的物理文件,並不存在,所以,這個方法不能保證創建的臨時文件不重復。所以多個partition數據寫入到一個臨時文件的概率還是有的,只不過是小概率事件。
最后小的分區文件會被合並為一個文件。
首先調用ShuffleBlockResolver(它是IndexShuffleBlockResolver實例)的getDataFile方法獲取數據文件的句柄File對象,org.apache.spark.util.Utils的tempFileWith獲取臨時文件,org.apache.spark.util.Utils#tempFileWith源碼如下,即獲得一個帶uuid后綴的文件:
合並分區文件
最后調用org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter的writePartitionedFile方法將多個小文件合並為一個大文件並返回包含每一個partition
對應的文件段的大小的數組,源碼如下:
更新索引文件
最后更新索引文件,給數據文件重命名后整個寫過程就徹底結束了,源碼不再做過多解釋,在 spark shuffle的寫操作之准備工作 中 IndexShuffleBlockResolver類中有說明。
總結
BypassMergeSortShuffleWriter是基於文件做的分區,沒有sort操作,最后分區數據被寫入一個完整文件,並且有一個索引文件記錄文件中每一個分區對應的FileSegment的大小。這種設計是比較朴素的,也很簡單,易實現。