flink的事務之兩階段提交
場景描述:
兩階段提交(two-phase commit, 2PC)是最基礎的分布式一致性協議,應用廣泛。本文來介紹它的相關細節以及它在Flink中的典型應用場景。。
簡介:
2PC 在分布式系統中,為了讓每個節點能夠感知其他所有節點的事務執行情況,需要我們引入一個中心節點的凡是統一所有節點的執行邏輯和進度。這個中心節點叫做協調者(coordinator),而其中向中心節點匯報或者被中心節點調度的其他節點叫做參與者。
具體過程
請求階段
1、協調者向所有參與者發送准備請求與事務內容,詢問是否可以准備事務提交,並等待參與者的響應。
2、參與者執行事務中的包含操作,並記錄undo日志(用於回滾)和redo日志(用於重放),但是不真正提交。
3、參與者向協調者返回事務才做的執行結果,執行陳工返回yes,否則返回no.
提交階段(分成成功和失敗兩種情況)
若所有的參與者都返回yes,說明事務可以提交。
1、協調者向所有參與者發送commit請求。
2、參與者收到commit 請求后,將事務真正的提交上去,並釋放占用的事務資源,並向協調者返回ack。
3、協調者收到所有參與者ack消息,事務成功完成。
若有參與者返回no或者超時未返回,說明事務終端,需要回滾。
1、協調者向所有參與者發送rollback請求。
2、參與者收到rollback請求后,根據undo日志回滾到事務執行前的狀態,釋放占用的事務資源,並向協調者返回ack。
3、協調者收到所有參與者的ack消息,事務回滾完成。


2pc 的優缺點
2PC的優點在於原理非常簡單,容易理解及實現。缺點主要有3個,列舉如下:
(1)協調者存在單點問題。如果協調者掛了,整個2PC邏輯就徹底不能運行。
(2)、執行過程是完全同步的。各參與者在等待其他參與者響應的過程中都處於阻塞狀態,大並發下有性能問題。
(3)、仍然存在不一致風險。如果由於網絡異常等意外導致只有部分參與者收到了commit請求,就會造成部分參與者提交了事務而其他參與者未提交的情況。
不過,現在人們在分布式一致性領域做了很多工作,以ZooKeeper為代表的分布式協調框架也數不勝數,2PC有了這些的加持,可靠性大大提升了,也就能夠真正用在要求高的生產環境中了。下面看看2PC與Flink是怎么扯上關系的。
flink基於2PC 應用
2PC 的最常見應用場景其實是關系型數據庫,比如mysql InnoDB 存儲引擎的XA事務系統。
Flink作為流式處理引擎,自然也提供了對exactly once語義的保證。flink的內部意圖檢查點機制和輕量級分布式快照算法ABS 保證exactly once .。二我們要實現端到端的精確一次的輸出邏輯,則需要施加以下兩種限制之一:冪等性寫入(idempotent write)、事務性寫入(transactional write)。
在Spark Streaming中,要實現事務性寫入完全靠用戶自己,框架本身並沒有提供任何實現。但是在Flink中提供了基於2PC的SinkFunction,名為TwoPhaseCommitSinkFunction,幫助我們做了一些基礎的工作。

flink 官方推薦所有需要保證exactly once 的sink 邏輯都繼承該抽象類。它具體定義如下四個抽象方法。需要我們去在子類中實現。
protected abstract TXN beginTransaction() throws Exception;
protected abstract void preCommit(TXN transaction) throws Exception;
protected abstract void commit(TXN transaction);
protected abstract void abort(TXN transaction);
beginTransaction(): 開始一個事務,返回事務信息的句柄。
preCommit :預提交(即提交請求)階段的邏輯
commit():正式提交階段的邏輯
abort():取消事務
下面以Flink與Kafka的集成來說明2PC的具體流程。注意這里的Kafka版本必須是0.11及以上,因為只有0.11+的版本才支持冪等producer以及事務性,從而2PC才有存在的意義。Kafka內部事務性的機制如下框圖所示。
kafka 的事務和冪等性參考。
https://blog.csdn.net/weixin_40809627/article/details/106918385
flink 實現兩階段提交具體實現為:
FlinkKafkaProducer011.commit()方法實際上是代理了KafkaProducer.commitTransaction()方法,正式向Kafka提交事務。
@Override
protected void commit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
} finally {
recycleTransactionalProducer(transaction.producer);
}
}
}
該方法的調用點位於 TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中,顧名思義,當所有的檢查點都成功后,會調用這個方法。
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
pendingTransactionIterator.remove();
}
if (firstError != null) {
throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
firstError);
}
}
從代碼中可以看出,該方法每次從賑災等待提交的事務句柄中取出一個,檢查他的檢查點ID,並調用commit()方法提交,這個階段流程圖為
可見,只有在所有的檢查點都成功的這個前提下,寫入才會成功。這符合前文描述2PC的流程。其中jobmanager為協調者,各個算子為參與者,並且中有sink一個參與者會執行提交。一旦有了檢查點失敗,notifyCheckpointComplete()方法不會執行,如果重試不成功的化。最后會調用abort()方法回滾事務。
@Override
protected void abort(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
transaction.producer.abortTransaction();
recycleTransactionalProducer(transaction.producer);
}
}
待確認點:具體代碼實現邏輯(感覺有部分說的不是清楚)
1、數據怎么提交到kafk中 兩步 提交請求 和執行
2、具體代碼實現
