上一篇文章所述的Exactly-Once語義是針對Flink系統內部而言的.
那么Flink和外部系統(如Kafka)之間的消息傳遞如何做到exactly once呢?
問題所在:

如上圖,當sink A已經往Kafka寫入了數據,而sink B fail.
根據Flink的exactly once保證,系統會回滾到最近的checkpoint,
但是sink A已經把數據寫入到kafka了.
Flink無法回滾kafka的state.因此,kafka將在之后再次接收到一份同樣的來自sink A的數據,
這樣的message delivery便成為了at least once
Solution ---- Two phase commit
Flink采用Two phase commit來解決這個問題.
Phase 1: Pre-commit
Flink的JobManager向source注入checkpoint barrier以開啟這次snapshot.
barrier從source流向sink.
每個進行snapshot的算子成功snapshot后,都會向JobManager發送ACK.
當sink完成snapshot后, 向JobManager發送ACK的同時向kafka進行pre-commit.
Phase 2:Commit
當JobManager接收到所有算子的ACK后,就會通知所有的算子這次checkpoint已經完成.
Sink接收到這個通知后, 就向kafka進行commit,正式把數據寫入到kafka
不同階段fail over的recovery舉措:
(1) 在pre-commit前fail over, 系統恢復到最近的checkponit
(2) 在pre-commit后,commit前fail over,系統恢復到剛完成pre-commit時的狀態
Flink的two phase commit實現 ---- 抽象類TwoPhaseCommitSinkFunction
TwoPhaseCommitSinkFunction有4個方法:
1. beginTransaction()
開啟事務.創建一個臨時文件.后續把原要寫入到外部系統的數據寫入到這個臨時文件
2. preCommit()
flush並close這個文件,之后便不再往其中寫數據.同時開啟一個新的事務供下個checkponit使用
3. commit()
把pre-committed的臨時文件移動到指定目錄
4. abort()
刪除掉pre-committed的臨時文件
