Kafka 事務


在了解 Kafka的事務之前,先說一下 Kafka中冪等和事務(Kafka 0.11.0.0版本引入的兩個特性)以此來實現 Exactly once(精確一次)了解更多鏈接。
冪等:生產者在進行重試的時候有可能會重復寫入消息,而使用 Kafka的冪等性功能之后就可以避免這種情況。
生產者事務相關配置
開啟冪等性功能的方式很簡單,只需顯式地將生產者客戶端參數 enable.idempotence=true(默認值為false)Kafka的冪等只能保證單個生產者會話(session)中單分區的冪等。冪等性不能跨多個分區運作,而事務可以彌補這個缺陷。事務可以保證對多個分區寫入操作的原子性。操作的原子性是指多個操作要么全部成功,要么全部失敗,不存在部分成功、部分失敗的可能。
為了使用事務,Producer 必須顯式設置唯一的 transactionalId。事務要求生產者開啟冪等性,因此通過將 transactional.id參數設置為非空從而開啟事務特性的同時需要將 enable.idempotence設置為true(設置 transactional.id后,enable.idempotence會自動設置為true),如果用戶顯式地將 enable.idempotence設置為false,則會報出 ConfigException的異常。
transactionalId 與 PID一一對應,兩者不同的是 transactionalId由用戶顯式設置,而 PID是由 Kafka內部分配的。
拒絕僵屍實例(Zombie fencing):為了保證新的生產者啟動后具有相同 transactionalId的舊生產者能夠立即失效,每個生產者通過 transactionalId獲取 PID的同時,還會獲取一個單調遞增的 producer epoch。如果使用同一個 transactionalId開啟兩個生產者,Kafka收到事務提交請求時檢查當前事務提交者的 epoch不是最新的,那么就會拒絕該 Producer的請求。從而達成拒絕僵屍實例的目標。
Kafka中的事務特性主要用於以下兩種場景:
【1】生產者發送多條消息可以封裝在一個事務中,形成一個原子操作。多條消息要么都發送成功,要么都發送失敗。
【2】read-process-write模式:將消息消費和生產封裝在一個事務中,形成一個原子操作。在一個流式處理的應用中,常常一個服務需要從上游接收消息,然后經過處理后送達到下游,這就對應着消息的消費和生成。
從生產者的角度分析,通過事務,Kafka可以保證跨生產者會話的消息冪等發送,以及跨生產者會話的事務恢復。前者表示具有相同 transactionalId的新生產者實例被創建且工作的時候,舊的且擁有相同 transactionalId的生產者實例將不再工作。后者指當某個生產者實例宕機后,新的生產者實例可以保證任何未完成的舊事務要么被提交(Commit),要么被中止(Abort),如此可以使新的生產者實例從一個正常的狀態開始工作。KafkaProducer提供了5個與事務相關的方法,詳細如下:

/**
 * 初始化事務
 */
public void initTransactions();
/**
 * 開啟事務
 */
public void beginTransaction() throws ProducerFencedException ;
/**
 * 在事務內提交已經消費的偏移量
 */
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                     String consumerGroupId) throws ProducerFencedException ;
/**
 * 提交事務
 */
public void commitTransaction() throws ProducerFencedException;
/**
 * 丟棄事務
 */
public void abortTransaction() throws ProducerFencedException ;

initTransactions()方法用來初始化事務;beginTransaction()方法用來開啟事務;sendOffsetsToTransaction()方法為消費者提供在事務內的位移提交的操作;commitTransaction()方法用來提交事務;abortTransaction()方法用來中止事務,類似於事務回滾。
下面是使用 Kafka事務特性的例子,這段代碼 Producer開啟了一個事務,然后在這個事務中發送了兩條消息。這兩條消息要么都發送成功,要么都失敗。

KafkaProducer producer = createKafkaProducer("bootstrap.servers", "localhost:9092",  "transactional.id”, “my-transactional-id");
producer.initTransactions();
producer.beginTransaction();
producer.send("outputTopic", "message1");
producer.send("outputTopic", "message2");
producer.commitTransaction();

下面這段代碼即為 read-process-write模式,在一個 Kafka事務中,同時涉及到了生產消息和消費消息。 

KafkaProducer producer = createKafkaProducer(
  "bootstrap.servers", "localhost:9092",
  "transactional.id", "my-transactional-id");
 
KafkaConsumer consumer = createKafkaConsumer(
  "bootstrap.servers", "localhost:9092",
  "group.id", "my-group-id",
  "isolation.level", "read_committed");
 
consumer.subscribe(singleton("inputTopic"));
 
producer.initTransactions();
 
while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  producer.beginTransaction();
  for (ConsumerRecord record : records)
    producer.send(producerRecord(“outputTopic”, record));
  producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
  producer.commitTransaction();
}

注意:在理解消息的事務時,一直處於一個錯誤理解是,把操作 db的業務邏輯跟操作消息當成是一個事務,如下所示: 

void  kakfa_in_tranction(){
  // 1.kafa的操作:讀取消息或生產消息
  kafkaOperation();
  // 2.db操作
  dbOperation();
}

其實這個是有問題的。操作 DB數據庫的數據源是DB,消息數據源是kfaka,這是完全不同兩個數據。一種數據源(如mysql,kafka)對應一個事務,所以它們是兩個獨立的事務。kafka事務指 kafka一系列生產、消費消息等操作組成一個原子操作,db事務是指操作數據庫的一系列增刪改操作組成一個原子操作。
消費者事務相關配置
在消費端有一個參數isolation.level,與事務有關,這個參數的默認值為“read_uncommitted”,意思是說消費端應用可以看到(消費到)未提交的事務,當然對於已提交的事務也是可見的。這個參數還可以設置為“read_committed”,表示消費端應用不可以看到尚未提交的事務內的消息。另外,需要設置 enable.auto.commit = false來關閉自動提交 Offset功能。
舉個例子,如果生產者開啟事務並向某個分區值發送3條消息msg1、msg2和msg3,在執行 commitTransaction()或abortTransaction()方法前,設置為 “read_committed”的消費端應用是消費不到這些消息的,不過在 KafkaConsumer內部會緩存這些消息,直到生產者執行 commitTransaction()方法之后它才能將這些消息推送給消費端應用。反之,如果生產者執行了abortTransaction()方法,那么 KafkaConsumer會將這些緩存的消息丟棄而不推送給消費端應用。

日志文件中除了普通的消息,還有一種消息專門用來標志一個事務的結束,它就是控制消息(ControlBatch)。控制消息一共有兩種類型:COMMIT 和 ABORT,分別用來表征事務已經成功提交或已經被成功中止。RecordBatch 中 attributes字段的第6位用來標識當前消息是否是控制消息。如果是控制消息,那么這一位會置為1,否則會置為0,如上圖所示。attributes字段中的第5位用來標識當前消息是否處於事務中,如果是事務中的消息,那么這一位置為1,否則置為0。由於控制消息也處於事務中,所以attributes字段的第5位和第6位都被置為1。

Kafka 事務原理
Kafka為了支持事務特性,引入一個新的組件:Transaction Coordinator。主要負責分配pid,記錄事務狀態等操作。下面時Kafka開啟一個事務到提交一個事務的流程圖:

  • 恢復(Commit 或 Abort)之前的 Producer未完成的事務
  • 對 PID對應的 epoch進行遞增,這樣可以保證同一個 app的不同實例對應的 PID是一樣,而 epoch是不同的。
只要開啟了冪等性即必須執行 InitpidRequest,而無須考慮該 Producer是否開啟了事務特性。

【3】開始事務beginTransaction:執行 Producer的 beginTransacion(),它的作用是 Producer在本地記錄下這個 transaction的狀態為開始狀態。這個操作並沒有通知 Transaction Coordinator,因為 Transaction Coordinator只有在 Producer發送第一條消息后才認為事務已經開啟。
【4】read-process-write流程:一旦 Producer開始發送消息,Transaction Coordinator會將該<Transaction, Topic, Partition>存於 Transaction Log內,並將其狀態置為 BEGIN。另外,如果該 <Topic, Partition>為該事務中第一個 <Topic, Partition>,Transaction Coordinator 還會啟動對該事務的計時(每個事務都有自己的超時時間)。
在注冊<Transaction, Topic, Partition> 到 Transaction Log后,生產者發送數據,雖然沒有還沒有執行 commit或者 abort,但是此時消息已經保存到 Broker上了。即使后面執行 abort,消息也不會刪除,只是更改狀態字段標識消息為 abort狀態。
【5】事務提交或終結 commitTransaction/abortTransaction:在 Producer執行 commitTransaction/abortTransaction時,Transaction Coordinator會執行一個兩階段提交:
第一階段,將 Transaction Log內的該事務狀態設置為 PREPARE_COMMIT或 PREPARE_ABORT
第二階段,將 Transaction Marker寫入該事務涉及到的所有消息(即將消息標記為committed 或 aborted)。這一步驟Transaction Coordinator會發送給當前事務涉及到的每個<Topic, Partition>的 Leader,Broker收到該請求后,會將對應的Transaction Marker控制信息寫入日志。
一旦 Transaction Marker寫入完成,Transaction Coordinator會將最終的 COMPLETE_COMMIT 或 COMPLETE_ABORT狀態寫入Transaction Log中以標明該事務結束。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM