Kafka消息投遞語義-消息不丟失,不重復,不丟不重


Kafka消息投遞語義-消息不丟失,不重復,不丟不重

介紹

kafka支持3種消息投遞語義:

  • At most once——最多一次,消息可能會丟失,但不會重復
  • At least once——最少一次,消息不會丟失,可能會重復
  • Exactly once——只且一次,消息不丟失不重復,只且消費一次。

但是整體的消息投遞語義需要Producer端和Consumer端兩者來保證。

Producer 消息生產者端

一個場景例子:
當producer向broker發送一條消息,這時網絡出錯了,producer無法得知broker是否接受到了這條消息。
網絡出錯可能是發生在消息傳遞的過程中,也可能發生在broker已經接受到了消息,並返回ack給producer的過程中。

這時,producer只能進行重發,消息可能會重復,但是保證了at least once。

0.11.0的版本通過給每個producer一個唯一ID,並且在每條消息中生成一個sequence num,
這樣就能對消息去重,達到producer端的exactly once。

這里還涉及到producer端的acks設置和broker端的副本數量,以及min.insync.replicas的設置。
比如producer端的acks設置如下:
acks=0 //消息發了就發了,不等任何響應就認為消息發送成功
acks=1 //leader分片寫消息成功就返回響應給producer
acks=all(-1) //當acks=all, min.insync.replicas=2,就要求INSRNC列表中必須要有2個副本都寫成功,才返回響應給producer,
如果INSRNC中已同步副本數量不足2,就會報異常,如果沒有2個副本寫成功,也會報異常,消息就會認為沒有寫成功。

Broker 消息接收端

上文說過acks=1,表示當leader分片副本寫消息成功就返回響應給producer,此時認為消息發送成功。
如果leader寫成功單馬上掛了,還沒有將這個寫成功的消息同步給其他的分片副本,那么這個分片此時的ISR列表為空,
如果unclean.leader.election.enable=true,就會發生log truncation(日志截取),同樣會發生消息丟失。
如果unclean.leader.election.enable=false,那么這個分片上的服務就不可用了,producer向這個分片發消息就會拋異常。

所以我們設置min.insync.replicas=2,unclean.leader.election.enable=false,producer端的acks=all,這樣發送成功的消息就絕不會丟失。

Consumer 消息消費者端

所有分片的副本都有自己的log文件(保存消息)和相同的offset值。當consumer沒掛的時候,offset直接保存在內存中,
如果掛了,就會發生負載均衡,需要consumer group中另外的consumer來接管並繼續消費。

consumer消費消息的方式有以下2種;

  1. consumer讀取消息,保存offset,然后處理消息。
    現在假設一個場景:保存offset成功,但是消息處理失敗,consumer又掛了,這時來接管的consumer
    就只能從上次保存的offset繼續消費,這種情況下就有可能丟消息,但是保證了at most once語義。

  2. consumer讀取消息,處理消息,處理成功,保存offset。
    如果消息處理成功,但是在保存offset時,consumer掛了,這時來接管的consumer也只能
    從上一次保存的offset開始消費,這時消息就會被重復消費,也就是保證了at least once語義。

以上這些機制的保證都不是直接一個配置可以解決的,而是你的consumer代碼來完成的,只是一個處理順序先后問題。 
第一種對應的代碼:

List<String> messages = consumer.poll(); consumer.commitOffset(); processMsg(messages); 

第二種對應的代碼:

List<String> messages = consumer.poll(); processMsg(messages); consumer.commitOffset(); 

Exactly Once實現原理

下面詳細說說exactly once的實現原理。

Producer端的消息冪等性保證

每個Producer在初始化的時候都會被分配一個唯一的PID,
Producer向指定的Topic的特定Partition發送的消息都攜帶一個sequence number(簡稱seqNum),從零開始的單調遞增的。

Broker會將Topic-Partition對應的seqNum在內存中維護,每次接受到Producer的消息都會進行校驗;
只有seqNum比上次提交的seqNum剛好大一,才被認為是合法的。比它大的,說明消息有丟失;比它小的,說明消息重復發送了。

以上說的這個只是針對單個Producer在一個session內的情況,假設Producer掛了,又重新啟動一個Producer被而且分配了另外一個PID,
這樣就不能達到防重的目的了,所以kafka又引進了Transactional Guarantees(事務性保證)。

Transactional Guarantees 事務性保證

kafka的事務性保證說的是:同時向多個TopicPartitions發送消息,要么都成功,要么都失敗。

為什么搞這么個東西出來?我想了下有可能是這種例子:
用戶定了一張機票,付款成功之后,訂單的狀態改了,飛機座位也被占了,這樣相當於是
2條消息,那么保證這個事務性就是:向訂單狀態的Topic和飛機座位的Topic分別發送一條消息,
這樣就需要kafka的這種事務性保證。

這種功能可以使得consumer offset的提交(也是向broker產生消息)和producer的發送消息綁定在一起。
用戶需要提供一個唯一的全局性TransactionalId,這樣就能將PID和TransactionalId映射起來,就能解決
producer掛掉后跨session的問題,應該是將之前PID的TransactionalId賦值給新的producer。

Consumer端

以上的事務性保證只是針對的producer端,對consumer端無法保證,有以下原因:

  1. 壓實類型的topics,有些事務消息可能被新版本的producer重寫
  2. 事務可能跨坐2個log segments,這時舊的segments可能被刪除,就會丟消息
  3. 消費者可能尋址到事務中任意一點,也會丟失一些初始化的消息
  4. 消費者可能不會同時從所有的參與事務的TopicPartitions分片中消費消息

如果是消費kafka中的topic,並且將結果寫回到kafka中另外的topic,
可以將消息處理后結果的保存和offset的保存綁定為一個事務,這時就能保證
消息的處理和offset的提交要么都成功,要么都失敗。

如果是將處理消息后的結果保存到外部系統,這時就要用到兩階段提交(tow-phase commit),
但是這樣做很麻煩,較好的方式是offset自己管理,將它和消息的結果保存到同一個地方,整體上進行綁定, 
可以參考Kafka Connect中HDFS的例子。

參考資料及相關閱讀

Message Delivery Semantics
KIP-98 - Exactly Once Delivery and Transactional Messaging
Kafka Connect Details 詳解

 

更多精彩干貨,盡請關注我的個人微信公眾號
wechat


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM