版本號:
spark 2.3
structured streaming代碼
異常信息
KafkaSource[Subscribe[test]]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.AssertionError: assertion failed: Concurrent update to the log. Multiple streaming jobs detected for 1470
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:339)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:338)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
一.異常表象原因
1.異常源碼:
currentBatchId,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId. " +
s"Metadata ${offsetSeqMetadata.toString}")
這是一個斷言,assert,其中
currentBatchId,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata))
這個函數返回一個布爾值,如果返回為false將拋出目前這樣的異常。
那么,這個函數是干嘛的?
require(metadata != null, "'null' metadata cannot written to a metadata log")
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written
writeBatch(batchId, metadata)
true
}
}
從注釋中可以看出,當一個structured streaming程序啟動時,會去判斷當前batch批次是否已經store。如果當前批次的元存儲信息已經被存儲,那么將返回false,程序將異常退出。
/**
- Store the metadata for the specified batchId and return
true
if successful. If the batchId's - metadata has already been stored, this method will return
false
.
*/
2.打個斷點
通過斷點可以看到:
1.當前batchId = 1470
2.當前kafka里的最新offset是41
然后查看本地checkpoint里的消息
通過checkpoint可以看到
1.在offset文件里,確實存在1470的批次。表面當前批次的元信息已經被存儲了。
2.打開1470文件可以看到,里面的最新消息offset為36
二.解決方案
最簡單粗暴的就是直接刪除checkpoint文件夾。但這樣的話,會丟失中間部份數據。即36到41這向條數據。
Current Committed Offsets: {KafkaSource[Subscribe[test]]: {"test":{"0":36}}}
Current Available Offsets: {KafkaSource[Subscribe[test]]: {"test":{"0":41}}}
然后可以指定開始offset.
1.可以通過代碼指定各分區的開始offset
.option("startingOffsets", """{"test":{"0":36,"1":-2},"topic2":{"0":-2}}""")
這種方式需要改代碼,不推薦。
2.不刪除而是更改checkpoint offset下的批次文件
如本例中,刪除1470,並將1470里的信息復制到1469.用1470替代前一個批次1469的信息。
三.異常背后的原因
目前只是解決了這個問題。但背后的原因呢?
首先是什么情況導致的?
根據字面意思【Concurrent update to the log. Multiple streaming jobs detected】,在寫checkpoint日志時,系統認為有多個streaming程序在寫。系統認為不行。不管是不是有多個streaming任務在執行,既然系統有這么判斷,那就去看checkpoint日志。
在這之前,先弄清structured streaming的checkpoint機制。
1). StreamExecution 通過 Source.getOffset() 獲取最新的 offsets,即最新的數據進度;
2). StreamExecution 將 offsets 等寫入到 offsetLog 里, 這里的 offsetLog 是一個持久化的 WAL (Write-Ahead-Log),是將來可用作故障恢復;
3). StreamExecution 構造本次執行的 LogicalPlan
(3a) 將預先定義好的邏輯(即 StreamExecution 里的 logicalPlan 成員變量)制作一個副本出來
(3b) 給定剛剛取到的 offsets,通過 Source.getBatch(offsets) 獲取本執行新收到的數據的 Dataset/DataFrame 表示,並替換到 (3a) 中的副本里
經過 (3a), (3b) 兩步,構造完成的 LogicalPlan 就是針對本執行新收到的數據的 Dataset/DataFrame 變換(即整個處理邏輯)了
4). 觸發對本次執行的 LogicalPlan 的優化,得到 IncrementalExecution
邏輯計划的優化:通過 Catalyst 優化器完成
物理計划的生成與選擇:結果是可以直接用於執行的 RDD DAG
邏輯計划、優化的邏輯計划、物理計划、及最后結果 RDD DAG,合並起來就是 IncrementalExecution
5). 將表示計算結果的 Dataset/DataFrame (包含 IncrementalExecution) 交給 Sink,即調用 Sink.add(ds/df)
6). 計算完成后的 commit
(6a) 通過 Source.commit() 告知 Source 數據已經完整處理結束;Source 可按需完成數據的 garbage-collection
(6b) 將本次執行的批次 id 寫入到 batchCommitLog 里
注意2和6。前面我們知道,checkpoint下面的有三個文件commit,offset,source。在2的步驟,接收到kafka消息時,會將當前Batch的信息寫入offset。直到當前batch處理完畢,再通過6的步驟commit到檢查點。
如果2->6正常執行,這是一種正常情況,offset.maxBatchid>commit.maxBatchid。不做討論。
如果2和6的步驟之間出現異常,執行了2,而沒有執行6呢?
這種情況下又分為二種情況:
1.offset.maxBatchid>commit.maxBatchid 這種情況最為常見。表示接收到了kafka消息,但處理過程中異常退出。那么下次重啟時,首先從offset.maxBatchId作為開始offset.
2.offset.maxBatchid>commit.maxBatchid 如果僅有一個strutured streaming任務在寫checkpoint目錄的話,永遠不可能出現這種情況。因為2->6順序執行的。如果出現了這種情況,那么系統就認為有多個任務使用了同一個檢查點。這種情況是不被允許的,直接拋出異常。
所以,之所以出現這種異常,就是上述第2種情況。
四驗證
反向驗證上述猜測。
在檢查點目錄下,手動保留一個commit/1679的文件,手動刪除offset/1679文件,這樣offset下最大的batchid將小於commit下的最大batchid.
具體驗證程序就不放上來了。有興趣的可以自行測試一下。
結果是成功復現此異常。
spark官方為什么這樣設計?
首先這肯定不是一個BUG。spark問題鏈接
我的猜測,這是官方的一種保護機制。多個任務使用同一個檢查點,相當於多個spark streaming設置了同一個group.id,將會造成kafka數據源混亂。
五后續
同樣的代碼,保留同一樣的checkpoint結構,即commit.maxBatchid>offset.maxBatchid,將系統升級到2.4版本,然后異常消失了!
所以這是打臉了嗎?
加了斷點,看了一下。
關鍵代碼在
MicroBatchExecution.class line 547
commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
在這里和2.3版本一樣,調用了這段代碼,檢查了commit offset目錄。
/**
* Store the metadata for the specified batchId and return `true` if successful. If the batchId's
* metadata has already been stored, this method will return `false`.
*/
override def add(batchId: Long, metadata: T): Boolean = {
require(metadata != null, "'null' metadata cannot written to a metadata log")
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written
writeBatchToFile(metadata, batchIdToPath(batchId))
true
}
}
只不過,當文件存在時,並沒有拋出異常,放過了,過了,了。。。。。。