flink的二階段提交


flink的事務之兩階段提交

場景描述:

兩階段提交(two-phase commit, 2PC)是最基礎的分布式一致性協議,應用廣泛。本文來介紹它的相關細節以及它在Flink中的典型應用場景。。

簡介:

2PC 在分布式系統中,為了讓每個節點能夠感知其他所有節點的事務執行情況,需要我們引入一個中心節點的凡是統一所有節點的執行邏輯和進度。這個中心節點叫做協調者(coordinator),而其中向中心節點匯報或者被中心節點調度的其他節點叫做參與者。

具體過程

請求階段
1、協調者向所有參與者發送准備請求與事務內容,詢問是否可以准備事務提交,並等待參與者的響應。
2、參與者執行事務中的包含操作,並記錄undo日志(用於回滾)和redo日志(用於重放),但是不真正提交。
3、參與者向協調者返回事務才做的執行結果,執行陳工返回yes,否則返回no.
提交階段(分成成功和失敗兩種情況)
若所有的參與者都返回yes,說明事務可以提交。
1、協調者向所有參與者發送commit請求。
2、參與者收到commit 請求后,將事務真正的提交上去,並釋放占用的事務資源,並向協調者返回ack。
3、協調者收到所有參與者ack消息,事務成功完成。
若有參與者返回no或者超時未返回,說明事務終端,需要回滾。

1、協調者向所有參與者發送rollback請求。
2、參與者收到rollback請求后,根據undo日志回滾到事務執行前的狀態,釋放占用的事務資源,並向協調者返回ack。
3、協調者收到所有參與者的ack消息,事務回滾完成。

在這里插入圖片描述
在這里插入圖片描述

2pc 的優缺點
2PC的優點在於原理非常簡單,容易理解及實現。缺點主要有3個,列舉如下:
(1)協調者存在單點問題。如果協調者掛了,整個2PC邏輯就徹底不能運行。
(2)、執行過程是完全同步的。各參與者在等待其他參與者響應的過程中都處於阻塞狀態,大並發下有性能問題。
(3)、仍然存在不一致風險。如果由於網絡異常等意外導致只有部分參與者收到了commit請求,就會造成部分參與者提交了事務而其他參與者未提交的情況。
不過,現在人們在分布式一致性領域做了很多工作,以ZooKeeper為代表的分布式協調框架也數不勝數,2PC有了這些的加持,可靠性大大提升了,也就能夠真正用在要求高的生產環境中了。下面看看2PC與Flink是怎么扯上關系的。

flink基於2PC 應用

2PC 的最常見應用場景其實是關系型數據庫,比如mysql InnoDB 存儲引擎的XA事務系統。
Flink作為流式處理引擎,自然也提供了對exactly once語義的保證。flink的內部意圖檢查點機制和輕量級分布式快照算法ABS 保證exactly once .。二我們要實現端到端的精確一次的輸出邏輯,則需要施加以下兩種限制之一:冪等性寫入(idempotent write)、事務性寫入(transactional write)。

在Spark Streaming中,要實現事務性寫入完全靠用戶自己,框架本身並沒有提供任何實現。但是在Flink中提供了基於2PC的SinkFunction,名為TwoPhaseCommitSinkFunction,幫助我們做了一些基礎的工作。

在這里插入圖片描述
flink 官方推薦所有需要保證exactly once 的sink 邏輯都繼承該抽象類。它具體定義如下四個抽象方法。需要我們去在子類中實現。

   protected abstract TXN beginTransaction() throws Exception;
    protected abstract void preCommit(TXN transaction) throws Exception;
    protected abstract void commit(TXN transaction);
    protected abstract void abort(TXN transaction);

 

beginTransaction(): 開始一個事務,返回事務信息的句柄。
preCommit :預提交(即提交請求)階段的邏輯
commit():正式提交階段的邏輯
abort():取消事務

下面以Flink與Kafka的集成來說明2PC的具體流程。注意這里的Kafka版本必須是0.11及以上,因為只有0.11+的版本才支持冪等producer以及事務性,從而2PC才有存在的意義。Kafka內部事務性的機制如下框圖所示。
kafka 的事務和冪等性參考。
https://blog.csdn.net/weixin_40809627/article/details/106918385

flink 實現兩階段提交具體實現為:
FlinkKafkaProducer011.commit()方法實際上是代理了KafkaProducer.commitTransaction()方法,正式向Kafka提交事務。

   @Override
    protected void commit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
                transaction.producer.commitTransaction();
            } finally {
                recycleTransactionalProducer(transaction.producer);
            }
        }
    }

 

該方法的調用點位於 TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中,顧名思義,當所有的檢查點都成功后,會調用這個方法。

   @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
        checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
        Throwable firstError = null;

        while (pendingTransactionIterator.hasNext()) {
            Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
            Long pendingTransactionCheckpointId = entry.getKey();
            TransactionHolder<TXN> pendingTransaction = entry.getValue();
            if (pendingTransactionCheckpointId > checkpointId) {
                continue;
            }

            LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);

            logWarningIfTimeoutAlmostReached(pendingTransaction);
            try {
                commit(pendingTransaction.handle);
            } catch (Throwable t) {
                if (firstError == null) {
                    firstError = t;
                }
            }

            LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);

            pendingTransactionIterator.remove();
        }

        if (firstError != null) {
            throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
                firstError);
        }
    }

從代碼中可以看出,該方法每次從賑災等待提交的事務句柄中取出一個,檢查他的檢查點ID,並調用commit()方法提交,這個階段流程圖為
在這里插入圖片描述
可見,只有在所有的檢查點都成功的這個前提下,寫入才會成功。這符合前文描述2PC的流程。其中jobmanager為協調者,各個算子為參與者,並且中有sink一個參與者會執行提交。一旦有了檢查點失敗,notifyCheckpointComplete()方法不會執行,如果重試不成功的化。最后會調用abort()方法回滾事務。

 @Override
    protected void abort(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            transaction.producer.abortTransaction();
            recycleTransactionalProducer(transaction.producer);
        }
    }

 

待確認點:具體代碼實現邏輯(感覺有部分說的不是清楚)
1、數據怎么提交到kafk中 兩步 提交請求 和執行
2、具體代碼實現


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM