kafka事務原理與使用


項目要求使用kafka的事務,遇到了一些問題,研究了下kafka的事務機制記錄一下。

kafka事務是為了實現:

  • Exactly Once即正好一次語義

  • 操作的原子性

  • 有狀態操作的可恢復性
        kafka的冪等性可以實現Exactly Once語義,冪等性提供了單會話單分區的Exactly-Once 語義的實現,冪等性實現是事務性實現的基礎。Kafka在引入冪等性之前,Producer向Broker發送消息,然后Broker將消息追加到消息流中后給Producer返回Ack信號值。實現流程如下:

    然而實際生產環境中會出現各種不確定的因素,比如在Producer在發送給Broker的時候出現網絡異常。比如以下這種異常情況的出現:

    上圖這種情況,當Producer第一次發送消息給Broker時,Broker將消息(x2,y2)追加到了消息流中,但是在返回Ack信號給Producer時失敗了(比如網絡異常) 。此時,Producer端觸發重試機制,將消息(x2,y2)重新發送給Broker,Broker接收到消息后,再次將該消息追加到消息流中,然后成功返回Ack信號給Producer。這樣下來,消息流中就被重復追加了兩條相同的(x2,y2)的消息。

冪等性:

    保證在消息重發的時候,消費者不會重復處理。即使在消費者收到重復消息的時候,重復處理,也要保證最終結果的一致性。所謂冪等性是指producer向server發送多條重復數據,server端只會持久化一條數據;數學概念就是: f(f(x)) = f(x) 。f函數表示對消息的處理。比如,銀行轉賬,如果失敗,需要重試。不管重試多少次,都要保證最終結果一定是一致的

冪等性的實現機制:

    冪等性的實現離不開ack機制,ack=1 只要下端的broker的leader分區寫入成功則任務是成功的;ack =0 producer之發送一次,不管對端有沒有寫入成功,對應語義是at most once;ack = -1 確保所有分區均寫入成功,對應語義是at least once;

at least once確保了至少會發送一次,冪等性確保了及時收到重復消息也不會重復處理,因此想要實現 Exactly once ,可以將at least once與冪等性結合,即Exactly once = at least once + 冪等性。當使用冪等性時,此時默認ack=-1。 而冪等性確保了不會發送重復的數據,

為實現冪等性,引入了兩個概念:

  • ProducerID:在每個新的Producer初始化時,會被分配一個唯一的ProducerID,這個ProducerID對客戶端使用者是不可見的。

  • SequenceNumber:對於每個ProducerID,Producer發送數據的每個Topic和Partition都對應一個從0開始單調遞增的SequenceNumber值。

Broker端會將<Pid,Partition,SequenceNumber> 持久化,具有相同主鍵的消息只會持久化一條。

冪等性解決的問題:

    當Producer發送消息(x2,y2)給Broker時,Broker接收到消息並將其追加到消息流中。此時,Broker返回Ack信號給Producer時,發生異常導致Producer接收Ack信號失敗。對於Producer來說,會觸發重試機制,將消息(x2,y2)再次發送,但是,由於引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發送給Broker,而之前Broker緩存過之前發送的相同的消息,那么在消息流中的消息就只有一條(x2,y2),不會出現重復發送的情況;

    如果消息序號比Broker維護的序號大一以上,說明中間有數據尚未寫入,也即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber

  • Broker保存消息后,發送ACK前宕機,Producer認為消息未發送成功並重試,造成數據重復。

  • 前一條消息發送失敗,后一條消息發送成功,前一條消息重試后成功,造成數據亂序(由SequenceNumber)。

局限性:

    只能保證 Producer 在單個會話內不丟不重,如果 Producer 出現意外掛掉再重啟是無法保證的(冪等性情況下,是無法獲取之前的狀態信息,因此是無法做到跨會話級別的不丟不重);冪等性不能跨多個 Topic-Partition,只能保證單個 partition 內的冪等性,當涉及多個 Topic-Partition 時,這中間的狀態並沒有同步。如果需要跨會話、跨多個 topic-partition 的情況,需要使用 Kafka 的事務性來實現。

事務:

    事務可以保證讀寫操作的原子性,要么全部成功,要么全部失敗,即使該生產或消費跨多個<Topic, Partition>。尤其對於Kafka Stream應用而言,典型的操作即是從某個Topic消費數據,經過一系列轉換后寫回另一個Topic,保證從源Topic的讀取與向目標Topic的寫入的原子性有助於從故障中恢復。

  • Exactly Once即正好一次語義

  • 操作的原子性

  • 有狀態操作的可恢復性

實現機制
    為實現這種效果應用程序必須提供一個穩定的(重啟后不變)唯一的ID,也即Transaction ID。Transactin ID與PID可能一一對應。區別在於Transaction ID由用戶提供,而PID是內部的實現對用戶透明。<Pid,Partition,SequenceNumber>+Transaction ID,通過不變的Transactin ID保證了跨會話的精准一次。

    另外,為了保證新的Producer啟動后,舊的具有相同Transaction ID的Producer即失效,每次Producer通過Transaction ID拿到PID的同時,還會獲取一個單調遞增的epoch。由於舊的Producer的epoch比新Producer的epoch小,Kafka可以很容易識別出該Producer是老的Producer並拒絕其請求。如果使用同一個TransactionID 開啟兩個生產者,那么前一個開啟的生產者會報錯:Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.

有了Transaction ID后,Kafka可保證:

  • 跨Session的數據冪等發送。當具有相同Transaction ID的新的Producer實例被創建且工作時,舊的且擁有相同Transaction ID的Producer將不再工作。

  • 跨Session的事務恢復。如果某個應用實例宕機,新的實例可以保證任何未完成的舊的事務要么Commit要么Abort,使得新實例從一個正常狀態開始工作。

事務流程圖:

具體的執行步驟:

1.查找Tranaction Corordinator,Producer向任意一個brokers發送 FindCoordinatorRequest請求來獲取Transaction Coordinator的地址;根據設置的transactional-id1的哈希值計算對_transaction_state分區數取余運算,找到分區編號,該分區對應的leader副本所對應的broker的即為Tranaction Coordinator所在節點。

2.初始化事務 initTransaction,此步驟為了獲得和保存<TransactionId,pid>的映射關系,將<TransactionId,pid>持久化到內部主題中。恢復(Commit或Abort)之前的Producer未完成的事務,對PID對應的epoch進行遞增,這樣可以保證同一個app的不同實例對應的PID是一樣,而epoch是不同的。

3.開始事務beginTransaction,執行Producer的beginTransacion(),它的作用是Producer在本地記錄下這個transaction的狀態為開始狀態。這個操作並沒有通知Transaction Coordinator,因為Transaction Coordinator只有在Producer發送第一條消息后才認為事務已經開啟。

4.read-process-write流程。這一階段,包含了整個事務的數據處理過程,並且包含了多種請求。包括存儲<TransactionId,TopicPartition>關系到_transaction_state,有了這個對應關系就可以后續給每個分區設置COMMIT和ABORT,根據groupId推導出在_consumer_offsets中的分區,並將該分區存儲到_transaction_state中,發送請求給GroupCoordinator,從而將本次事務消費的offsets存儲至_consumer_offsets.

5.事務提交或終結 commitTransaction/abortTransaction。第一階段,將Transaction Log內的該事務狀態設置為PREPARE_COMMIT或PREPARE_ABORT,第二階段,將Transaction Marker寫入該事務涉及到的所有消息(即將消息標記為committed或aborted)。這一步驟Transaction Coordinator會發送給當前事務涉及到的每個<Topic, Partition>的Leader,Broker收到該請求后,會將對應的Transaction Marker控制信息寫入日志。一旦Transaction Marker寫入完成,Transaction Coordinator會將最終的COMPLETE_COMMIT或COMPLETE_ABORT狀態寫入Transaction Log中以標明該事務結束。

事務使用的場景:

  • 只有Producer生產消息,這種場景需要事務的介入;
  • 消費消息和生產消息並存,比如Consumer&Producer模式,這種場景是一般Kafka項目中比較常見的模式,需要事務介入;
  • 只有Consumer消費消息,這種操作在實際項目中意義不大,和手動Commit Offsets的結果一樣,而且這種場景不是事務的引入目的。

事務提供了哪些可使用的API?

// 初始化事務,需要注意確保transation.id屬性被分配
void initTransactions();
// 開啟事務
void beginTransaction() throws ProducerFencedException;
// 為Consumer提供的在事務內Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;
// 提交事務
void commitTransaction() throws ProducerFencedException;
// 放棄事務,類似於回滾事務的操作
void abortTransaction() throws ProducerFencedException;

事務幾個重要的配置:

  • transactional.id=transactional-id1,transactional.id唯一不變的,便於程序異常重啟后恢復事務;

  • enable.idempotence 冪等性,在開啟事務的時候自動設置enable.idempotence=true。

  • isolation.level =read_committed 當commitTransaction()方法執行成功后,讀取已經提交事務的消息。

事務樣例

  • Producer 模式:

保證的producer發送一批數據,要么全部成功要么全部失敗。

// 初始化事務
producer.initTransactions();
// 開啟事務
producer.beginTransaction();
try {
producer.send(new ProducerRecord<>("topic1", "tx_msg_01"));
producer.send(new ProducerRecord<>("topic2", "tx_msg_02"));
// int i = 1 / 0;  ##制造異常情況
// 提交事務
producer.commitTransaction();
} catch (Exception ex) {
// 中止事務
producer.abortTransaction();
} finally {
// 關閉生產者
producer.close();
  • 消費-轉換-發送 模式:
Consumer Offset的Commit與Producer對發送消息的Commit包含在同一個事務中。
producer.initTransactions(); ##初始化事務

try {

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        if(!records.isEmpty()){
            Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
            producer.beginTransaction();      ##開始事務
            System.out.println("beginTransaction");
            
           ## 獲取每一個分區的數據
            for (TopicPartition partition:records.partitions()){

                List<ConsumerRecord<String,String>> partitionsRecords = records.records(partition);

                for (ConsumerRecord<String,String> record : partitionsRecords){

                    String msg = record.value();
                    ## 發送數據
                    producer.send(new ProducerRecord<>(sendTopicName, record.key(),record.value()));
                    System.out.println("send msg!");
                }

                long lastConsumedOffset = partitionsRecords.get(partitionsRecords.size() - 1).offset();
                ##保存每個分區最后一個offset
                commits.put(partition,new OffsetAndMetadata(lastConsumedOffset + 1));
            }
            ## 發送消費數據的Offset,將上述數據消費與數據發送納入同一個Transaction內
            producer.sendOffsetsToTransaction(commits,groupid );
            ## 提交事務
            producer.commitTransaction();
        }

    }
} catch (Exception e) {
    if (e instanceof ProducerFencedException){
        System.out.println("ProducerFencedException");
    }
    if (e instanceof InterruptedException) {
        System.out.println("thread interrupted");
    } else {
        System.out.println("error");
        e.printStackTrace();
    }
} finally {
    System.out.println("finally");
    producer.abortTransaction();
    consumer.close();
    producer.flush();
    producer.close();
}

異常情況:
producer.send();上述代碼中,使用的是異步發送,若是發送過程中出現異常,是不會倍捕獲到的,使用get()方法能夠捕獲到。使用同步發送方法producer.send().get(),當發送過程中發生異常是可以捕獲到的,但是使用同步發送方法消息是極低的,大概幾百tps。

局限性:
經過測試kafka1.8 與2.5版本,使用consume-transform-produce 的事務模型,在本集群中從topic A--topic B 發送沒有什么問題;但是跨集群從集群A 的topicA--集群B topicB 時offset 無法更新,kafka不支持跨集群事務。在kafka集群中存在一個協調者,由協調者統一提交consumer的offsets以及producer的messages。整個read-process-write流程都必須在協調者的監控之下。這也是為什么消費者和生產者必須在同一個集群內的原因。

小結:

  • PID與Sequence Number的引入實現了寫操作的冪等性
  • 寫操作的冪等性結合At Least Once語義實現了單一Session內的Exactly Once語義
  • Transaction Marker與PID提供了識別消息是否應該被讀取的能力,從而實現了事務的隔離性
  • Offset的更新標記了消息是否被讀取,從而將對讀操作的事務處理轉換成了對寫(Offset)操作的事務處理
  • Kafka事務的本質是,將一組寫操作(如果有)對應的消息與一組讀操作(如果有)對應的Offset的更新進行同樣的標記(即Transaction Marker)來實現事務中涉及的所有讀寫操作同時對外可見或同時對外不可見
  • Kafka只提供對Kafka本身的讀寫操作的事務性,不提供包含外部系統的事務性

參考:

http://www.jasongj.com/kafka/transaction/
http://matt33.com/2018/10/24/kafka-idempotent/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM