Kafka事務特性詳解


1. Kafka事務的使用

Kafka中的事務特性主要用於以下兩種場景:

  • 生產者發送多條消息可以封裝在一個事務中,形成一個原子操作。多條消息要么都發送成功,要么都發送失敗。
  • read-process-write模式:將消息消費和生產封裝在一個事務中,形成一個原子操作。在一個流式處理的應用中,常常一個服務需要從上游接收消息,然后經過處理后送達到下游,這就對應着消息的消費和生成。

當事務中僅僅存在Consumer消費消息的操作時,它和Consumer手動提交Offset並沒有區別。因此單純的消費消息並不是Kafka引入事務機制的原因,單純的消費消息也沒有必要存在於一個事務中。

Kafka producer API提供了以下接口用於事務操作:

    /**
     * 初始化事務
     */
    public void initTransactions();
 
    /**
     * 開啟事務
     */
    public void beginTransaction() throws ProducerFencedException ;
 
    /**
     * 在事務內提交已經消費的偏移量
     */
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, 
                                         String consumerGroupId) throws ProducerFencedException ;
 
    /**
     * 提交事務
     */
    public void commitTransaction() throws ProducerFencedException;
 
    /**
     * 丟棄事務
     */
    public void abortTransaction() throws ProducerFencedException ;

下面是使用Kafka事務特性的例子,這段代碼Producer開啟了一個事務,然后在這個事務中發送了兩條消息。這兩條消息要么都發送成功,要么都失敗。

KafkaProducer producer = createKafkaProducer(
  "bootstrap.servers", "localhost:9092",
  "transactional.id”, “my-transactional-id");

producer.initTransactions();
producer.beginTransaction();
producer.send("outputTopic", "message1");
producer.send("outputTopic", "message2");
producer.commitTransaction();

下面這段代碼即為read-process-write模式,在一個Kafka事務中,同時涉及到了生產消息和消費消息。

KafkaProducer producer = createKafkaProducer(
  "bootstrap.servers", "localhost:9092",
  "transactional.id", "my-transactional-id");

KafkaConsumer consumer = createKafkaConsumer(
  "bootstrap.servers", "localhost:9092",
  "group.id", "my-group-id",
  "isolation.level", "read_committed");

consumer.subscribe(singleton("inputTopic"));

producer.initTransactions();

while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  producer.beginTransaction();
  for (ConsumerRecord record : records)
    producer.send(producerRecord(“outputTopic”, record));
  producer.sendOffsetsToTransaction(currentOffsets(consumer), group);  
  producer.commitTransaction();
}

注意:在理解消息的事務時,一直處於一個錯誤理解是,把操作db的業務邏輯跟操作消息當成是一個事務,如下所示:

void  kakfa_in_tranction(){
  // 1.kafa的操作:讀取消息或生產消息
  kafkaOperation();
  // 2.db操作
  dbOperation();
}

其實這個是有問題的。操作DB數據庫的數據源是DB,消息數據源是kfaka,這是完全不同兩個數據。一種數據源(如mysql,kafka)對應一個事務,所以它們是兩個獨立的事務。kafka事務指kafka一系列 生產、消費消息等操作組成一個原子操作,db事務是指操作數據庫的一系列增刪改操作組成一個原子操作。

2. Kafka事務配置

  • 對於Producer,需要設置transactional.id屬性,這個屬性的作用下文會提到。設置了transactional.id屬性后,enable.idempotence屬性會自動設置為true。
  • 對於Consumer,需要設置isolation.level = read_committed,這樣Consumer只會讀取已經提交了事務的消息。另外,需要設置enable.auto.commit = false來關閉自動提交Offset功能。

3. Kafka事務特性

Kafka的事務特性本質上代表了三個功能:原子寫操作拒絕僵屍實例(Zombie fencing)和讀事務消息

3.1 原子寫

Kafka的事務特性本質上是支持了Kafka跨分區和Topic的原子寫操作。在同一個事務中的消息要么同時寫入成功,要么同時寫入失敗。我們知道,Kafka中的Offset信息存儲在一個名為_consumed_offsets的Topic中,因此read-process-write模式,除了向目標Topic寫入消息,還會向_consumed_offsets中寫入已經消費的Offsets數據。因此read-process-write本質上就是跨分區和Topic的原子寫操作。Kafka的事務特性就是要確保跨分區的多個寫操作的原子性。

3.2 拒絕僵屍實例(Zombie fencing)

在分布式系統中,一個instance的宕機或失聯,集群往往會自動啟動一個新的實例來代替它的工作。此時若原實例恢復了,那么集群中就產生了兩個具有相同職責的實例,此時前一個instance就被稱為“僵屍實例(Zombie Instance)”。在Kafka中,兩個相同的producer同時處理消息並生產出重復的消息(read-process-write模式),這樣就嚴重違反了Exactly Once Processing的語義。這就是僵屍實例問題。

Kafka事務特性通過transaction-id屬性來解決僵屍實例問題。所有具有相同transaction-id的Producer都會被分配相同的pid,同時每一個Producer還會被分配一個遞增的epoch。Kafka收到事務提交請求時,如果檢查當前事務提交者的epoch不是最新的,那么就會拒絕該Producer的請求。從而達成拒絕僵屍實例的目標。

3.3 讀事務消息

為了保證事務特性,Consumer如果設置了isolation.level = read_committed,那么它只會讀取已經提交了的消息。在Producer成功提交事務后,Kafka會將所有該事務中的消息的Transaction Markeruncommitted標記為committed狀態,從而所有的Consumer都能夠消費。

4. Kafka事務原理

Kafka為了支持事務特性,引入一個新的組件:Transaction Coordinator。主要負責分配pid,記錄事務狀態等操作。下面時Kafka開啟一個事務到提交一個事務的流程圖:

 
KafkaTransaction.png

主要分為以下步驟:

1. 查找Tranaction Corordinator

Producer向任意一個brokers發送 FindCoordinatorRequest請求來獲取Transaction Coordinator的地址。

2. 初始化事務 initTransaction

Producer發送InitpidRequest給Transaction Coordinator,獲取pid。Transaction Coordinator在Transaciton Log中記錄這<TransactionId,pid>的映射關系。另外,它還會做兩件事:

  • 恢復(Commit或Abort)之前的Producer未完成的事務
  • 對PID對應的epoch進行遞增,這樣可以保證同一個app的不同實例對應的PID是一樣,而epoch是不同的。

只要開啟了冪等特性即必須執行InitpidRequest,而無須考慮該Producer是否開啟了事務特性。

3. 開始事務beginTransaction

執行Producer的beginTransacion(),它的作用是Producer在本地記錄下這個transaction的狀態為開始狀態。這個操作並沒有通知Transaction Coordinator,因為Transaction Coordinator只有在Producer發送第一條消息后才認為事務已經開啟。

4. read-process-write流程

一旦Producer開始發送消息,Transaction Coordinator會將該<Transaction, Topic, Partition>存於Transaction Log內,並將其狀態置為BEGIN。另外,如果該<Topic, Partition>為該事務中第一個<Topic, Partition>,Transaction Coordinator還會啟動對該事務的計時(每個事務都有自己的超時時間)。

在注冊<Transaction, Topic, Partition>到Transaction Log后,生產者發送數據,雖然沒有還沒有執行commit或者abort,但是此時消息已經保存到Broker上了。即使后面執行abort,消息也不會刪除,只是更改狀態字段標識消息為abort狀態。

5. 事務提交或終結 commitTransaction/abortTransaction

在Producer執行commitTransaction/abortTransaction時,Transaction Coordinator會執行一個兩階段提交:

  • 第一階段,將Transaction Log內的該事務狀態設置為PREPARE_COMMITPREPARE_ABORT
  • 第二階段,將Transaction Marker寫入該事務涉及到的所有消息(即將消息標記為committedaborted)。這一步驟Transaction Coordinator會發送給當前事務涉及到的每個<Topic, Partition>的Leader,Broker收到該請求后,會將對應的Transaction Marker控制信息寫入日志。

一旦Transaction Marker寫入完成,Transaction Coordinator會將最終的COMPLETE_COMMITCOMPLETE_ABORT狀態寫入Transaction Log中以標明該事務結束。

5. 總結

  • Transaction Marker與PID提供了識別消息是否應該被讀取的能力,從而實現了事務的隔離性。
  • Offset的更新標記了消息是否被讀取,從而將對讀操作的事務處理轉換成了對寫(Offset)操作的事務處理。
  • Kafka事務的本質是,將一組寫操作(如果有)對應的消息與一組讀操作(如果有)對應的Offset的更新進行同樣的標記(Transaction Marker)來實現事務中涉及的所有讀寫操作同時對外可見或同時對外不可見。
  • Kafka只提供對Kafka本身的讀寫操作的事務性,不提供包含外部系統的事務性。

參考文章




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM