kafka冪等性和事務使用及實現原理
開篇
在開始這篇之前,先拋出問題,這章解決如下問題:
- 如何開啟冪等性?
- 如何使用事務?
- 冪等性的原理
- 事務實現原理
正文
Producer 冪等性
Producer 的冪等性指的是當發送同一條消息時,數據在 Server 端只會被持久化一次,數據不丟不重,但是這里的冪等性是有條件的:
- 只能保證 Producer 在單個會話內不丟不重,如果 Producer 出現意外掛掉再重啟是無法保證的(冪等性情況下,是無法獲取之前的狀態信息,因此是無法做到跨會話級別的不丟不重);
- 冪等性不能跨多個 Topic-Partition,只能保證單個 partition 內的冪等性,當涉及多個 Topic-Partition 時,這中間的狀態並沒有同步。
如果需要跨會話、跨多個 topic-partition 的情況,需要使用 Kafka 的事務性來實現。
使用方式:props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
當冪等性開啟的時候acks即為all。如果顯性的將acks設置為0,-1,那么將會報錯Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
示例:
1 roperties props = new Properties(); 2 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); 3 props.put(ProducerConfig.ACKS_CONFIG, "all"); 4 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 5 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 6 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); 7 8 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); 9 kafkaProducer.send(new ProducerRecord<String, String>("truman_kafka_center", "1", "hello world.")).get(); 10 kafkaProducer.close();
冪等性是通過兩個關鍵信息保證的,PID(Producer ID)和sequence numbers。冪等性原理
- PID 用來標識每個producer client
- sequence numbers 客戶端發送的每條消息都會帶相應的 sequence number,Server 端就是根據這個值來判斷數據是否重復
producer初始化會由server端生成一個PID,然后發送每條信息都包含該PID和sequence number,在server端,是按照partition同樣存放一個sequence numbers 信息,通過判斷客戶端發送過來的sequence number與server端number+1差值來決定數據是否重復或者漏掉。
通常情況下為了保證數據順序性,我們可以通過max.in.flight.requests.per.connection=1
來保證,這個也只是針對單實例。在kafka2.0+版本上,只要開啟冪等性,不用設置這個參數也能保證發送數據的順序性。
為什么要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小於等於5
其實這里,要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小於等於 5 的主要原因是:Server 端的 ProducerStateManager 實例會緩存每個 PID 在每個 Topic-Partition 上發送的最近 5 個batch 數據(這個 5 是寫死的,至於為什么是 5,可能跟經驗有關,當不設置冪等性時,當這個設置為 5 時,性能相對來說較高,社區是有一個相關測試文檔),如果超過 5,ProducerStateManager 就會將最舊的 batch 數據清除。
假設應用將 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 設置為 6,假設發送的請求順序是 1、2、3、4、5、6,這時候 server 端只能緩存 2、3、4、5、6 請求對應的 batch 數據,這時候假設請求 1 發送失敗,需要重試,當重試的請求發送過來后,首先先檢查是否為重復的 batch,這時候檢查的結果是否,之后會開始 check 其 sequence number 值,這時候只會返回一個 OutOfOrderSequenceException 異常,client 在收到這個異常后,會再次進行重試,直到超過最大重試次數或者超時,這樣不但會影響 Producer 性能,還可能給 Server 帶來壓力(相當於client 狂發錯誤請求)。
Kafka 事務性
示例
1 //Producer 2 Properties props = new Properties(); 3 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); 4 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 5 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 6 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); 7 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id-0"); 8 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); 9 kafkaProducer.initTransactions(); 10 kafkaProducer.beginTransaction(); 11 for (int i = 0; i < 10; i++) { 12 kafkaProducer.send(new ProducerRecord<String, String>("truman_kafka_center", "key"+i, "hello world.")).get(); 13 } 14 kafkaProducer.commitTransaction(); 15 kafkaProducer.close(); 16 //Consumer 17 Properties config = new Properties(); 18 config.put("group.id", "test11"); 19 config.put("bootstrap.servers", "127.0.0.1:9092"); 20 config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 21 config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 22 config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); 23 24 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config); 25 consumer.subscribe(Arrays.asList(TOPIC)); 26 boolean isConsumer = true; 27 while (isConsumer) { 28 ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer 29 .poll(Duration.ofMillis(100)); 30 for (ConsumerRecord<String, String> record : records) { 31 System.out.println("consumer message: key =" + record.key() + " value:" + record.value()); 32 } 33 } 34 consumer.close(); 35 }
(1)查找TransactionCoordinator事務實現原理
通過transaction_id 找到TransactionCoordinator,具體算法是Utils.abs(transaction_id.hashCode %transactionTopicPartitionCount )
,獲取到partition,再找到該partition的leader,即為TransactionCoordinator。
(2)獲取PID
凡是開啟冪等性都是需要生成PID(Producer ID),只不過未開啟事務的PID可以在任意broker生成,而開啟事務只能在TransactionCoordinator節點生成。這里只講開啟事務的情況,Producer Client的initTransactions()
方法會向TransactionCoordinator發起InitPidRequest ,這樣就能獲取PID。這里面還有一些細節問題,這里不探討,例如transaction_id 之前的事務狀態什么的。但需要說明的一點是這里會將 transaction_id 與相應的 TransactionMetadata 持久化到事務日志(_transaction_state)中。
(3)開啟事務
Producer調用beginTransaction
開始一個事務狀態,這里只是在客戶端將本地事務狀態轉移成 IN_TRANSACTION,只有在發送第一條信息后,TransactionCoordinator才會認為該事務已經開啟。
(4)Consume-Porcess-Produce Loop
這里說的是一個典型的consume-process-produce
場景:
1 while (true) { 2 ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); 3 producer.beginTransaction(); 4 //start 5 for (ConsumerRecord record : records){ 6 producer.send(producerRecord(“outputTopic1”, record)); 7 producer.send(producerRecord(“outputTopic2”, record)); 8 } 9 producer.sendOffsetsToTransaction(currentOffsets(consumer), group); 10 //end 11 producer.commitTransaction(); 12 }
AddPartitionsToTxnRequest該階段主要經歷以下幾個步驟:
- ProduceRequest
- AddOffsetsToTxnRequest
- TxnOffsetsCommitRequest
關於這里的詳細介紹可以查看參考鏈接,或者直接查看官網文檔!
(5)提交或者中斷事務
Producer 調用 commitTransaction()
或者 abortTransaction()
方法來 commit 或者 abort 這個事務操作。
基本上經歷以下三個步驟,才真正結束事務。
- EndTxnRequest
- WriteTxnMarkerRquest
- Writing the Final Commit or Abort Message
其中EndTxnRequest是在Producer發起的請求,其他階段都是在TransactionCoordinator端發起完成的。WriteTxnMarkerRquest是發送請求到partition的leader上寫入事務結果信息(ControlBatch),第三步主要是在_transaction_state
中標記事務的結束。