Kafka無消息丟失配置
Kafka到底會不會丟數據(data loss)? 網上各種說法都有,在回答這個問題之前, 我們要明確“責任邊界”。所謂責任邊界就是要確定消息在生產和消費的完整流程中是由誰來負責,確保它不會丟失。這樣即使真的出現了消息丟失,也能明確是責任主體,有針對性地進行改進和調整。
個人認為,關於責任的划定,官方其實已經給出了很明確的答案:
Once a published message is committed it will not be lost as long as one broker that replicates the partition to which this message was written remains "alive".
倘若我們完全理解這句話,那么“是否丟失消息”的問題自可迎刃而解。這句話有兩個關鍵要點:
-
committed: Kafka只對已提交的消息做出交付保證(delivery guarantee),沒有成功提交的消息Kafka不對其做出任何承諾
-
alive:只要有一個保存了該條消息的broker還活着(alive)就不會丟失消息
Kafka如何定義一個broker是否存活(alive)呢? 很簡單,也是兩個條件:
-
節點進程必須存活,且一直維持與zookeeper的會話
-
如果是follower節點,它與leader節點相差的消息數不能過大,即不能遠遠落后於leader節點的進度。如果按照Kafka的術語來說,就是這個follower節點必須是一個ISR(in-sync replica,即與leader保持同步的副本節點)
當然,我個人絕對相信,因為一些默認的配置和尚未發現的bug等原因,上面Kafka所做的保證也不一定百分之百能夠實現,但大多數情況下通過本文的配置是可以幫助你做到無消息丟失的。
okay,閑言少敘,直接上配置了。下面的參數配置及Best practice列表可以較好地保證數據的持久性(當然是trade-off,犧牲了吞吐量)。我會在該列表之后對列表中的每一項進行討論,有興趣的同學可以看下后面的分析。
-
block.on.buffer.full = true
-
acks = all
-
retries = MAX_VALUE
-
max.in.flight.requests.per.connection = 1
-
使用KafkaProducer.send(record, callback)
-
如果僅僅是要消息無丟失,使用帶callback的send方法;如果還要保證無亂序問題,那么發送失敗時一定要在callback邏輯中立即關閉producer:close(0)
-
unclean.leader.election.enable=false
-
replication.factor = 3
-
min.insync.replicas = 2
-
replication.factor > min.insync.replicas
-
enable.auto.commit=false
-
使用手動提交位移,消息處理完成之后再提交位移
給出列表之后,我們從兩個方面來探討一下數據為什么會丟失:
1. Producer端
本文討論的是Kafka 0.9版本之后的producer——Kafka0.9正式使用java版producer替換了老版的scala producer。
新版本默認使用異步發送機制,所以KafkaProducer.send僅僅是把這條消息放入一個緩存中(即RecordAccumulator,本質上使用了隊列來緩存記錄),同時后台的Sender IO線程會不斷掃描該緩存區,將滿足條件的消息封裝到某個batch中然后發送出去。顯然,這個過程中就有一個數據丟失的窗口:若IO線程發送之前client端掛掉了,累積在accumulator中的數據的確有可能會丟失。但顯然,這不在Kafka做出保證的責任邊界內,畢竟消息沒有提交成功,尚未被Kafka接管。不過上面列表中的一些參數配置仍然可以幫助你避免這種情況下的數據丟失。
Producer的另一個問題是消息的亂序問題。假設客戶端代碼依次執行下面的語句將兩條消息發到相同的分區
producer.send(record1); producer.send(record2);
如果此時由於某些原因(比如瞬時的網絡抖動)導致record1沒有成功發送,同時Kafka又配置了重試機制和max.in.flight.requests.per.connection大於1(默認值是5,本來就是大於1的),那么重試record1成功后,record1在分區中就在record2之后,從而造成消息的亂序。很多某些要求強順序保證的場景是不允許出現這種情況的。
鑒於producer的這兩個問題,我們應該如何規避呢??對於消息丟失的問題,很容易想到的一個方案就是:既然異步發送有可能丟失數據, 我改成同步發送總可以吧?比如這樣:
producer.send(record).get();
這樣當然是可以的,但是性能會很差,不建議這樣使用。因此特意總結了一份配置列表。個人認為該配置清單應該能夠比較好地規避producer端數據丟失情況的發生:(特此說明一下,軟件配置的很多決策都是trade-off,下面的配置也不例外:應用了這些配置,你可能會發現你的producer/consumer 吞吐量會下降,這是正常的,因為你換取了更高的數據安全性)
-
block.on.buffer.full = true 盡管該參數在0.9.0.0已經被標記為“deprecated”,但鑒於它的含義非常直觀,所以這里還是顯式設置它為true,使得producer將一直等待緩沖區直至其變為可用。否則如果producer生產速度過快耗盡了緩沖區,producer將拋出異常
-
acks=all 很好理解,所有follower都響應了才認為消息提交成功,即"committed"
-
retries = MAX 無限重試,直到你意識到出現了問題:)
-
max.in.flight.requests.per.connection = 1 限制客戶端在單個連接上能夠發送的未響應請求的個數。設置此值是1表示kafka broker在響應請求之前client不能再向同一個broker發送請求。注意:設置此參數是為了避免消息亂序
-
使用KafkaProducer.send(record, callback)而不是send(record)方法 自定義回調邏輯處理消息發送失敗
-
callback邏輯中最好顯式關閉producer:close(0) 注意:設置此參數是為了避免消息亂序
-
unclean.leader.election.enable=false 關閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader,以避免數據丟失
-
replication.factor >= 3 這個完全是個人建議了,參考了Hadoop及業界通用的三備份原則
-
min.insync.replicas > 1 消息至少要被寫入到這么多副本才算成功,也是提升數據持久性的一個參數。與acks配合使用
-
保證replication.factor > min.insync.replicas 如果兩者相等,當一個副本掛掉了分區也就沒法正常工作了。通常設置replication.factor = min.insync.replicas + 1即可
2. Consumer端
consumer端丟失消息的情形比較簡單:如果在消息處理完成前就提交了offset,那么就有可能造成數據的丟失。由於Kafka consumer默認是自動提交位移的,所以在后台提交位移前一定要保證消息被正常處理了,因此不建議采用很重的處理邏輯,如果處理耗時很長,則建議把邏輯放到另一個線程中去做。為了避免數據丟失,現給出兩點建議:
-
enable.auto.commit=false 關閉自動提交位移
-
在消息被完整處理之后再手動提交位移
okay,總結一下,本文給出了Kafka關於交付保證的基本定義以及無消息丟失配置。這只是一個best practice,具體的使用還要結合各自的業務特點進行展開,有針對性地進行設置。