提出問題
1. shuffle過程的數據是如何傳輸過來的,是按文件來傳輸,還是只傳輸該reduce對應在文件中的那部分數據?
2. shuffle讀過程是否有溢出操作?是如何處理的?
3. shuffle讀過程是否可以排序、聚合?是如何做的?
。。。。。。
概述
在 spark shuffle的寫操作之准備工作 中的 ResultTask 和 ShuffleMapTask 看到了,rdd讀取數據是調用了其 iterator 方法。
計算或者讀取RDD
org.apache.spark.rdd.RDD#iterator源碼如下,它是一個final方法,只在此有實現,子類不允許重實現這個方法:
思路:如果是已經緩存下來了,則調用 org.apache.spark.rdd.RDD#getOrCompute 方法,通過底層的存儲系統或者重新計算來獲取父RDD的map數據。否則調用 org.apache.spark.rdd.RDD#computeOrReadCheckpoint ,從checkpoint中讀取或者是通過計算來來獲取父RDD的map數據。
我們逐一來看其依賴方法:
org.apache.spark.rdd.RDD#getOrCompute 源碼如下:
首先先通過Spark底層的存儲系統獲取 block。如果底層存儲沒有則調用 org.apache.spark.rdd.RDD#computeOrReadCheckpoint,其源碼如下:
主要通過三種途徑獲取數據 -- 通過spark 底層的存儲系統、通過父RDD的checkpoint、直接計算。
處理返回的數據
讀取完畢之后,數據的處理基本上一樣,都使用 org.apache.spark.InterruptibleIterator 以迭代器的形式返回,org.apache.spark.InterruptibleIterator 源碼如下:
比較簡單,使用委托模式,將迭代下一個行為委托給受委托類。
下面我們逐一來看三種獲取數據的實現細節。
通過spark 底層的存儲系統
其核心源碼如下:
思路:首先先從本地或者是遠程executor中的存儲系統中獲取到block,如果是block存在,則直接返回,如果不存在,則調用 computeOrReadCheckpoint計算或者通過讀取父RDD的checkpoint來獲取RDD的分區信息,並且將根據其持久化級別(即StorageLevel)將數據做持久化。 關於持久化的內容 可以參考 Spark 源碼分析系列 中的 Spark存儲部分 做深入了解。
通過父RDD的checkpoint
其核心源碼如下:
通過父RDD的checkpoint也是需要通過spark底層存儲系統或者是直接計算來得出數據的。
不做過多的說明。
下面我們直接進入主題,看shuffle的讀操作是如何進行的。
直接計算
其核心方法如下:
首先,org.apache.spark.rdd.RDD#compute是一個抽象方法。
我們來看shuffle過程reduce的讀map數據的實現。
表示shuffle結果的是 org.apache.spark.rdd.ShuffledRDD。
其compute 方法如下:
整體思路:首先從 shuffleManager中獲取一個 ShuffleReader 對象,並調用該reader對象的read方法將數據讀取出來,最后將讀取結果強轉為Iterator[(K,C)]
該shuffleManager指的是org.apache.spark.shuffle.sort.SortShuffleManager。
其 getReader 源碼如下:
簡單來說明一下參數:
handle:是一個ShuffleHandle的實例,它有三個子類,可以參照 spark shuffle的寫操作之准備工作 做深入了解。
startPartition:表示開始partition的index
endPartition:表示結束的partition的index
context:表示Task執行的上下文對象
其返回的是一個 org.apache.spark.shuffle.BlockStoreShuffleReader 對象,下面直接來看這個對象。
BlockStoreShuffleReader
這個類的繼承關系如下:
其中ShuffleReader的說明如下:
Obtained inside a reduce task to read combined records from the mappers.
ShuffleReader只有一個read方法,其子類BlockStoreShuffleReader也比較簡單,也只有一個實現了的read方法。
下面我們直接來看這個方法的源碼。
在上圖,把整個流程划分為5個步驟 -- 獲取block輸入流、反序列化輸入流、添加監控、數據聚合、數據排序。
下面我們分別來看這5個步驟。這5個流程中輸入流和迭代器都沒有把大數據量的數據一次性全部加載到內存中。並且即使在數據聚合和數據排序階段也沒有,但是會有數據溢出的操作。我們下面具體來看每一步的具體流程是如何進行的。
獲取block輸入流
其核心源碼如下:
我們先來對 ShuffleBlockFetcherIterator 做進一步了解。
使用ShuffleBlockFetcherIterator獲取輸入流
這個類就是用來獲取block的輸入流的。
blockId等相關信息傳入構造方法
其構造方法如下:
它繼承了Iterator trait,是一個 [(BlockId,InputStream)] 的迭代器。
對構造方法參數做進一步說明:
context:TaskContext,是作業執行的上下文對象
shuffleClieent:默認為 NettyBlockTransferService,如果使用外部shuffle系統則使用 ExternalShuffleClient
blockManager:底層存儲系統的核心類
blocksByAddress:需要的block的blockManager的信息以及block的信息。
通過 org.apache.spark.MapOutputTracker#getMapSizesByExecutorId 獲取,其源碼如下:
org.apache.spark.MapOutputTrackerWorker#getStatuses 其源碼如下:
思路:如果有shuffleId對應的MapStatus則返回,否則使用 MapOutputTrackerMasterEndpointRef 請求 driver端的 MapOutputTrackerMaster 返回 對應的MapStatus信息。
org.apache.spark.MapOutputTracker#convertMapStatuses 源碼如下:
思路:將MapStatus轉換為一個可以迭代查看BlockManagerId、BlockId以及對應大小的迭代器。
streamWrapper:輸入流的解密以及解壓縮操作的包裝器,其依賴方法 org.apache.spark.serializer.SerializerManager#wrapStream 源碼如下:
這部分在 spark 源碼分析之十三 -- SerializerManager剖析 部分有相關剖析,不再說明。
maxBytesInFlight: max size (in bytes) of remote blocks to fetch at any given point.
maxReqsInFlight: max number of remote requests to fetch blocks at any given point.
maxBlocksInFlightPerAddress: max number of shuffle blocks being fetched at any given point
maxReqSizeShuffleToMem: max size (in bytes) of a request that can be shuffled to memory.
detectCorrupt: whether to detect any corruption in fetched blocks.
讀取數據
在迭代方法next中不斷去讀取遠程的block以及本地的block輸入流。不做詳細剖析,見 ShuffleBlockFetcherIterator.scala 中next 相關方法的剖析。
反序列化輸入流
核心方法如下:
其依賴方法 scala.collection.Iterator#flatMap 源碼如下:
可見,即使是在這里,數據並沒有全部落到內存中。流跟管道的概念很類似,數據並沒有一次性加載到內存中。它只不過是在使用迭代器的不斷銜接,最終形成了新的處理鏈。在這個鏈中的每一個環節,數據都是懶加載式的被加載到內存中,這在處理大數據量的時候是一個很好的技巧。當然也是責任鏈的一種具體實現方式。
添加監控
其實這一步跟上一步本質上區別並不大,都是在責任鏈上添加了一個新的環節,其核心源碼如下:
其中,核心方法 scala.collection.Iterator#map 源碼如下:
又是一個新的迭代器處理環節被加到責任鏈中。
數據聚合
數據聚合其實也很簡單。
其核心源碼如下:
在聚合的過程中涉及到了數據的溢出操作,如果有溢出操作還涉及 ExternalSorter的溢出合並操作。
其核心源碼不做進一步解釋,有興趣可以看 spark shuffle寫操作三部曲之SortShuffleWriter 做進一步了解。
數據排序
數據排序其實也很簡單。如果使用了排序,則使用ExternalSorter則在分區內部進行排序。
其核心源碼如下:
其內部使用了ExternalSorter進行排序,其中也涉及到了溢出操作的處理。有興趣可以看 spark shuffle寫操作三部曲之SortShuffleWriter 做進一步了解。
總結
主要從實現細節和設計思路上來說。
實現細節
首先在實現細節上,先使用ShuffleBlockFetcherIterator獲取本地或遠程節點上的block並轉化為流,最終返回一小部分數據的迭代器,隨后序列化、解壓縮、解密流操作被放在一個迭代器中該迭代器后執行,然后添加了監控相關的迭代器、數據聚合相關的迭代器、數據排序相關的迭代器等等。這些迭代器保證了處理大量數據的高效性,在數據聚合和排序階段,大數據量被不斷溢出到磁盤中,數據最終還是以迭代器形式返回,確保了內存不會被大數據量占用,提高了數據的吞吐量和處理數據的高效性。
設計思路
在設計上,主要說三點:
- 責任鏈和迭代器的混合使用,即使得程序易擴展,處理環節可插拔,處理流程清晰易懂。
- 關於聚合和排序的使用,在前面文章中shuffle寫操作也提到了,聚合和排序的類是獨立出來的,跟shuffle的處理耦合性很低,這使得在shuffle的讀和寫階段的數據內存排序聚合溢出操作的處理類可以重復使用。
- shuffle數據的設計也很巧妙,shuffle的數據是按reduceId分區的,分區信息被保存在索引文件中,這使得每一個reduce task只需要取得一個文件中屬於它分區的那部分shuffle數據就可以了,極大地減少無用了數據量的網絡傳輸,提高了shuffle的效率。還值得說的是,shuffle數據的格式是一個約定,不管map階段的數據是如何被處理,最終數據形式肯定是約定好的,這使得map和reduce階段的處理類之間的耦合性大大地降低。
至此,spark 的shuffle階段的細節就徹底剖析完畢。
最后,明天周末,玩得開心~