一、兩階段提交2PC
在分布式系統中,可以使用兩階段提交來實現事務性從而保證數據的一致性,兩階段提交分為:預提交階段與提交階段,通常包含兩個角色:協調者與執行者,協調者用於用於管理所有執行者的操作,執行者用於執行具體的提交操作,具體的操作流程:
1. 首先協調者會送預提交(pre-commit)命令有的執行者
2. 執行者執行預提交操作然后發送一條反饋(ack)消息給協調者
3. 待協調者收到所有執行者的成功反饋,則發送一條提交信息(commit)給執行者
4. 執行者執行提交操作

如果在流程2中部分預提交失敗,那么協調者就會收到一條失敗的反饋,則會發送一條rollback消息給所有執行者,執行回滾操作,保證數據一致性;但是如果在流程4中,出現部分提交成功部分提交失敗,那么就會造成數據的不一致,因此后面也提出了3PC或者通過其他補償機制來保證數據最終一致性,接下看看flink 是如何做到2PC,保證數據的一致性。
二、Flink中兩階段提交
1. jobMaster 會周期性的發送執行checkpoint命令(start checkpoint);
2.當source端收到執行指令后會產生一條barrier消息插入到input消息隊列中,當處理到barrier時會執行本地checkpoint, 並且會將barrier發送到下一個節點,當checkpoint完成之后會發送一條ack信息給jobMaster ;
3. 當DAG圖中所有節點都完成checkpoint之后,jobMaster會收到來自所有節點的ack信息,那么就表示一次完整的checkpoint的完成;
4. JobMaster會給所有節點發送一條callback信息,表示通知checkpoint完成消息,這個過程是異步的,並非必須的,方便做一些其他的事情,例如kafka offset提交到kafka。
對比Flink整個checkpoint機制調用流程可以發現與2PC非常相似,JobMaster相當於協調者,所有的處理節點相當於執行者,start-checkpoint消息相當於pre-commit消息,每個處理節點的checkpoint相當於pre-commit過程,checkpoint ack消息相當於執行者反饋信息,最后callback消息相當於commit消息,完成具體的提交動作。那么我們應該怎么去使用這種機制來實現2PC呢?
Flink提供了CheckpointedFunction與CheckpointListener這樣兩個接口,CheckpointedFunction中有snapshotState方法,每次checkpoint觸發執行方法,通常會將緩存數據放入狀態中,可以理解為是一個hook,這個方法里面可以實現預提交,CheckpointListener中有notifyCheckpointComplete方法,checkpoint完成之后的通知方法,這里可以做一些額外的操作,例如FlinkKafakConsumerBase 使用這個來完成kafka offset的提交,在這個方法里面可以實現提交操作。
在2PC中提到如果對應流程2預提交失敗,那么本次checkpoint就被取消不會執行,不會影響數據一致性,那么如果流程4提交失敗了,在flink中可以怎么處理的呢? 我們可以在預提交階段(snapshotState)將事務的信息保存在state狀態中,如果流程4失敗,那么就可以從狀態中恢復事務信息,並且在CheckpointedFunction的initializeState方法中完成事務的提交,該方法是初始化方法只會執行一次,從而保證數據一致性。
參考:1、flink exactly-once系列之兩階段提交概述
