Kafka筆記—可靠性、冪等性和事務


這幾天很忙,但是我現在給我的要求是一周至少要出一篇文章,所以先拿這篇筆記來做開胃菜,源碼分析估計明后兩天應該能寫一篇。給自己加油~,即使沒什么人看。

可靠性

如何保證消息不丟失

Kafka只對“已提交”的消息(committed message)做有限度的持久化保證。

已提交的消息
當Kafka的若干個Broker成功地接收到一條消息並寫入到日志文件后,它們會告訴生產者程序這條消息已成功提交。

有限度的持久化保證
假如一條消息保存在N個Kafka Broker上,那么至少這N個Broker至少有一個存活,才能保證消息不丟失。

丟失數據案例

生產者程序丟失數據

由於Kafka Producer是異步發送的,調用完producer.send(msg)並不能認為消息已經發送成功。

所以,在Producer永遠要使用帶有回調通知的發送API,使用producer.send(msg,callback)。一旦出現消息提交失敗的情況,可以由針對性地進行處理。

消費者端丟失數據

消費者是先更新offset,再消費消息。如果這個時候消費者突然宕機了,那么這條消息就會丟失。

所以我們要先消費消息,再更新offset位置。但是這樣會導致消息重復消費。

還有一種情況就是consumer獲取到消息后開啟了多個線程異步處理消息,而consumer自動地向前更新offset。假如其中某個線程運行失敗了,那么消息就丟失了。

遇到這樣的情況,consumer不要開啟自動提交位移,而是要應用程序手動提交位移。

最佳實現

  1. 使用producer.send(msg,callback)。
  2. 設置acks = all。acks是Producer的參數,代表了所有副本Broker都要接收到消息,該消息才算是“已提交”。
  3. 設置retries為一個較大的值。是Producer的參數,對應Producer自動重試。如果出現網絡抖動,那么可以自動重試消息發送,避免消息丟失。
  4. unclean.leader.election.enable = false。控制有哪些Broker有資格競選分區的Leader。表示不允許落后太多的Broker競選Leader。
  5. 設置replication.factor>=3。Broker參數,冗余Broker。
  6. 設置min.insync.replicas>1。Broker參數。控制消息至少要被寫入到多少個副本才算是“已提交”。
  7. 確保replication.factor>min.insync.replicas。如果兩個相等,那么只要有一個副本掛機,整個分區就無法正常工作了。推薦設置成replication.factor=min.insync.replicas+1.
  8. 確保消息消費完成在提交。Consumer端參數enbale.auto.commit,設置成false,手動提交位移。

解釋第二條和第六條:
如果ISR中只有1個副本了,acks=all也就相當於acks=1了,引入min.insync.replicas的目的就是為了做一個下限的限制:不能只滿足於ISR全部寫入,還要保證ISR中的寫入個數不少於min.insync.replicas。

冪等性

在0.11.0.0版本引入了創建冪等性Producer的功能。僅需要設置props.put(“enable.idempotence”,true),或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。

enable.idempotence設置成true后,Producer自動升級成冪等性Producer。Kafka會自動去重。Broker會多保存一些字段。當Producer發送了相同字段值的消息后,Broker能夠自動知曉這些消息已經重復了。

作用范圍:

  1. 只能保證單分區上的冪等性,即一個冪等性Producer能夠保證某個主題的一個分區上不出現重復消息。
  2. 只能實現單回話上的冪等性,這里的會話指的是Producer進程的一次運行。當重啟了Producer進程之后,冪等性不保證。

事務

Kafka在0.11版本開始提供對事務的支持,提供是read committed隔離級別的事務。保證多條消息原子性地寫入到目標分區,同時也能保證Consumer只能看到事務成功提交的消息。

事務性Producer

保證多條消息原子性地寫入到多個分區中。這批消息要么全部成功,要不全部失敗。事務性Producer也不懼進程重啟。

Producer端的設置:

  1. 開啟enable.idempotence = true
  2. 設置Producer端參數 transactional.id

除此之外,還要加上調用事務API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,分別應對事務的初始化、事務開始、事務提交以及事務終止。
如下:

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}

這段代碼能保證record1和record2被當做一個事務同一提交到Kafka,要么全部成功,要么全部寫入失敗。

Consumer端的設置:
設置isolation.level參數,目前有兩個取值:

  1. read_uncommitted:默認值表明Consumer端無論事務型Producer提交事務還是終止事務,其寫入的消息都可以讀取。
  2. read_committed:表明Consumer只會讀取事務型Producer成功提交事務寫入的消息。注意,非事務型Producer寫入的所有消息都能看到。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM