spark存儲模塊之內存存儲--MemeoryStore


MemeoryStore

上一節,我們對BlockManager的主要寫入方法做了一個整理,知道了BlockMananger的主要寫入邏輯,以及對於塊信息的管理。但是,由於spark的整個存儲模塊是在是很龐大,而且很多細節的邏輯錯綜復雜,如果對於每個細節都刨根問底,一來精力有限,二來感覺也沒有太大的必要,當然如果時間允許肯定是越詳細越好,在這里,我的分析的主要目的是理清存儲模塊的重點邏輯,希望能夠提綱契領地把各個模塊的脈絡領出來,建立起對spark-core中各模塊的整體認知,這樣我們在遇到一些問題的時候就能夠很快地知道應該從何處下手,從哪個具體的模塊去找問題。
好了廢話不多說,本節接着上一節。上一篇,我們分析了BlockManager的幾個主要的存儲方法,發現BlockManager主要依靠內部的兩個組件MemoryStore和DiskStore來進行實際的數據寫入和塊的管理。
本節,我們就來看一下MemoryStore這個組件。

不過,我還是延續我一貫的風格,從外部對一個類的方法調用為切入點分析這個類的作用和邏輯。
所以,我們先來看一下上一節對於MemoryStore的主要的方法調用的總結:

memoryStore.putIteratorAsValues
memoryStore.putIteratorAsBytes
memoryStore.putBytes

memoryStore.putIteratorAsValues

這個方法主要是用於存儲級別是非序列化的情況,即直接以java對象的形式將數據存放在jvm堆內存上。我們都知道,在jvm堆內存上存放大量的對象並不是什么好事,gc壓力大,擠占內存,可能引起頻繁的gc,但是也有明顯的好處,就是省去了序列化和反序列化耗時,而且直接從堆內存取數據顯然比任何其他方式(磁盤和直接內存)都要快很多,所以對於內存充足且要緩存的數據量本省不是很大的情況,這種方式也不失為一種不錯的選擇。

private[storage] def putIteratorAsValues[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {

// 用於存儲java對象的容器
val valuesHolder = new DeserializedValuesHolder[T](classTag)

putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
    // 存儲成功
  case Right(storedSize) => Right(storedSize)
    // 存儲失敗的情況
  case Left(unrollMemoryUsedByThisBlock) =>
    // ValuesHolder內部的數組和vector會相互轉換
    // 數據寫入完成后會將vector中的數據轉移到數組中
    val unrolledIterator = if (valuesHolder.vector != null) {
      valuesHolder.vector.iterator
    } else {
      valuesHolder.arrayValues.toIterator
    }

    // 返回寫入一半的迭代器、
    // 外部調用者一半會選擇關閉這個迭代器以釋放被使用的內存
    Left(new PartiallyUnrolledIterator(
      this,
      MemoryMode.ON_HEAP,
      unrollMemoryUsedByThisBlock,
      unrolled = unrolledIterator,
      rest = values))
}
}

這個方法的邏輯很簡單,作用也比較單一,主要是對實際存儲方法putIterator的返回結果做處理,如果失敗的話,就封裝一個PartiallyUnrolledIterator返回給外部調用這個,調用這個一般需要將這個寫入一半的迭代器關閉。

MemoryStore.putIterator

這個方法看似很長,其實邏輯相對簡單,主要做的事就是把數據一條一條往ValuesHolder中寫,並周期性地檢查內存,如果內存不夠就通過內存管理器MemoryManager申請內存,每次申請當前內存量的1.5倍。
最后,將ValuesHolder中的數據轉移到一個數組中(其實數據在SizeTrackingVector中也是以數組的形式存儲,只不過SizeTrackingVector對象內部處理數組還有一些其他的簿記量,更為關鍵的是我們需要將存儲的數據以同一的接口進行包裝,以利於MemoryStore進行同一管理)。最后還有關鍵的一步,就是釋放展開內存,重新申請存儲內存。
此外,這個過程中有使用到memoryManager,具體的方法調用是:

memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)

------------------------------分割線------------------------------

private def putIterator[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode,
  valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

// Number of elements unrolled so far
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
// 用於數據在內存展開的初始的內存使用量
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
// 檢查內存的頻率,每寫這么多條數據就會檢查一次是否需要申請額外的內存
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory currently reserved by this task for this particular unrolling operation
// 內存閾值,開始時等於初始閾值
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
// 內存增長因子,每次申請的內存是當前內存的這個倍數
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
// 當前的塊使用的內存大小
var unrollMemoryUsedByThisBlock = 0L

// Request enough memory to begin unrolling
// 首先進行初始的內存申請,向MemoryManager申請內存
keepUnrolling =
  reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)

if (!keepUnrolling) {
  logWarning(s"Failed to reserve initial memory threshold of " +
    s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
  // 如果成功申請到內存,則累加記錄
  unrollMemoryUsedByThisBlock += initialMemoryThreshold
}

// Unroll this block safely, checking whether we have exceeded our threshold periodically
// 循環將每條數據寫入容器中valuesHolder
while (values.hasNext && keepUnrolling) {
  valuesHolder.storeValue(values.next())
  // 如果寫入數據的條數達到一個周期,那么就檢查一下是否需要申請額外的內存
  if (elementsUnrolled % memoryCheckPeriod == 0) {
    // 通過valuesHolder獲取已經寫入的數據的評估大小
    // 注意,這里的數據大小只是估計值,並不是十分准確
    // 具體如何進行估算的可以看valuesHolder內部實現
    val currentSize = valuesHolder.estimatedSize()
    // If our vector's size has exceeded the threshold, request more memory
    // 如果已寫入的數據大小超過了當前閾值
    if (currentSize >= memoryThreshold) {
      // 這里每次申請的內存量都是不一樣的
      // 每次申請的內存是當前已使用內存的1.5倍(默認)
      val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
      keepUnrolling =
        reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
      if (keepUnrolling) {
        // 記錄累積申請的內存量
        unrollMemoryUsedByThisBlock += amountToRequest
      }
      // New threshold is currentSize * memoryGrowthFactor
      // 目前已經向內存管理器申請的內存量
      memoryThreshold += amountToRequest
    }
  }
  // 記錄插入的數據條數
  elementsUnrolled += 1
}

// Make sure that we have enough memory to store the block. By this point, it is possible that
// the block's actual memory usage has exceeded the unroll memory by a small amount, so we
// perform one final call to attempt to allocate additional memory if necessary.
// 如果keepUnrolling為true,說明順利地將所有數據插入,
// 並未遇到申請內存失敗的情況
if (keepUnrolling) {
  // 將內部的數據轉移到一個數組中
  val entryBuilder = valuesHolder.getBuilder()
  // 數據在內存中的精確大小
  val size = entryBuilder.preciseSize
  // 實際的大小可能大於申請的內存量
  // 因此根據實際大小還要再申請額外的內存
  if (size > unrollMemoryUsedByThisBlock) {
    val amountToRequest = size - unrollMemoryUsedByThisBlock
    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
    if (keepUnrolling) {
      unrollMemoryUsedByThisBlock += amountToRequest
    }
  }

  if (keepUnrolling) {
    // 獲取MemoryEntry對象,該對象是對插入數據的包裝
    val entry = entryBuilder.build()
    // Synchronize so that transfer is atomic
    memoryManager.synchronized {
      // 這一步主要是釋放申請的展開內存
      // 然后申請存儲內存
      // 這里需要弄清楚展開內存的概念
      // 展開狀態指的是對象在內存中處於一種比較松散的狀態,這樣的狀態方便做一些管理如統計大小等
      // 而隨后將對象轉移到數組中,處於一種比較緊實的狀態,數組相對來說占用的額外內存是比較小的
      // 一個數組只是一個對象,只有一個對象頭,可以用來管理大量的對象
      releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
      // 申請存儲內存
      val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
      assert(success, "transferring unroll memory to storage memory failed")
    }

    // 放入map中管理起來
    entries.synchronized {
      entries.put(blockId, entry)
    }

    logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
      Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
    Right(entry.size)
  } else {
    // We ran out of space while unrolling the values for this block
    logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
    // 如果失敗,返回已經申請的展開內存
    Left(unrollMemoryUsedByThisBlock)
  }
} else {
  // We ran out of space while unrolling the values for this block
  logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
  Left(unrollMemoryUsedByThisBlock)
}
}

memoryStore.putIteratorAsBytes

我們再看另一個方法。套路基本和putIteratorAsValues是一樣一樣的。
最大的區別在於ValuesHolder類型不同。非序列化形式存儲使用的是DeserializedMemoryEntry,而序列化形式存儲使用的是SerializedMemoryEntry。

private[storage] def putIteratorAsBytes[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {

require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// 字節數組的塊大小,默認是1m
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
  logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
    s"is too large to be set as chunk size. Chunk size has been capped to " +
    s"${Utils.bytesToString(Int.MaxValue)}")
  Int.MaxValue
} else {
  initialMemoryThreshold.toInt
}

// 字節數組的容器
val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,
  memoryMode, serializerManager)

putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {
  case Right(storedSize) => Right(storedSize)
  case Left(unrollMemoryUsedByThisBlock) =>
    // 部分展開,部分以序列化形式存儲的block
    Left(new PartiallySerializedBlock(
      this,
      serializerManager,
      blockId,
      valuesHolder.serializationStream,
      valuesHolder.redirectableStream,
      unrollMemoryUsedByThisBlock,
      memoryMode,
      valuesHolder.bbos,
      values,
      classTag))
}
}

memoryStore.putBytes

我們再來看另一個被外部調用用來插入數據的方法。很簡單,不說了。

def putBytes[T: ClassTag](
  blockId: BlockId,
  size: Long,
  memoryMode: MemoryMode,
  _bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// 首先向內存管理器申請內存
// 這里申請的是存儲內存,因為要插入的字節數組,
// 所以不需要再展開,也就不需要申請展開內存
if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
  // We acquired enough memory for the block, so go ahead and put it
  val bytes = _bytes()
  assert(bytes.size == size)
  // 這里直接構建了一個SerializedMemoryEntry
  // 並放到map中管理起來
  val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
  entries.synchronized {
    entries.put(blockId, entry)
  }
  logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
    blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
  true
} else {
  false
}
}

小結

通過對上面的三個方法,其實主要是前兩個方法的分析,我們發現,除了對內存進行簿記管理之外,以及通過內存管理器申請內存之外,插入數據最主要的工作其實都是有ValuesHolder對象來完成的。
ValuesHolder特質有兩個實現類:DeserializedValuesHolder和SerializedValuesHolder。

DeserializedValuesHolder

DeserializedValuesHolder對象內部有兩個成員:vector,是一個SizeTrackingVector;arrayValues,是一個存放值的數組,用於在所有數據插入后,將主句轉移到一個數組中,方便包裝成一個MemoryEntry對象。大部分工作是有SizeTrackingVector完成的。

private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] {
  // Underlying vector for unrolling the block
  var vector = new SizeTrackingVector[T]()(classTag)
  var arrayValues: Array[T] = null

  override def storeValue(value: T): Unit = {
    vector += value
  }

  override def estimatedSize(): Long = {
    vector.estimateSize()
  }

  override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
    // We successfully unrolled the entirety of this block
    arrayValues = vector.toArray
    vector = null

    override val preciseSize: Long = SizeEstimator.estimate(arrayValues)

    override def build(): MemoryEntry[T] =
      DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
  }
}

SizeTracker

上面提到的SizeTrackingVector繼承了這個特質,除了這個特質,還集成了PrimitiveVector類,但是PrimitiveVector類基本上就是對一個數組的簡單包裝。
SizeTrackingVector最重要的功能:追蹤對象的大小,就是在SizeTracker特之中實現的。

我大致說一下這個特質是如何實現對象大小跟蹤和估算的,代碼實現也並不復雜,感興趣的可以看一看,限於篇幅這里就不貼了。

  • 每插入一定數量的數據(姑且稱之為周期),就會對當前的對象進行一次取樣,而這個取樣的周期會越來越長,以1.1倍的速率增長;
  • 取樣就是計算對象大小,並與前一次取樣作比較,而且只會保留最近兩次的取樣數據;
  • 每次取樣其實就是獲取兩個數據,當前對象大小,當前插入的數據條數;
  • 這樣與上一次取樣一比較,就能夠計算出每條數據的大小了;
  • 最后,在返回整個對象大小時,是拿最近一次取樣時記錄下的對象大小,以及根據最近的情況估算的每條數據的大小乘以自從上次取樣以來新插入的數據量,二者相加作為對象大小的估算值,

可見這么做並不是什么精確,但是由於是抽樣,而且抽樣周期越往后面越長,所以對於數據插入的效率影響很小,而且這種不精確性其實在后續的內存檢查過程中是有考慮到的。在所有數據插入完的收尾工作中,會對對象大小做一次精確計算。此外,熟悉spark內存管理的同學應該知道,其實spark一般會配置一個安全因子(一般是0.9),也就是說只是用配置的內存大小的90%,就是為了盡可能地減少這種不精確的內存估算造成OOM的可能性。

SerializedValuesHolder

private class SerializedValuesHolder[T](
    blockId: BlockId,
    chunkSize: Int,
    classTag: ClassTag[T],
    memoryMode: MemoryMode,
    serializerManager: SerializerManager) extends ValuesHolder[T] {
  val allocator = memoryMode match {
    case MemoryMode.ON_HEAP => ByteBuffer.allocate _
      // 調用unsafe的本地方法申請直接內存
      // 這個方法之所以沒有調用ByteBuffer.allocateDirect方法
      // 是因為這個方法分配的直接內存大小收到參數MaxDirectMemorySize限制
      // 所以這里繞過ByteBuffer.allocateDirect方法,通過反射和unsafe類創建直接內存對象
    case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
  }

  val redirectableStream = new RedirectableOutputStream
  val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
  redirectableStream.setOutputStream(bbos)
  val serializationStream: SerializationStream = {
    val autoPick = !blockId.isInstanceOf[StreamBlockId]
    val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
    // 包裝壓縮流和序列化流
    ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
  }

  // 寫入方法,寫入的對象經過序列化,壓縮,
  // 然后經過ChunkedByteBufferOutputStream被分割成一個個的字節數組塊
  override def storeValue(value: T): Unit = {
    serializationStream.writeObject(value)(classTag)
  }

  override def estimatedSize(): Long = {
    bbos.size
  }

  override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
    // We successfully unrolled the entirety of this block
    serializationStream.close()

    override def preciseSize(): Long = bbos.size

    override def build(): MemoryEntry[T] =
      SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
  }
}

大概看一下,主要的邏輯很簡單,這里面有幾個注意點:

  • 對於直接內存分配,spark並沒有使用jdk的高級api,而是反射配合unsafe類分配直接內存,這樣可以繞過jvm參數MaxDirectMemorySize的限制,這也體現了spark的作者盡可能的降低用戶使用難度
  • 另外,我們看到序列化流其實經過了層層包裝(典型的裝飾器模式),序列化和壓縮以及分塊是比較重要的幾個點,感興趣的話可以深究,序列化和壓縮如果深入了解都是很大的課題,所以這里也僅僅是蜻蜓點水,不深究了。

總結

MemoryStore.scala這個文件中乍看代碼有八百多行,但是其實很大部分代碼是一些輔助類,比較核心的寫入邏輯也就是前面提到的幾個方法,再加上核心的兩個類DeserializedValuesHolder和SerializedValuesHolder實現了以對象或字節數組的形式存儲數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM