歡迎轉載,轉載請注明出處,徽滬一郎,謝謝。
在流數據的處理過程中,為了保證處理結果的可信度(不能多算,也不能漏算),需要做到對所有的輸入數據有且僅有一次處理。在Spark Streaming的處理機制中,不能多算,比較容易理解。那么它又是如何作到即使數據處理結點被重啟,在重啟之后這些數據也會被再次處理呢?
環境搭建
為了有一個感性的認識,先運行一下簡單的Spark Streaming示例。首先確認已經安裝了openbsd-netcat。
運行netcat
nc -lk 9999
運行spark-shell
SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=10000 MASTER=local-cluster[2,2,1024] bin/spark-shell
在spark-shell中輸入如下內容
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(3))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap( _.split(" "))
val pairs = words.map(word => (word,1))
val wordCount = pairs.reduceByKey(_ + _)
wordCount.print()
ssc.start()
ssc.awaitTermination()
當ssc.start()執行之后,在nc一側輸入一些內容並回車,spark-shell上就會顯示出統計的結果。
數據接收過程
來看一下代碼實現層面,從兩個角度來說,一是控制層面(control panel),另一是數據層面(data panel)。
Spark Streaming的數據接收過程的控制層面大致如下圖所示。
簡要講解一下上圖的意思,
- 數據真正接收到是發生在SocketReceiver.receive函數中,將接收到的數據放入到BlockGenerator.currentBuffer
- 在BlockGenerator中有一個重復定時器,處理函數為updateCurrentBuffer, updateCurrentBuffer將當前buffer中的數據封裝為一個新的Block,放入到blocksForPush隊列中
- 同樣是在BlockGenerator中有一個BlockPushingThread,其職責就是不停的將blocksForPush隊列中的成員通過pushArrayBuffer函數傳遞給blockmanager,讓BlockManager將數據存儲到MemoryStore中
- pushArrayBuffer還會將已經由BlockManager存儲的Block的id號傳遞給ReceiverTracker,ReceiverTracker會將存儲的blockId放到對應StreamId的隊列中
socket.receive->receiver.store->pushSingle->blockgenerator.updateCurrentBuffer->blockgenerator.keepPushBlocks->pushArrayBufer
->ReceiverTracker.addBlocks
pushArrayBuffer函數的定義如下
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]],
storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
}
數據結構的變化過程
Spark Streaming數據處理高效的原因之一就是批量的進行數據分析,那么這些批量的數據是如何聚集起來的呢?換種方式來表述這個問題,在某一時刻,接收到的數據是單一的,也就是我們最多只能組成<t,data>這種數據元組,而在runJob的時候是批量的提取和分析數據的,這個批量數據的組成是在什么時候完成的呢?
下圖大到勾勒出一條新的message被socketreceiver接收之后,是如何通過一系列的處理而放入到BlockManager中,並同時由ReceiverTracker記錄下相應的元數據的。
- 首先new message被放入到blockManager.currentBuffer
- 定時器超時處理過程,將整個currentBuffer中的數據打包成一條Block,放入到ArrayBlockingQueue,該數據結構支持FIFO
- keepPushingBlocks將每一條Block(block中包含時間戳,接收到的原始數據)讓BlockManager進行保存,同時通知ReceiverTracker已經將哪些block存儲到了blockmanager中
- ReceiverTracker將每一個stream接收到但還沒有進行處理的block放入到receiverBlockInfo,其為一Hashmap. 在后面的generateJobs中會從receiverBlockInfo提取數據以生成相應的RDD
數據處理過程
數據處理中最重要的函數就是generateJobs, generateJobs會引發下述的函數調用過程,具體的代碼就不一一羅列了。
- jobgenerator.generateJobs->dstreamgraph.generateJobs->dstream.generateJob->getOrCompute->compute 生成RDD
- job調用job.func
JobGenerator.generateJobs函數定義如下
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}
我們先來談一談數據處理階段是如何與上述的接收階段中存儲下來的數據掛上鈎的。
假設上一次進行RDD處理發生在時間點t1,現在是時間點t2,那么在<t2,t1>之間有哪些blocks沒有被處理呢?
想必你已經知道答案了,沒有被處理的blocks全部保存在ReceiverTracker的receiverBlockInfo之中
在generateJob時,每一個DStream都會調用getReceivedBlockInfo,你說沒有跟ReceiverTracker中的receivedBlockInfo連起來啊,別急!且看數據輸入的源頭ReceiverInputDStream中的getReceivedBlockInfo是如何定義的。代碼列舉如下。
private[streaming] def getReceivedBlockInfo(time: Time) = {
receivedBlockInfo(time)
}
那么此處的receivedBlockInfo(time)是從何而來的呢,這個要看ReceivedInputDStream中的compute函數實現
override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}
至此終於看到了receiverTracker中的getReceivedBlockInfo被調用,也就是說將接收階段的數據和目前處理階段的輸入通道打通了
函數調用路徑,從generateJobs到sparkcontext.submitJobs. 這個時候要注意注冊為DStreamGraph中的OutputStream上的操作會引發SparkContext.runJobs被調用。我們以print函數為例看一下調用過程。
def print() {
def foreachFunc = (rdd: RDD[T], time: Time) => {
val first11 = rdd.take(11)
println ("-------------------------------------------")
println ("Time: " + time)
println ("-------------------------------------------")
first11.take(10).foreach(println)
if (first11.size > 10) println("...")
println()
}
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}
注意rdd.take,這個會引發runJob調用,不信的話,我們可以看一看其定義中調用runJob的片段。
val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += numPartsToTry
}
小結一下數據處理過程
- 用time為關鍵字去取出在此時間之前加入的所有blockIds
- 真正提交運行的時候,rdd中的blockfetcher以blockId為關鍵字去blockmanagermaster獲取真正的數據,即從socket上接收到的原始數據
容錯處理
JobGenerator.generateJobs函數的最后會發出DoCheckpoint通知,該通知會讓相應的actor將DStreamCheckpointData寫入到hdfs文件中,我們來看一看為什么需要寫入checkpointdata以及哪些東西是包含在checkpointdata之中。
在數據處理一節,我們已經分析到在generateJobs的時候會生成多個jobs,它們會通過sparkcontext.runJob接口而發送到cluster中被真正執行。
假設在t2,worker掛掉了,掛掉的worker直到t3才完全恢復。由於掛掉的原因,上一次generateJobs生成的job不一定被完全處理了(也許有些已經處理了,有些還沒有處理),所以需要重新再提交一次。這里有一個問題,那就是可能導致針對同一批數據有重復處理的情況發生,從而無法達到exactly-once的語義效果。
問題2: 在<t2,t3>這一段掛掉的時間之內,沒有新的數據被接收,所以Spark Streaming的SocketReceiver適合用來充當client側而不是server側。SocketReceiver讀取到的數據應該存在一個具有冗余備份機制的內存數據庫或緩存隊列里,如kafaka. 對問題2, Spark Streaming本身是解決不了的。當然這里再往下細究的話,會牽出負載均衡的問題。
checkpointData
checkpoint的成員變量有哪些呢,我們看一看其結構定義就清楚了。
val master = ssc.sc.master
val framework = ssc.sc.appName
val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConfPairs = ssc.conf.getAll
generatedRDDs是被包含在graph里面。所以不要突然之間驚惶失措,發覺沒有將generatedRDDs保存起來。
checkpoint的數據是通過CheckpointwriteHandler真正的寫入到hdfs,通過CheckPiontReader而讀入。CheckpointReade在重啟的時候會被使用到,判斷是第一次干凈的啟動還是因錯誤而重啟,判斷的依據全部在cp這個變量。
為了達到重啟之后而自動的檢查並載入相應的checkpoint數據,那么在創建StreamingContext的時候就不能簡單的通過調用new StreamingContext來完成,而是利用getOrCreate函數,代碼示例如下。
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreaminContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
小結
本文中講述數據接收過程中所使用的兩幅圖使用tikz完成,里面包含的信息很豐富,有志於了解清楚Spark Streaming內部處理機制的同仁,不妨以此為參考進行詳細的代碼走讀。
如果有任何不對或錯誤之處,歡迎批評指正。
參考資料
- Spark Streaming源碼分析 checkpoint http://www.cnblogs.com/fxjwind/p/3596451.html
- Spark Streaming Introduction http://jerryshao.me/architecture/2013/04/02/spark-streaming-introduction/
- deep dive with Spark Streaming http://www.meetup.com/spark-users/events/122694912/