一般我們在用到這種消息中件的時候,肯定會考慮要怎樣才能保證數據不丟失,在面試中也會問到相關的問題。但凡遇到這種問題,是指3個方面的數據不丟失,即:producer consumer 端數據不丟失 broker端數據不丟失下面我們分別從這三個方面來學習,kafka是如何保證數據不丟失的
一.producer 生產端是如何保證數據不丟失的
1.ack的配置策略
acks = 0
生產者發送消息之后 不需要等待服務端的任何響應,它不管消息有沒有發送成功,如果發送過程中遇到了異常,導致broker端沒有收到消息,消息也就丟失了。實際上它只是把消息發送到了socketBuffer(緩存)中,而socketBuffer什么時候被提交到broker端並不關心,它不擔保broker端是否收到了消息,但是這樣的配置對retry是不起作用的,因為producer端都不知道是否發生了錯誤,而且對於offset的獲取永遠都是-1,因為broker端可能還沒有開始寫數據。這樣不保險的操作為什么還有這樣的配置?kafka對於收集海量數據,如果在收集某一項日志時是允許數據量有一定丟失的話,是可以用這種配置來收集日志。
acks = 1(默認值)
生產者發送消息之后,只要分區的leader副本成功寫入消息,那么它就會收到來自服務端的成功響應。其實就是消息只發給了leader leader收到消息后會返回ack到producer端。如果消息無法寫入leader時(選舉、宕機等情況時),生產都會收到一個錯誤的響應,為了避免消息丟失,生產者可以選擇重發消息,如果消息成功寫入,在被其它副本同步數據時leader 崩潰,那么此條數據還是會丟失,因為新選舉的leader是沒有收到這條消息,ack設置為1是消息可靠性和吞吐量折中的方案。
acks = all (或-1)
生產者在發送消息之后,需要等待ISR中所有的副本都成功寫入消息之后才能夠收到來自服務端的成功響應,在配置環境相同的情況下此種配置可以達到最強的可靠性。即:在發送消息時,需要leader 向fllow 同步完數據之后,也就是ISR隊列中所有的broker全部保存完這條消息后,才會向ack發送消息,表示發送成功。
2.retries的配置策略
在kafka中錯誤分為2種,一種是可恢復的,另一種是不可恢復的。
可恢復性的錯誤:
如遇到在leader的選舉、網絡的抖動等這些異常時,如果我們在這個時候配置的retries大於0的,也就是可以進行重試操作,那么等到leader選舉完成后、網絡穩定后,這些異常就會消息,錯誤也就可以恢復,數據再次重發時就會正常發送到broker端。需要注意retries(重試)之間的時間間隔,以確保在重試時可恢復性錯誤都已恢復。
不可恢復性的錯誤:
如:超過了發送消息的最大值(max.request.size)時,這種錯誤是不可恢復的,如果不做處理,那么數據就會丟失,因此我們需要注意在發生異常時把這些消息寫入到DB、緩存本地文件中等等,把這些不成功的數據記錄下來,等錯誤修復后,再把這些數據發送到broker端。
我們上面講了2個配置項的作用,下面結合實際場景如何使用
3.如何選取
1.高可用型
配置:acks = all,retries > 0 retry.backoff.ms=100(毫秒) (並根據實際情況設置retry可能恢復的間隔時間)
優點:這樣保證了producer端每發送一條消息都要成功,如果不成功並將消息緩存起來,等異常恢復后再次發送。
缺點:這樣保證了高可用,但是這會導致集群的吞吐量不是很高,因為數據發送到broker之后,leader要將數據同步到fllower上,如果網絡帶寬、不穩定等情況時,ack響應時間會更長
2.折中型
配置:acks = 1 retries > 0 retries 時間間隔設置 (並根據實際情況設置retries可能恢復的間隔時間)
優點:保證了消息的可靠性和吞吐量,是個折中的方案
缺點:性能處於2者中間
3.高吞吐型
配置:acks = 0
優點:可以相對容忍一些數據的丟失,吞吐量大,可以接收大量請求
缺點:不知道發送的消息是 否成功
二.consumer端是如何保證數據不丟失的
1.consumer端的配置項
group.id: consumer group 分組的一個id
消費者隸屬的消費組名稱。在kafka中只允許消息只能被某個組里面的一個consumer端消費,如果為空,則會報異常。
對於一個新的consumer加入到消費時,肯定會隸屬於哪個組,只有這樣才能消費數據
auto.offset.reset = earliest(最早) /latest(最晚)
從何處開始進行消費
當一個新加入的consumer要進行消費數據,如果這個consumer是做數據分析工作的,是需要以前的歷史數據那就需要從最早的位置消費數據,如果僅僅是查看消費情況,那可以從最晚位置開始消費數據
enable.auto.commit = true/false(默認true)
是否開啟自動提交消費位移的功能,默認開啟.
當設置為true時,意味着由kafka的consumer端自己間隔一定的時間會自動提交offset,如果設置成了fasle,也就是由客戶端(自己寫代碼)來提交,那就還得控制提交的時間間隔auto.commit.interval.ms
auto.commit.interval.ms
當enable.auto.commit設置為true時才生效,表示開啟自動提交消費位移功能時自動提交消費位移的時間間隔。
2.consumer端的配置策略
在consumer消費階段,對offset的處理,關系到是否丟失數據,是否重復消費數據,因此,我們把處理好offset就可以做到exactly-once && at-least-once(只消費一次)數據。
當enable.auto.commit=true時
表示由kafka的consumer端自動提交offset,當你在pull(拉取)30條數據,在處理到第20條時自動提交了offset,但是在處理21條的時候出現了異常,當你再次pull數據時,由於之前是自動提交的offset,所以是從30條之后開始拉取數據,這也就意味着21-30條的數據發生了丟失。
當enable.auto.commit=false時
由於上面的情況可知自動提交offset時,如果處理數據失敗就會發生數據丟失的情況。那我們設置成手動提交。
當設置成false時,由於是手動提交的,可以處理一條提交一條,也可以處理一批,提交一批,由於consumer在消費數據時是按一個batch來的,當pull了30條數據時,如果我們處理一條,提交一個offset,這樣會嚴重影響消費的能力,那就需要我們來按一批來處理,或者設置一個累加器,處理一條加1,如果在處理數據時發生了異常,那就把當前處理失敗的offset進行提交(放在finally代碼塊中)注意一定要確保offset的正確性,當下次再次消費的時候就可以從提交的offset處進行再次消費。
3.comsumer 的應用場景
1.一直commit offset的處理
假如poll了100條數據,每處理1條,commit offset一次,這樣會嚴重影響性能,在處理的時候設置1個計數器(或累加器),按一批來提交,但要確保提交offset的准確性
2.rebalance的影響
在處理數據時,有2種情況會發生,一種情況是處理了一半的時候,發生了rebalance,但是offset還沒有來得及提交,另一種情況是rebalance發生后,重新分配了offset,在這種情況時會發生錯誤。
3.消息處理錯誤時的處理
假如consumer在處理數據的時候失敗了,那么可以把這條數據給緩存起來,可以是redis、DB、file等,也可以把這條消息存入專門用於存儲失敗消息的topic中,讓其它的consumer專門處理失敗的消息。
4.處理消息的時間過長
假如poll一批100條消息的時間是1秒鍾,但是在每處理1條需要花費1秒鍾,這樣來說極其影響消費能力,那我們可以把100條消息放到1個線程池中處理。這里特別特別注意,由於線程池的處理行為是並行的,所以要做對offset的判斷。這里先說正常情況,如果消息都能被正常處理,那么會提交1個offset,並把這個offset存起來,假如此時又提交了1個offset,把2個offset相對比,哪個大把哪個存起來並做提交。如果消息處理發生了錯誤,我們在前面講過,把這個錯誤消息發送到專門處理錯誤的topic中,讓專門的consumer來處理。
4.consumer 保證確保消息只被處理一次處理,同時確保冪等性
exactly-once & at-least-once
如何保證消息只獲取一次並且確定被處理呢?這就需要我們在處理消息的時候要添加一個unique key
假如pull 一個batch 100條的消息,在處理到第80條的時候,由於網絡延遲、或者crash的原因沒有來得及提交offset,被處理的80條數據都添加了unique key, 可以存到到DB中或者redis中(推薦,因為這樣更快),當consumer端會再次poll消費數據時,因為沒有提交offset,所以會從0開始消費數據,如果對之前已經消息過的數據沒有做unique key的處理,那么會造成重復消息之前的80條數據,但是如果把每條對應的消息都添加了unique key,那就只需要對被處理的消息進行判斷,有沒有unique key 就可以做到不重復消費數據的問題,這樣也同時保證了冪等性。
三.broker端是如何保證數據不丟失的
1.broker端的配置項
以下參數都是在創建topic時進行設置
1.replication-factor 3
在創建topic時會通過replication-factor來創建副本的個數,它提高了kafka的高可用性,同時,它允許n-1台broker掛掉,設置好合理的副本因子對kafka整體性能是非常有幫助的,通常是3個,極限是5個,如果多了也會影響開銷。 2.min.insync.replicas = 2
分區ISR隊列集合中最少有多少個副本,默認值是1
3.unclean.leander.election.enable = false
是否允許從ISR隊列中選舉leader副本,默認值是false,如果設置成true,則可能會造成數據丟失。
2.leader選舉造成的數據丟失
3個replica分別為0 1 2,0為leader,數據都能完全同步到100,在某一時刻,分別有2個fllow掛掉了,此時有producer往0 的replica上發送50條數據完后,此時的leader掛掉了,而此時剛好的1個fllow起來了,它沒有向leader上feach數據,因為leader已經不存在了,此時有2種處理方法:重新起來的fllow可以成為1個leader,需要通過 unclean.leader.election.enable=true,這樣做保證了高可用,但是這樣做的弊端是:新起來的fllow成為了leader,但是它會丟失部分數據,雖然這樣保證了高可用。另一種情況是設置為false,不讓fllow競選leader,但是這樣也會造成數據的丟失。假如在ISR的隊列里面,只有0 1,但此時replica 1 沒有來得及向leader feach數據leader掛掉了,這樣也會造成數據的丟失。
3.broker端的配置策略
min.insync.replica
在一個topic中,1個分區 有3個副本,在創建時設置了min.insync.replica=2,假如此時在ISR中只有leader副本(1個)存在,在producer端生產數據時,此時的acks=all,這也就意味着在producer向broker端寫數據時,必須保證ISR中指定數量的副本(包含leader、fllow副本)全部同步完成才算寫成功,這個數量就是由min.insync.replica來控制的,這樣producer端向broker端寫數據是不成功,因為ISR中只有leader副本,min.insync.replica要求2個副本,此時的producer生產數據失敗(異常),當然consumer端是可以消費數據的,只不過是沒有新數據產生而已.這樣保證了數據的一致性,但這樣會導致高可用性降低了。一般的配置是按: n/2 +1 來配置min.insync.replicas 的數量的,同時也要將unclean.leader.election.enable=false
unclean.leader.election.enable
假如現在有leader 0 fllow 1 fllow 2 三個副本,存儲的數據量分別是10 9 8,此時的broker的配置是:min.insync.replica=2 acks=all,leader的數據更新到了15,在沒有同步到fllow 1 fllow 2時掛掉了,此時的ISR隊列中是有fllow 1 和fllow 2的,如果unclean.leader.election.enable設置的是true,表示在ISR中的副本是可以競選leader這樣就會造成9-15或8-15之間的數據丟失,所以unclean.leader.election.enable必須設置成成false,這樣整個kafka cluster都不讀寫了,這樣就保證了數據的高度一致性.
我們通過producer consumer broker 三個方面來講述怎樣保證數據在生產過程中不丟失,在發到broker(服務端)不丟失,在消費時不消費重復數據,其中通過學習kafka就是了解各種配置項控制的功能,后續我會總結梳理這三塊的服務參數。