kafka冪等性和事務使用及實現原理


kafka冪等性和事務使用及實現原理

開篇

在開始這篇之前,先拋出問題,這章解決如下問題:

  1. 如何開啟冪等性?
  2. 如何使用事務?
  3. 冪等性的原理
  4. 事務實現原理

正文

Producer 冪等性

Producer 的冪等性指的是當發送同一條消息時,數據在 Server 端只會被持久化一次,數據不丟不重,但是這里的冪等性是有條件的:

  • 只能保證 Producer 在單個會話內不丟不重,如果 Producer 出現意外掛掉再重啟是無法保證的(冪等性情況下,是無法獲取之前的狀態信息,因此是無法做到跨會話級別的不丟不重);
  • 冪等性不能跨多個 Topic-Partition,只能保證單個 partition 內的冪等性,當涉及多個 Topic-Partition 時,這中間的狀態並沒有同步。

如果需要跨會話、跨多個 topic-partition 的情況,需要使用 Kafka 的事務性來實現。

使用方式:props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

當冪等性開啟的時候acks即為all。如果顯性的將acks設置為0,-1,那么將會報錯Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

示例:

 1 roperties props = new Properties();
 2 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
 3 props.put(ProducerConfig.ACKS_CONFIG, "all");
 4 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 5 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 6 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
 7 
 8 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
 9 kafkaProducer.send(new ProducerRecord<String, String>("truman_kafka_center", "1", "hello world.")).get();
10 kafkaProducer.close();


冪等性是通過兩個關鍵信息保證的,PID(Producer ID)和sequence numbers。
冪等性原理

  • PID 用來標識每個producer client
  • sequence numbers 客戶端發送的每條消息都會帶相應的 sequence number,Server 端就是根據這個值來判斷數據是否重復

producer初始化會由server端生成一個PID,然后發送每條信息都包含該PID和sequence number,在server端,是按照partition同樣存放一個sequence numbers 信息,通過判斷客戶端發送過來的sequence number與server端number+1差值來決定數據是否重復或者漏掉。

通常情況下為了保證數據順序性,我們可以通過max.in.flight.requests.per.connection=1來保證,這個也只是針對單實例。在kafka2.0+版本上,只要開啟冪等性,不用設置這個參數也能保證發送數據的順序性。

為什么要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小於等於5

其實這里,要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小於等於 5 的主要原因是:Server 端的 ProducerStateManager 實例會緩存每個 PID 在每個 Topic-Partition 上發送的最近 5 個batch 數據(這個 5 是寫死的,至於為什么是 5,可能跟經驗有關,當不設置冪等性時,當這個設置為 5 時,性能相對來說較高,社區是有一個相關測試文檔),如果超過 5,ProducerStateManager 就會將最舊的 batch 數據清除。

假設應用將 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 設置為 6,假設發送的請求順序是 1、2、3、4、5、6,這時候 server 端只能緩存 2、3、4、5、6 請求對應的 batch 數據,這時候假設請求 1 發送失敗,需要重試,當重試的請求發送過來后,首先先檢查是否為重復的 batch,這時候檢查的結果是否,之后會開始 check 其 sequence number 值,這時候只會返回一個 OutOfOrderSequenceException 異常,client 在收到這個異常后,會再次進行重試,直到超過最大重試次數或者超時,這樣不但會影響 Producer 性能,還可能給 Server 帶來壓力(相當於client 狂發錯誤請求)。

Kafka 事務性

示例

 1 //Producer
 2 Properties props = new Properties();
 3 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
 4 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 5 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 6 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
 7 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id-0");
 8 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
 9 kafkaProducer.initTransactions();
10 kafkaProducer.beginTransaction();
11 for (int i = 0; i < 10; i++) {
12     kafkaProducer.send(new ProducerRecord<String, String>("truman_kafka_center", "key"+i, "hello world.")).get();
13 }
14 kafkaProducer.commitTransaction();
15 kafkaProducer.close();
16 //Consumer
17 Properties config = new Properties();
18 config.put("group.id", "test11");
19 config.put("bootstrap.servers", "127.0.0.1:9092");
20 config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
21 config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
22 config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
23 
24 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
25 consumer.subscribe(Arrays.asList(TOPIC));
26 boolean isConsumer = true;
27 while (isConsumer) {
28     ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer
29         .poll(Duration.ofMillis(100));
30     for (ConsumerRecord<String, String> record : records) {
31         System.out.println("consumer message: key =" + record.key() + " value:" + record.value());
32     }
33 }
34 consumer.close();
35 }

(1)查找TransactionCoordinator事務實現原理

通過transaction_id 找到TransactionCoordinator,具體算法是Utils.abs(transaction_id.hashCode %transactionTopicPartitionCount ),獲取到partition,再找到該partition的leader,即為TransactionCoordinator。

(2)獲取PID

凡是開啟冪等性都是需要生成PID(Producer ID),只不過未開啟事務的PID可以在任意broker生成,而開啟事務只能在TransactionCoordinator節點生成。這里只講開啟事務的情況,Producer Client的initTransactions()方法會向TransactionCoordinator發起InitPidRequest ,這樣就能獲取PID。這里面還有一些細節問題,這里不探討,例如transaction_id 之前的事務狀態什么的。但需要說明的一點是這里會將 transaction_id 與相應的 TransactionMetadata 持久化到事務日志(_transaction_state)中。

(3)開啟事務

Producer調用beginTransaction開始一個事務狀態,這里只是在客戶端將本地事務狀態轉移成 IN_TRANSACTION,只有在發送第一條信息后,TransactionCoordinator才會認為該事務已經開啟。

(4)Consume-Porcess-Produce Loop

這里說的是一個典型的consume-process-produce場景:

 1 while (true) {
 2     ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
 3     producer.beginTransaction();
 4     //start
 5     for (ConsumerRecord record : records){
 6         producer.send(producerRecord(“outputTopic1”, record));
 7         producer.send(producerRecord(“outputTopic2”, record));
 8     }
 9     producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
10     //end
11     producer.commitTransaction();
12 }

AddPartitionsToTxnRequest該階段主要經歷以下幾個步驟:

  1. ProduceRequest
  2. AddOffsetsToTxnRequest
  3. TxnOffsetsCommitRequest

關於這里的詳細介紹可以查看參考鏈接,或者直接查看官網文檔!

(5)提交或者中斷事務

Producer 調用 commitTransaction() 或者 abortTransaction() 方法來 commit 或者 abort 這個事務操作。

基本上經歷以下三個步驟,才真正結束事務。

  1. EndTxnRequest
  2. WriteTxnMarkerRquest
  3. Writing the Final Commit or Abort Message

其中EndTxnRequest是在Producer發起的請求,其他階段都是在TransactionCoordinator端發起完成的。WriteTxnMarkerRquest是發送請求到partition的leader上寫入事務結果信息(ControlBatch),第三步主要是在_transaction_state中標記事務的結束。

 

轉自:kafka冪等性和事務使用及實現原理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM