本篇文章主要剖析broadcast 的實現機制。
BroadcastManager初始化
BroadcastManager初始化方法源碼如下:
TorrentBroadcastFactory的繼承關系如下:
BroadcastFactory
An interface for all the broadcast implementations in Spark (to allow multiple broadcast implementations). SparkContext uses a BroadcastFactory implementation to instantiate a particular broadcast for the entire Spark job.
即它是Spark中broadcast中所有實現的接口。SparkContext使用BroadcastFactory實現來為整個Spark job實例化特定的broadcast。它有唯一子類 -- TorrentBroadcastFactory。
它有兩個比較重要的方法:
newBroadcast 方法負責創建一個broadcast變量。
TorrentBroadcastFactory
其主要方法如下:
newBroadcast其實例化TorrentBroadcast類。
unbroadcast方法調用了TorrentBroadcast 類的 unpersist方法。
TorrentBroadcast父類Broadcast
官方說明如下:
A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost. Broadcast variables are created from a variable v by calling org.apache.spark.SparkContext.broadcast. The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method.
The interpreter session below shows this:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
即廣播變量允許編程者將一個只讀變量緩存到每一個機器上,而不是隨任務一起發送它的副本。它們可以被用來用一種高效的方式拷貝輸入的大數據集。Spark也嘗試使用高效的廣播算法來減少交互代價。它通過調用SparkContext的broadcast 方法創建,broadcast變量是對真實變量的包裝,它可以通過broadcast對象的value方法返回真實對象。一旦真實對象被廣播了,要確保對象不會被改變,以確保該數據在所有節點上都是一致的。
TorrentBroadcast繼承關系如下:
TorrentBroadcast 是 Broadcast 的唯一子類。
TorrentBroadcast
其說明如下:
A BitTorrent-like implementation of org.apache.spark.broadcast.Broadcast.
The mechanism is as follows:
The driver divides the serialized object into small chunks and stores those chunks in the BlockManager of the driver.
On each executor, the executor first attempts to fetch the object from its BlockManager.
If it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or other executors if available.
Once it gets the chunks, it puts the chunks in its own BlockManager, ready for other executors to fetch from.
This prevents the driver from being the bottleneck in sending out multiple copies of the broadcast data (one per executor).
When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
實現機制:
driver 將數據拆分成多個小的chunk並將這些小的chunk保存在driver的BlockManager中。在每一個executor節點上,executor首先先從它自己的blockmanager獲取數據,如果不存在,它使用遠程抓取,從driver或者是其他的executor中抓取數據。一旦它獲取到chunk,就將其放入到自己的BlockManager中,准備被其他的節點請求獲取。這使得driver發送多個副本到多個executor節點的瓶頸不復存在。
driver 端寫數據
廣播數據的保存有兩種形式:
1. 數據保存在memstore中一份,需要反序列化后存入;保存在磁盤中一份,磁盤中的那一份先使用 SerializerManager序列化為字節數組,然后保存到磁盤中。
2. 將對象根據blockSize(默認為4m,可以通過spark.broadcast.blockSize 參數指定),compressCodec(默認是啟用的,可以通過 spark.broadcast.compress參數禁用。壓縮算法默認是lz4,可以通過 spark.io.compression.codec 參數指定)將數據寫入到outputStream中,進而拆分為幾個小的chunk,最終將數據持久化到blockManager中,也是memstore一份,不需要反序列化;磁盤一份。
其中,TorrentBroadcast 的 blockifyObject 方法如下:
壓縮的Outputstream對 ChunkedByteBufferOutputStream 做了裝飾。
driver或executor讀數據
broadcast 方法調用 value 方法時, 會調用 TorrentBroadcast 的 getValue 方法,如下:
_value 字段聲明如下:
private lazy val _value: T = readBroadcastBlock()
接下來看一下 readBroadcastBlock 這個方法:
1 private def readBroadcastBlock(): T = Utils.tryOrIOException { 2 TorrentBroadcast.synchronized { 3 val broadcastCache = SparkEnv.get.broadcastManager.cachedValues 4 5 Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse { 6 setConf(SparkEnv.get.conf) 7 val blockManager = SparkEnv.get.blockManager 8 blockManager.getLocalValues(broadcastId) match { 9 case Some(blockResult) => 10 if (blockResult.data.hasNext) { 11 val x = blockResult.data.next().asInstanceOf[T] 12 releaseLock(broadcastId) 13 14 if (x != null) { 15 broadcastCache.put(broadcastId, x) 16 } 17 18 x 19 } else { 20 throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") 21 } 22 case None => 23 logInfo("Started reading broadcast variable " + id) 24 val startTimeMs = System.currentTimeMillis() 25 val blocks = readBlocks() 26 logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) 27 28 try { 29 val obj = TorrentBroadcast.unBlockifyObject[T]( 30 blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) 31 // Store the merged copy in BlockManager so other tasks on this executor don't 32 // need to re-fetch it. 33 val storageLevel = StorageLevel.MEMORY_AND_DISK 34 if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { 35 throw new SparkException(s"Failed to store $broadcastId in BlockManager") 36 } 37 38 if (obj != null) { 39 broadcastCache.put(broadcastId, obj) 40 } 41 42 obj 43 } finally { 44 blocks.foreach(_.dispose()) 45 } 46 } 47 } 48 } 49 }
對源碼作如下解釋:
第3行:broadcastManager.cachedValues 保存着所有的 broadcast 的值,它是一個Map結構的,key是強引用,value是虛引用(在垃圾回收時會被清理掉)。
第4行:根據 broadcastId 從cachedValues 中取數據。如果沒有,則執行getOrElse里的 default 方法。
第8行:從BlockManager的本地獲取broadcast的值(從memstore或diskstore中,獲取的數據是完整的數據,不是切分之后的小chunk),若有,則釋放BlockManager的鎖,並將獲取的值存入cachedValues中;若沒有,則調用readBlocks將chunk 數據讀取到並將數據轉換為 broadcast 的value對象,並將該對象放入cachedValues中。
其中, readBlocks 方法如下:
1 /** Fetch torrent blocks from the driver and/or other executors. */ 2 private def readBlocks(): Array[BlockData] = { 3 // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported 4 // to the driver, so other executors can pull these chunks from this executor as well. 5 val blocks = new Array[BlockData](numBlocks) 6 val bm = SparkEnv.get.blockManager 7 8 for (pid <- Random.shuffle(Seq.range(0, numBlocks))) { 9 val pieceId = BroadcastBlockId(id, "piece" + pid) 10 logDebug(s"Reading piece $pieceId of $broadcastId") 11 // First try getLocalBytes because there is a chance that previous attempts to fetch the 12 // broadcast blocks have already fetched some of the blocks. In that case, some blocks 13 // would be available locally (on this executor). 14 bm.getLocalBytes(pieceId) match { 15 case Some(block) => 16 blocks(pid) = block 17 releaseLock(pieceId) 18 case None => 19 bm.getRemoteBytes(pieceId) match { 20 case Some(b) => 21 if (checksumEnabled) { 22 val sum = calcChecksum(b.chunks(0)) 23 if (sum != checksums(pid)) { 24 throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" + 25 s" $sum != ${checksums(pid)}") 26 } 27 } 28 // We found the block from remote executors/driver's BlockManager, so put the block 29 // in this executor's BlockManager. 30 if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) { 31 throw new SparkException( 32 s"Failed to store $pieceId of $broadcastId in local BlockManager") 33 } 34 blocks(pid) = new ByteBufferBlockData(b, true) 35 case None => 36 throw new SparkException(s"Failed to get $pieceId of $broadcastId") 37 } 38 } 39 } 40 blocks 41 }
源碼解釋如下:
第14行:根據pieceid從本地BlockManager 中獲取到 chunk
第15行:如果獲取到了chunk,則釋放鎖。
第18行:如果沒有獲取到chunk,則從遠程根據pieceid獲取遠程獲取chunk,獲取到chunk后做checksum校驗,之后將chunk存入到本地BlockManager中。
注:本篇文章沒有對BroadcastManager中關於BlockManager的操作做進一步更詳細的說明,下一篇文章會專門剖析Spark的存儲體系。