Spark源碼剖析 - 計算引擎


本章導讀

RDD作為Spark對各種數據計算模型的統一抽象,被用於迭代計算過程以及任務輸出結果的緩存讀寫。在所有MapReduce框架中,shuffle是連接map任務和reduce任務的橋梁。map任務的中間輸出要作為reduce任務的輸入,就必須經過shuffle,shuffle的性能優劣直接決定了整個計算引擎的性能和吞吐量。相比於Hadoop的MapReduce,我們可以看到Spark提供多種計算結果處理的方式,對shuffle過程進行了優化。

本章將繼續以word count為例講解。

1. 迭代計算

MapPartitionsRDD的iterator方法實際是父類RDD的iterator方法,見如下代碼。如果分區任務初次執行,此時還沒有緩存,所以會調用computeOrCheckpoint方法。

這里需要說一下iterator方法的容錯處理過程:如果某個分區任務執行失敗,但是其他分區任務執行成功,可以利用DAG重新調度。失敗的分區任務將從檢查點恢復狀態,而那些執行成功的分區任務由於其執行結果已經緩存到存儲體系,所以調用CacheManager的getOrCompute方法獲取即可,不需要再次執行。

這里我們主要分析computeOrReadCheckpoint方法。computeOrReadCheckpoint在存在檢查點時直接獲取中間結果,否則需要調用compute繼續計算,代碼如下:

MapPartitionsRDD的compute方法實現如下:

MapPartitionsRDD的compute方法首先調用firstParent找到其父RDD,本例中MapPartitionsRDD的父RDD為RDD。

經過RDD管道中對iterator和computeOrReadCheckpoint的層層調用,最終到達HadoopRDD。查看此時的線程棧更直觀,如圖所示:

HadoopRDD的compute方法用來創建NextIterator的匿名內部類,然后將其封裝為InterruptibleIterator,見代碼如下:

 

構造NextIterator的過程如下:

1) 從broadcast中獲取jobConf,此處的jobConf是hadoopConfiguration。

2) 創建InputMetrics用於計算字節讀取的測量信息,然后在RecordReader正式讀取數據之前創建bytesReadCallback。byteReadCallback用於獲取當前線程從文件系統讀取的字節數。

3) 獲取inputFormat,此處的inputFormat是TextInputFormat。

4) 使用addLocalConfiguration給JobConf添加Hadoop任務相關配置。addLocalConfiguration的實現見如下代碼:

5) 創建RecordReader,調用reader.createKey()和reader.createValue()得到的是LongWritable和Text。NextIterator的getNext實際是代理了RecordReader的next方法並且每讀取一些記錄后使用bytesReadCallback更新InputMetrics的bytesRead字段。

6) 將NextIterator封裝為InterruptibleIterator。

InterruptibleIterator只是對NextIterator的代理,見代碼如下:

根據5.5.3節的內容,我們知道整個rdd.iterator調用結束,最后返回InterruptibleIterator對象后,會調用SortShuffleWriter的write方法,其功能如下:

1) 創建ExternalSorter,然后調用insertAll將計算結果寫入緩存。

2) 調用shuffleBlockManager.getDataFile方法獲取當前任務要輸出的文件路徑。

3) 調用shuffleBlockManager.consolidateId創建blockId。

4) 調用ExternalSorter的writePartitionedFile將中間結果持久化。

5) 調用shuffleBlockManager.writeIndexFile方法創建索引文件。

6) 創建MapStatus。

2. 什么是shuffle

shuffle是所有MapReduce計算框架所必須經過的階段,shuffle用於打通map任務的輸出與reduce任務的輸入,map任務的中間輸出結果按照key值哈希后分配給某一個reduce任務,這個過程如圖所示:

在具體分析源碼之前,我們先看看Spark早期版本的shuffle是怎樣的,如圖所示:

這里對圖6-3做一些解釋:

1) map任務會為每一個reduce任務創建一個bucket。假設有M個map任務,R個reduce任務,則map階段一共會創建M x R 個bucket;

2) map任務會將產生的中間結果按照partition寫入不同 的bucket中;

3) reduce任務從本地或者遠端的map任務所在的BlockManager獲取相應的bucket作為輸入。

Spark早期的shuffle過程存在以下問題:

1) map任務的中間結果首先存入內存,然后才寫入磁盤。這對於內存的開銷很大,當一個節點上map任務的輸出結果集很大時,很容易導致內存緊張,進而發生內存溢出(out of memory ,  OOM);

2) 每個map任務都會輸出R(reduce任務數量)個bucket。假如M等於1000,R也等於1000,那么共計生成100萬個bucket,在bucket本身不大,但是shuffle很頻繁的情況下,磁盤I/O將稱為性能瓶頸。

熟悉Hadoop的讀者應該知道,Hadoop MapReduce的shuffle過程存在以下問題:

1) reduce任務獲取到map任務的中間輸出后,會對這些數據在磁盤上進行merge sort,雖然不怎么占用內存,但是卻產生了更多的磁盤I/O;

2) 當數據量很小,但是map任務和reduce任務數目很多時,會產生很多網絡I/O。

 為了解決以上Hadoop MapReduce和早期Spark在shuffle過程中的性能問題,目前Spark的shuffle已經做了多種性能優化,主要解決方法包括:

1) 將map任務給每個partition的reduce任務輸出的bucket合並到同一個文件中,這解決了bucket數量很多,但是本身數據體積不大時,造成shuffle很頻繁,磁盤I/O成為性能瓶頸的問題;

2) map任務逐條輸出計算結果,而不是一次性輸出到內存,並使用AppendOnlyMap緩存及其聚合算法對中間結果進行聚合,這大大減小了中間結果所占的內存大小;

3) 對SizeTrackingAppendOnlyMap和SizeTrackingPairBuffer等緩存進行溢出判斷,當超出myMemoryThreshold的大小時,將數據寫入磁盤,防止內存溢出;

4) reduce任務對拉取到的map任務中間結果逐條讀取,而不是一次性讀入內存,並在內存中進行聚合和排序(其本質上也使用了AppendOnlyMap緩存),這也大大減小了數據占用的內存;

5) reduce任務將要拉取的Block按照BlockManager地址划分,然后將同一BlockMananger地址中的Block累積為少量網絡請求,減少網絡I/O。

3. map端計算結果緩存處理

在詳細介紹map端對中間計算結果的細節之前,先理解兩個概念:

  • bypassMergeThreshold:傳遞到reduce端再做合並(merge)操作的閥值。如果partition的數量小於bypassMergeThreshold的值,則不需要在Executor執行聚合和排序操作,只需要將各個partition直接寫到Executor的存儲文件,最后在reduce端再做串聯。通過配置spark.shuffle.sort.bypassMergeThreshold可以修改bypassMergeThreshold的大小,在分區數量小的時候提升計算引擎的性能。bypassMergeThreshold的默認值是200。
  • bypassMergeSort:標記是否傳遞到reduce端再做合並和排序,即是否直接將各個partition直接寫到Executor的存儲文件。當沒有定義aggregator、ordering函數,並且partition的數量小於等於bypassMergeThreshold時,bypassMergeSort為true。如果bypassMergeSort為true,map中間結果將直接輸出到磁盤,此時不會占用太多內存,避免了內存撐爆問題。

map端計算結果緩存有三種處理方式(見如下代碼):

  • map端對計算結果在緩存中執行聚合和排序。
  • map不使用緩存,也不執行聚合和排序,直接調用spillToPartitionFiles將各個partition直接寫到自己的存儲文件(即bypassMergeSort為true的情況),最后由reduce端對計算結果執行合並和排序。
  • map端對計算結果簡單緩存。

我們先來分析兩種需要緩存的方式。

3.1 map端計算結果緩存聚合

 一個任務的分區數量通常很多,如果只是簡單地將數據存儲到Executor上。在執行reduce任務時會存在大量的網絡I/O操作,這時網絡I/O將成為系統性能的瓶頸,reduce任務讀取map任務的計算結果變慢,導致其他想要分配到被這些map任務占用的節點的任務不得不等待或者降低本地化選擇分配到更遠的節點上。對於更遠節點的I/O本身會更慢,因此還會導致更多的任務得不到分配或者無法高效本地化。經過這樣的惡性循環,整個集群將變得遲鈍,新的任務長時間得不到執行或者執行變慢。

通過在map端對計算結果在緩存中執行聚合和排序,能夠節省I/O操作,進而提升系統性能。這種情況下,必須要定義聚合器(aggregator)函數,以便於對計算結果按照按照partitionID和key聚合后進行排序。

ExternalSorter的insertAll方法中,如果定義了aggregator,則shouldCombine為true。此分支執行過程如下:

1) 由於設置了聚合函數aggregator,則從聚合函數獲取mergeValue(word count例子中為Function2)、createCombiner(word count例子中為PairFunction)等函數。

2) 定義update偏函數,此函數用於操作mergeValue和createCombiner。

3) 迭代之前創建的iterator,每讀取一條Product2[K, V],將每行字符串按照空格划分,並且給每個文本設置1,比如(#,1)、(Apache,1)、(Spark,1)...。

4) 以(分區索引,Produce2[K,V]._1)為參數調用SizeTrackingAppendOnlyMap的changeValue函數,與update函數配合,按照key值疊加value。

5) 調用mapbeSpillCollection方法,來處理SizeTrackingAppendOnlyMap溢出(當SizeTrackingAppendOnlyMap的大小超過myMemoryThreshold時,將集合中的數據寫入磁盤並新建SizeTrackingAppendOnlyMap)。這樣做是為了防止內存溢出,解決了Spark早期版本shuffle的內存撐爆問題。

SizeTrackingAppendOnlyMap的changeValue方法的處理包括三步:

1) 調用父類AppendOnlyMap的changeValue函數,應用緩存聚合算法。

2) 調用繼承特質SizeTracker的afterUpdate函數,增加對AppendOnlyMap大小的采樣。

3) 返回第1)步計算的結果。

1. AppendOnlyMap的緩存聚合算法

SizeTrackingAppendOnlyMap的父類AppendOnlyMap的changeValue函數用於回調update函數進行聚合操作。其實現可以說明,AppendOnlyMap支持null值的緩存,而Java的map默認是不支持的。changeValue方法利用一種使用數據緩存的算法完成聚合。在介紹此算法前先弄清一些定義:

  • LOAD_FACTOR:負載因子,常量值等於0.7。
  • initialCapacity:初始容量值64。
  • capacity:容量,初始時等於initialCapacity。
  • curSize:記錄當前已經放入data的key與聚合值的數量。
  • data:數組,初始大小為2*capacity,data數組的實際大小之所以是capacity的2倍,是因為key和聚合值各占一位。
  • growThreshold:data數組容量增加的閥值,表達式為growThreshold = LOAD_FACTOR * capacity。
  • mask:計算數據存放位置的掩碼值,表達式為capacity - 1。
  • k:要放入data的key。
  • pos:k將要放入data的索引值。索引值等於k的哈希值再次計算哈希值的結果與mask按位&運算的值。表達式為pos = rehash (k.hashCode) & mask。
  • curKey:data(2 * pos)位置的當前key。
  • newValue:key的聚合值。

在掌握以上概念的前提下,給出以下算法描述:

條件1:如果curKey等於null,那么newValue等於1;

條件2:如果curKey不等於null並且不等於k,那么從pos當前位置向后找,直到此位置的索引值與mask按位&運算后的新位置的key符合條件1或者條件3;

條件3:如果curKey不等於null並且等於k,那么newValue等於data(2 * pos + 1) 與k對應的值按照mergeValue定義的函數運算。

2.AppendOnlyMap的容量增長

incrementSize方法用於擴充AppendOnlyMap的容量。當curSize>growThreshold時,調用growTable方法將capacity容量擴大一倍,即newCapacity = capacity * 2。

growTable方法先創建newCapacity的兩倍大小的新數組,將老數組中的元素復制到新數組中,新數組索引位置采用新的mask重新使用rehash(k.hashCode)&mask計算。

經過以上算法的運算,word count例子的數據集合中間計算結果會變為((0, site), 1)、((0, which), 2)、((0, hadoop), 4)的樣子,證明確實發生了聚合。

3.AppendOnlyMap大小采樣

上一節growTable代碼列出了AppendOnlyMap的容量增長實現方法growTable,那是不是意味着AppendOnlyMap的容量可以無限制增長呢?當然不是,我們需要對AppendOnlyMap大小進行限制。很明顯我們可以在AppendOnlyMap的每次更新操作之后計算它的大小,這應該沒有什么問題。Spark為大數據平台需要提供實時計算能力,無論是數據量還是對CPU的開銷都很大,每當發生update或者insert操作就計算一次大小會嚴重影響Spark的性能,因此Spark實際采用了采樣並利用這些采樣對AppendOnlyMap未來的大小進行估算或推測的方式。

SizeTrackingAppendOnlyMap繼承了特質SizeTracker,其afterUpdate,其處理步驟如下:

1) 將AppendOnlyMap所占的內存進行估算並且與當前編號(numUpdates)一起作為樣本數據更新到samples = new mutable.Queue[Sample]中。

2) 如果當前采樣數量大於2,則使samples執行一次出隊操作,保證樣本總數等於2。

3) 計算每次更新增加的大小,公式如下:

4) 計算下次采樣的間隔nextSampleNum。

AppendOnlyMap的大小采樣數據用於推測AppendOnlyMap未來的大小。推測的實現見如下代碼。由於PartitionedPairBuffer也繼承了SizeTracker,所以estimateSize方法不但對AppendOnlyMap也對PartitionedPairBuffer在內存中的容量進行限制以防內存溢出時發揮其作用。

3.2 map端計算結果簡單緩存

 ExternalSorter的insertAll方法中,如果沒有定義aggregator,那么shouldCombine為false,見代碼如下。這時會調用PartitionedPairBuffer的insert方法,從其實現可以知道,它只不過是把計算結果簡單緩存到數組中了。

下面我們來介紹PartitionedPairBuffer的容量增長。

PartitionedPairBuffer的容量增長是通過growArray方法實現的。growArray實現增長data數組容量的方式非常簡單,只是新建2倍大小的新數組,然后簡單復制而已,見代碼如下:

spark使用PartitionedPairBuffer的過程中,也會調用maybeSpillCollection方法,來處理PartitionedPairBuffer溢出(當PartitionedPairBuffer的大小超過myMemoryThreshold時,將集合中的數據寫入磁盤並新建PartitionedPairBuffer)。這樣做是為了防止內存溢出,解決了Spark早期版本shuffle的內存撐爆問題。

3.3 容量限制

既然AppendOnlyMap和PartitionedPairBuffer的容量都可以增長,那么數據量不大的時候不會有問題。但由於大數據處理的數據量往往都很大,全部都放入內存會將系統的內存撐爆。Spark為了防止這個問題的發生,提供了函數maybeSpillCollection,見代碼如下:

1.集合溢出判定

maybeSpillCollection判定集合是否溢出主要由maybeSpill函數來決定,見代碼如下。maybeSpill函數的處理步驟如下:

1) 為當前線程嘗試獲取amountToRequest大小的內存(amountToRequest = 2 * currentMemory - myMemoryThreshold)。

2) 如果獲得的內存依然不足(myMemoryThreshold <= currentMemory),則調用spill,執行溢出操作。內存不足可能是申請到的內存為0或者已經申請得到的內存大小超過了myMemoryThreshold。

3) 溢出后續處理,如elementsRead歸零,已溢出內存字節數(memoryBytesSpilled)增加線程當前內存大小(currentMemory),釋放當前線程占用的內存。

2. 溢出

spill方法的實現,見如下代碼。如果bypassMergeSort為真,則調用spillToPartitiionFiles將內存中的數據溢出到分區文件。如果bypassMergeSort不為真,則調用spillToMergeableFile。

spillToMergeableFile方法的處理步驟如下:

1) 調用createTempShuffleBlock創建臨時文件。

2) 新建ShuffleWriteMetrics用於測量。

3) 調用getDiskWriter方法創建DiskBlockObjectWriter。

4) 調用destructiveSortedIterator方法對集合元素排序。

5) 將集合內容寫入臨時文件。寫入的時機有兩個:

  • 集合遍歷完的時候,執行flush。
  • 遍歷過程中,每當寫入DiskBlockObjectWriter的元素個數(objectsWritten)達到批量序列化尺寸(serializerBatchSize)時,也會執行flush,然后重新創建DiskBlockObjectWriter。

4.map端計算結果持久化

writPartitionedFile用於持久化計算結果。此方法有兩個分支:

  • 溢出到分區文件后合並:將內存中緩存的多個partition的計算結果分別寫入多個臨時Block文件,然后將這些Block文件的內容全部寫入正式的Block輸出文件中。
  • 內存中排序合並:將緩存的中間計算結果按照partition分組后寫入Block輸出文件。此種方式還需要更新此任務與內存、磁盤有關的測量數據。

無論哪種排序方式,每個partition都會最終寫入一個正式的Block文件,所以每個map任務實際上最后只會生成一個磁盤文件,最終解決了Spark早期版本中一個map任務輸出的bucket文件過多和磁盤I/O成為性能瓶頸的問題。此外,無論哪種排序方式,每輸出完一個partition的中間結果時,都會記錄當前partition的長度,此長度將記錄在索引文件中,以便下游任務的讀取。

writePartitionedFile中有關DiskBlockObjectWriter的實現。

4.1 溢出分區文件

spillToPartitionFiles用於將內存中的集合數據按照每個partition創建一個臨時Block文件,為每個臨時Block文件生成一個DiskBlockObjectWriter,並且用DiskBlockObjectWriter將計算結果分別寫入這些臨時Block文件中。createTempShuffleBlock方法創建臨時的Block。getDiskWriter方法創建DiskBlockObjectWriter。

4.2 排序與分區分組

 partitionedIterator通過對集合按照指定的比較器進行排序,並且按照partition id分組,生成迭代器。

1.比較器

下述代碼列出了目前的三種比較器:

  • keyComparator:按照指定的key進行排序;
  • partitionComparator:按照partition id進行比較;
  • partitionKeyComparator:先按照partition id進行比較,再按照指定的key進行第二級排序。當沒有指定排序字段並且沒有指定聚合函數時會退化為partitionComparator。

由於partitionedIterator方法實際是通過調用destructiveSortedIterator和groupByPartition來實現,下面詳細分析這兩個方法。

2.排序

destructiveSortedIterator方法的處理步驟如下:

1) 將data數組向左整理排序。

2) 利用Sorter、KVArraySortDataFormat以及指定的比較器進行排序。這其中用到了TimSort,也就是優化版的歸並排序。

3) 生成新的迭代器。

3.分區分組

groupByPartition主要用於對destructiveSortedIterator生成的迭代器按照partition id分組。

IteratorForPartition如何區分partition呢?見如下代碼。可見其hasNext會判斷數據的partitionId。

4.3 分區索引文件

無論采用哪種緩存處理,在持久化的時候都會被寫入同一文件。那么reduce任務如何從此文件中按照分區讀取數據呢?IndexShuffleBlockManager的writeIndexFile方法生成的分區索引文件。此文件使用偏移量來區分各個分區的計算結果,偏移量來自於合並排序過程中記錄的各個partition的長度。

這里用下圖展示內存中排序、分組后生成分區索引文件的全過程。

5. reduce端讀取中間結算結果

先簡單說下,當map任務相關Stage的任務都執行完畢后,會喚起下游Stage的提交及任務的執行。上游任務的執行結果必然是下游任務的輸入,我們就下游 任務如何讀取上游任務計算結果來展開。

ResultTask的計算是由RDD的iterator方法驅動,這在介紹ShuffleMapTask的時候已經介紹過。其計算過程最終會落實到ShuffleRDD的compute方法。ShuffleRDD的compute方法首先會調用SortShuffleManager的getReader方法創建BlockStoreShuffleReader,然后執行BlockStoreShuffleReader的read方法讀取依賴任務的中間計算結果。

SortShuffleManager的getReader方法的實現如下:

BlockStoreShuffleReader用來讀取上游任務計算結果,它的read方法的處理步驟如下:

1) 從遠端節點或者本地讀取中間計算結果。

2) 對InterruptibleIterator執行聚合。

3) 對InterruptibleIterator排序,由於使用ExternalSorter的insertAll,不再贅述。

從遠端節點或者本地讀取中間計算結果通過調用BlockStoreShuffleFetcher的fetch方法實現,它的處理步驟如下:

1) 獲取map任務執行的狀態信息。

2) 按照中間結果所在節點划分各個Block。

3) 創建ShuffleBlockFetcherIterator(即從遠端節點或者本地讀取中間計算結果)。

4) 將ShuffleBlockFetcherIterator封裝為InterruptibleIterator。

5.1 獲取map任務狀態

Spark通過調用MapOutputTracker的getServerStatues來獲取map任務執行的狀態信息,其處理步驟如下:

1) 從當前BlockManager的MapOutputTracker中獲取MapStatus,若沒有就進入第2)步,否則直接到第4)步。

2) 如果獲取列表(fetching)中已經存在要取的shuffleId,那么就等待其他線程獲取。如果獲取列表中不存在要取的shuffleId,那么就將shuffleId放入獲取列表。

3) 調用askTracker方法向MapOutputTrackerMasterEndpoint發送GetMapOutputStatuses消息獲取map任務的狀態信息。MapOutputTrackerMasterEndpoint接收到GetMapOutputStatuses消息后,將請求的map任務狀態信息序列化發送給請求方。請求方接收到map任務狀態信息后進行反序列化操作,然后放入本地的mapStatuses中。

4) 調用MapOutputTracker的convertMapStatuses方法將獲得MapStatus轉換為map任務所在的地址(即BlockManagerId)和map任務輸出中分配給當前reduce任務的Block大小。

5.2 划分本地與遠程Block

無論從本地還是從MapOutputTrackerMasterEndpoint獲取的狀態信息,都需要按照地址划分並且轉換為BlockId。ShuffleBlockFetcherIterator是讀取中間結果的關鍵。構造ShuffleBlockFetcherIterator的時候會調用到initialize方法,它的初始化過程如下:

1) 使用splitLocalRemoteBlocks方法划分本地讀取和遠程讀取的Block的請求。

2) 將FetchRequest隨機排序后存入fetchRequests:newQueue[FetchRequest]。

3) 遍歷fetchRequests中的所有FetchRequest,遠程請求Block中間結果。

4) 調用fetchLocalBlocks獲取本地Block。

 splitLocalRemoteBlocks方法用於划分哪些Block從本地獲取,哪些需要遠程拉取,是獲取中間計算結果的關鍵。為便於描述,先解釋以下定義:

  • targetRequestSize:每個遠程請求的最大尺寸。
  • totalBlocks:統計Block總數。
  • localBlocks:ArrayBuffer[BlockId],緩存可以在本地獲取的Block的blockId。
  • remoteBlocks:HashSet[BlockId]j,緩存需要遠程獲取的Block的blockId。
  • curBlocks:ArrayBuffer[(BlockId, Long)],遠程獲取的累加緩存,用於保證每個遠程請求的尺寸不超過targetRequestSize。為什么要累加緩存?如果向一個機器節點頻繁地請求字節數很小的Block,那么勢必造成網絡擁塞並增加節點負擔。將多個小數據量的請求合並為一個大的請求將避免這些問題,提高系統性能。
  • curRequestSize:當前累加到curBlocks中的所有Block的大小之和,用於保證每個遠程請求的尺寸不超過targetRequestSize。
  • remoteRequests:new ArrayBuffer[FetchRequest],緩存需要遠程請求的FetchRequest對象。
  • numBlocksToFetch:一共要獲取的Block數量。
  • maxBytesInFlight:單次航班請求的最大字節數。什么叫航班?其實就是一批請求,這批請求的字節總數不能超過maxBytesInFlight,而且每個請求的字節數不能超過maxBytesInFilght的五分之一。可以通過參數spark.reducer.maxMbInFlight來控制大小。為什么每個請求的字節數不能超過maxBytesInFlight的五分之一?這樣做是為了提高請求的並發度,允許5個請求分別從5個節點獲取數據,最大限度利用各節點的資源。

明白了這些定義,我們一起來看看splitLocalRemoteBlocks的處理邏輯吧。

1)  遍歷已經按照BlockManagerId分組的blockInfo,如果blockInfo所在的Executor與當前Executor相同,則將它的BlockId存入localBlocks;否則,將blockInfo的BlockId和size累加到curBlocks,將blockId存入remoteBlocks,curRequestSize增加size的大小,每當curRequestSize >= targetRequestSize,則新建FetchRequest放入remoteRequests,並且為生成下一個FetchRequest做一些准備(如新建curBlocks,curRequestSize置為0)。

2) 遍歷結束,curBlocks中如果仍然有緩存的(BlockId,Long),新建FetchRequest放入remoteRequests。此次請求不受maxBytesInFlight和targetRequestSize的影響。

5.3 獲取遠程Block

sendRequest方法用於遠程請求中間結果。sendRequest利用FetchRequest里封裝的blockId、size、address等信息,調用shuffleClient的fetchBlocks方法獲取其他節點上的中間計算結果。

5.4 獲取本地Block

fetchLocalBlocks用於對本地中間計算結果的獲取。fetchLocalBlocks方法很簡單,利用熟悉的BlockManager的getBlockData方法獲取本地Block,最后將取到的中間結果存入results = new LinkedBlockingQueue[FetchResult]中。

6.reduce端計算

6.1 如何同時處理多個map任務的中間結果

reduce任務的上游map任務可能有多個,根據之前的分析,知道這些中間結果的Block及數據緩存在ShuffleBlockFetcherIterator的results:new LinkedBlockingQueue[FetchResult]中。ShuffleBlockFetcherIterator作為迭代器,它的實現見如下代碼。從其實現可知,每次迭代ShuffleBlockFetcherIterator,會先從results:new LinkedBlockingQueue[FetchResult]中取出一個FetchResult,並構造此FetchResult的迭代器。

6.2 reduce端再緩存中對中間計算結果執行聚合和排序

reduce端獲取map端任務計算中間結果后,將ShuffleBlockFetcherIterator封裝為InterruptibleIterator並聚合。聚合操作主要依賴aggregator的combineCombinersByKey方法,見如下代碼。如果isSpillEnabled為false,會再次使用AppendOnlyMap的changeValue方法。isSpillEnabled默認是true,此時會使用ExternalAppendOnlyMap完成聚合。

ExternalAppendOnlyMap的insert方法的實際工作是由insertAll完成的,見如下代碼。從代碼實現可以看到其實質也是使用SizeTrackingAppendOnlyMap。

經過以上處理,數據結果為類似(##, 8), (N, 1), (set, 2), (use, 3), (Hadoop-supported, 1)的樣子。

7.map端與reduce端組合分析

這一節主要對計算引擎部分的內容進行串聯,用圖來展示最常見的幾種組合,以便大家對計算引擎有個宏觀的認識。

7.1 在map端溢出分區文件,在reduce端合並組合

bypassMergeSort標記是否傳遞到reduce端再做合並和排序,此種情況不使用緩存,而是先將數據按照partition寫入不同文件,最后按partition順序合並寫入同一文件。當沒有指定聚合、排序函數,且partition數量較小時,一般采用這種方式。此種方式將多個bucket合並到同一個文件,通過減少map輸出的文件數量,節省了磁盤I/O,最終提升了性能,見如下圖:

7.2 在map端簡單緩存、排序分組,在reduce端合並組合

此種情況在緩存中利用指定的排序函數對數據按照partition或者key進行排序,最后按partition順序合並寫入同一文件。當沒有指定聚合函數,且partition數量大時,一般采用這種方式,見如下圖。此種方式將多個bucket合並到同一個文件,通過減少map輸出的文件數量,節省了磁盤I/O,提升了性能;對SizeTrackingPairBuffer的緩存進行溢出判斷,當超過myMemoryThreshold的大小時,將數據寫入磁盤,防止內存溢出。

7.3在map端緩存中聚合、排序分組,在reduce端組合

 此種情況在緩存中對數據按照key聚合,並且利用指定的排序函數對數據按照partition或者key進行排序,最后按partition順序合並寫入同一文件。當指定了聚合函數時,一般采用這種方式,見如下圖。此種方式將多個bucket合並到同一個文件,通過減少map輸出的文件數量,節省了磁盤I/O,提升了性能;對中間輸出數據不是一次性讀取,而是逐條放入AppendOnlyMap的緩存,並對數據進行聚合,減少了中間結果占用的內存大小;對AppendOnlyMap的緩存進行溢出判斷,當超出myMemoryThreshold的大小時,將數據寫入磁盤,防止內存溢出。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM