Spark存儲介紹


記錄一下Spark的存儲相關內容

@


Spark雖說是計算引擎,但存儲也是比較重要的一塊。

在cache和shuffle等地方用到了存儲,存儲介質包括有內存和磁盤。

整體架構

Spark存儲采用主從模式(Master/Slave),模塊間使用RPC進行通信。

Master負責運行期間數據塊元數據的管理和維護。

Slave一方面將本地數據塊的狀態報告給Master;另一方面接收Master傳過來的執行命令,如獲取數據塊狀態、刪除RDD/數據塊等命令。

Slave之間存在數據傳輸通道,可以進行遠程數據的讀取和寫入。


存儲相關類

在看整體架構之前,先看一下Spark存儲模塊相關類,以下是類圖:
存儲模塊相關類

可以看到是以BlockManager為核心。

BlockManager:存在於Driver和Executor中,Driver端的BlockManager保存了數據的元數據信息,Executor端的BlockManager根據接收到的消息進行操作。

BlockManagerMaster:Driver端特有的Master類,用來接收處理Executor發送來的請求。

BlockManagerMasterEndpoint:Master的消息終端點,用於與遠程Slave進行消息通信。

BlockManagerSlaveEndpoint:Slave的消息終端點,用於與Master進行通信。

BlockTransferService:在遠程節點間提供數據傳輸服務。

BlockManagerInfo:維護了BlockManager的一些信息。

DiskBlockManager:對數據塊進行磁盤讀寫的管理者。

DiskStore:在磁盤上存儲BlockManager塊。

MemoryStore:將BlockManager存儲在內存中。

MapOutputTracker:跟蹤shuffle map stage輸出位置的類。

ShuffleManager:shuffle的管理器,可以用於獲取shuffle讀寫的組件。


接下來看看Spark存儲的消息通信架構:

以下是架構圖,

Spark存儲的消息通信架構

圖中根據數據的生命周期描述了四個步驟:

  1. RegisterBlockManager。應用程序啟動時、初始化相關組件。
  2. UpdateBlockInfo。增刪改后更新數據塊信息。
  3. GetLocations、GetMemoryStatus。查詢數據存放的位置,對數據進行讀取。
  4. RemoveBlock、RemoveRDD。提供了刪除的功能。

依次看看四個步驟的具體過程:

應用啟動時

應用程序啟動時,SparkContext創建Driver端的SparkEnv,在該SparkEnv中實例化BlockManager和BlockManagerMaster,在BlockManagerMaster內部創建消息通信的BlockManagerMasterEndpoint。

Executor啟動時也會創建其SparkEnv,在該SparkEnv中實例化BlockManager和負責網絡數據傳輸服務的BlockTransferService。在BlockManager初始化過程中,一方面會加入BlockManagerMasterEndpoint終端點的引用,另一方面會創建Executor消息通信的BlockManagerSlaveEndpoint終端點,並把終端點的引用注冊到Driver中,Driver和Executor相互持有引用,在應用執行過程中就可以進行通信了。

增刪改后更新元數據

當寫入、更新或刪除數據完畢后,發送數據塊的最新狀態消息UpdateBlockInfo給BlockManagerMasterEndpoint終端點,由其更新數據塊的元數據。該終端點的元數據存放BlockManagerMasterEndpoint的3個HashMap中,如下:

// 該HashMap中存放了BlockManagerId與BLockManagerInfo的對應,其中BlockManagerInfo包含了Executor的內存使用情況、數據塊的使用情況、已被緩存的數據塊和Executor終端點的引用
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]

// 該HashMap存放了ExecutorId和BlockManagerId的對應
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

// 該HashMap存放了BlockId和BlockManagerId序列 的對應,原因在於一個數據塊可能存儲有多個副本,保存在多個Executor中
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

獲取數據存放位置

應用數據存儲后,在獲取遠程節點數據、獲取RDD執行的首選位置時需要根據數據塊的編號查詢數據塊所處的位置,通過發送GetLocations或GetLocationsMultipleBlockIds等消息給BlockManagerMasterEndpoint,通過對元數據的查詢獲取數據塊的位置信息。

數據塊的刪除

當數據需要刪除時,提交刪除消息給BlockManagerSlaveEndpoint終端點,在該終端店發起刪除操作。刪除操作一方面需要刪除Driver端的元數據信息,另一方面發送消息通知Executor,刪除對應的物理數據。

RDD存儲調用

RDD和Block的關系:RDD包含了多個Partition,每個Partition對應一個數據塊,那么每個RDD中包含一個或多個數據塊Block。

我們知道RDD是懶執行的,只有在遇到行動操作的時候,才會提交作業、划分階段、執行任務,其真正發生數據操作是調用RDD.iterator()時發生的。

我們看看RDD的iterator方法:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    // 如果存在存儲級別,則從嘗試從緩存中讀取數據,緩存不存在時再進行計算
    if (storageLevel != StorageLevel.NONE) {
        getOrCompute(split, context)
    // 不存在緩存,直接計算或者從checkpoint中讀取
    } else {
        computeOrReadCheckpoint(split, context)
    }
}

iterator中,會判斷是否存在存儲級別(其實就是緩存),如果存在調用getOrCompute(),如果不存在調用computeOrReadCheckpoint()

先看一下不存在緩存的時候,調用的computeOrReadCheckpoint()

// 如果RDD存在檢查點,則從檢查點讀取它。不存在,則計算
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{	
    if (isCheckpointedAndMaterialized) {
    	// 存在時,調用父RDD的iterator讀取數據   
        firstParent[T].iterator(split, context)
    } else {
        // 不存在時,直接調用compute方法對數據進行計算
        compute(split, context)
    }
}

computeOrReadCheckpoint()會從checkpoint中讀取數據或重新計算數據,進行返回。

再看一下存在緩存時,調用的getOrCompute()

private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
    // 通過RDD的編號和Partition序號獲取數據塊Block的編號
    val blockId = RDDBlockId(id, partition.index)
    var readCachedBlock = true
    // 根據數據塊編號先讀取數據,然后再更新數據,這里是讀寫數據的入口
    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
        readCachedBlock = false
        // 如果緩存中不存在數據塊,則嘗試調用computeOrReadCheckpoint()從檢查點讀取或重新計算
        computeOrReadCheckpoint(partition, context)
    }) match {
        // 對返回結果進行處理,該結果表示處理成功
        case Left(blockResult) =>
        if (readCachedBlock) {
            val existingMetrics = context.taskMetrics().inputMetrics
            existingMetrics.incBytesRead(blockResult.bytes)
            new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
                override def next(): T = {
                    existingMetrics.incRecordsRead(1)
                    delegate.next()
                }
            }
        } else {
            new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
        }
        // 處理失敗把結果返回調用者
        case Right(iter) =>
        new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
    }
}

// 從緩存中獲取數據;如果緩存中不存在,重新計算並寫入緩存
def getOrElseUpdate[T](
    blockId: BlockId,
    level: StorageLevel,
    classTag: ClassTag[T],
    makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
    // 讀數據的入口,嘗試從本地或遠程讀取數據
    get[T](blockId)(classTag) match {
        case Some(block) =>
        	return Left(block)
        case _ =>
    }
    // 寫數據入口
    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
        case None =>
            val blockResult = getLocalValues(blockId).getOrElse {
                releaseLock(blockId)
            }
            releaseLock(blockId)
            Left(blockResult)
        case Some(iter) =>
        	Right(iter)
    }
}

getOrCompute()中會調用getOrElseUpdate()方法從緩存中讀取數據;如果緩存中不存在數據則重新計算,並寫入緩存。


RDD的計算就是基於對Iterator中數據的不斷轉換,只有需要存儲的時候,才會做對應的存儲操作。

數據讀取

BlockManager的get方法時讀數據的入口。

讀數據時分為本地讀取和遠程節點讀取。

本地讀取時使用getLocalValues方法,在該方法中根據不同的存儲級別調用不同的實現。

遠程讀取時使用getRemoteValues方法,最終調用BlockTransferService的fetchBlockSync進行處理,使用Netty的fetchBlocks方法獲取數據。數據讀取調用圖如下:
數據讀取調用圖

數據寫入

BlockManager的doPutIterator方法是寫數據的入口點。

在該方法中,根據數據是否緩存到內存中進行處理。

如果不緩存到內存中,調用BlockManager的putIterator方法直接存儲磁盤;如果緩存到內存中,先判斷數據是否進行了反序列化。

如果設置反序列化,說明數據為值類型,調用putIteratorAsValues把數據存入內存;如果沒有設置反序列化,說明數據為字節類型,調用putIteratorAsBytes把數據寫入內存。

在把數據存入內存過程中,需要判斷在內存中展開該數據是否足夠,當足夠時調用BlockManager的putArray方法寫入內存,否則把數據寫入磁盤。


寫入完成后,一方面把數據塊的元數據發送給Driver端的BlockManagerMasterEndpoint終端點,請求其更新數據元數據;另一方面判斷是否需要創建副本,如果需要則調用replicate方法,把數據寫到遠程節點上。

寫入調用圖如下:
數據讀取調用圖

cache & checkpoint

cache:將RDD的數據緩存到內存或磁盤中。

checkpoint:將計算過程中重要的中間數據建立檢查點,類似於快照。

cache的應用主要是對一個RDD的進行復用,避免重復計算。

相對於cache而言,checkpoint將切斷與該RDD之前的依賴關系。設置檢查點對包含寬依賴的長血統RDD是非常重要的,可以避免失敗時重新計算的高成本。

貼兩個緩存和檢查點講的比較清晰的鏈接:

https://github.com/JerryLead/SparkInternals/blob/master/markdown/6-CacheAndCheckpoint.md

https://blog.csdn.net/qq_20641565/article/details/76223002


end. 以上內容來自看書和自己的理解,如果偏差,歡迎指正。

Reference

《圖解Spark核心技術與案例實踐》



個人公眾號:碼農峰,定時推送行業資訊,持續發布原創技術文章,歡迎大家關注。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM