spark block讀寫流程分析


之前分析了spark任務提交以及計算的流程,本文將分析在計算過程中數據的讀寫過程。我們知道:spark抽象出了RDD,在物理上RDD通常由多個Partition組成,一個partition對應一個block。在driver和每個executor端,都有一個Blockmanager。Blockmanager是spark在計算過程中對block進行讀寫的入口,它屏蔽了在讀取數據時涉及到的內存分配,從其他executor端遠程獲取等具體細節。接下來,本文將以讀寫block為主線,分析spark在計算過程中讀寫實際數據的流程。

 

1,計算數據寫流程

1.1,從計算上來說, RDD中的一個Partition對應一個Task。在Task在taskRunner的run方法中調用task.run方法,然后根據計算結果的大小,以不同形式(直接發送或者通過blockManager)將數據發送給driver。

val value = try {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
}

...

// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
if (maxResultSize > 0 && resultSize > maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize > maxDirectResultSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId,
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}

1.2,在使用block形式的時候,可以看到調用了blockManager的putBytes方法,在核心實現doPutBytes中,根據存儲級別和是否序列化使用memstore和diskstore中不同的不方法進行存儲。顧名思義,memstore和diskstore主要就是根據存儲級別將對應的block儲存到內存或者磁盤。

1.3,memstore的putBytes實現如下。可以看到首先需要通過memoryManager申請存儲保存當前block的內存,申請到內存后改block的數據,會以BlockId和SerializeMemoryEntry的鍵值對,保存在memstore的entries的對象中。關於memoryManager,當前有StaticMemoryManager和UnifiedMemoryManager兩種實現。StaticMemoryManager是之前老的實現,將spark計算過程使用的存儲內存和計算內存按照總大小的固定比例進行分配。UnifiedMemoryManager是2.x的默認實現,相對StaticMemoryManager,UnifiedMemoryManager中存儲和計算的內存是可以動態調整的。也就是說,當計算內存緊張,儲存內存空閑的時候,計算內存可以借用存儲內存。反之類似。

1.4,在1.3完成當前executor機器完成當前block存儲以后,當需要告訴driver時(tellMaster參數),會將該block的狀態匯報給driver(reportBlockStatus),通過向dirver發送UpdateBlockInfo消息。driver接收到UpdateBlockInfo消息后,將匯報過來的相關信息保存在BlockManagerMasterEndpoint的blockManagerInfo和blockLocations中。

至此,計算過程寫數據的流程完成。

2,計算數據讀流程

2.1,話說在TaskRunner運行結束以后,會調用execBackend.statusUpdate,會將該任務的結束的狀態通過StatusUpdate的信息發送給driver。

2.2,driver端接收到StatusUpdate消息后,最終將調用TaskResultGetter的enqueueSuccessfulTask方法。在該方法中,對於使用block(即IndirectTaskResult),最終將調用blockManager的getRemoteBytes獲取該blockId對應的數據。

2.3,在blockManager的getRemoteBytes方法中,主要邏輯是獲取該blockId對應的存儲該blockId數據的所有機器位置,通過調用blockTransferService的fetchBlockSync獲取具體數據,一旦從一個指定的位置獲取到數據,則立即返回。

2.4,fetchBlockSync接着會調用具體實現NettyBlockTransferService中的fetchBlocks方法,在該方法中,將通過OneForOneBlockFetcher發送OpenBlocks消息給指定目標的blockManager,從而對應的streamId等信息,然后通過

client.fetchChunk一次獲取每塊的數據。

2.5,在提供數據的blockManager端(即server端),接受到消息OpenBlocks消息后,首先根據blockId通過blockManager的getBlockData方法獲取對應的數據,然后將該數據和一個streamId獎勵對應關系(通過streamManager調用進行)

2.6,在2.4中獲取到對應的streamId后,將通過ChunkFetchRequest分塊獲取數據。server端接受到該消息以后,streamManager將根據streamId和chunkIndex獲取對應數據,然后返回給客戶端。

至此,計算過程獲取block數據的流程結束。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM