Spark源碼分析之Sort-Based Shuffle讀寫流程


一 、概述

我們知道Spark Shuffle機制總共有三種:

1.未優化的Hash Shuffle:每一個ShuffleMapTask都會為每一個ReducerTask創建一個單獨的文件,總的文件數是S * R,不僅文件數量很多,造成頻繁的磁盤和網絡I/O,而且內存負擔也很大,GC頻繁,經常出現OOM。

2.優化后Hash Shuffle:改進后的Shuffle,啟用consolidation機制,Executor每一個core上的ShuffleMapTask共享文件,減少文件數目,比如Executor有2個core,總共有20個ShuffleMapTask,ReducerTask任務為4個,那么這里總共只有2 * 4 = 8個文件,和未優化之前相比較20 * 4 = 80個文件比較,改進較大。但是如果數據很大的情況下,優化后的Hash Shuffle依然會存在各種問題。比如數據量很大的時候groupByKey操作,必須保證每一個partition的數據內存可以存放。

3.Sort-Based Shuffle: 為了緩解Shuffle過程產生文件數過多和Writer緩存開銷過大的問題,spark引入了類似於hadoop Map-Reduce的shuffle機制。該機制每一個ShuffleMapTask不會為后續的任務創建單獨的文件,而是會將所有的Task結果寫入同一個文件,並且對應生成一個索引文件。以前的數據是放在內存緩存中,等到數據完了再刷到磁盤,現在為了減少內存的使用,在內存不夠用的時候,可以將輸出溢寫到磁盤,結束的時候,再將這些不同的文件聯合內存的數據一起進行歸並,從而減少內存的使用量。一方面文件數量顯著減少,另一方面減少Writer緩存所占用的內存大小,而且同時避免GC的風險和頻率。

二、Sort-BasedShuffle寫機制

2.1 ShuffleMapTask獲取ShuffleManager

Spark1.6之后,取消hash機制的shuffle, 只剩下基於sort的shuffle機制。我們可以在配置文件指定spark.shuffle.manager,如果沒有指定默認就是sort,但是tungsten-sort也是基於SortShuffleManager的

valshortShuffleMgrNames = Map( "sort"-> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort"->classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName= conf.get("spark.shuffle.manager","sort") val shuffleMgrClass= shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase,shuffleMgrName) val shuffleManager= instantiateClass[ShuffleManager](shuffleMgrClass)

2.2 根據ShuffleManager獲取writer

ShuffleManager會根據注冊的handle來決定實例化哪一個writer.如果注冊的是SerializedShuffleHandle,就獲取UnsafeShuffleWriter;如果注冊的是BypassMergeSortShuffleHandle,就獲取BypassMergeSortShuffleWriter;如果注冊的是BaseShuffleHandle,就獲取SortShuffleWriter

首先:我們看一下ShuffleManager如何注冊ShuffleHandle的?

override def registerShuffle[K, V, C](shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { // 如果滿足使用BypassMergeSort,就優先使用BypassMergeSortShuffleHandle
  if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) { new BypassMergeSortShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { // 如果支持序列化模式,則使用SerializedShuffleHandle
    new SerializedShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else { // 否則使用BaseShuffleHandle
    new BaseShuffleHandle(shuffleId, numMaps, dependency) } }

然后:看一下shouldBypassMergeSort這個方法,判斷是否應該使用BypassMergeSort

使用這個模式需要滿足的條件:

# 不能指定aggregator,即不能聚合

# 不能指定ordering,即分區內數據不能排序

# 分區的數目 < spark.shuffle.sort.bypassMergeThrshold指定的閥值

def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { // We cannot bypass sorting if we need to do map-side aggregation.
  if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") false } else { val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) dep.partitioner.numPartitions <= bypassMergeThreshold } }

最后:我們分析一下canUseSerializedShuffle函數,來確定是否使用Tungsten-Sort支持的序列化模式SerializedShuffleHandle

滿足條件:

# shuffle依賴不帶有聚合操作

# 支持序列化值的重新定位

# 分區數量少於16777216個

 def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { val shufId = dependency.shuffleId // 獲取分區數
    val numPartitions = dependency.partitioner.numPartitions // 如果不支持序列化值的重新定位
    if (!dependency.serializer.supportsRelocationOfSerializedObjects) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " + s"${dependency.serializer.getClass.getName}, does not support object relocation") false } // 如果定義聚合器
    else if (dependency.aggregator.isDefined) { log.debug( s"Can't use serialized shuffle for shuffle $shufId because an aggregator is defined") false } // 如果分區數量大於16777216個
    else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " + s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions") false } else { log.debug(s"Can use serialized shuffle for shuffle $shufId") true } } } override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] = { numMapsForShuffle.putIfAbsent( handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) val env = SparkEnv.get handle match { // 如果使用SerializedShuffleHandle則獲取UnsafeShuffleWriter
    case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
      new UnsafeShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf) // 如果使用BypassMergeSortShuffleHandle則獲取BypassMergeSortShuffleWriter
    case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
      new BypassMergeSortShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], bypassMergeSortHandle, mapId, context, env.conf) // 如果使用BaseShuffleHandle則獲取SortShuffleWriter
    case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
      new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) } }
2.3 BypassMergeSortShuffleWriter的寫機制分析

BypassMergeSortShuffleWriter:實現帶Hash風格的基於Sort的Shuffle機制。在Reducer端任務數比較少的情況下,基於Hash的Shuffle實現機制明顯比Sort的Shuffle實現快。所以基於Sort的Shuffle實現機制提供一個方案,當Reducer任務數少於配置的屬性spark.shuffle.sort.bypassMergeThreshold設置的個數的時候,則使用此種方案。

特點:

# 主要用於處理不需要排序和聚合的Shuffle操作,所以數據是直接寫入文件,數據量較大的時候,網絡I/O和內存負擔較重

# 主要適合處理Reducer任務數量比較少的情況下

# 將每一個分區寫入一個單獨的文件,最后將這些文件合並,減少文件數量;但是這種方式需要並發打開多個文件,對內存消耗比較大
public void write(Iterator<Product2<K,V>> records)throws IOException{ assert (partitionWriters== null); if (!records.hasNext()) { partitionLengths= new long[numPartitions]; shuffleBlockResolver.writeIndexFileAndCommit(shuffleId,mapId, partitionLengths,null); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(),partitionLengths); return; } final SerializerInstanceserInstance = serializer.newInstance(); final long openStartTime= System.nanoTime(); // 構建一個對於task結果進行分區的數量的writer數組,即每一個分區對應着一個writer // 這種寫入方式,會同時打開numPartition個writer,所以分區數不宜設置過大 // 避免帶來過重的內存開銷。現在默認writer的緩存大小是32k,比起以前100k小太多了
  partitionWriters= new DiskBlockObjectWriter[numPartitions]; // 構建一個對於task結果進行分區的數量的FileSegment數組,寄一個分區的writer對應着一組FileSegment
  partitionWriterSegments= new FileSegment[numPartitions]; for (int i = 0; i < numPartitions; i++) { // 創建臨時的shuffle block,返回一個(shuffle blockid,file)的元組
    final Tuple2<TempShuffleBlockId,File> tempShuffleBlockIdPlusFile= blockManager.diskBlockManager().createTempShuffleBlock(); // 獲取該分區對應的文件
    final Filefile = tempShuffleBlockIdPlusFile._2(); // 獲取該分區對應的blockId
    final BlockIdblockId = tempShuffleBlockIdPlusFile._1(); // 構造每一個分區的writer
    partitionWriters[i] = blockManager.getDiskWriter(blockId,file, serInstance,fileBufferSize, writeMetrics); } writeMetrics.incWriteTime(System.nanoTime() -openStartTime); // 如果有數據,獲取數據,對key進行分區,然后將<key,value>寫入該分區對應的文件
  while (records.hasNext()) { final Product2<K,V> record =records.next(); final K key = record._1(); partitionWriters[partitioner.getPartition(key)].write(key,record._2()); } // 遍歷所有分區的writer列表,刷新數據到文件,構建FileSegment數組
  for (inti = 0; i < numPartitions; i++) { final DiskBlockObjectWriterwriter = partitionWriters[i]; // 把數據刷到磁盤,構建一個FileSegment
    partitionWriterSegments[i] =writer.commitAndGet(); writer.close(); } // 根據shuffleId和mapId,構建ShuffleDataBlockId,創建文件,文件格式為: //shuffle_{shuffleId}_{mapId}_{reduceId}.data
  File output= shuffleBlockResolver.getDataFile(shuffleId,mapId); // 創建臨時文件
  File tmp= Utils.tempFileWith(output); try { // 合並前面的生成的各個中間臨時文件,並獲取分區對應的數據大小,然后就可以計算偏移量
    partitionLengths= writePartitionedFile(tmp); // 創建索引文件,將每一個分區的起始位置、結束位置和偏移量寫入索引, // 且將合並的data文件臨時文件重命名,索引文件的臨時文件重命名
 shuffleBlockResolver.writeIndexFileAndCommit(shuffleId,mapId, partitionLengths,tmp); } finally { if (tmp.exists() && !tmp.delete()) { logger.error("Error while deleting tempfile {}",tmp.getAbsolutePath()); } } // 封裝並返回任何結果
  mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(),partitionLengths); } private long[]writePartitionedFile(FileoutputFile) throws IOException { // 構建一個分區數量的數組
  final long[] lengths = new long[numPartitions]; if (partitionWriters== null) { // We werepassed an empty iterator
    return lengths; } // 創建合並文件的臨時文件輸出流
  final FileOutputStreamout = new FileOutputStream(outputFile,true); final long writeStartTime= System.nanoTime(); boolean threwException= true; try { // 進行分區文件的合並,返回每一個分區文件長度
    for (inti = 0; i < numPartitions; i++) { // 獲取該分區對應的FileSegment對應的文件
      final Filefile = partitionWriterSegments[i].file(); // 如果文件存在
      if (file.exists()) { final FileInputStreamin = new FileInputStream(file); boolean copyThrewException= true; try { // 把該文件拷貝到合並文件的臨時文件,並返回文件長度
          lengths[i] =Utils.copyStream(in,out, false,transferToEnabled); copyThrewException= false; } finally { Closeables.close(in,copyThrewException); } if (!file.delete()) { logger.error("Unable to delete file forpartition {}",i); } } } threwException= false; } finally { Closeables.close(out,threwException); writeMetrics.incWriteTime(System.nanoTime() -writeStartTime); } partitionWriters= null; return lengths; } defwriteIndexFileAndCommit( shuffleId: Int, mapId: Int, lengths: Array[Long], dataTmp: File): Unit = { // 獲取索引文件
  val indexFile= getIndexFile(shuffleId,mapId) // 生成臨時的索引文件
  val indexTmp= Utils.tempFileWith(indexFile) try { val out = new DataOutputStream(newBufferedOutputStream(newFileOutputStream(indexTmp))) Utils.tryWithSafeFinally{ // 將offset寫入索引文件寫入臨時的索引文件
      var offset= 0L out.writeLong(offset) for (length <- lengths) { offset += length out.writeLong(offset) } } { out.close() } // 獲取數據文件
    val dataFile= getDataFile(shuffleId,mapId) // There isonly one IndexShuffleBlockResolver per executor, this synchronization make sure // the following check and rename areatomic.
    synchronized { // 傳遞進去的索引、數據文件以及每一個分區的文件的長度
      val existingLengths= checkIndexAndDataFile(indexFile,dataFile, lengths.length) if (existingLengths!= null) { // Anotherattempt for the same task has already written our map outputs successfully, // so just use the existingpartition lengths and delete our temporary map outputs.
        System.arraycopy(existingLengths,0, lengths,0, lengths.length) if (dataTmp!= null && dataTmp.exists()) { dataTmp.delete() } indexTmp.delete() } else { // This is thefirst successful attempt in writing the map outputs for this task, // so override any existing indexand data files with the ones we wrote.
        if (indexFile.exists()) { indexFile.delete() } if (dataFile.exists()) { dataFile.delete() } if (!indexTmp.renameTo(indexFile)) { throw new IOException("fail to rename file"+ indexTmp + " to " + indexFile) } if (dataTmp!= null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { throw new IOException("fail to rename file"+ dataTmp + " to " + dataFile) } } } } finally { if (indexTmp.exists() && !indexTmp.delete()) { logError(s"Failed to delete temporary index file at${indexTmp.getAbsolutePath}") } } }

總結:

基於BypassMergeSortShuffleWriter的機制:

# 首先確定ShuffleMapTask的結果應該分為幾個分區,並且為每一個分區創建一個DiskBlockObjectWriter和臨時文件

# 將每一個ShuffleMapTask的結果通過Partitioner進行分區,寫入對應分區的臨時文件

# 將分區刷到磁盤文件, 並且創建每一個分區文件對應的FileSegment數組

# 根據shuffleId和mapId,構建ShuffleDataBlockId,創建合並文件data和合並文件的臨時文件,文件格式為:

shuffle_{shuffleId}_{mapId}_{reduceId}.data

# 將每一個分區對應的文件的數據合並到合並文件的臨時文件,並且返回一個每一個分區對應的文件長度的數組

# 創建索引文件index和索引臨時文件,每一個分區的長度和offset寫入索引文件等;並且重命名臨時data文件和臨時index文件

# 將一些信息封裝到MapStatus返回

2.4 SortShuffleWriter的寫機制分析

SortShuffleWriter它主要是判斷在Map端是否需要本地進行combine操作。如果需要聚合,則使用PartitionedAppendOnlyMap;如果不進行combine操作,則使用PartitionedPairBuffer添加數據存放於內存中。然后無論哪一種情況都需要判斷內存是否足夠,如果內存不夠而且又申請不到內存,則需要進行本地磁盤溢寫操作,把相關的數據寫入溢寫到臨時文件。最后把內存里的數據和磁盤溢寫的臨時文件的數據進行合並,如果需要則進行一次歸並排序,如果沒有發生溢寫則是不需要歸並排序,因為都在內存里。最后生成合並后的data文件和index文件。

2.4.1 遍歷數據,將task的輸出寫入文件

# 創建外部排序器ExternalSorter, 只是根據是否需要本地combine與否從而決定是否傳入aggregator和keyOrdering參數

# 將寫入數據全部放入外部排序器ExternalSorter,並且根據是否需要spill進行spill操作

# 創建data文件和臨時的data文件,文件格式為'shuffle_{shuffleId}_{mapId}_{reducerId}.data' 先將數據寫入臨時data文件

# 創建index索引文件和臨時index文件,寫入每一個分區的offset以及length信息等,並且重命名data臨時文件和index臨時文件

# 把部分信息封裝到MapStatus返回

override def write(records: Iterator[Product2[K, V]]): Unit = { // 是否map端需要在本地進行combine操作,如果需要,則需要傳入aggregator和keyOrdering,創建ExternalSorter // aggregator用於指示進行combiner的操作( keyOrdering用於傳遞key的排序規則);
  sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // 如果不需要在本地進行combine操作, 就不需要aggregator和keyOrdering // 那么本地每個分區的數據不會做聚合和排序
    new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } // 將寫入數據全部放入外部排序器ExternalSorter,並且根據是否需要spill進行spill操作
 sorter.insertAll(records) // 創建data文件,文件格式為'shuffle_{shuffleId}_{mapId}_{reducerId}.data'
  val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) // 為data文件創建臨時的文件
  val tmp = Utils.tempFileWith(output) try { // 創建Shuffle Block Id:shuffle_{shuffleId}_{mapId}_{reducerId}
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, tmp) // 創建index索引文件,寫入每一個分區的offset以及length信息等,並且重命名data臨時文件和index臨時文件
 shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) // 把部分信息封裝到MapStatus返回
    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") } } }

2.4.2 將寫入數據全部放入外部排序器ExternalSorter,並且根據是否需要spill進行spill操作

# 判斷aggregator是否為空,如果不為空,表示需要在本地combine

# 如果需要本地combine,則使用PartitionedAppendOnlyMap,先在內存進行聚合,如果需要一些磁盤,則開始溢寫磁盤

# 如果不進行combine操作,則使用PartitionedPairBuffer添加數據存放於內存中,如果需要一些磁盤,則開始溢寫磁盤

def insertAll(records: Iterator[Product2[K, V]]): Unit = { // 判斷aggregator是否為空,如果不為空,表示需要在本地combine
  val shouldCombine = aggregator.isDefined // 如果需要本地combine
  if (shouldCombine) { // 使用AppendOnlyMap優先在內存中進行combine // 獲取aggregator的merge函數,用於merge新的值到聚合記錄
    val mergeValue = aggregator.get.mergeValue // 獲取aggregator的createCombiner函數,用於創建聚合的初始值
    val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null
    // 創建update函數,如果有值進行mergeValue,如果沒有則createCombiner
    val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { // 處理一個元素,就更新一次結果
 addElementsRead() // 取出一個(key,value)
      kv = records.next() // 對key計算分區,然后開始進行merge
 map.changeValue((getPartition(kv._1), kv._1), update) // 如果需要溢寫內存數據到磁盤
      maybeSpillCollection(usingMap = true) } } else { // 不需要進行本地combine
    while (records.hasNext) { // 處理一個元素,就更新一次結果
 addElementsRead() // 取出一個(key,value)
      val kv = records.next() // 往PartitionedPairBuffer添加數據
 buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) // 如果需要溢寫內存數據到磁盤
      maybeSpillCollection(usingMap = false) } } }

2.4.3 根據是否需要本地combine,從而決定初始化哪一個數據結構

private def maybeSpillCollection(usingMap: Boolean): Unit = { var estimatedSize = 0L
  // 如果使用PartitionedAppendOnlyMap存放數據,主要方便進行聚合
  if (usingMap) { // 首先估計一下該map的大小
    estimatedSize = map.estimateSize() // 然后會根據預估的map大小決定是否需要進行spill
    if (maybeSpill(map, estimatedSize)) { map = new PartitionedAppendOnlyMap[K, C] } } else {//否則使用PartitionedPairBuffer,以用於本地不需要進行聚合的情況 // 首先估計一下該map的大小
    estimatedSize = buffer.estimateSize() // 然后會根據預估的map大小決定是否需要進行spill
    if (maybeSpill(buffer, estimatedSize)) { buffer = new PartitionedPairBuffer[K, C] } } if (estimatedSize > _peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize } }

2.4.4 判斷是否需要溢寫磁盤,如果需要則開始溢寫

# 如果已經讀取的數據是32的倍數且預計的當前需要的內存大於閥值的時候,准備申請內存

# 申請不成功或者申請完畢之后還是當前需要的內存還是不夠,則表示需要進行spill

# 如果需要spill,則調用spill方法開始溢寫磁盤,溢寫完畢之后釋放內存

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false
  // 如果讀取的數據是32的倍數,而且當前內存大於內存閥值,默認是5M // 會先嘗試向MemoryManager申請(2 * currentMemory - myMemoryThreshold)大小的內存 // 如果能夠申請到,則不進行Spill操作,而是繼續向Buffer中存儲數據, // 否則就會調用spill()方法將Buffer中數據輸出到磁盤文件
  if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // 向MemoryManager申請內存的大小
    val amountToRequest = 2 * currentMemory - myMemoryThreshold // 分配內存,並更新已經使用的內存
    val granted = acquireMemory(amountToRequest) // 更新現在內存閥值
    myMemoryThreshold += granted // 再次判斷當前內存是否大於閥值,如果還是大於閥值則繼續spill
    shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // 如果需要進行spill,則開始進行spill操作
  if (shouldSpill) { _spillCount += 1 logSpillage(currentMemory) // 開始spill
 spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory // 釋放內存
 releaseMemory() } shouldSpill }

2.4.5 溢寫磁盤

# 返回一個根據指定的比較器排序的迭代器

# 溢寫內存里的數據到磁盤一個臨時文件

# 更新溢寫的臨時磁盤文件

override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { // 返回一個根據指定的比較器排序的迭代器
  val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator) // 溢寫內存里的數據到磁盤一個臨時文件
  val spillFile = spillMemoryIteratorToDisk(inMemoryIterator) // 更新溢寫的臨時磁盤文件
  spills += spillFile } 

2.4.6 溢寫內存里的數據到磁盤一個臨時文件

# 創建臨時的blockId和文件

# 針對臨時文件創建DiskBlockObjectWriter

# 循環讀取內存里的數據

# 內存里的數據數據寫入文件

# 將數據刷到磁盤

# 創建SpilledFile然后返回

private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator) : SpilledFile = { // 因為這些文件在shuffle期間可能被讀取,他們壓縮應該被spark.shuffle.spill.compress控制而不是 // spark.shuffle.compress,所以我們需要創建臨時的shuffle block
  val (blockId, file) = diskBlockManager.createTempShuffleBlock() // These variables are reset after each flush
  var objectsWritten: Long = 0 val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics // 創建針對臨時文件的writer
  val writer: DiskBlockObjectWriter = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) // 批量寫入磁盤的列表
  val batchSizes = new ArrayBuffer[Long] // 每一個分區有多少數據
  val elementsPerPartition = new Array[Long](numPartitions) // 刷新數據到磁盤
  def flush(): Unit = { // 每一個分區對應文件刷新到磁盤,並返回對應的FileSegment
    val segment = writer.commitAndGet() // 獲取該FileSegment對應的文件的長度,並且更新batchSizes
    batchSizes += segment.length _diskBytesSpilled += segment.length objectsWritten = 0 } var success = false
  try { // 循環讀取內存里的數據
    while (inMemoryIterator.hasNext) { // 獲取partitionId
      val partitionId = inMemoryIterator.nextPartition() require(partitionId >= 0 && partitionId < numPartitions, s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})") // 內存里的數據數據寫入文件
 inMemoryIterator.writeNext(writer) elementsPerPartition(partitionId) += 1 objectsWritten += 1
      // 將數據刷到磁盤
      if (objectsWritten == serializerBatchSize) { flush() } } // 遍歷完了之后,刷新到磁盤
    if (objectsWritten > 0) { flush() } else { writer.revertPartialWritesAndClose() } success = true } finally { if (success) { writer.close() } else { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further
 writer.revertPartialWritesAndClose() if (file.exists()) { if (!file.delete()) { logWarning(s"Error deleting ${file}") } } } } // 創建SpilledFile然后返回
 SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition) }

2.4.7 對結果排序,合並文件

# 溢寫文件為空,則內存足夠,不需要溢寫結果到磁盤, 返回一個對結果排序的迭代器, 遍歷數據寫入data臨時文件;再將數據刷到磁盤文件,返回FileSegment對象;構造一個分區文件長度的數組

# 溢寫文件不為空,則需要將溢寫的文件和內存數據合並,合並之后則需要進行歸並排序(merge-sort);數據寫入data臨時文件,再將數據刷到磁盤文件,返回FileSegment對象;構造一個分區文件長度的數組

# 返回分區文件長度的數組

def writePartitionedFile(blockId: BlockId, outputFile: File): Array[Long] = { // Track location of each range in the output file // 臨時的data文件跟蹤每一個分區的位置 // 創建每一個分區對應的文件長度的數組
  val lengths = new Array[Long](numPartitions) // 創建DiskBlockObjectWriter對象
  val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics().shuffleWriteMetrics) // 判斷是否有進行spill的文件
  if (spills.isEmpty) { // 如果是空的表示我們只有內存數據,內存足夠,不需要溢寫結果到磁盤 // 如果指定aggregator,就返回PartitionedAppendOnlyMap里的數據,否則返回 // PartitionedPairBuffer里的數據
    val collection = if (aggregator.isDefined) map else buffer // 返回一個對結果排序的迭代器
    val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext) { // 獲取partitionId
      val partitionId = it.nextPartition() // 通過writer將內存數據寫入臨時文件
      while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(writer) } // 數據刷到磁盤,並且創建FileSegment數組
      val segment = writer.commitAndGet() // 構造一個分區文件長度的數組
      lengths(partitionId) = segment.length } } else { // 否則,表示有溢寫文件,則需要進行歸並排序(merge-sort) // We must perform merge-sort; get an iterator by partition and write everything directly. // 每一個分區的數據都寫入到data文件的臨時文件
    for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { for (elem <- elements) { writer.write(elem._1, elem._2) } // 數據刷到磁盤,並且創建FileSegment數組
        val segment = writer.commitAndGet() // 構造一個分區文件長度的數組
        lengths(id) = segment.length } } } writer.close() context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes) lengths }

2.4.8 返回遍歷所有數據的迭代器

# 沒有溢寫,則判斷是否需要對key排序,如果不需要則只是將數據按照partitionId排序,否則首先按照partitionId排序,然后partition內部再按照key排序

# 如果發生溢寫,則需要將磁盤上溢寫文件和內存里的數據進行合並

def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
  // 是否需要本地combine
  val usingMap = aggregator.isDefined
  val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
  // 如果沒有發生磁盤溢寫
  if (spills.isEmpty) {
    // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
    // we don't even need to sort by anything other than partition ID
    // 而且不需要排序
    if (!ordering.isDefined) {
      // 數據只是按照partitionId排序,並不會對key進行排序
      groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
    } else {
      // 否則我們需要先按照partitionId排序,然后分區內部對key進行排序
      groupByPartition(destructiveIterator(
        collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
    }
  } else {
    // 如果發生了溢寫操作,則需要將磁盤上溢寫文件和內存里的數據進行合並
    merge(spills, destructiveIterator(
      collection.partitionedDestructiveSortedIterator(comparator)))
  }
}
 

三 Sort-Based Shuffle讀機制

假設我們執行了reduceByKey算子,那么生成的RDD的就是ShuffleRDD,下游在運行任務的時候,則需要獲取上游ShuffleRDD的數據,所以ShuffleRDD的compute方法是Shuffle讀的起點。

下游的ReducerTask,可能是ShuffleMapTask也有可能是ResultTask,首先會去Driver獲取parent stage中ShuffleMapTask輸出的位置信息,根據位置信息獲取index文件,然后解析index文件,從index文件中獲取相關的位置等信息,然后讀data文件獲取屬於自己那部分內容。

那什么ReducerTask什么時候去獲取數據呢?當parent stage的所有ShuffleMapTask結束后再去fetch,然后一邊fetch一邊計算。

3.1ShuffleRDD的compute方法

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { // ResultTask或者ShuffleMapTask在執行到ShuffleRDD時,肯定會調用ShuffleRDD的compute方法 // 來計算當前這個RDD的partition數據
  val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] // 獲取ShuffleManager的reader去拉取ShuffleMapTask,需要聚合的數據
  SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] }
三 Sort-Based Shuffle讀機制 假設我們執行了reduceByKey算子,那么生成的RDD的就是ShuffleRDD,下游在運行任務的時候,則需要獲取上游ShuffleRDD的數據,所以ShuffleRDD的compute方法是Shuffle讀的起點。 下游的ReducerTask,可能是ShuffleMapTask也有可能是ResultTask,首先會去Driver獲取parent stage中ShuffleMapTask輸出的位置信息,根據位置信息獲取index文件,然后解析index文件,從index文件中獲取相關的位置等信息,然后讀data文件獲取屬於自己那部分內容。 那什么ReducerTask什么時候去獲取數據呢?當parent stage的所有ShuffleMapTask結束后再去fetch,然后一邊fetch一邊計算。 3.1 ShuffleRDD的compute方法 
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { // ResultTask或者ShuffleMapTask在執行到ShuffleRDD時,肯定會調用ShuffleRDD的compute方法 // 來計算當前這個RDD的partition數據
  val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] // 獲取ShuffleManager的reader去拉取ShuffleMapTask,需要聚合的數據
  SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] }
3.2 調用BlockStoreShuffleReader的read方法開始讀取數據 # 創建ShuffleBlockFetcherIterator,一個迭代器,它獲取多個塊,對於本地塊,從本地讀取對於遠程塊,通過遠程方法讀取 # 如果reduce端需要聚合:如果map端已經聚合過了,則對讀取到的聚合結果進行聚合; 如果map端沒有聚合,則針對未合並的<k,v>進行聚合 # 如果需要對key排序,則進行排序。基於sort的shuffle實現過程中,默認只是按照partitionId排序。在每一個partition內部並沒有排序,因此添加了keyOrdering變量,提供是否需要對分區內部的key排序
override def read(): Iterator[Product2[K, C]] = { // 構造ShuffleBlockFetcherIterator,一個迭代器,它獲取多個塊,對於本地塊,從本地讀取 // 對於遠程塊,通過遠程方法讀取
  val blockFetcherItr = new ShuffleBlockFetcherIterator( context, blockManager.shuffleClient, blockManager, //MapOutputTracker在SparkEnv啟動的時候實例化
 mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
    SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue)) // 基於配置文件對於流進行包裝
  val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => serializerManager.wrapStream(blockId, inputStream) } // 獲取序列化實例
  val serializerInstance = dep.serializer.newInstance() // 對於每一個流創建一個<key,value>迭代器
  val recordIter = wrappedStreams.flatMap { wrappedStream => serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator } // Update the context task metrics for each record read.
  val readMetrics = context.taskMetrics.createTempShuffleReadMetrics() val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map { record => readMetrics.incRecordsRead(1) record }, context.taskMetrics().mergeShuffleReadMetrics()) // An interruptible iterator must be used here in order to support task cancellation
  val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter) // 如果reduce端需要聚合
  val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { // 如果map端已經聚合過了
    if (dep.mapSideCombine) { //則對讀取到的聚合結果進行聚合
      val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] // 針對map端各個partition對key進行聚合后的結果再次聚合
 dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) } else { // 如果map端沒有聚合,則針對未合並的<k,v>進行聚合
      val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] } // 如果需要對key排序,則進行排序。基於sort的shuffle實現過程中,默認只是按照partitionId排序 // 在每一個partition內部並沒有排序,因此添加了keyOrdering變量,提供是否需要對分區內部的key排序
 dep.keyOrdering match { case Some(keyOrd: Ordering[K]) =>
      // 為了減少內存壓力和避免GC開銷,引入了外部排序器,當內存不足時會根據配置文件 // spark.shuffle.spill決定是否進行spill操作
      val sorter =
        new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) sorter.insertAll(aggregatedIter) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) case None =>
      // 不需要排序直接返回
 aggregatedIter } } def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { // 是否需要本地combine
  val usingMap = aggregator.isDefined val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer // 如果沒有發生磁盤溢寫
  if (spills.isEmpty) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID // 而且不需要排序
    if (!ordering.isDefined) { // 數據只是按照partitionId排序,並不會對key進行排序
 groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None))) } else { // 否則我們需要先按照partitionId排序,然后分區內部對key進行排序
 groupByPartition(destructiveIterator( collection.partitionedDestructiveSortedIterator(Some(keyComparator)))) } } else { // 如果發生了溢寫操作,則需要將磁盤上溢寫文件和內存里的數據進行合並
 merge(spills, destructiveIterator( collection.partitionedDestructiveSortedIterator(comparator))) } }

3.2  調用BlockStoreShuffleReader的read方法開始讀取數據

# 創建ShuffleBlockFetcherIterator,一個迭代器,它獲取多個塊,對於本地塊,從本地讀取對於遠程塊,通過遠程方法讀取

# 如果reduce端需要聚合:如果map端已經聚合過了,則對讀取到的聚合結果進行聚合; 如果map端沒有聚合,則針對未合並的<k,v>進行聚合

# 如果需要對key排序,則進行排序。基於sort的shuffle實現過程中,默認只是按照partitionId排序。在每一個partition內部並沒有排序,因此添加了keyOrdering變量,提供是否需要對分區內部的key排序

override def read(): Iterator[Product2[K, C]] = { // 構造ShuffleBlockFetcherIterator,一個迭代器,它獲取多個塊,對於本地塊,從本地讀取 // 對於遠程塊,通過遠程方法讀取
  val blockFetcherItr = new ShuffleBlockFetcherIterator( context, blockManager.shuffleClient, blockManager, //MapOutputTracker在SparkEnv啟動的時候實例化
 mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
    SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue)) // 基於配置文件對於流進行包裝
  val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => serializerManager.wrapStream(blockId, inputStream) } // 獲取序列化實例
  val serializerInstance = dep.serializer.newInstance() // 對於每一個流創建一個<key,value>迭代器
  val recordIter = wrappedStreams.flatMap { wrappedStream => serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator } // Update the context task metrics for each record read.
  val readMetrics = context.taskMetrics.createTempShuffleReadMetrics() val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map { record => readMetrics.incRecordsRead(1) record }, context.taskMetrics().mergeShuffleReadMetrics()) // An interruptible iterator must be used here in order to support task cancellation
  val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter) // 如果reduce端需要聚合
  val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { // 如果map端已經聚合過了
    if (dep.mapSideCombine) { //則對讀取到的聚合結果進行聚合
      val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] // 針對map端各個partition對key進行聚合后的結果再次聚合
 dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) } else { // 如果map端沒有聚合,則針對未合並的<k,v>進行聚合
      val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] } // 如果需要對key排序,則進行排序。基於sort的shuffle實現過程中,默認只是按照partitionId排序 // 在每一個partition內部並沒有排序,因此添加了keyOrdering變量,提供是否需要對分區內部的key排序
 dep.keyOrdering match { case Some(keyOrd: Ordering[K]) =>
      // 為了減少內存壓力和避免GC開銷,引入了外部排序器,當內存不足時會根據配置文件 // spark.shuffle.spill決定是否進行spill操作
      val sorter =
        new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) sorter.insertAll(aggregatedIter) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) case None =>
      // 不需要排序直接返回
 aggregatedIter } }

3.3 通過MapOutputTracker的getMapSizesByExecutorId去獲取MapStatus

要去讀取數據,我們就需要知道從哪兒讀取,讀取哪一些數據,這些信息在上游shuffle會存封裝在MapStatus中

def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") // 根據shuffleId獲取MapStatus
  val statuses = getStatuses(shuffleId) // 將得到MapStatus數組進行轉化
  statuses.synchronized { return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses) } }

# 向MapOutputTrackerMasterEndpoint發送GetMapOutputStatuses消息, MapOutputTrackerMasterEndpoint收到消息之后,MapOutputTrackerMaster會添加這個請求到隊列,並且它有一個后台線程一直不斷從該隊列獲取請求,獲取請求之后返回。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // 如果接收的是GetMapOutputStatuses消息,則表示獲取MapOutput狀態
  case GetMapOutputStatuses(shuffleId: Int) => val hostPort = context.senderAddress.hostPort logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort) // 調用MapOutputTrackerMaster的post方法獲取
    val mapOutputStatuses = tracker.post(new GetMapOutputMessage(shuffleId, context)) case StopMapOutputTracker => logInfo("MapOutputTrackerMasterEndpoint stopped!") context.reply(true) stop() } def post(message: GetMapOutputMessage): Unit = { // 往這個隊列插入一個請求
 mapOutputRequests.offer(message) } private class MessageLoop extends Runnable { override def run(): Unit = { try { while (true) { try { // 從隊列中取出一個GetMapOutputMessage消息
          val data = mapOutputRequests.take() if (data == PoisonPill) { // Put PoisonPill back so that other MessageLoops can see it.
 mapOutputRequests.offer(PoisonPill) return } val context = data.context val shuffleId = data.shuffleId val hostPort = context.senderAddress.hostPort logDebug("Handling request to send map output locations for shuffle " + shuffleId +
            " to " + hostPort) // 根據shuffleId獲取序列化的MapOutputStatuses
          val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId) context.reply(mapOutputStatuses) } catch { case NonFatal(e) => logError(e.getMessage, e) } } } catch { case ie: InterruptedException => // exit
 } } }

3.4 創建ShuffleBlockFetcherIterator,在其內部會調用初始化方法initialize方法

# 切分本地和遠程的block,並且將遠程block隨機排序

# 發送請求到遠程獲取block數據

# 拉取本地block的數據

private[this] def initialize(): Unit = { // Add a task completion callback (called in both success case and failure case) to cleanup.
  context.addTaskCompletionListener(_ => cleanup()) // 切分本地和遠程的block
  val remoteRequests = splitLocalRemoteBlocks() // 然后進行隨機排序
  fetchRequests ++= Utils.randomize(remoteRequests) assert ((0 == reqsInFlight) == (0 == bytesInFlight), "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight +
    ", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight) // 發送請求到遠程獲取數據
 fetchUpToMaxBytes() val numFetches = remoteRequests.size - fetchRequests.size logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime)) // 拉取本地的數據
 fetchLocalBlocks() logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime)) }

3.5 切分本地和遠程的block

private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { // 遠端請求從最多5個node去獲取數據,每一個節點拉取的數據取決於spark.reducer.maxMbInFlight即maxBytesInFlight參數 // 加入整個集群只允許每次在5台拉取5G的數據,那么每一節點只允許拉取1G數據,這樣就可以允許他們並行從5個節點獲取, // 而不是主動從一個節點獲取
  val targetRequestSize = math.max(maxBytesInFlight / 5, 1L) logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize) // 創建FetchRequest隊列,用於存放拉取的數據的請求,每一個請求可能包含多個block, // 具體多少取決於總的請求block大小是否超過目標閥值
  val remoteRequests = new ArrayBuffer[FetchRequest] var totalBlocks = 0
  for ((address, blockInfos) <- blocksByAddress) { // 獲取block的大小,並更新總的block數量信息
    totalBlocks += blockInfos.size // 要獲取的數據在本地
    if (address.executorId == blockManager.blockManagerId.executorId) { // 更新要從本地block拉取的集合
      localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) // 更新要拉取的block數量
      numBlocksToFetch += localBlocks.size } else {//數據不在本地時
      val iterator = blockInfos.iterator var curRequestSize = 0L // 當前請求的大小 // 存放當前的遠端請求
      var curBlocks = new ArrayBuffer[(BlockId, Long)] // 遍歷每一個block
      while (iterator.hasNext) { val (blockId, size) = iterator.next() // 過濾掉空的block
        if (size > 0) { curBlocks += ((blockId, size)) // 更新要拉取的遠端的blockId的集合列表
          remoteBlocks += blockId // 更新要拉取的block數量
          numBlocksToFetch += 1 curRequestSize += size } else if (size < 0) { throw new BlockException(blockId, "Negative block size " + size) } // 如果當前請求的大小已經超過了閥值
        if (curRequestSize >= targetRequestSize) { // 創建一個新的FetchRequest,放到請求隊列
          remoteRequests += new FetchRequest(address, curBlocks) // 重置當前block列表
          curBlocks = new ArrayBuffer[(BlockId, Long)] logDebug(s"Creating fetch request of $curRequestSize at $address") // 重置當前請求數量為0
          curRequestSize = 0 } } // 添加最終的請求
      if (curBlocks.nonEmpty) { remoteRequests += new FetchRequest(address, curBlocks) } } } logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks blocks") remoteRequests }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM