項目要求使用kafka的事務,遇到了一些問題,研究了下kafka的事務機制記錄一下。
kafka事務是為了實現:
-
Exactly Once即正好一次語義
-
操作的原子性
-
有狀態操作的可恢復性
kafka的冪等性可以實現Exactly Once語義,冪等性提供了單會話單分區的Exactly-Once 語義的實現,冪等性實現是事務性實現的基礎。Kafka在引入冪等性之前,Producer向Broker發送消息,然后Broker將消息追加到消息流中后給Producer返回Ack信號值。實現流程如下:

然而實際生產環境中會出現各種不確定的因素,比如在Producer在發送給Broker的時候出現網絡異常。比如以下這種異常情況的出現:

上圖這種情況,當Producer第一次發送消息給Broker時,Broker將消息(x2,y2)追加到了消息流中,但是在返回Ack信號給Producer時失敗了(比如網絡異常) 。此時,Producer端觸發重試機制,將消息(x2,y2)重新發送給Broker,Broker接收到消息后,再次將該消息追加到消息流中,然后成功返回Ack信號給Producer。這樣下來,消息流中就被重復追加了兩條相同的(x2,y2)的消息。
冪等性:
保證在消息重發的時候,消費者不會重復處理。即使在消費者收到重復消息的時候,重復處理,也要保證最終結果的一致性。所謂冪等性是指producer向server發送多條重復數據,server端只會持久化一條數據;數學概念就是: f(f(x)) = f(x) 。f函數表示對消息的處理。比如,銀行轉賬,如果失敗,需要重試。不管重試多少次,都要保證最終結果一定是一致的
冪等性的實現機制:
冪等性的實現離不開ack機制,ack=1 只要下端的broker的leader分區寫入成功則任務是成功的;ack =0 producer之發送一次,不管對端有沒有寫入成功,對應語義是at most once;ack = -1 確保所有分區均寫入成功,對應語義是at least once;
at least once確保了至少會發送一次,冪等性確保了及時收到重復消息也不會重復處理,因此想要實現 Exactly once ,可以將at least once與冪等性結合,即Exactly once = at least once + 冪等性。當使用冪等性時,此時默認ack=-1。 而冪等性確保了不會發送重復的數據,
為實現冪等性,引入了兩個概念:
-
ProducerID:在每個新的Producer初始化時,會被分配一個唯一的ProducerID,這個ProducerID對客戶端使用者是不可見的。
-
SequenceNumber:對於每個ProducerID,Producer發送數據的每個Topic和Partition都對應一個從0開始單調遞增的SequenceNumber值。
Broker端會將<Pid,Partition,SequenceNumber> 持久化,具有相同主鍵的消息只會持久化一條。
冪等性解決的問題:

當Producer發送消息(x2,y2)給Broker時,Broker接收到消息並將其追加到消息流中。此時,Broker返回Ack信號給Producer時,發生異常導致Producer接收Ack信號失敗。對於Producer來說,會觸發重試機制,將消息(x2,y2)再次發送,但是,由於引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發送給Broker,而之前Broker緩存過之前發送的相同的消息,那么在消息流中的消息就只有一條(x2,y2),不會出現重復發送的情況;
如果消息序號比Broker維護的序號大一以上,說明中間有數據尚未寫入,也即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
-
Broker保存消息后,發送ACK前宕機,Producer認為消息未發送成功並重試,造成數據重復。
-
前一條消息發送失敗,后一條消息發送成功,前一條消息重試后成功,造成數據亂序(由SequenceNumber)。
局限性:

只能保證 Producer 在單個會話內不丟不重,如果 Producer 出現意外掛掉再重啟是無法保證的(冪等性情況下,是無法獲取之前的狀態信息,因此是無法做到跨會話級別的不丟不重);冪等性不能跨多個 Topic-Partition,只能保證單個 partition 內的冪等性,當涉及多個 Topic-Partition 時,這中間的狀態並沒有同步。如果需要跨會話、跨多個 topic-partition 的情況,需要使用 Kafka 的事務性來實現。
事務:
事務可以保證讀寫操作的原子性,要么全部成功,要么全部失敗,即使該生產或消費跨多個<Topic, Partition>。尤其對於Kafka Stream應用而言,典型的操作即是從某個Topic消費數據,經過一系列轉換后寫回另一個Topic,保證從源Topic的讀取與向目標Topic的寫入的原子性有助於從故障中恢復。
-
Exactly Once即正好一次語義
-
操作的原子性
-
有狀態操作的可恢復性
實現機制
為實現這種效果應用程序必須提供一個穩定的(重啟后不變)唯一的ID,也即Transaction ID。Transactin ID與PID可能一一對應。區別在於Transaction ID由用戶提供,而PID是內部的實現對用戶透明。<Pid,Partition,SequenceNumber>+Transaction ID,通過不變的Transactin ID保證了跨會話的精准一次。
另外,為了保證新的Producer啟動后,舊的具有相同Transaction ID的Producer即失效,每次Producer通過Transaction ID拿到PID的同時,還會獲取一個單調遞增的epoch。由於舊的Producer的epoch比新Producer的epoch小,Kafka可以很容易識別出該Producer是老的Producer並拒絕其請求。如果使用同一個TransactionID 開啟兩個生產者,那么前一個開啟的生產者會報錯:Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.
有了Transaction ID后,Kafka可保證:
-
跨Session的數據冪等發送。當具有相同Transaction ID的新的Producer實例被創建且工作時,舊的且擁有相同Transaction ID的Producer將不再工作。
-
跨Session的事務恢復。如果某個應用實例宕機,新的實例可以保證任何未完成的舊的事務要么Commit要么Abort,使得新實例從一個正常狀態開始工作。
事務流程圖:

具體的執行步驟:
1.查找Tranaction Corordinator,Producer向任意一個brokers發送 FindCoordinatorRequest請求來獲取Transaction Coordinator的地址;根據設置的transactional-id1的哈希值計算對_transaction_state分區數取余運算,找到分區編號,該分區對應的leader副本所對應的broker的即為Tranaction Coordinator所在節點。
2.初始化事務 initTransaction,此步驟為了獲得和保存<TransactionId,pid>的映射關系,將<TransactionId,pid>持久化到內部主題中。恢復(Commit或Abort)之前的Producer未完成的事務,對PID對應的epoch進行遞增,這樣可以保證同一個app的不同實例對應的PID是一樣,而epoch是不同的。
3.開始事務beginTransaction,執行Producer的beginTransacion(),它的作用是Producer在本地記錄下這個transaction的狀態為開始狀態。這個操作並沒有通知Transaction Coordinator,因為Transaction Coordinator只有在Producer發送第一條消息后才認為事務已經開啟。
4.read-process-write流程。這一階段,包含了整個事務的數據處理過程,並且包含了多種請求。包括存儲<TransactionId,TopicPartition>關系到_transaction_state,有了這個對應關系就可以后續給每個分區設置COMMIT和ABORT,根據groupId推導出在_consumer_offsets中的分區,並將該分區存儲到_transaction_state中,發送請求給GroupCoordinator,從而將本次事務消費的offsets存儲至_consumer_offsets.
5.事務提交或終結 commitTransaction/abortTransaction。第一階段,將Transaction Log內的該事務狀態設置為PREPARE_COMMIT或PREPARE_ABORT,第二階段,將Transaction Marker寫入該事務涉及到的所有消息(即將消息標記為committed或aborted)。這一步驟Transaction Coordinator會發送給當前事務涉及到的每個<Topic, Partition>的Leader,Broker收到該請求后,會將對應的Transaction Marker控制信息寫入日志。一旦Transaction Marker寫入完成,Transaction Coordinator會將最終的COMPLETE_COMMIT或COMPLETE_ABORT狀態寫入Transaction Log中以標明該事務結束。
事務使用的場景:
- 只有Producer生產消息,這種場景需要事務的介入;
- 消費消息和生產消息並存,比如Consumer&Producer模式,這種場景是一般Kafka項目中比較常見的模式,需要事務介入;
- 只有Consumer消費消息,這種操作在實際項目中意義不大,和手動Commit Offsets的結果一樣,而且這種場景不是事務的引入目的。
事務提供了哪些可使用的API?
// 初始化事務,需要注意確保transation.id屬性被分配
void initTransactions();
// 開啟事務
void beginTransaction() throws ProducerFencedException;
// 為Consumer提供的在事務內Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 提交事務
void commitTransaction() throws ProducerFencedException;
// 放棄事務,類似於回滾事務的操作
void abortTransaction() throws ProducerFencedException;
事務幾個重要的配置:
-
transactional.id=transactional-id1,transactional.id唯一不變的,便於程序異常重啟后恢復事務;
-
enable.idempotence 冪等性,在開啟事務的時候自動設置enable.idempotence=true。
-
isolation.level =read_committed 當commitTransaction()方法執行成功后,讀取已經提交事務的消息。
事務樣例
- Producer 模式:
保證的producer發送一批數據,要么全部成功要么全部失敗。
// 初始化事務
producer.initTransactions();
// 開啟事務
producer.beginTransaction();
try {
producer.send(new ProducerRecord<>("topic1", "tx_msg_01"));
producer.send(new ProducerRecord<>("topic2", "tx_msg_02"));
// int i = 1 / 0; ##制造異常情況
// 提交事務
producer.commitTransaction();
} catch (Exception ex) {
// 中止事務
producer.abortTransaction();
} finally {
// 關閉生產者
producer.close();
- 消費-轉換-發送 模式:
Consumer Offset的Commit與Producer對發送消息的Commit包含在同一個事務中。
producer.initTransactions(); ##初始化事務
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
if(!records.isEmpty()){
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
producer.beginTransaction(); ##開始事務
System.out.println("beginTransaction");
## 獲取每一個分區的數據
for (TopicPartition partition:records.partitions()){
List<ConsumerRecord<String,String>> partitionsRecords = records.records(partition);
for (ConsumerRecord<String,String> record : partitionsRecords){
String msg = record.value();
## 發送數據
producer.send(new ProducerRecord<>(sendTopicName, record.key(),record.value()));
System.out.println("send msg!");
}
long lastConsumedOffset = partitionsRecords.get(partitionsRecords.size() - 1).offset();
##保存每個分區最后一個offset
commits.put(partition,new OffsetAndMetadata(lastConsumedOffset + 1));
}
## 發送消費數據的Offset,將上述數據消費與數據發送納入同一個Transaction內
producer.sendOffsetsToTransaction(commits,groupid );
## 提交事務
producer.commitTransaction();
}
}
} catch (Exception e) {
if (e instanceof ProducerFencedException){
System.out.println("ProducerFencedException");
}
if (e instanceof InterruptedException) {
System.out.println("thread interrupted");
} else {
System.out.println("error");
e.printStackTrace();
}
} finally {
System.out.println("finally");
producer.abortTransaction();
consumer.close();
producer.flush();
producer.close();
}
異常情況:
producer.send();上述代碼中,使用的是異步發送,若是發送過程中出現異常,是不會倍捕獲到的,使用get()方法能夠捕獲到。使用同步發送方法producer.send().get(),當發送過程中發生異常是可以捕獲到的,但是使用同步發送方法消息是極低的,大概幾百tps。
局限性:
經過測試kafka1.8 與2.5版本,使用consume-transform-produce 的事務模型,在本集群中從topic A--topic B 發送沒有什么問題;但是跨集群從集群A 的topicA--集群B topicB 時offset 無法更新,kafka不支持跨集群事務。在kafka集群中存在一個協調者,由協調者統一提交consumer的offsets以及producer的messages。整個read-process-write流程都必須在協調者的監控之下。這也是為什么消費者和生產者必須在同一個集群內的原因。
小結:
- PID與Sequence Number的引入實現了寫操作的冪等性
- 寫操作的冪等性結合At Least Once語義實現了單一Session內的Exactly Once語義
- Transaction Marker與PID提供了識別消息是否應該被讀取的能力,從而實現了事務的隔離性
- Offset的更新標記了消息是否被讀取,從而將對讀操作的事務處理轉換成了對寫(Offset)操作的事務處理
- Kafka事務的本質是,將一組寫操作(如果有)對應的消息與一組讀操作(如果有)對應的Offset的更新進行同樣的標記(即Transaction Marker)來實現事務中涉及的所有讀寫操作同時對外可見或同時對外不可見
- Kafka只提供對Kafka本身的讀寫操作的事務性,不提供包含外部系統的事務性
參考:
http://www.jasongj.com/kafka/transaction/
http://matt33.com/2018/10/24/kafka-idempotent/
