sort-based shuffle的核心:org.apache.spark.util.collection.ExternalSorter


依據Spark 1.4版

在哪里會用到它

ExternalSorter是Spark的sort形式的shuffle實現的關鍵。SortShuffleWriter使用它,把RDD分區中的數據寫入文件。

  override def write(records: Iterator[Product2[K, V]]): Unit = {
    if (dep.mapSideCombine) {//根據是否需要mqp-side combine創建不同的sorter
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      sorter = new ExternalSorter[K, V, C](dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
      sorter.insertAll(records)
    } else {
      sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer) //如果不需要map-side combine 就不再需要Aggregator和Ordering
      sorter.insertAll(records)
    }
    val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)//寫數據文件
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
    shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)//寫索引文件

    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  }

ExternalSorter的注釋

這個類的注釋提供了關於它的設計的很多信息,先翻譯一下。

這個類用於對一些(K, V)類型的key-value對進行排序,如果需要就進行merge,生的結果是一些(K, C)類型的key-combiner對。combiner就是對同樣keyvalue進行合並的結果。它首先使用一個Partitioner來把key分到不同的partition,然后,如果有必要的話,就把每個partition內部的key按照一個特定的Comparator來進行排序。它可以輸出只一個分區了的文件,其中不同的partition位於這個文件的不同區域(在字節層面上每個分區是連續的),這樣就適用於shuffle時對數據的抓取。

 

如果combining沒有啟用,C和V的類型必須相同 -- 在最后我們會對對象進行強制類型轉換。

 

注意:僅管ExternalSorte是一個比較通用的sorter,但是它的一些配置是和它在基於sortshuffle的用處緊密相連的(比如,它的block壓縮是通過'spark.shuffle.compress'控制的) 如果在非shuffle情況下使用ExternalSorter時我們想要另外的配置,可能就需要重新審視一下它的實現。

 

構造函數參數:

 1. aggregator, 類型為Option[Aggregator], 提供combine函數,用於merge

 2. partitioner, 類型為Optinon[Partitioner], 如果提供了partitioner,就先按partitionID排序,然后再按key排序

 3. ordering, 類型為Option[Ordering], 用來對每個partition內部的key進行排序;必須是一個          4. total ordering(即,所有key必須可以互相比較大小,與partitial ordering不同)

 4. serializer, 類型為Option[Serializer], 用於spill數據到磁盤。

 

注意,如果提供了Ordering 那么我們就總會使用它進行排序(是指對partition內部的key排序),因此,只有在真正需要輸出的數據按照key排列時才提供ordering。例如,在一個沒有map-side combinemap任務中,你應該會需要傳遞None作為ordering,這樣會避免額外的排序。另一方面,如果你的確需要combining 提供一個Ordering會更好。

 

用戶應該這么和這個類型進行交互:

  1. 初始化一個ExternalSorter
  2. 調用insertAll, 提供要排序的數據
  3. 請求一個iterator()來遍歷排序/聚合后的數據。或者,調用writePartitionedFiles來創建一個包含了排序/聚合后數據的文件,這個文件可以用於Sparksort shuffle

 

這個類在內部是這么工作的:

 

  • 我們重復地將數據填滿內存中的buffer,如果我們想要combine,就使用PartitionedAppendOnlyMap作為buffer, 如果不想要combine,就使用PartitionedSerializedPairBuffer或者PartitionedPariBuffer。在這里buffer內部,我們使用partition Id對元素排序,如果需要,就也按key排序(對同樣partition Id的元素)。為了避免重復調用partitioner,我們會把recordpartition ID存儲在一起。
  • buffer達到了容量上限以后,我們把它spill到文件。這個文件首先按partition ID排序,然后如果需要進行聚合,就用key或者key的hashcode作為第二順序。對於每個文件,我們會追蹤在內存時,每個partition里包括多少個對象,所以我們在寫文件 時候就不必要為每個元素記錄partition ID了。
  • 當用戶請求獲取迭代器或者文件時,spill出來的文件就會和內存中的數據一起被merge,並且使用上邊定義的排列順序(除非排序和聚合都沒有開啟)。如果我們需要按照key聚合,我們要不使用Ordering參數進行全排序,要不就讀取有相同hash codekey,並且對它們進行比較來確定相等性,以進行merge
  • 用戶最后應該使用stop()來刪除中間文件。

 

作為一種特殊情況,如果OrderingAggregator都沒有提供,並且partition的數目少於spark.shuffle.sort.bypassMergeThreshold, 我們會繞過merge-sort,每次spill時會為每個partition單獨寫一個文件,就像HashShuffleWriter一樣。我們然后把這些文件連接起來產生一個單獨的排序后的文件,這時就沒有必要為每個元素進行兩次序列化和兩次反序列化(merge中就需要這么做)。這會加快groupBy, sort等沒有部分聚合的操作的map端的效率。

它的功能 

根據注釋所述,這個類的功能包括:

1. 把kv對按partitioner分到不同的分區

2. 如果需要,就對相同key對應的value進行聚合

3. 把輸出的kv對寫到一個文件,在文件內部,kv對按照partition ID排序,如果需要的話,就對每個partition內部的kv排序。

前兩個功能,是hash-based shuffle也會做的,而第3個功能,是sort-based shuffle特有的。

為了實現這些功能,它要解決以下的問題:

  1. 考慮到內存的限制,需要進行外部排序,需要spill到磁盤文件,  需要對這些文件進行merge。那么如何追蹤內存中數據結構的大小,spill到磁盤后的文件應該如何組織其結構?如果進行merge?
  2. 如何實現aggregation?在填充數據到內存里的buffer時,需要進行aggregate, spill出來的文件在merge時,位於不同文件里的相同key對應的value也需要aggregate。
  3. 如何確定最終文件里每個partition以byte為單位的大小。由於壓縮流和序列化流對文件輸出流的包裝,以及中間的buffer的影響,這個大小只能在關閉這些流之后才能獲得。這樣的話,最終寫成的文件會是很多輸出流的輸出追加在一起的結果。

它的實現


它的整個實現比較繁雜,但按照通常的使用方式,大體包括寫入buffer、spill、merge三個部分。

buffer

首先,充分利用內存作為buffer,直接對內存中的對象進行操作可以提高效率,減少序列化、反序列化和IO的開銷。比如在內存中先對部分value進行聚合,會減少要序列化和寫磁盤的數據量;在內存中對kv先按照partition組合在一起,也有利於以后的merge,而且越大的buffer寫到磁盤中的文件越大,這意味着要合並的文件就越少。

所以,就像注釋中提到的,ExternalSorter可能會用到三種類型的buffer,以應對不同的情況,提高效率。這三種buffer是

  • PartitionedAppendOnlyMap
  • PartitionedSerializedPairBuffer
  • PartitionedPairBuffer

下面看一下這三種數據結構的特性以及適用的情境

PartitionedAppendOnlyMap

它的繼承結構是這樣的

下面分別看一下它的父類 

SizeTracker

這是一個trait,把它混入到集合類中用來追蹤這個集合的估計大小。之所以PartitionedAppendOnlyMap需要繼承SizeTracker,是為了確定spill的時機。


調用SizeEstimator的時機

它有一個afterUpdate方法,當被混入的集合的每次update操作以后,需要執行SizeTracker的afterUpdate方法,afterUpdate會判斷這是第幾次更新,需要的話就會使用SizeEstimator的estimate方法來估計下集合的大小。由於SizeEstimator的調用開銷比較大,注釋上說會是數毫秒,所以不能頻繁調用。所以SizeTracker會記錄更新的次數,發生estimate的次數是指數級增長的,基數是1.1,所以調用estimate時更新的次數會是1.1, 1.1 * 1.1, 1.1 * 1.1 *1.1, ....

這是指數的初始增長是很慢的, 1.1的96次方會是1w, 1.1 ^ 144次方是100w,即對於1w次update,它會執行96次estimate,對10w次update執行120次estimate, 對100w次update執行144次estimate,對1000w次update執行169次。

估計集合大小的方法

  1. 每到需要estimate的更新后,就調用SizeEstimator估計一下當前集合的大小。用集合的大小和更新次數組裝成一個Sample對象(一個只有這兩個field的case class),把這個Sample放個一個存放Sample history和隊列。然后取這個隊列里最后兩個Sample,算出來這兩個Sample之間每次update這個集合size增長了多少,記為bytesPerUpdate。方法是這兩個Sample里大小的差值除以它們update次數的差值。

(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)

    2. SizeTracker的estimateSize被調用時,以bytePerUpdate作為最近平均每次更新時的bytePerUpdate,用當前的update次數減去最后一個Sample的update次數,然后乘以bytePerUpdate,結果加上最后一個Sample記錄的大小。

 val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
    (samples.last.size + extrapolatedDelta).toLong

  estimateSize方法之所以這么設計,是為了盡量減少對SizeEstimator的調用。因為集合會在每次update之后調用estimateSize來決定是否需要spill。

感覺SizeTracker有兩個地方不太好

  1. 對update的定義有些寬泛。以SizeTrackingAppendOnlyMap為例, 它會在update和changeValue兩個方法中都調用afterUpdate。其中changeValue在使用中既被當作insert插入新的kv對,也會用於對已有的kv對進行update。對有些update方式來說,明確區分insert和對已有值的更新會使得估計更准確,比如word count中的reduceByKey,它執行對已有值的更新時,不會改變集合的大小,而只有新加入的kv會。
  2. 調用SizeEstimator時的update次數簡單地以指數增長,這種策略過於寬泛。對於一批update,保證它引發的對SizeEstimate的estimate的調用耗費的時間在一定可接受的值即可。SizeTrackingAppendOnly在shuffle中被使用,做為buffer,它的元素不會太多,所以update的次數有限,使得estimate的調用不會間隔太多update。但是如果update的次數太多,后期的estimate次數會特別少,比如在100w和1000w更新次數之間,平均每37.5w次才會調用一次estimate。調用SizeEstimator的時機應考慮到當前集合的大小、集合元素大復雜程度,在這種大小的集合上調用一次SizeEstimator的開銷,當前與上一次調用隔了多少次update等因素。或許應該提供接口或配置項,讓用戶有機會提供關於集合內數據的較准確的信息。或者在SizeTracker的estimateSize調用后,讓用戶可以根據情況強制SizeTracker給出一個更准確的值,比如如果得到的size顯示需要進行spill了。

  希望Spark能在以后以它進行改進。如果對集合的大小估計不准,就不能充分內存,這對於shuffle的效率影響非常大。

AppendOnlyMap

當需要對Value進行聚合時,會使用AppendOnlyMap作為buffer。它是一個只支持追加的map,可以修改某個key對應的value, 但是不能刪除已經存在的key。使用它是因為在shuffle的map端,刪除key不是必須的。那么append only能帶來什么好處呢?

1. 省內存

AppendOnlyMap也是一個hash map, 但它不是像java.util.collection的HashMap一樣在Hash沖突時采用鏈接法,而是采用二次探測法。這樣,它就不需要采用entry這種對kv對的包裝,而是把kv對寫同一個object數組里,減少了entry的對象頭帶來的內存開銷。但是二次探測法有個缺點,就是刪除元素時比較復雜,不能只是簡單地把數組中相應位置的kv都置成null,這樣查找元素時就沒辦法了,通常會把被刪除的元素標記為已被刪除,這就又需要額外的內存。而當這個hash map只支持insert和update時,情況就簡單了,不僅可以減少鏈接法時構造鏈表需要的內存,而且不需要另外的內存做刪除標記。在相同的load factor時,會比HashMap更省內存。

  // Holds keys and values in the same array for memory locality; specifically, the order of
  // elements is key0, value0, key1, value1, key2, value2, etc.
  private var data = new Array[AnyRef](2 * capacity)

2. 省內存,可以用數組排序算法,排序效率高

由於所有元素都在一個數組里,所以在對這個map里的kv對進行排序時,可以直接用數組排序的算法在數組內做,節省了內存,效率也比較高。ExternalSorter的destructiveSortedIterator就是這么做的。它把所有的kv對移動數組的前端,然后進行排序

new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)

3. 支持函數式地update操作,適合進行aggregate。

AppendOnlyMap一個changeValue方法,它的簽名是這樣的

def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { ... }

在啟用aggregate時,會把aggregate的邏輯和kv里的value組裝成updateFunc, 來對每個key調用changeValue。要明白這個邏輯首先得看下Aggregator這個類的定義

case class Aggregator[K, V, C] (
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C) { ... }

shuffle中的aggregate操作實際是把一個KV對的集合,變成一個KC對的map, C是指combiner,是V聚合成的結果。Aggregator的三個類型參數K, V, C即代表Key的類型, Value的類型和Combiner的類型。

  • createCombiner描述了對於原KV對里由一個Value生成Combiner,以作為聚合的起始點。
  • mergeValue描述了如何把一個新的Value(類型為V)合並到之前聚合的結果(類型為C)里
  • mergeCombiner描述了如何把兩個分別聚合好了的Combiner再聚合

這三個函數就描述了aggregate所遇到的各種情況。

先看下reduceByKey

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }

它接收的函數參數的類型為 (V, V) => V, 也就是說Value和Combiner的類型是一樣的,所以它會生成一個Aggregator[K, V, V],它的三個構造器參數分別為

(v: V) => v, func, func。 這符合reduceByKey的意義。

與此不同的是aggregateByKey,由於它指定了一個初始值zeroValue,所以初始的Combiner應該是把這個初始值和Value聚合的結果。為此,它是這么做的

  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

    // We will clean the combiner closure later in `combineByKey`
    val cleanedSeqOp = self.context.clean(seqOp)
    combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner)
  }

首先,createCombiner每次調用時,需要一個屬於自己的zeroValue的拷貝,否則變成共享的就麻煩了,比如當zeroValue是一個集合時。所以aggregateByKey的createCombiner方法每次運行會反序列化一個zeroValue,然后調用mergeValue函數(也就是seqOp函數)創建初始的Combiner。

與此類似的是groupByKey,它的createCombiner函數是構造一個只有Value一個元素的集合,mergeValue函數即是把Value添加到這個集合,而mergeCombiner函數是對集合進行合並。

可見Aggregator的確能描述各種不同的聚合策略。那么Aggregator的這三個函數是如何被用於AppendOnlyMap的呢?

首先,只有在需要對Value進行聚合時,才會使用AppendOnlyMap作為buffer。而此時,在ExternalSorter的insertAll函數中,是這么使用它的

    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
//        Runtime.getRuntime.maxMemory()
        addElementsRead()
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpillCollection(usingMap = true)
      }
    }

注意它是如何用createCombiner和mergeValue兩個函數組裝成AppendOnlyMap的changeValue函數所需要的update函數。這種直觀地對函數地組合的確是函數式編程的一種優勢。

SizeTrackingAppendOnlyMap

它繼承自AppendOnlyMap和SizeTracker,覆蓋了AppendOnlyMap的三個方法

  • update和changeValue。 在調用AppendOnlyMap的相應方法后,調用SizeTracker的afterUpdate方法
  • growTable。在調用AppendOnlyMap的growTable方法后,調用SizeTracker的resetSamples方法。

實際上SizeTrackingAppendOnlyMap對於SizeTracker的使用有些簡單粗暴。比如在growTable之后,AppendOnlyMap的data數組會增長,所以之前的bytesPerUpdate就不准確了,但這時候直接調用resetSamples會清空之前的采樣,重置update次數。而AppendOnlyMap的data數組額外占據的空間可以根據它的capacity的變化算出來,這使得之前的bytesPerUpdate的值可以繼續使用。對於一個很多的集合調用resetSamples,會使得對它的采樣更密集,並不是一個特別好的做法。

WritablePartitionedPairCollection

這個trait的大部分方法都是未實現的,它描述了一個分區的kv集合應具有的性質。這個集合的特點在於,它的destructiveSortedWritablePartitionedIterator應該返回一個WritablePartitionedIterator對象。WritablePartitionedIterator可以使用BlockObjectWriter來寫入它的元素。

private[spark] trait WritablePartitionedIterator {
  def writeNext(writer: BlockObjectWriter): Unit

  def hasNext(): Boolean

  def nextPartition(): Int
}

此外它的伴生對象會提供兩種Comparator

  • PartitionComparator  按照partition ID排序
  • PartitionKeyComparator 它先按partition ID排序,再按key排序。按key排序時使用的Comparator是作為參數提供的。

此外, WriteablePartitionedIterator的伴生對象有一個fromIterator方法,它接受一個Iterator[((Int, _), _)]類型的迭代器,返回一個特殊的WritablePartitionedIterator對象,此對象的特點在於它的writeNext方法只寫入Key和Value,並不寫入Partition ID。ExternalSorter的三種buffer都是使用這個fromIterator方法,從自身的iterator生成WritablePartitionIterator。所以,它們三個的iterator方法返回的迭代器的KV對中,K的類型就是(Int, Key的類型)。

PartitionedAppendOnlyMap

它繼承自WritablePartitionedPairCollection和SizeTrackingAppendOnlyMap, ExternalSorter在需要進行aggregate元素的情況下,用它做為buffer。

需要注意一下它的類型參數

private[spark] class PartitionedAppendOnlyMap[K, V]
  extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] 

它本身是一個KV集合,Key的類型是K, Value的類型是V。但是它繼承了SizeTrackingAppendOnlyMap[(Int, K), V],這意味着SizeTrackingAppendOnlyMap繼承的AppendOnlyMap的類型是AppendOnlyMap[(Int, K), V]。也就是說PartitionedAppendOnlyMap繼承的AppendOnlyMap的Key的類型為(Int, K), Value的類型為V。

這個Int就是Partition ID。所以PartitionedAppendOnlyMap定義了一個partitioneDestructiveSortedIterator方法,返回一個Iterator[(Int, K), V]。

  def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
    val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
    destructiveSortedIterator(comparator)
  }

這個迭代器的排序方式由keyComparator決定,如果keyComparator是None,就用WritablePartitionedPairCollection的partitionComparator只按partition排序, 如果是Some,就按照WritablePartitionedPairCollection的partitionKeyComparator排序,也就是先按partition ID排序,再使用keyComparator按key排序。

 

PartitionedPairBuffer和PartitionedSerializedPairBuffer

下面看一下另兩種buffer: PartitionedPairBuffer和PartitionedSerializedPairBuffer。它們都不支持aggregation,但是ExternalSorter是如何在二者間選擇的呢?

  private val useSerializedPairBuffer =
    !ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
    ser.supportsRelocationOfSerializedObjects

如果useSerializedPairBuffer為true,就會使用PartitionedSerailizedPairBuffer。而它為true必須有三個條件同時滿足:

  • 沒有提供Ordering。即不需要對partition內部的kv再排序。
  • spark.shuffle.sort.searlizedMapOutputs為true。它默認即為true
  • serializer支持relocate序列化以后的對象。即在序列化輸出流寫了兩個對象以后,把這兩個對象對應的字節塊交換位置,序列化輸出流仍然能讀出這兩個對象。一般而言,如果序列化流是無狀態的,並且在序列化流的開始和結束時不記特殊的元數據,就會支持這個性質。這個性質JavaSerializer是不支持的,而KryoSerializer有條件支持
     private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = {
        newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()
      }

 PartitionedPairBuffer

private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
  extends WritablePartitionedPairCollection[K, V] with SizeTracker

它繼承自WritablePartitionedPairCollection以及SizeTracker。底層存儲用一個object數組。它所存儲的pair,即KV對,Key的類型為(Int, K),即Partition ID和key。

同一個kv對的key和value被放在這個數組相鄰的位置,和AppendOnlyMap相同。

PartitionedSerializedPairBuffer

它也是存儲了partitionId, key, value這三種數據。

這個buffer的特點是用字節數組來存儲數據,而不像其它兩種用object數組。這就要求它存儲的數據是序列化以后的。它把key和value依次序列化以后依次寫入同一個字節數組(實際上是寫入一個ChainedBuffer,ChainedBuffer再寫入到它里邊的字節數組),這就要求有另外的元數據來區分key和value的邊界。所以PartitionedSerializedBuffer會另外使用一個meta buffer存儲元數據,這個meta buffer是一個IntBuffer,即一個integer buffer。

這樣它存儲的數據就在兩個buffer里:kvBuffer, 存儲的是序列化后的key和value; metaBuffer存儲的是關於kvBuffer的元數據。

其中metaBuffer里的每個元素是關於一個kv對的元數據,有4個int,依次是

  1. keyStart,這是一個long, 用兩個int存儲。指這個kv對在kvBuffer中的起始位置。
  2. keyValLen, 用一個int存儲。即key和value序列化后,總的長度。
  3. partitionId, 用一個int存儲。存儲partitionId在metaBuffer里,使得在kv排序時,直接對metaBuffer按partitionId排序就行了,而kvBuffer不需要變化。

這個buffer只支持按照partition id排序,因此要選它做buffer, ExternalSorter的Ordering參數必須是None。

這個數組的排序時直接移動底層的字節,所以要求Serializer必須supportRelcationSerializedObjects。

使用這種buffer的好處是

1. 省內存,這由兩原因引起。首先,最主要的因素,它把對象序列化以后存儲,通常會占用更少的內存。其次,它存儲所使用的byte buffer是ChainedBuffer這個類。ChainedBuffer的底層存儲用的是ArrayBuffer[Array[Byte]],這使得它比直接用ArrayBuffer[Byte]更省內存,但是ChainedBuffer的實現其表現的像一個字節數組。ChainedBuffer中的ArrayBuffer里的字節數組是等長的,稱為一個chunk, ExternalSorter使用spark.shuffle.sort.kvChunkSize來做為chunk的大小,默認為4M。這使得它在ArrayBuffer中存儲的引用占的大小與整個集合的大小相比,不會太大,也算是比起AppendOnlyMap和PartitionedPairBuffer用object數組做存儲的一點優勢。

2. 就像PartitionedSerializedPairBuffer所說的。對這個集合排序意味着只需要交換metaBuffer里的元素,而kvBuffer不需要修改。而metaBuffer排序時是按照partitionId排序,partitionId就保存在metaBuffer使用的int buffer里,這意味着獲取partitionId不需要通過引用(而AppendOnlyMap和PartitionedPairBuffer就需要獲得對(partitionId, key)組成的tuple的引用,然后再訪問partitionId),這就最小化了訪問緩存時的未命中。所以,對這個buffer內元素的排序的效率會較高。

3. 它的內存占用可以更准確地估計。

實現PartitionedSerailizedPairBuffer還是挺復雜的。PartitionedSerializedPairBuffer雖然繼承了SizeTracker,但是卻沒有使用SizeTracker的estimateSize方法,相反,由於它是使用的基本類型的數組,因此可以直接計算出自己較准確的大小,所以它覆蓋了SizeTracker的estimateSize方法。

override def estimateSize: Long = metaBuffer.capacity * 4L + kvBuffer.capacity

這明顯比其它兩種buffer對內存占用的估計准確得多。


 

spill

 ExternalSorter繼承了Spillable[WriteablePartitionedPairCollection[K, C]],實現了其spill方法,用來對buffer進行spill。

Spill的時機

為了合理地地在同一個executor的task線程間分配用於shuffle的內存,shuffle時內存buffer的大小向ShuffleMemoryManager申請,以避免過度占用內存,但這個MemoryManager並不實際地控制虛擬機的內存,只是起到限制作用。當buffer擴張需要的內存過多,ShuffleMemoryManager分配不了這么多內存時,buffer就會被spill。

Spill的策略

Spill的的策略必須考慮到以后對spill出來文件的merge。ExternalSorter會寫出唯一一個文件,因此merge是一定會的。但

是如果需要進行aggregate,那么spill出來的文件一定需要按照partition以及key排序,才能用merge sort來對combiner做聚合。但是這樣做的開銷是很大的,首先需要對集合先進行排序,才能寫入文件(這也是為啥WritablePartitionedPairCollection會定義partitionedDestructiveSortedIterator這個方法),其次在merge文件時需要先反序列化,然后再把merge完成后的combiner序列化寫入文件;然后,merge sort本身也會耗費時間。因此,ExternalSorter在某些情況下會按照類似於hash shuffle的方法為每個partition寫一個文件,每次spill,就把buffer里的數據按照partition追加到對應的文件。在需要輸出一整個文件時,把這些文件直接連接在一起,這樣就避免了一次反序列化和一次序列化。而之所以能直接把每個partition的文件相連,而不影響讀取,是因為shuffle的reader一方請求獲得的就是每個partition對應的那部分字節串,所以reader和writer都是在同樣的位置開啟的輸入流和輸出流,因此外層的壓縮流和序列化流也不會因此而混亂。

但是這樣不好的地方在於如果reducer很多,那么中間文件就會非常多,可能會遇到hash shuffle類似的問題(俺並不清楚具體會對性能有多大影響)。所以,有時merge sort的方式還是必須的。這時候,buffer每次spill都寫出一個包括各個partition數據的文件。然后在merge時,對這些文件進行merge,采用merge sort的方式。那么,這兩種spill的方式如何選擇呢?

ExternalSorter用bypassMergeSort這個bool值來做出選擇,如果此值為true,就用第一種方式,否則用第二種方式

  private val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
  private val bypassMergeSort =
    (numPartitions <= bypassMergeThreshold && aggregator.isEmpty && ordering.isEmpty)
  override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    if (bypassMergeSort) {
      spillToPartitionFiles(collection)
    } else {
      spillToMergeableFile(collection)
    }
  }

可見只有在reducer數目不會太多,不需要aggregation 並且不需要進行排序的情況下才會用spillToPartitionFiles。

 spillToPartitionFiles

  private def spillToPartitionFiles(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    spillToPartitionFiles(collection.writablePartitionedIterator())
  }

  private def spillToPartitionFiles(iterator: WritablePartitionedIterator): Unit = {
    assert(bypassMergeSort)

    // Create our file writers if we haven't done so yet
    if (partitionWriters == null) {
       ...  
    }

    // No need to sort stuff, just write each element out
    while (iterator.hasNext) {
      val partitionId = iterator.nextPartition()
      iterator.writeNext(partitionWriters(partitionId))
    }
  }

所有三種buffer都實現了WritablePartitionedPairCollection接口,因此都可以從它們獲取一個WritablePartitionedIterator,這個迭代器前邊提到過,特點在於可以知道下一個元素的partitiionId, 也可以直接調用writeNext把下一個元素寫到BlockObjectWriter里。而spillToPartitionFiles就是這么使用它的,它取出迭代器中下一個元素的partitionId, 就能找到對應於這個partition的writer,然后用它來寫入下一個元素。所以,多次spill出來的結果中同樣的partition里的kv都會被用同樣的writer寫入同一個文件。

spillToMergeableFile

方法名里的是File而不像spillToPartitionFiles中是Files,它只會spill到一個文件。所以這個文件的內容是排序后的。那么寫入同一個文件的問題是需要記錄每個KV屬於哪個partition,否則就需要再用partitioner算一下。由於寫入文件時,每個partition的kv對記在一起,所以實際只需要記錄下每個partition有多少個KV對就行了。spillToMergableFiles把這個信息記錄在elementsPerPartition這個數據結構里

// How many elements we have in each partition
    val elementsPerPartition = new Array[Long](numPartitions)

另外一個問題與序列化流有關。當通過一個序列化流寫入了大量的對象,它內部的數據結構可能會很多,而且在這個內部數據結構增長時,它可能會進行的拷貝,很大的內部數據結構意味着占用過多內存,對大量數據拷貝意味着時間開銷的增長。因此ExternalSorter通過serializerBatchSize這個參數來控制每次序列化流最多寫入的元素個數。在寫入serializerBatchSize這個元素后,這個序列化流會被關閉,確切地說是writer被關閉,然后重新開啟新的writer繼續往同一個文件寫。這樣帶來的問題是,整個文件是多個輸出流的輸出追加在一起的結果,因此需要記錄每個輸出流開始的位置,也就是寫完一個batch的對象后,文件增長的大小。spillToMergableFiles用batchSizes這個數組來記錄每個batch的字節數,在此次spill結束后,這些簿記的信息被組裝成SpilledFile,它被作為元數據使用,記在spills這個ArrayBuffer[SpilledFiles]里。

    // List of batch sizes (bytes) in the order they are written to disk
    val batchSizes = new ArrayBuffer[Long]
  private[this] case class SpilledFile(
    file: File,
    blockId: BlockId,
    serializerBatchSizes: Array[Long],
    elementsPerPartition: Array[Long])

由於這種復雜的寫入方式,對於寫出來的文件,需要一個特殊的reader,即SpillReader。這個reader的特殊之處在於它可以產生一個特殊的迭代器,這個迭代器的每個元素都是某個partition中kv的迭代器。

def readNextPartition(): Iterator[Product2[K, C]] = new Iterator[Product2[K, C]] {
  ...
}

也就是調用readNextPartition返回的迭代器可以迭代這個partition內的所有元素。這樣spillToMergeableFiles的主要邏輯就很清楚了,在源碼中是這樣的

      val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
      while (it.hasNext) {
        val partitionId = it.nextPartition()
        it.writeNext(writer)
        elementsPerPartition(partitionId) += 1
        objectsWritten += 1

        if (objectsWritten == serializerBatchSize) {
          flush()
          curWriteMetrics = new ShuffleWriteMetrics()
          writer = blockManager.getDiskWriter(
            blockId, file, serInstance, fileBufferSize, curWriteMetrics)
        }
      }

首先,從buffer中構造一個WritablePartitionedIterator,排序方式使用comparator,

  • 如果在ExternalSorter的構造函數中提供了Ordering,就會按Ordering排序
  • 如果沒有提供Ordering,但是提供了Aggregator,就會按hashCode排序
  • 如果即沒有Ordering,也沒有aggregator,就不排序。

然后把迭代器中的每個元素調用it.writeNext寫入writer,在此過程中根據此元素的partitionId,增長elementsPerPartition中對應的partition中的元素數,如果一個writer寫入的元素數到了serializerBatchSize,就調用flush,關閉writer,記錄這個batch對應的byte總量到batchSizes,然后建立新的writer。

最后,調用

spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition))

來記錄此次spill出來的mergable file的元數據。


 

 Merge

SortShuffleWriter在調用sorter.insertAll(records)把數據寫入sorter之后,會調用

val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)

生成最后的輸出文件(這是一整個文件)。ExternalSorter的writePartitionedFile會spill出來的數據進行merge。這分為三種情況

沒有發生spill

由於內存中buffer里的數據已經進行了aggregate(如果需要的話),所以這種情況的處理邏輯比較簡單。

  1. 調用buffer的destructiveSortedWritablePartitionedIteartor,獲取一個按partition排序的SortedWritablePartitionIterator
  2. 按partitionId的順序,把同一個partition的內容用同一個writer寫到最終的輸出文件里,寫一個partition按一個writer。
  3. 記錄每個partition的字節數,簿記到lengths里
else if (spills.isEmpty && partitionWriters == null) {
      //說明只有內存中的數據,並沒有發生spill
      // Case where we only have in-memory data
      val collection = if (aggregator.isDefined) map else buffer
      val it = collection.destructiveSortedWritablePartitionedIterator(comparator)//獲取SortedWritablePartitionIterator
      while (it.hasNext) {
        val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
          context.taskMetrics.shuffleWriteMetrics.get)
        val partitionId = it.nextPartition()//獲取此次while循環開始時的partition id
        while (it.hasNext && it.nextPartition() == partitionId) {
          it.writeNext(writer) //把與這個partition id相同的數據全寫入
        }
        writer.commitAndClose()//這個writer只用於寫入這個partition的數據,因此當此partition數據寫完后,需要commitAndClose,以使得reader可讀這個文件段。
        val segment = writer.fileSegment()
        lengths(partitionId) = segment.length//把這個partition對應的文件里數據的長度添加到lengths里
      }
    }

發生了spill, 且使用bypassMergeSort

這也意味着Aggregator和Ordering都沒有。所以不需要聚合,也不需要對partition內部的元素排序。所以直接把每個partition內容依次寫入最終的輸出文件就行了。

    if (bypassMergeSort && partitionWriters != null) {
      //byPassMergeSort了,所以會用到partitionWriters。如果partitionWriters不為空,就代表着的確寫了些東西。就需要把這些文件合並。
      spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
      partitionWriters.foreach(_.commitAndClose())//把已有的writer commitAndClose了
      val out = new FileOutputStream(outputFile, true)//把所有文件合並到使用這個文件輸出流輸出的文件
      val writeStartTime = System.nanoTime
      util.Utils.tryWithSafeFinally {
        for (i <- 0 until numPartitions) {
          val in = new FileInputStream(partitionWriters(i).fileSegment().file)//對於每個writer的輸出文件,建立一個文件輸出流
          util.Utils.tryWithSafeFinally {
            //把writer的輸出文件里的數據拷貝到最終的文件里
            lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
          } {
            in.close()
          }
        }
      } {
        out.close()
        context.taskMetrics.shuffleWriteMetrics.foreach(
          _.incShuffleWriteTime(System.nanoTime - writeStartTime))
      }
    }

發生了spill,並且沒有bypassMergeSort

這時候就需要對spill出來的mergable files以及內存中的數據進行merge。ExternalSorter使用partitionedIterator來完成merge,得到一個按partition組合出來的迭代器,它的每個元素都是(partitionId, 這個partition內容的迭代器)這樣的二元組。然后把這個partitionedIterator按partition依次寫到輸出文件里就行了。

  for ((id, elements) <- this.partitionedIterator) {//merge過程在partitionedIterator方法中
        if (elements.hasNext) {//對於這個partition的所有數據,用一個writer寫入
          val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
            context.taskMetrics.shuffleWriteMetrics.get)
          for (elem <- elements) {
            writer.write(elem._1, elem._2)
          }
          writer.commitAndClose()//寫完了
          val segment = writer.fileSegment()
          lengths(id) = segment.length//簿記
        }
      }

實際的merge過程發生在this.partitionedIterator這一步,partitionedIterator是ExternalSorter的一個方法

def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])]

partitionedIterator的實現和writePartitionedFile一樣,需要考慮同樣的三種情況,所以只有第三種情況才會被調用。partitionedIterator處理的三種情況為:

沒有發生spill

由於buffer中的數據是已經aggregate以后的(如果需要的話),所以直接把buffer里的數據按排序,同樣partition的數據就會到一起,此時簡單地按partition組合一下就行了。只是排序的時候需要考慮是否需要按Ordering來排序。

    if (spills.isEmpty && partitionWriters == null) {
      //只有內存中的數據,就按是否有ordering采用不同的排序方式得到迭代器,然后按partition對迭代器中的數據進行組合。
      if (!ordering.isDefined) {
        // The user hasn't requested sorted keys, so only sort by partition ID, not key
        groupByPartition(collection.partitionedDestructiveSortedIterator(None))
      } else {
        groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator)))
      }
    }

發生了spill, 並且bypassMergeSort

這種情況也很簡單,因為每個分區都對應不同的文件,就直接把這些分區在文件里的內容和在內存中的內容組合起來就行了。

else if (bypassMergeSort) {
      //否則就代表spill出來文件了,如果bypassMergeSort就代表着寫出來了一些文件,每個partition對應一個
      // Read data from each partition file and merge it together with the data in memory;
      // note that there's no ordering or aggregator in this case -- we just partition objects
      val collIter = groupByPartition(collection.partitionedDestructiveSortedIterator(None))//獲得內存中數據的迭代器
        //取得spill出來的那些文件里為這個partition所寫的文件,然后和內存里的這個partition的迭代器組合在一起。
      collIter.map { case (partitionId, values) =>
        (partitionId, values ++ readPartitionFile(partitionWriters(partitionId)))
      }
    }

發生了spill,並且沒有bypassMergeSort

這是最復雜的一種情況,因為此時spill出來的每個文件里都有各個分區的內容,所以需要進行merge sort,而在merge sort的過程中,可能需要進行aggregation。

ExternalSorter專門有一個merge方法來完成這個工作,所以第三種情況會直接調用merge方法。

else {
      //此時沒有bypassMergeSort,並且spill出來一些文件。因此需要把它們和內存中數據merge在一起,這是最復雜的一種情況。
      merge(spills, collection.partitionedDestructiveSortedIterator(comparator))
    }

這個merge方法首先為每個spill出來的文件創建一個reader,然后按partition id的順序,依次從各個reader和內存中的迭代器中獲取這個partition對應的那部分迭代器。這樣對於每個partition,都獲得了一組迭代器。merge方法對每個partition對應的那些迭代器進行merge。

又根據ExternalSorter是否有Aggregator和Ordering的情況,分成三種處理邏輯

1. 需要聚合,此時會調用mergeWithAggregation方法來邊merge邊做aggregate

2. 不需要聚合,並且提供了Ordering。這時候直接mergeSort就行了

3. 不需要聚合,並且沒有提供Ordering,這時候就直接把每個partition對應的那組迭代器里的元素組合在一起就行了,會直接用Iterator的flatten方法

  private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
      : Iterator[(Int, Iterator[Product2[K, C]])] = {
    val readers = spills.map(new SpillReader(_))//為每個spill出來的文件生成一個reader
    val inMemBuffered = inMemory.buffered//內存中的迭代器進行buffered,以方便查看其head的信息
    (0 until numPartitions).iterator.map { p => //對每一個partition
      val inMemIterator = new IteratorForPartition(p, inMemBuffered)//對內存中的數據獲取這個partition對應的iterator
      val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)//把文件數據的迭代器和內存數據的迭代器都放在一個seq里
      if (aggregator.isDefined) {//如果需要聚合的話
        // Perform partial aggregation across partitions 對這個partition對應的那些iterator進行merge,並且聚合數據
        (p, mergeWithAggregation(
          iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
      } else if (ordering.isDefined) {
        // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
        // sort the elements without trying to merge them
        (p, mergeSort(iterators, ordering.get))
      } else {
        (p, iterators.iterator.flatten)
      }
    }
  }

merge方法 

mergeWithAggregation

這會邊merge,邊做aggregation。根據傳進去的iterators是否是按照Ordering排序的,分為兩種:

1. 非totalOrder

這是最復雜的一種情況。非totalOrder,說明了這些迭代器中一個partition內部的元素實際是按照hash code排序的。所以即使key1==key2,但是key1和key2之間可能有key3, 它只是與key1和key2有相同的哈希碼,但==號並不成立。它所要處理的問題和hash shuffle的ExternalAppendOnlyMap是類似的,但算法並不相同。ExternalSorter里的算法復雜度更低一些,但實際運行時的情況跟互不相等的key的hash code的沖突程度有關。ExternalAppendOnlyMap是基於PriorityQueue做的,而ExternalSorter里的算法是使用兩個buffer完成的,后者充分利用了“==不成立的元素不可能有相同的hash code”這個條件,把相同hash code的元素都取出來,對這些元素用兩個buffer做聚合。

    if (!totalOrder) {
      // We only have a partial ordering, e.g. comparing the keys by hash code, which means that
      // multiple distinct keys might be treated as equal by the ordering. To deal with this, we
      // need to read all keys considered equal by the ordering at once and compare them.
      new Iterator[Iterator[Product2[K, C]]] {
        val sorted = mergeSort(iterators, comparator).buffered//先按comparator進行merge sort,不aggregate

        // Buffers reused across elements to decrease memory allocation
        val keys = new ArrayBuffer[K] //存放compare為0,但又相到不==的所有key
        val combiners = new ArrayBuffer[C]//存放keys中對應位置的key對應的所有combiner聚合后的結果

        override def hasNext: Boolean = sorted.hasNext

        override def next(): Iterator[Product2[K, C]] = {
          if (!hasNext) {
            throw new NoSuchElementException
          }
          keys.clear()
          combiners.clear()
          val firstPair = sorted.next()//獲取排序后iterator的第一個pair
          keys += firstPair._1//第一個pair的key放在keys里
          combiners += firstPair._2 //第一個pair的combiner放在combiners里
          val key = firstPair._1//第一個pair的key
          while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) {
            //獲取sorted中跟前key compare以后為0的下一個kv。注意,compare為0不一定 ==號成立
            val pair = sorted.next()
            var i = 0
            var foundKey = false
            while (i < keys.size && !foundKey) {
              if (keys(i) == pair._1) {//用當前取出的這個kc的key與keys中key依次比較,找到一個==的,就對combiner進行aggregate,然后結果放在combiners
              // 里,並且結束循環
                combiners(i) = mergeCombiners(combiners(i), pair._2)
                foundKey = true
              }
              i += 1
            }
            //如果這個kc里的key與keys里所有key都不==,意味着它與它當前緩存的所有keycompare為0但不==,所以它是一個新的key,就放在keys里,它的combiner放在combiners里
            if (!foundKey) {
              keys += pair._1
              combiners += pair._2
            }
          }

          // Note that we return an iterator of elements since we could've had many keys marked
          // equal by the partial order; we flatten this below to get a flat iterator of (K, C).
          keys.iterator.zip(combiners.iterator) //把keys和combiners 進行zip,得到iterator of (K, C)
        }
      }.flatMap(i => i) //flatMap之前是Iteator[compare為0的所有kc聚合而成的Iteator[K, C]], 所以直接flatMap(i => i)就成了
    }

 2.totalOrder

此時對這些迭代器先用comparator進行merge sort, 得到的merge后的迭代器里所有==號成立的key就都挨在一起了。所以接下來只需要直接按==號把迭代器划分,然后進行aggregate就行了。

else {
      //因為是total ordering的,意味着用Ordering排序,所以==的key是挨在一起的
      // We have a total ordering, so the objects with the same key are sequential.
      new Iterator[Product2[K, C]] {
        val sorted = mergeSort(iterators, comparator).buffered

        override def hasNext: Boolean = sorted.hasNext

        override def next(): Product2[K, C] = {
          if (!hasNext) {
            throw new NoSuchElementException
          }
          val elem = sorted.next()
          val k = elem._1
          var c = elem._2
          while (sorted.hasNext && sorted.head._1 == k) { //取出所有==的kc,進行merge
            val pair = sorted.next()
            c = mergeCombiners(c, pair._2)
          }
          (k, c)
        }
      }
    }

總結

ExternalSorter就Spark的sort-based shuffle的核心,它整個文件有800多行,雖然其算法不太復雜,還是由於要處理各種情況,以及進行相關的優化,其實現還是很繁瑣的。它的復雜性主要來源於以下幾個方面:

  1. 需要控制內存消耗,所以需要spill以及merge
  2. 在spill和merge過程中需要考慮到Aggregator和Ordering的不同情況
  3. 需要為每個partition使用一個輸出流,因此有一些輸出流的切換和簿記工作。

此外,為了提高效率,它根據特殊情況使用了PartitonedSerializedPairBuffer、byPassMergeSort等優化手段。

在它的實現中,大量使用了迭代器。

  1. 使用了大量迭代器的基本操作,如map、flatmap、flatten、filter、zip。

  2. 為各種集合生成了特殊的迭代器。主要是WritableParitionedPariCollection中定義的三種獲取特殊迭代器的方法:partitionedDestructiveSortedIterator,     destructiveSortedWritablePartitionedIterator, writablePartitionedIterator

  3. 大量使用了迭代器的包裝。比如Scala的BufferedIterator, mergeSort和mergeWithAggregation中的包裝了另一個迭代器的匿名迭代器(new Iterator{...}),     IteratorForPartition。

而且它所使用的三種buffer的設計,以及merge的算法也是值得看一下的。

不過shuffle絕對是Spark程序的性能殺手。每個元素都要經過如此復雜的處理,所以shuffle的總的性能開銷還是挺大的,但這也意味着對shuffle的過程進行優化可以對性能有較大的提升。俺認為,一方面,可以優化自己的程序,包括盡量避免shuffle、減少需要shuffle中需要IO的數據量(各種使kv、kc序列化后變得更小的方法,使用map-side combiner)、選擇合適的shuffle配置參數等;另一方面,Spark框架本身的shuffle的實現也還有優化的空間,比如對內存占用更准確地估計,根據被shuffle的數據的特點區分不同情況以采用更細致的策略(比如實現shuffle專用的各種特殊集合, 考慮到shuffle特點的序列化方法等)。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM