深入理解Flink ---- End-to-End Exactly-Once語義


上一篇文章所述的Exactly-Once語義是針對Flink系統內部而言的.

那么Flink和外部系統(如Kafka)之間的消息傳遞如何做到exactly once呢?

 

問題所在:

如上圖,當sink A已經往Kafka寫入了數據,而sink B fail.

根據Flink的exactly once保證,系統會回滾到最近的checkpoint,

但是sink A已經把數據寫入到kafka了.

Flink無法回滾kafka的state.因此,kafka將在之后再次接收到一份同樣的來自sink A的數據,

這樣的message delivery便成為了at least once

 

Solution ---- Two phase commit

Flink采用Two phase commit來解決這個問題.

Phase 1: Pre-commit

Flink的JobManager向source注入checkpoint barrier以開啟這次snapshot.

barrier從source流向sink.

每個進行snapshot的算子成功snapshot后,都會向JobManager發送ACK.

當sink完成snapshot后, 向JobManager發送ACK的同時向kafka進行pre-commit.

Phase 2:Commit

當JobManager接收到所有算子的ACK后,就會通知所有的算子這次checkpoint已經完成.

Sink接收到這個通知后, 就向kafka進行commit,正式把數據寫入到kafka

 

不同階段fail over的recovery舉措:

(1)     在pre-commit前fail over, 系統恢復到最近的checkponit

(2)     在pre-commit后,commit前fail over,系統恢復到剛完成pre-commit時的狀態

 

Flink的two phase commit實現 ---- 抽象類TwoPhaseCommitSinkFunction

TwoPhaseCommitSinkFunction有4個方法:

1. beginTransaction()

  開啟事務.創建一個臨時文件.后續把原要寫入到外部系統的數據寫入到這個臨時文件

2. preCommit()

  flush並close這個文件,之后便不再往其中寫數據.同時開啟一個新的事務供下個checkponit使用

3. commit()

  把pre-committed的臨時文件移動到指定目錄

4. abort()

  刪除掉pre-committed的臨時文件

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM