Kafka 0.11.x版本(對應 Confluent Platform 3.3),該版本引入了exactly-once語義。
精確一次確實很難實現(Exactly-once is a really hard problem)
Mathias Verraes說,分布式系統中最難解決的兩個問題是:
- 消息順序保證(Guaranteed order of messages)。
- 消息的精確一次投遞(Exactly-once delivery)。
消息系統語義概述(Overview of messaging system semantics)
在一個分布式發布訂閱消息系統中,組成系統的計算機總會由於各自的故障而不能工作。在Kafka中,一個單獨的broker,可能會在生產者發送消息到一個topic的時候宕機,或者出現網絡故障,從而導致生產者發送消息失敗。根據生產者如何處理這樣的失敗,產生了不同的語義:
- 至少一次語義(At least once semantics):如果生產者收到了Kafka broker的確認(acknowledgement,ack),並且生產者的acks配置項設置為all(或-1),這就意味着消息已經被精確一次寫入Kafka topic了。然而,如果生產者接收ack超時或者收到了錯誤,它就會認為消息沒有寫入Kafka topic而嘗試重新發送消息。如果broker恰好在消息已經成功寫入Kafka topic后,發送ack前,出了故障,生產者的重試機制就會導致這條消息被寫入Kafka兩次,從而導致同樣的消息會被消費者消費不止一次。每個人都喜歡一個興高采烈的給予者,但是這種方式會導致重復的工作和錯誤的結果。
- 至多一次語義(At most once semantics):如果生產者在ack超時或者返回錯誤的時候不重試發送消息,那么消息有可能最終並沒有寫入Kafka topic中,因此也就不會被消費者消費到。但是為了避免重復處理的可能性,我們接受有些消息可能被遺漏處理。
- 精確一次語義(Exactly once semantics): 即使生產者重試發送消息,也只會讓消息被發送給消費者一次。精確一次語義是最令人滿意的保證,但也是最難理解的。因為它需要消息系統本身和生產消息的應用程序還有消費消息的應用程序一起合作。比如,在成功消費一條消息后,你又把消費的offset重置到之前的某個offset位置,那么你將收到從那個offset到最新的offset之間的所有消息。這解釋了為什么消息系統和客戶端程序必須合作來保證精確一次語義。
必須被處理的故障(Failures that must be handled)
為了描述為了支持精確一次消息投遞語義而引入的挑戰,讓我們從一個簡單的例子開始。
假設有一個單進程生產者程序,發送了消息“Hello Kafka“給一個叫做“EoS“的單分區Kafka topic。然后有一個單實例的消費者程序在另一端從topic中拉取消息,然后打印。在沒有故障的理想情況下,這能很好的工作,“Hello Kafka“只被寫入到EoS topic一次。消費者拉取消息,處理消息,提交偏移量來說明它完成了處理。然后,即使消費者程序出故障重啟也不會再收到“Hello Kafka“這條消息了。
然而,我們知道,我們不能總認為一切都是順利的。在上規模的集群中,即使最不可能發生的故障場景都可能最終發生。比如:
- broker可能故障:Kafka是一個高可用、持久化的系統,每一條寫入一個分區的消息都會被持久化並且多副本備份(假設有n個副本)。所以,Kafka可以容忍n-1個broker故障,意味着一個分區只要至少有一個broker可用,分區就可用。Kafka的副本協議保證了只要消息被成功寫入了主副本,它就會被復制到其他所有的可用副本(ISR)。
- producer到broker的RPC調用可能失敗:Kafka的持久性依賴於生產者接收broker的ack。沒有接收成功ack不代表生產請求本身失敗了。broker可能在寫入消息后,發送ack給生產者的時候掛了。甚至broker也可能在寫入消息前就掛了。由於生產者沒有辦法知道錯誤是什么造成的,所以它就只能認為消息沒寫入成功,並且會重試發送。在一些情況下,這會造成同樣的消息在Kafka分區日志中重復,進而造成消費端多次收到這條消息。
- 客戶端可能會故障:精確一次交付也必須考慮客戶端故障。但是我們如何知道一個客戶端已經故障而不是暫時和brokers斷開,或者經歷一個程序短暫的暫停?區分永久性故障和臨時故障是很重要的,為了正確性,broker應該丟棄僵住的生產者發送來的消息,同樣,也應該不向已經僵住的消費者發送消息。一旦一個新的客戶端實例啟動,它應該能夠從失敗的實例留下的任何狀態中恢復,從一個安全點開始處理。這意味着,消費的偏移量必須始終與生產的輸出保持同步。
Kafka的exactly-once語義
在0.11.x版本之前,Apache Kafka支持at-least-once delivery語義以及partition內部的順序delivery,如前所述這在某些場景下可能會導致數據重復消費。而Kafka 0.11.x支持exactly-once語義,不會導致該情況發生,其中主要包括三個內部邏輯的改造:
冪等:partition內部的exactly-once順序語義
冪等操作,是指可以執行多次,而不會產生與僅執行一次不同結果的操作,Producer的send操作現在是冪等的。在任何導致producer重試的情況下,相同的消息,如果被producer發送多次,也只會被寫入Kafka一次。要打開此功能,並讓所有partition獲得exactly-once delivery、無數據丟失和in-order語義,需要修改broker的配置:enable.idempotence = true。
這個功能如何工作?它的工作方式類似於TCP:發送到Kafka的每批消息將包含一個序列號,該序列號用於重復數據的刪除。與TCP不同,TCP只能在transient in-memory中提供保證。序列號將被持久化存儲topic中,因此即使leader replica失敗,接管的任何其他broker也將能感知到消息是否重復。
這種機制的開銷相當低:它只是在每批消息中添加了幾個額外字段:
- PID,在Producer初始化時分配,作為每個Producer會話的唯一標識;
- 序列號(sequence number),Producer發送的每條消息(更准確地說是每一個消息批次,即ProducerBatch)都會帶有此序列號,從0開始單調遞增。Broker根據它來判斷寫入的消息是否可接受。
事務:跨partition的原子性寫操作
第二點,Kafka現在支持使用新事務API原子性的對跨partition進行寫操作,該API允許producer發送批量消息到多個partition。該功能同樣支持在同一個事務中提交消費者offsets,因此真正意義上實現了end-to-end的exactly-once delivery語義。以下是一段示例代碼:
producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch(ProducerFencedException e) { producer.close(); } catch(KafkaException e) { producer.abortTransaction(); }
該代碼片段描述了如何使用新的producer事務API原子性的發送消息至多個partition。值得注意的是,某個Kafka topic partition內部的消息可能是事務完整提交后的消息,也可能是事務執行過程中的部分消息。
而從consumer的角度來看,有兩種策略去讀取事務寫入的消息,通過"isolation.level"來進行配置:
-
read_committed
:可以同時讀取事務執行過程中的部分寫入數據和已經完整提交的事務寫入數據; -
read_uncommitted
:完全不等待事務提交,按照offsets order去讀取消息,也就是兼容0.11.x版本前Kafka的語義;
我們必須通過配置consumer端的配置isolation.level
,來正確使用事務API,通過使用 new Producer API並且對一些unique ID設置transaction.id
(該配置屬於producer端),該unique ID用於提供事務狀態的連續性。
Exactly-once 流處理
基於冪等和原子性,通過Streams API實現exactly-once流處理成為可能。如果要在流應用中實現相關語義,只需要配置 processing.guarantee=exactly_once
,這會影響所有的流處理環境中的語義,包括將處理作業和由加工作業創建的所有物理狀態同時寫回到Kafka的操作。
事務機制原理
事務性消息傳遞
這一節所說的事務主要指原子性,也即Producer將多條消息作為一個事務批量發送,要么全部成功要么全部失敗。
為了實現這一點,Kafka 0.11.0.0引入了一個服務器端的模塊,名為Transaction Coordinator
,用於管理Producer發送的消息的事務性。
該Transaction Coordinator
維護Transaction Log
,該log存於一個內部的Topic內。由於Topic數據具有持久性,因此事務的狀態也具有持久性。
Producer並不直接讀寫Transaction Log
,它與Transaction Coordinator
通信,然后由Transaction Coordinator
將該事務的狀態插入相應的Transaction Log
。
Transaction Log
的設計與Offset Log
用於保存Consumer的Offset類似。
事務中Offset的提交
許多基於Kafka的應用,尤其是Kafka Stream應用中同時包含Consumer和Producer,前者負責從Kafka中獲取消息,后者負責將處理完的數據寫回Kafka的其它Topic中。
為了實現該場景下的事務的原子性,Kafka需要保證對Consumer Offset的Commit與Producer對發送消息的Commit包含在同一個事務中。否則,如果在二者Commit中間發生異常,根據二者Commit的順序可能會造成數據丟失和數據重復:
-
如果先Commit Producer發送數據的事務再Commit Consumer的Offset,即
At Least Once
語義,可能造成數據重復。 -
如果先Commit Consumer的Offset,再Commit Producer數據發送事務,即
At Most Once
語義,可能造成數據丟失。
用於事務特性的控制型消息
為了區分寫入Partition的消息被Commit還是Abort,Kafka引入了一種特殊類型的消息,即Control Message
。該類消息的Value內不包含任何應用相關的數據,並且不會暴露給應用程序。它只用於Broker與Client間的內部通信。
對於Producer端事務,Kafka以Control Message的形式引入一系列的Transaction Marker
。Consumer即可通過該標記判定對應的消息被Commit了還是Abort了,然后結合該Consumer配置的隔離級別決定是否應該將該消息返回給應用程序。
Data flow
At a high level, the data flow can be broken into four distinct types.
A: the producer and transaction coordinator interaction
When executing transactions, the producer makes requests to the transaction coordinator at the following points:
- The initTransactions API registers a transactional.id with the coordinator. At this point, the coordinator closes any pending transactions with that transactional.id and bumps the epoch to fence out zombies. This happens only once per producer session.
- When the producer is about to send data to a partition for the first time in a transaction, the partition is registered with the coordinator first.
- When the application calls commitTransaction or abortTransaction, a request is sent to the coordinator to begin the two phase commit protocol.
B: the coordinator and transaction log interaction
As the transaction progresses, the producer sends the requests above to update the state of the transaction on the coordinator. The transaction coordinator keeps the state of each transaction it owns in memory, and also writes that state to the transaction log (which is replicated three ways and hence is durable).
The transaction coordinator is the only component to read and write from the transaction log. If a given broker fails, a new coordinator is elected as the leader for the transaction log partitions the dead broker owned, and it reads the messages from the incoming partitions to rebuild its in-memory state for the transactions in those partitions.
C: the producer writing data to target topic-partitions
After registering new partitions in a transaction with the coordinator, the producer sends data to the actual partitions as normal. This is exactly the same producer.send flow, but with some extra validation to ensure that the producer isn’t fenced.
D: the coordinator to topic-partition interaction
After the producer initiates a commit (or an abort), the coordinator begins the two phase commit protocol.
In the first phase, the coordinator updates its internal state to “prepare_commit” and updates this state in the transaction log. Once this is done the transaction is guaranteed to be committed no matter what.
The coordinator then begins phase 2, where it writes transaction commit markers to the topic-partitions which are part of the transaction.
These transaction markers are not exposed to applications, but are used by consumers in read_committed mode to filter out messages from aborted transactions and to not return messages which are part of open transactions (i.e., those which are in the log but don’t have a transaction marker associated with them).
Once the markers are written, the transaction coordinator marks the transaction as “complete” and the producer can start the next transaction.
事務處理樣例代碼:
Producer<String, String> producer = new KafkaProducer<String, String>(props); // 初始化事務,包括結束該Transaction ID對應的未完成的事務(如果有) // 保證新的事務在一個正確的狀態下啟動 producer.initTransactions(); // 開始事務 producer.beginTransaction(); // 消費數據 ConsumerRecords<String, String> records = consumer.poll(100); try{ // 發送數據 producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value")); // 發送消費數據的Offset,將上述數據消費與數據發送納入同一個Transaction內 producer.sendOffsetsToTransaction(offsets, "group1"); // 數據發送及Offset發送均成功的情況下,提交事務 producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // 數據發送或者Offset發送出現異常時,終止事務 producer.abortTransaction(); } finally { // 關閉Producer和Consumer producer.close(); consumer.close(); }
參考:
Kafka 0.11.0.0 是如何實現 Exactly-once 語義的
Kafka設計解析(八)- Exactly Once語義與事務機制原理
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
https://www.confluent.io/blog/transactions-apache-kafka/