Flink通過全局快照能保證內部處理的Exactly-once語義
但是端到端的Exactly-once還需要下游數據源配合,常見的通過冪等或者二階段提交這兩種方式保證
這里就來分析一下Sink二階段提交的Flink源碼是如何實現的
本文源碼基於Flink1.14
老版本的話看TwoPhaseCommitSinkFunction,現在用SinkWriter邏輯都是差不多的
先來看下我們的主角 org.apache.flink.streaming.runtime.operators.sink.SinkOperator 類
1階段. 在barrier到齊准備觸發checkpoint之前
調用了數據源的預提交方法 prepareCommit
來看下已kafka為例具體做了什么
kafkaWriter就是調用了生產者的flush方法,在已經開始的事務里面刷數據
2階段. 觸發checkpoint保存狀態數據的時候 snapshotState 方法
以kafka為例
會啟動下一個checkpoint的kafka事務,直接就begin事務了,接着
用這次checkpoint需要commit的kafkaCommiter更新了狀態, 會被保存下來,這里有事務信息的后面會用到
3階段. 當checkpoint完成以后
已kafka為例,會直接提交事務了commit
這里可能會有疑問,,如果我只預提交了,還沒有commit這時候跪了,那我從checkpoint恢復起來,那不就有問題了嗎
帶着疑問看下最后一個階段
4階段. 當任務失敗從checkpoint恢復的時候
初始化的時候會恢復狀態
可以看到會將上面說的上次checkpoint需要commiter的放到recoveredCommittables恢復隊列里面
然后retrayWithDelay,就會根據我們保存的kafka事務信息id等去判斷,上一次事務的狀態,如果是預提交的話,就會先去commit了
總結一下流程:
prepareSnapshotPreBarrier快照觸發前, 預提交事務,kafka里面就是flash
snapshotState快照保存時,開啟一個新的事務kafka就是beginTransation,並且保存這次要提交的事務信息
notifyCheckpointComplete快照完成以后,調用對應的commit提交事務 , kafka就是commitTransation
initializeState從快照恢復,會先判斷上次事務的狀態如果還沒提交會先提交