作為分布式應用,Spark的數據存儲在不同機器上。這就涉及到數據的傳輸,元數據的管理等內容。而且由於Spark可以利用內存和磁盤作為存儲介質,這還涉及到了內存和磁盤的數據管理。
Spark存儲體系架構
Spark存儲(主要由BlockManager來完成)主要完成了寫入數據塊,如果需要備份數據塊,則將數據塊寫入其他節點;讀取數據塊,如果當前節點不含有數據塊,則從其他節點獲取數據塊;向Driver節點注冊自身的BlockManager,以及上報其所管理的數據塊信息。
Spark使用BlockInfoManager來管理當前節點所管理的數據塊的元數據,維護了BlockId(數據塊的唯一標識)到BlockInfo(數據塊元數據)的映射關系。使用內存(MemoryStore)和磁盤(DiskStore)來存儲數據塊。
Spark使用BlockManagerMaster使Executor的BlockManager與Driver進行通信,向Driver注冊自己,並上報數據塊信息。Driver通過Executor BlockManager的BlockManagerSlaveEndpoint向Executor發出刪除數據塊/Rdd/Shuffle/Broadcast等數據。
Spark使用ShuffleClient來實現不同Executor BlockManager之間的通信。ShuffleClient中包含了一個TransportServer和一個用來創建client的TransportClientFactory,作為服務器和客戶端實現Executor BlockManager之間的雙向通信。
Spark設置了多種存儲級別:
存儲級別 | 說明 |
---|---|
DISK_ONLY | 只使用磁盤存儲 |
DISK_ONLY_2 | 只使用磁盤存儲,並且有一個備份 |
MEMORY_ONLY | 只使用內存儲存儲 |
MEMORY_ONLY_2 | 只使用內存存儲,並有一個備份 |
MEMORY_ONLY_SER | 只使用內存存儲序列化數據 |
MEMORY_ONLY_SER_2 | 只使用內存存儲序列化數據,並有一個備份 |
MEMORY_AND_DISK | 優先內存存儲,內存不足則使用磁盤 |
MEMORY_AND_DISK_2 | 優先內存存儲,內存不足使用磁盤,並有一個備份 |
MEMORY_AND_DISK_SER | 優先內存存儲序列化數據,內存不足使用磁盤 |
MEMORY_AND_DISK_SER_2 | 優先內存存儲序列化數據,內存不足使用磁盤,並有一個備份 |
OFF_HEAP | 使用堆外存儲(同MEMORY_AND_DISK_SER,只是使用堆外內存) |
put block(寫入數據塊):在寫數據塊的時候,根據存儲級別的不同,如果存儲級別要求存儲序列化/非序列化的數據,而輸入的數據塊是非序列化/序列化的,則要首先序列化/反序列化;如果支持內存存儲,則將數據塊保存到內存中,如果設置了MEMORY_AND_DISK之類的存儲級別,當內存不足時,會將數據塊寫入磁盤。如果不支持內存存儲,只支持磁盤存儲,則直接將數據塊寫入磁盤。如果需要備份數據塊,則將數據塊同步的寫入其他節點。
get block(獲取數據塊):獲取數據塊時,如果請求序列化的數據而存儲級別是非序列化,則優先從磁盤中獲取數據,如果磁盤獲取不到,則根據存儲級別嘗試從內存獲取數據並將其序列化;如果存儲級別就是序列化數據,則首先嘗試從內存獲取數據,如果獲取不到,則根據存儲級別從磁盤獲取數據。如果請求的是非序列化的數據,如果存儲級別包括內存,則首先嘗試從內存獲取,如果獲取不到,則根據存儲級別再嘗試從磁盤獲取並反序列化數據;如果存儲級別只包括磁盤,則直接從磁盤獲取並反序列化數據。
register & register block(注冊BlockManager&上報數據塊信息):Executor的BlockManager在初始化時需要向Driver注冊,並定時上報其所管理的數據塊信息。
remove(刪除數據塊):Driver向Executor BlockManager下發刪除數據塊/Rdd/Shuffle/Broadcast等數據的指令,Executor BlockManager接收指令並在本地執行刪除操作。
BlockManager 塊管理器
Spark中使用BlockManager塊管理器來管理當前節點內存和磁盤的數據塊,Driver和Executor節點都會創建塊管理器。塊管理器負責數據的讀寫請求,刪除等操作,並向Driver節點注冊,匯報其所管理的數據塊元數據信息(如果是Driver節點的塊管理器,則整個注冊過程無需網絡通信,如果是Executor節點的塊管理器注冊,則需要與Driver節點進行網絡通信)。當節點需要的數據塊不在本地時,塊管理器會首先通過Driver節點獲取到持有所需數據塊的節點,然后直接與該節點進行通信,完成數據的傳輸。其中涉及到的通信(不論是網絡通信還是本地通信),都是通過Spark的RPC框架實現的(RPC框架相關內容見Spark RPC筆記)。
BlockManager主要功能有
- 向Driver注冊當前的BlockManger
- 向Driver上報所管理的數據塊信息
- 從本地獲取序列化/非序列化數據塊的方法
- 保存數據塊到本地
- 從Driver獲取集群中持有某個數據塊的節點信息
- 從其他節點獲取數據塊的方法
- 注冊任務,獲取/釋放數據塊上的鎖
- 將所持有的數據塊復制到其他節點
BlockManagerMaster
BlockManagerMaster是BlockManager的組件,BlockManager通過其來與Driver進行通信,完成向Driver注冊BlockManager,從Driver獲取數據塊的位置信息,定時向Driver同步BlockManager所管理的數據塊信息,響應Driver發來的刪除數據塊的請求等。
在創建BlockManagerMaster時,會傳入類型為RpcEndpointRef的driverEndpoint,是BlockManagerMaster與Driver通信的rpdEndpoint的引用,通過Spark的RPC框架(RPC框架相關內容見Spark RPC筆記)最終netty通過調用rpcEndpointRef中的driver地址實現BlockManagerMaster與Driver之間的通信。
Driver和Executor節點都會創建自己的BlockManagerMaster,區別是Driver自身BlockManager中的BlockManagerMaster與Driver通信時,不產生網絡通信。而Executor節點中的BlockManagerMaster與Driver通信時,會產生網絡通信。
BlockManagerMasterEndpoint
BlockManagerMasterEndpoint是RpcEndpoint的一個具體實現類,就是上文中BlockManagerMaster所持有的RpcEndpointRef的具體指向類,是Driver的BlockManager處理來自Executor請求,以及向Executor發送請求的邏輯。來自Executor的請求/向Executor發送的請求與BlockManagerMaster發送和響應的相同,主要為注冊Executor的BlockManager,返回所保存的數據塊的位置信息,接收並更新Executor發送過來的數據塊信息,向Executor發送刪除數據塊的請求等。
在BlockManagerMasterEndpoint中維護了注冊的BlockManager,BlockManager和Executor的映射關系,數據塊所在的位置等信息。這些都可以看做是存儲相關的元數據信息。Driver節點通過保存這些元數據來管理整個集群的存儲。
源碼分析
- BlockManagerMasterEndpoint存儲的元數據
// Mapping from block manager id to the block manager's information. private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] // Mapping from executor ID to block manager ID. private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
- 根據來自Executor不同的請求,調用相關函數並回調請求回調函數
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) => register(blockManagerId, maxMemSize, slaveEndpoint) context.reply(true) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)) listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo))) case GetLocations(blockId) => context.reply(getLocations(blockId)) case GetLocationsMultipleBlockIds(blockIds) => context.reply(getLocationsMultipleBlockIds(blockIds)) case GetPeers(blockManagerId) => context.reply(getPeers(blockManagerId)) case GetExecutorEndpointRef(executorId) => context.reply(getExecutorEndpointRef(executorId)) case GetMemoryStatus => context.reply(memoryStatus) case GetStorageStatus => context.reply(storageStatus) case GetBlockStatus(blockId, askSlaves) => context.reply(blockStatus(blockId, askSlaves)) case GetMatchingBlockIds(filter, askSlaves) => context.reply(getMatchingBlockIds(filter, askSlaves)) case RemoveRdd(rddId) => context.reply(removeRdd(rddId)) case RemoveShuffle(shuffleId) => context.reply(removeShuffle(shuffleId)) case RemoveBroadcast(broadcastId, removeFromDriver) => context.reply(removeBroadcast(broadcastId, removeFromDriver)) case RemoveBlock(blockId) => removeBlockFromWorkers(blockId) context.reply(true) case RemoveExecutor(execId) => removeExecutor(execId) context.reply(true) case StopBlockManagerMaster => context.reply(true) stop() case BlockManagerHeartbeat(blockManagerId) => context.reply(heartbeatReceived(blockManagerId)) case HasCachedBlocks(executorId) => blockManagerIdByExecutor.get(executorId) match { case Some(bm) => if (blockManagerInfo.contains(bm)) { val bmInfo = blockManagerInfo(bm) context.reply(bmInfo.cachedBlocks.nonEmpty) } else { context.reply(false) } case None => context.reply(false) } }
- 向Executor節點發送各種刪除請求,如
private def removeBlockManager(blockManagerId: BlockManagerId) { val info = blockManagerInfo(blockManagerId) // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next val locations = blockLocations.get(blockId) locations -= blockManagerId if (locations.size == 0) { blockLocations.remove(blockId) } } listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId)) logInfo(s"Removing block manager $blockManagerId") }
BlockManagerSlaveEndpoint
BlockManagerSlaveEndpoint是RpcEndpoint的一個具體實現類,是BlockManager的組件,當向Driver注冊BlockManager時,會帶上BlockManagerSlaveEndpoint信息。BlockManagerSlaveEndpoint保存了Executor的地址,Driver可以通過調用相關BlockManager的BlockManagerSlaveEndpoint來向BlockManager發送刪除指令。
源碼分析
- 處理來自Driver的刪除請求,並返回刪除結果的邏輯
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RemoveBlock(blockId) => doAsync[Boolean]("removing block " + blockId, context) { blockManager.removeBlock(blockId) true } case RemoveRdd(rddId) => doAsync[Int]("removing RDD " + rddId, context) { blockManager.removeRdd(rddId) } case RemoveShuffle(shuffleId) => doAsync[Boolean]("removing shuffle " + shuffleId, context) { if (mapOutputTracker != null) { mapOutputTracker.unregisterShuffle(shuffleId) } SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) } case RemoveBroadcast(broadcastId, _) => doAsync[Int]("removing broadcast " + broadcastId, context) { blockManager.removeBroadcast(broadcastId, tellMaster = true) } case GetBlockStatus(blockId, _) => context.reply(blockManager.getStatus(blockId)) case GetMatchingBlockIds(filter, _) => context.reply(blockManager.getMatchingBlockIds(filter)) case TriggerThreadDump => context.reply(Utils.getThreadDump()) } private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) { val future = Future { logDebug(actionMessage) body } future.onSuccess { case response => logDebug("Done " + actionMessage + ", response is " + response) context.reply(response) logDebug("Sent response: " + response + " to " + context.senderAddress) } future.onFailure { case t: Throwable => logError("Error in " + actionMessage, t) context.sendFailure(t) } }
BlockTransferService
BlockTransferService用來實現BlockManager與其他Executor的BlockManager的相互通信,來獲取/上傳數據塊。BlockTransferService是一個抽象類,NettyBlockTransferService是他的實現類,通過netty實現網絡通信。
創建NettyBlockTransferService后,在調用NettyBlockTransferService其他方法之前需要調用init方法,來創建netty傳輸相關的TransportServer,TransportClientFactory,NettyBlockRpcServer。其中TransportServer用來接收其他Executor BlockManager發來的請求,TransportClientFactory用來創建client向其他Executor BlockManager發送請求,NettyBlockRpcServer用來執行響應請求的具體邏輯。
源碼分析
- 初始化NettyBlockTransferService
// 創建rpcHandler,server,client,來發送和接收block override def init(blockDataManager: BlockDataManager): Unit = { // 創建實際負責處理請求的rpcHandler val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager) var serverBootstrap: Option[TransportServerBootstrap] = None var clientBootstrap: Option[TransportClientBootstrap] = None // 如果需要認證,則在實際初始化TransportServer和TransportClientFactory之前需要執行bootstrap方法 if (authEnabled) { serverBootstrap = Some(new SaslServerBootstrap(transportConf, securityManager)) clientBootstrap = Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager, securityManager.isSaslEncryptionEnabled())) } transportContext = new TransportContext(transportConf, rpcHandler) clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava) server = createServer(serverBootstrap.toList) appId = conf.getAppId logInfo(s"Server created on ${hostName}:${server.getPort}") }
- 從其他Executor獲取數據塊
override def fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener): Unit = { logTrace(s"Fetch blocks from $host:$port (executor id $execId)") try { // 調用shuffle包中的方法從遠端獲取數據 val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) { val client = clientFactory.createClient(host, port) new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start() } } val maxRetries = transportConf.maxIORetries() if (maxRetries > 0) { // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's // a bug in this code. We should remove the if statement once we're sure of the stability. new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start() } else { blockFetchStarter.createAndStart(blockIds, listener) } } catch { case e: Exception => logError("Exception while beginning fetchBlocks", e) blockIds.foreach(listener.onBlockFetchFailure(_, e)) } }
- 響應其他Executor的請求,將數據塊返回給請求的Executor
override def uploadBlock( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Future[Unit] = { val result = Promise[Unit]() val client = clientFactory.createClient(hostname, port) // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer. // Everything else is encoded using our binary protocol. val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag))) // Convert or copy nio buffer into array in order to serialize it. val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer, new RpcResponseCallback { override def onSuccess(response: ByteBuffer): Unit = { logTrace(s"Successfully uploaded block $blockId") result.success((): Unit) } override def onFailure(e: Throwable): Unit = { logError(s"Error while uploading block $blockId", e) result.failure(e) } }) result.future }
- NettyBlockRpcServer
// TODO
BlockInfoManager
BlockInfoManager主要維護了BlockManager所管理的所有數據塊元數據以及對申請數據塊讀寫的任務提供數據塊讀寫鎖。Spark中的數據塊除了實際存儲數據的Block之外,還有兩個數據結構,BlockId和BlockInfo分別用來為數據塊提供全局唯一的標記,以及記錄數據塊的元數據。
BlockId
BlockId用來全局標示一個數據塊,對於存儲在文件中的數據塊,文件名通常就是BlockId的name。針對不同類型的數據塊(如Shuffle Block,Broadcast Block,TaskResult Block等),都有特定的BlockId與之對應。不同類型的BlockId區別是他們的name前綴不一樣。
BlockInfo
BlockInfo主要用來保存數據塊的元數據信息,如數據塊大小,當前數據塊被加讀鎖的次數,數據塊當前是否被某個任務持有寫鎖。
源碼分析
- BlockInfoManager維護的數據結構
/** * blockId -> blockInfo維護blockId和block元數據的映射關系 * 添加和刪除infos中數據需要首先獲取寫鎖 */ @GuardedBy("this") private[this] val infos = new mutable.HashMap[BlockId, BlockInfo] /** * taskAttemptId -> blockIds * 記錄任務已經獲取到寫鎖的數據塊 */ @GuardedBy("this") private[this] val writeLocksByTask = new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]] with mutable.MultiMap[TaskAttemptId, BlockId] /** * taskAttempId -> blockIds * 記錄任務已經獲取到讀鎖的數據塊集合,維護了每個數據塊已經被鎖的次數(這里的讀鎖是可重入的) */ @GuardedBy("this") private[this] val readLocksByTask = new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
- 獲取數據塊寫鎖
/** * * 獲取一個塊的寫鎖,並返回塊的元數據 * * 如果另一個任務已經獲取了塊的讀鎖或寫鎖,則當前任務需要阻塞直到鎖被釋放,或如果配置了blocking=false * 則立刻返回 */ def lockForWriting( blockId: BlockId, blocking: Boolean = true): Option[BlockInfo] = synchronized { logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId") do { infos.get(blockId) match { case None => return None case Some(info) => // 塊上沒有寫鎖,也沒有讀鎖 if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) { // 獲取當前數據塊的寫鎖 info.writerTask = currentTaskAttemptId // 將數據塊加入當前任務所持有的寫鎖的數據塊集合 writeLocksByTask.addBinding(currentTaskAttemptId, blockId) logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId") return Some(info) } } // 如果是阻塞調用,則一直等待直到成功獲取寫鎖 if (blocking) { wait() } } while (blocking) None }
- 獲取數據塊讀鎖
/** * 獲取一個數據塊的讀鎖並返回數據塊元數據 * * 如果另外的任務已經獲取了數據塊的讀鎖,則讀鎖會直接返回給當前調用的任務,並將讀鎖的引用+1 * * 如果另外的任務已經獲取了數據塊的寫鎖,則當前調用的任務會被阻塞直到寫鎖被釋放,或者如果配置了 * blocking=false,則立刻返回 * * 一個任務可以多次獲取數據塊的讀鎖,每次獲取的讀鎖需要單獨釋放 */ def lockForReading( blockId: BlockId, blocking: Boolean = true): Option[BlockInfo] = synchronized { logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId") do { infos.get(blockId) match { case None => return None case Some(info) => // 如果當前數據塊沒有寫鎖,則將數據塊讀鎖引用+1,將當前數據塊添加到當前任務所持有讀鎖的數據塊集合 if (info.writerTask == BlockInfo.NO_WRITER) { info.readerCount += 1 readLocksByTask(currentTaskAttemptId).add(blockId) logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId") return Some(info) } } // 到這里說明當前數據塊有寫鎖,如果blocking=true,則阻塞一直到成功獲取讀鎖 if (blocking) { wait() } } while (blocking) None }
- 釋放所持有的數據塊的鎖
/** * 釋放數據塊上的鎖 */ def unlock(blockId: BlockId): Unit = synchronized { logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId") val info = get(blockId).getOrElse { throw new IllegalStateException(s"Block $blockId not found") } // 釋放任務持有的寫鎖 if (info.writerTask != BlockInfo.NO_WRITER) { // 將數據塊上的寫鎖釋放 info.writerTask = BlockInfo.NO_WRITER // 將數據塊移出當前任務所持有寫鎖的數據塊集合 writeLocksByTask.removeBinding(currentTaskAttemptId, blockId) } else { // 釋放讀鎖 assert(info.readerCount > 0, s"Block $blockId is not locked for reading") // 將數據塊上讀鎖的引用-1 info.readerCount -= 1 // 將數據塊從當前任務所持有讀鎖的數據塊列表中移出一個 val countsForTask = readLocksByTask(currentTaskAttemptId) val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1 assert(newPinCountForTask >= 0, s"Task $currentTaskAttemptId release lock on block $blockId more times than it acquired it") } notifyAll() }
MemoryStore
MemoryStore用來將數據塊以序列化/非序列化的形式保存到內存中,以及從內存中獲取數據塊。當內存空間不足時,MemoryStore會將內存中的數據塊遷移到磁盤中。
具體見 Spark內存管理
DiskStore
DiskStore用來將數據塊存儲到磁盤上,以及從磁盤上獲取數據塊。如果數據塊的存儲級別配置了磁盤,則當內存不足時,Spark會將數據塊從內存移動到磁盤上。
在新建DiskStore時,會傳入DiskBlockManager實例,DiskBlockManager主要用來創建和維護邏輯數據塊和磁盤實際存儲的物理位置的映射關系。
源碼分析
- 將數據塊保存到磁盤
/** * 向磁盤寫入數據塊 * 首先通過diskManager獲取到數據塊存放到磁盤的路徑,然后通過調用傳入的writeFunc方法將數據塊寫入磁盤 */ def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = { if (contains(blockId)) { throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val fileOutputStream = new FileOutputStream(file) var threwException: Boolean = true try { writeFunc(fileOutputStream) threwException = false } finally { try { Closeables.close(fileOutputStream, threwException) } finally { if (threwException) { remove(blockId) } } } val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(file.length()), finishTime - startTime)) }
- 從磁盤獲取數據塊
def getBytes(blockId: BlockId): ChunkedByteBuffer = { val file = diskManager.getFile(blockId.name) val channel = new RandomAccessFile(file, "r").getChannel Utils.tryWithSafeFinally { // 判斷文件大小是否小於minMemoryMapBytes(啟用mmap的最小文件大小,默認是2m)。如果低於minMemoryMapBytes // 文件將直接通過NIO讀取,否則將通過內存映射方式讀取 // 一般來說,對於接近或低於操作系統頁大小的文件進行內存映射會有較高的開銷,所以此處對小文件的讀取進行了優化 if (file.length < minMemoryMapBytes) { val buf = ByteBuffer.allocate(file.length.toInt) channel.position(0) while (buf.remaining() != 0) { if (channel.read(buf) == -1) { throw new IOException("Reached EOF before filling buffer\n" + s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") } } buf.flip() new ChunkedByteBuffer(buf) } else { new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) } } { channel.close() } }
DiskBlockManager
DiskBlockManager用來創建並維護邏輯數據塊和磁盤實際存儲的物理位置的映射關系。數據塊會被保存到以BlockId為名字的文件中。數據塊文件會被hash到spark.local.dir所配置的目錄。