Flink 如何通過2PC實現Exactly-once語義 (源碼分析)


Flink通過全局快照能保證內部處理的Exactly-once語義

但是端到端的Exactly-once還需要下游數據源配合,常見的通過冪等或者二階段提交這兩種方式保證

這里就來分析一下Sink二階段提交的Flink源碼是如何實現的

本文源碼基於Flink1.14

老版本的話看TwoPhaseCommitSinkFunction,現在用SinkWriter邏輯都是差不多的

先來看下我們的主角  org.apache.flink.streaming.runtime.operators.sink.SinkOperator 類

1階段. 在barrier到齊准備觸發checkpoint之前

調用了數據源的預提交方法 prepareCommit

來看下已kafka為例具體做了什么

 kafkaWriter就是調用了生產者的flush方法,在已經開始的事務里面刷數據

2階段. 觸發checkpoint保存狀態數據的時候 snapshotState 方法

 以kafka為例

 會啟動下一個checkpoint的kafka事務,直接就begin事務了,接着

用這次checkpoint需要commit的kafkaCommiter更新了狀態, 會被保存下來,這里有事務信息的后面會用到

3階段. 當checkpoint完成以后

 已kafka為例,會直接提交事務了commit

 

這里可能會有疑問,,如果我只預提交了,還沒有commit這時候跪了,那我從checkpoint恢復起來,那不就有問題了嗎

帶着疑問看下最后一個階段

4階段. 當任務失敗從checkpoint恢復的時候

初始化的時候會恢復狀態

可以看到會將上面說的上次checkpoint需要commiter的放到recoveredCommittables恢復隊列里面

然后retrayWithDelay,就會根據我們保存的kafka事務信息id等去判斷,上一次事務的狀態,如果是預提交的話,就會先去commit了

總結一下流程:

prepareSnapshotPreBarrier快照觸發前, 預提交事務,kafka里面就是flash
snapshotState快照保存時,開啟一個新的事務kafka就是beginTransation,並且保存這次要提交的事務信息
notifyCheckpointComplete快照完成以后,調用對應的commit提交事務 , kafka就是commitTransation
initializeState從快照恢復,會先判斷上次事務的狀態如果還沒提交會先提交



 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM