記錄一下Spark的存儲相關內容
@
Spark雖說是計算引擎,但存儲也是比較重要的一塊。
在cache和shuffle等地方用到了存儲,存儲介質包括有內存和磁盤。
整體架構
Spark存儲采用主從模式(Master/Slave),模塊間使用RPC進行通信。
Master負責運行期間數據塊元數據的管理和維護。
Slave一方面將本地數據塊的狀態報告給Master;另一方面接收Master傳過來的執行命令,如獲取數據塊狀態、刪除RDD/數據塊等命令。
Slave之間存在數據傳輸通道,可以進行遠程數據的讀取和寫入。
存儲相關類
在看整體架構之前,先看一下Spark存儲模塊相關類,以下是類圖:
可以看到是以BlockManager為核心。
BlockManager:存在於Driver和Executor中,Driver端的BlockManager保存了數據的元數據信息,Executor端的BlockManager根據接收到的消息進行操作。
BlockManagerMaster:Driver端特有的Master類,用來接收處理Executor發送來的請求。
BlockManagerMasterEndpoint:Master的消息終端點,用於與遠程Slave進行消息通信。
BlockManagerSlaveEndpoint:Slave的消息終端點,用於與Master進行通信。
BlockTransferService:在遠程節點間提供數據傳輸服務。
BlockManagerInfo:維護了BlockManager的一些信息。
DiskBlockManager:對數據塊進行磁盤讀寫的管理者。
DiskStore:在磁盤上存儲BlockManager塊。
MemoryStore:將BlockManager存儲在內存中。
MapOutputTracker:跟蹤shuffle map stage輸出位置的類。
ShuffleManager:shuffle的管理器,可以用於獲取shuffle讀寫的組件。
接下來看看Spark存儲的消息通信架構:
以下是架構圖,
圖中根據數據的生命周期描述了四個步驟:
- RegisterBlockManager。應用程序啟動時、初始化相關組件。
- UpdateBlockInfo。增刪改后更新數據塊信息。
- GetLocations、GetMemoryStatus。查詢數據存放的位置,對數據進行讀取。
- RemoveBlock、RemoveRDD。提供了刪除的功能。
依次看看四個步驟的具體過程:
應用啟動時
應用程序啟動時,SparkContext創建Driver端的SparkEnv,在該SparkEnv中實例化BlockManager和BlockManagerMaster,在BlockManagerMaster內部創建消息通信的BlockManagerMasterEndpoint。
Executor啟動時也會創建其SparkEnv,在該SparkEnv中實例化BlockManager和負責網絡數據傳輸服務的BlockTransferService。在BlockManager初始化過程中,一方面會加入BlockManagerMasterEndpoint終端點的引用,另一方面會創建Executor消息通信的BlockManagerSlaveEndpoint終端點,並把終端點的引用注冊到Driver中,Driver和Executor相互持有引用,在應用執行過程中就可以進行通信了。
增刪改后更新元數據
當寫入、更新或刪除數據完畢后,發送數據塊的最新狀態消息UpdateBlockInfo給BlockManagerMasterEndpoint終端點,由其更新數據塊的元數據。該終端點的元數據存放BlockManagerMasterEndpoint的3個HashMap中,如下:
// 該HashMap中存放了BlockManagerId與BLockManagerInfo的對應,其中BlockManagerInfo包含了Executor的內存使用情況、數據塊的使用情況、已被緩存的數據塊和Executor終端點的引用
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
// 該HashMap存放了ExecutorId和BlockManagerId的對應
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
// 該HashMap存放了BlockId和BlockManagerId序列 的對應,原因在於一個數據塊可能存儲有多個副本,保存在多個Executor中
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
獲取數據存放位置
應用數據存儲后,在獲取遠程節點數據、獲取RDD執行的首選位置時需要根據數據塊的編號查詢數據塊所處的位置,通過發送GetLocations或GetLocationsMultipleBlockIds等消息給BlockManagerMasterEndpoint,通過對元數據的查詢獲取數據塊的位置信息。
數據塊的刪除
當數據需要刪除時,提交刪除消息給BlockManagerSlaveEndpoint終端點,在該終端店發起刪除操作。刪除操作一方面需要刪除Driver端的元數據信息,另一方面發送消息通知Executor,刪除對應的物理數據。
RDD存儲調用
RDD和Block的關系:RDD包含了多個Partition,每個Partition對應一個數據塊,那么每個RDD中包含一個或多個數據塊Block。
我們知道RDD是懶執行的,只有在遇到行動操作的時候,才會提交作業、划分階段、執行任務,其真正發生數據操作是調用RDD.iterator()
時發生的。
我們看看RDD的iterator方法:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
// 如果存在存儲級別,則從嘗試從緩存中讀取數據,緩存不存在時再進行計算
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
// 不存在緩存,直接計算或者從checkpoint中讀取
} else {
computeOrReadCheckpoint(split, context)
}
}
iterator中,會判斷是否存在存儲級別(其實就是緩存),如果存在調用getOrCompute(),如果不存在調用computeOrReadCheckpoint()。
先看一下不存在緩存的時候,調用的computeOrReadCheckpoint():
// 如果RDD存在檢查點,則從檢查點讀取它。不存在,則計算
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
// 存在時,調用父RDD的iterator讀取數據
firstParent[T].iterator(split, context)
} else {
// 不存在時,直接調用compute方法對數據進行計算
compute(split, context)
}
}
computeOrReadCheckpoint()會從checkpoint中讀取數據或重新計算數據,進行返回。
再看一下存在緩存時,調用的getOrCompute():
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
// 通過RDD的編號和Partition序號獲取數據塊Block的編號
val blockId = RDDBlockId(id, partition.index)
var readCachedBlock = true
// 根據數據塊編號先讀取數據,然后再更新數據,這里是讀寫數據的入口
SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
readCachedBlock = false
// 如果緩存中不存在數據塊,則嘗試調用computeOrReadCheckpoint()從檢查點讀取或重新計算
computeOrReadCheckpoint(partition, context)
}) match {
// 對返回結果進行處理,該結果表示處理成功
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
// 處理失敗把結果返回調用者
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
}
// 從緩存中獲取數據;如果緩存中不存在,重新計算並寫入緩存
def getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
// 讀數據的入口,嘗試從本地或遠程讀取數據
get[T](blockId)(classTag) match {
case Some(block) =>
return Left(block)
case _ =>
}
// 寫數據入口
doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
case None =>
val blockResult = getLocalValues(blockId).getOrElse {
releaseLock(blockId)
}
releaseLock(blockId)
Left(blockResult)
case Some(iter) =>
Right(iter)
}
}
getOrCompute()中會調用getOrElseUpdate()方法從緩存中讀取數據;如果緩存中不存在數據則重新計算,並寫入緩存。
RDD的計算就是基於對Iterator中數據的不斷轉換,只有需要存儲的時候,才會做對應的存儲操作。
數據讀取
BlockManager的get方法時讀數據的入口。
讀數據時分為本地讀取和遠程節點讀取。
本地讀取時使用getLocalValues方法,在該方法中根據不同的存儲級別調用不同的實現。
遠程讀取時使用getRemoteValues方法,最終調用BlockTransferService的fetchBlockSync進行處理,使用Netty的fetchBlocks方法獲取數據。數據讀取調用圖如下:
數據寫入
BlockManager的doPutIterator方法是寫數據的入口點。
在該方法中,根據數據是否緩存到內存中進行處理。
如果不緩存到內存中,調用BlockManager的putIterator方法直接存儲磁盤;如果緩存到內存中,先判斷數據是否進行了反序列化。
如果設置反序列化,說明數據為值類型,調用putIteratorAsValues把數據存入內存;如果沒有設置反序列化,說明數據為字節類型,調用putIteratorAsBytes把數據寫入內存。
在把數據存入內存過程中,需要判斷在內存中展開該數據是否足夠,當足夠時調用BlockManager的putArray方法寫入內存,否則把數據寫入磁盤。
寫入完成后,一方面把數據塊的元數據發送給Driver端的BlockManagerMasterEndpoint終端點,請求其更新數據元數據;另一方面判斷是否需要創建副本,如果需要則調用replicate方法,把數據寫到遠程節點上。
寫入調用圖如下:
cache & checkpoint
cache:將RDD的數據緩存到內存或磁盤中。
checkpoint:將計算過程中重要的中間數據建立檢查點,類似於快照。
cache的應用主要是對一個RDD的進行復用,避免重復計算。
相對於cache而言,checkpoint將切斷與該RDD之前的依賴關系。設置檢查點對包含寬依賴的長血統RDD是非常重要的,可以避免失敗時重新計算的高成本。
貼兩個緩存和檢查點講的比較清晰的鏈接:
https://github.com/JerryLead/SparkInternals/blob/master/markdown/6-CacheAndCheckpoint.md
https://blog.csdn.net/qq_20641565/article/details/76223002
end. 以上內容來自看書和自己的理解,如果偏差,歡迎指正。
Reference
《圖解Spark核心技術與案例實踐》
個人公眾號:碼農峰,定時推送行業資訊,持續發布原創技術文章,歡迎大家關注。