Structured Streaming 的異常處理 【Concurrent update to the log. Multiple streaming jobs detected】


版本號:
spark 2.3
structured streaming代碼

異常信息

KafkaSource[Subscribe[test]]
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.AssertionError: assertion failed: Concurrent update to the log. Multiple streaming jobs detected for 1470
	at scala.Predef$.assert(Predef.scala:170)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:339)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:338)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
	... 1 more

一.異常表象原因

1.異常源碼:

          currentBatchId,
          availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
          s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
        logInfo(s"Committed offsets for batch $currentBatchId. " +
          s"Metadata ${offsetSeqMetadata.toString}")

這是一個斷言,assert,其中

          currentBatchId,
          availableOffsets.toOffsetSeq(sources, offsetSeqMetadata))

這個函數返回一個布爾值,如果返回為false將拋出目前這樣的異常。
那么,這個函數是干嘛的?

    require(metadata != null, "'null' metadata cannot written to a metadata log")
    get(batchId).map(_ => false).getOrElse {
      // Only write metadata when the batch has not yet been written
      writeBatch(batchId, metadata)
      true
    }
  } 

從注釋中可以看出,當一個structured streaming程序啟動時,會去判斷當前batch批次是否已經store。如果當前批次的元存儲信息已經被存儲,那么將返回false,程序將異常退出。

/**

  • Store the metadata for the specified batchId and return true if successful. If the batchId's
  • metadata has already been stored, this method will return false.
    */

2.打個斷點

通過斷點可以看到:

1.當前batchId = 1470
2.當前kafka里的最新offset是41

然后查看本地checkpoint里的消息

通過checkpoint可以看到

1.在offset文件里,確實存在1470的批次。表面當前批次的元信息已經被存儲了。
2.打開1470文件可以看到,里面的最新消息offset為36

二.解決方案

最簡單粗暴的就是直接刪除checkpoint文件夾。但這樣的話,會丟失中間部份數據。即36到41這向條數據。

Current Committed Offsets: {KafkaSource[Subscribe[test]]: {"test":{"0":36}}}
Current Available Offsets: {KafkaSource[Subscribe[test]]: {"test":{"0":41}}}

然后可以指定開始offset.

1.可以通過代碼指定各分區的開始offset

.option("startingOffsets", """{"test":{"0":36,"1":-2},"topic2":{"0":-2}}""")
這種方式需要改代碼,不推薦。

2.不刪除而是更改checkpoint offset下的批次文件

如本例中,刪除1470,並將1470里的信息復制到1469.用1470替代前一個批次1469的信息。

三.異常背后的原因

目前只是解決了這個問題。但背后的原因呢?

首先是什么情況導致的?

根據字面意思【Concurrent update to the log. Multiple streaming jobs detected】,在寫checkpoint日志時,系統認為有多個streaming程序在寫。系統認為不行。不管是不是有多個streaming任務在執行,既然系統有這么判斷,那就去看checkpoint日志。
在這之前,先弄清structured streaming的checkpoint機制。

1). StreamExecution 通過 Source.getOffset() 獲取最新的 offsets,即最新的數據進度;
2). StreamExecution 將 offsets 等寫入到 offsetLog 里, 這里的 offsetLog 是一個持久化的 WAL (Write-Ahead-Log),是將來可用作故障恢復;
3). StreamExecution 構造本次執行的 LogicalPlan
(3a) 將預先定義好的邏輯(即 StreamExecution 里的 logicalPlan 成員變量)制作一個副本出來
(3b) 給定剛剛取到的 offsets,通過 Source.getBatch(offsets) 獲取本執行新收到的數據的 Dataset/DataFrame 表示,並替換到 (3a) 中的副本里
經過 (3a), (3b) 兩步,構造完成的 LogicalPlan 就是針對本執行新收到的數據的 Dataset/DataFrame 變換(即整個處理邏輯)了
4). 觸發對本次執行的 LogicalPlan 的優化,得到 IncrementalExecution
邏輯計划的優化:通過 Catalyst 優化器完成
物理計划的生成與選擇:結果是可以直接用於執行的 RDD DAG
邏輯計划、優化的邏輯計划、物理計划、及最后結果 RDD DAG,合並起來就是 IncrementalExecution
5). 將表示計算結果的 Dataset/DataFrame (包含 IncrementalExecution) 交給 Sink,即調用 Sink.add(ds/df)
6). 計算完成后的 commit
(6a) 通過 Source.commit() 告知 Source 數據已經完整處理結束;Source 可按需完成數據的 garbage-collection
(6b) 將本次執行的批次 id 寫入到 batchCommitLog 里

注意2和6。前面我們知道,checkpoint下面的有三個文件commit,offset,source。在2的步驟,接收到kafka消息時,會將當前Batch的信息寫入offset。直到當前batch處理完畢,再通過6的步驟commit到檢查點。

如果2->6正常執行,這是一種正常情況,offset.maxBatchid>commit.maxBatchid。不做討論。
如果2和6的步驟之間出現異常,執行了2,而沒有執行6呢?
這種情況下又分為二種情況:

1.offset.maxBatchid>commit.maxBatchid 這種情況最為常見。表示接收到了kafka消息,但處理過程中異常退出。那么下次重啟時,首先從offset.maxBatchId作為開始offset.

2.offset.maxBatchid>commit.maxBatchid 如果僅有一個strutured streaming任務在寫checkpoint目錄的話,永遠不可能出現這種情況。因為2->6順序執行的。如果出現了這種情況,那么系統就認為有多個任務使用了同一個檢查點。這種情況是不被允許的,直接拋出異常。

所以,之所以出現這種異常,就是上述第2種情況。

四驗證

反向驗證上述猜測。
在檢查點目錄下,手動保留一個commit/1679的文件,手動刪除offset/1679文件,這樣offset下最大的batchid將小於commit下的最大batchid.

具體驗證程序就不放上來了。有興趣的可以自行測試一下。
結果是成功復現此異常。

spark官方為什么這樣設計?

首先這肯定不是一個BUG。spark問題鏈接

image

我的猜測,這是官方的一種保護機制。多個任務使用同一個檢查點,相當於多個spark streaming設置了同一個group.id,將會造成kafka數據源混亂。

五后續

同樣的代碼,保留同一樣的checkpoint結構,即commit.maxBatchid>offset.maxBatchid,將系統升級到2.4版本,然后異常消失了!

所以這是打臉了嗎?

加了斷點,看了一下。

關鍵代碼在

MicroBatchExecution.class line 547
commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))

在這里和2.3版本一樣,調用了這段代碼,檢查了commit offset目錄。

/**
   * Store the metadata for the specified batchId and return `true` if successful. If the batchId's
   * metadata has already been stored, this method will return `false`.
   */
  override def add(batchId: Long, metadata: T): Boolean = {
    require(metadata != null, "'null' metadata cannot written to a metadata log")
    get(batchId).map(_ => false).getOrElse {
      // Only write metadata when the batch has not yet been written
      writeBatchToFile(metadata, batchIdToPath(batchId))
      true
    }
  }

只不過,當文件存在時,並沒有拋出異常,放過了,過了,了。。。。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM