1. Kafka事務的使用
Kafka中的事務特性主要用於以下兩種場景:
- 生產者發送多條消息可以封裝在一個事務中,形成一個原子操作。多條消息要么都發送成功,要么都發送失敗。
- read-process-write模式:將消息消費和生產封裝在一個事務中,形成一個原子操作。在一個流式處理的應用中,常常一個服務需要從上游接收消息,然后經過處理后送達到下游,這就對應着消息的消費和生成。
當事務中僅僅存在Consumer消費消息的操作時,它和Consumer手動提交Offset並沒有區別。因此單純的消費消息並不是Kafka引入事務機制的原因,單純的消費消息也沒有必要存在於一個事務中。
Kafka producer API提供了以下接口用於事務操作:
/** * 初始化事務 */ public void initTransactions(); /** * 開啟事務 */ public void beginTransaction() throws ProducerFencedException ; /** * 在事務內提交已經消費的偏移量 */ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException ; /** * 提交事務 */ public void commitTransaction() throws ProducerFencedException; /** * 丟棄事務 */ public void abortTransaction() throws ProducerFencedException ;
下面是使用Kafka事務特性的例子,這段代碼Producer開啟了一個事務,然后在這個事務中發送了兩條消息。這兩條消息要么都發送成功,要么都失敗。
KafkaProducer producer = createKafkaProducer( "bootstrap.servers", "localhost:9092", "transactional.id”, “my-transactional-id"); producer.initTransactions(); producer.beginTransaction(); producer.send("outputTopic", "message1"); producer.send("outputTopic", "message2"); producer.commitTransaction();
下面這段代碼即為read-process-write模式,在一個Kafka事務中,同時涉及到了生產消息和消費消息。
KafkaProducer producer = createKafkaProducer( "bootstrap.servers", "localhost:9092", "transactional.id", "my-transactional-id"); KafkaConsumer consumer = createKafkaConsumer( "bootstrap.servers", "localhost:9092", "group.id", "my-group-id", "isolation.level", "read_committed"); consumer.subscribe(singleton("inputTopic")); producer.initTransactions(); while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); producer.beginTransaction(); for (ConsumerRecord record : records) producer.send(producerRecord(“outputTopic”, record)); producer.sendOffsetsToTransaction(currentOffsets(consumer), group); producer.commitTransaction(); }
注意:在理解消息的事務時,一直處於一個錯誤理解是,把操作db的業務邏輯跟操作消息當成是一個事務,如下所示:
void kakfa_in_tranction(){ // 1.kafa的操作:讀取消息或生產消息 kafkaOperation(); // 2.db操作 dbOperation(); }
其實這個是有問題的。操作DB數據庫的數據源是DB,消息數據源是kfaka,這是完全不同兩個數據。一種數據源(如mysql,kafka)對應一個事務,所以它們是兩個獨立的事務。kafka事務指kafka一系列 生產、消費消息等操作組成一個原子操作,db事務是指操作數據庫的一系列增刪改操作組成一個原子操作。
2. Kafka事務配置
- 對於Producer,需要設置
transactional.id
屬性,這個屬性的作用下文會提到。設置了transactional.id
屬性后,enable.idempotence
屬性會自動設置為true。 - 對於Consumer,需要設置
isolation.level = read_committed
,這樣Consumer只會讀取已經提交了事務的消息。另外,需要設置enable.auto.commit = false
來關閉自動提交Offset功能。
3. Kafka事務特性
Kafka的事務特性本質上代表了三個功能:原子寫操作,拒絕僵屍實例(Zombie fencing)和讀事務消息。
3.1 原子寫
Kafka的事務特性本質上是支持了Kafka跨分區和Topic的原子寫操作。在同一個事務中的消息要么同時寫入成功,要么同時寫入失敗。我們知道,Kafka中的Offset信息存儲在一個名為_consumed_offsets的Topic中,因此read-process-write模式,除了向目標Topic寫入消息,還會向_consumed_offsets中寫入已經消費的Offsets數據。因此read-process-write本質上就是跨分區和Topic的原子寫操作。Kafka的事務特性就是要確保跨分區的多個寫操作的原子性。
3.2 拒絕僵屍實例(Zombie fencing)
在分布式系統中,一個instance的宕機或失聯,集群往往會自動啟動一個新的實例來代替它的工作。此時若原實例恢復了,那么集群中就產生了兩個具有相同職責的實例,此時前一個instance就被稱為“僵屍實例(Zombie Instance)”。在Kafka中,兩個相同的producer同時處理消息並生產出重復的消息(read-process-write模式),這樣就嚴重違反了Exactly Once Processing的語義。這就是僵屍實例問題。
Kafka事務特性通過transaction-id
屬性來解決僵屍實例問題。所有具有相同transaction-id
的Producer都會被分配相同的pid,同時每一個Producer還會被分配一個遞增的epoch。Kafka收到事務提交請求時,如果檢查當前事務提交者的epoch不是最新的,那么就會拒絕該Producer的請求。從而達成拒絕僵屍實例的目標。
3.3 讀事務消息
為了保證事務特性,Consumer如果設置了isolation.level = read_committed
,那么它只會讀取已經提交了的消息。在Producer成功提交事務后,Kafka會將所有該事務中的消息的Transaction Marker
從uncommitted
標記為committed
狀態,從而所有的Consumer都能夠消費。
4. Kafka事務原理
Kafka為了支持事務特性,引入一個新的組件:Transaction Coordinator。主要負責分配pid,記錄事務狀態等操作。下面時Kafka開啟一個事務到提交一個事務的流程圖:

主要分為以下步驟:
1. 查找Tranaction Corordinator
Producer向任意一個brokers發送 FindCoordinatorRequest請求來獲取Transaction Coordinator的地址。
2. 初始化事務 initTransaction
Producer發送InitpidRequest給Transaction Coordinator,獲取pid。Transaction Coordinator在Transaciton Log中記錄這<TransactionId,pid>的映射關系。另外,它還會做兩件事:
- 恢復(Commit或Abort)之前的Producer未完成的事務
- 對PID對應的epoch進行遞增,這樣可以保證同一個app的不同實例對應的PID是一樣,而epoch是不同的。
只要開啟了冪等特性即必須執行InitpidRequest,而無須考慮該Producer是否開啟了事務特性。
3. 開始事務beginTransaction
執行Producer的beginTransacion(),它的作用是Producer在本地記錄下這個transaction的狀態為開始狀態。這個操作並沒有通知Transaction Coordinator,因為Transaction Coordinator只有在Producer發送第一條消息后才認為事務已經開啟。
4. read-process-write流程
一旦Producer開始發送消息,Transaction Coordinator會將該<Transaction, Topic, Partition>存於Transaction Log內,並將其狀態置為BEGIN。另外,如果該<Topic, Partition>為該事務中第一個<Topic, Partition>,Transaction Coordinator還會啟動對該事務的計時(每個事務都有自己的超時時間)。
在注冊<Transaction, Topic, Partition>到Transaction Log后,生產者發送數據,雖然沒有還沒有執行commit或者abort,但是此時消息已經保存到Broker上了。即使后面執行abort,消息也不會刪除,只是更改狀態字段標識消息為abort狀態。
5. 事務提交或終結 commitTransaction/abortTransaction
在Producer執行commitTransaction/abortTransaction時,Transaction Coordinator會執行一個兩階段提交:
- 第一階段,將Transaction Log內的該事務狀態設置為
PREPARE_COMMIT
或PREPARE_ABORT
- 第二階段,將
Transaction Marker
寫入該事務涉及到的所有消息(即將消息標記為committed
或aborted
)。這一步驟Transaction Coordinator會發送給當前事務涉及到的每個<Topic, Partition>的Leader,Broker收到該請求后,會將對應的Transaction Marker
控制信息寫入日志。
一旦Transaction Marker
寫入完成,Transaction Coordinator會將最終的COMPLETE_COMMIT
或COMPLETE_ABORT
狀態寫入Transaction Log中以標明該事務結束。
5. 總結
- Transaction Marker與PID提供了識別消息是否應該被讀取的能力,從而實現了事務的隔離性。
- Offset的更新標記了消息是否被讀取,從而將對讀操作的事務處理轉換成了對寫(Offset)操作的事務處理。
- Kafka事務的本質是,將一組寫操作(如果有)對應的消息與一組讀操作(如果有)對應的Offset的更新進行同樣的標記(Transaction Marker)來實現事務中涉及的所有讀寫操作同時對外可見或同時對外不可見。
- Kafka只提供對Kafka本身的讀寫操作的事務性,不提供包含外部系統的事務性。