記錄下和kafka相關的語義、不重復消息、不丟失數據、分區有序的內容,文中很多理解參考文末博文、書籍還有某前輩。
kafka語義
kafka中有三種語義,它對理解下面的不重復消費有幫助。
最多一次(at most once):消息最多被處理一次,可能有消息丟失的風險。
至少一次(at least once):這種語義下消息可能被處理多次,可以保證消息不丟失,但是可能導致重復消息。
精確一次(exactly once):消息只會被處理一次,at least once+冪等性,可以實現精確一次語義。
重復消費
這是一個很常見的問題,如果保證消費者不重復消費數據,博客上有很多的方法,簡單羅列幾條如下。
(1)給每條消息加一個獨一無二的key,如uuid,消費數據的時候同時記錄這些key,下次消費數據時需要檢查消息的key,是否已經被消費過了,這樣是可以避免重復消費的。
(2)at least once+冪等性,也可以實現,在保證至少一次的語義下,有多種方式實現冪等性,如在關系型數據庫如Oracle、MySQL表中設置唯一約束、將數據存儲到redis的set、使用set [NX]存儲數據到redis,都是一種實現方式。
- 使用主鍵約束,當插入數據到關系型數據庫的時候,第一次插入成功后,下次再次消費數據后插入數據到數據庫,會由於主鍵的唯一約束導致插入失敗,這就保證了冪等性。
- redis的set,可以不斷的set數據到里面,重復消費和一次消費的效果一樣,這也可以保證冪等性。
# 添加set數據 football
127.0.0.1:6379> sadd likes football
(integer) 1
# 查看有一個football
127.0.0.1:6379> smembers likes
1) "football"
# 多次添加football,相當於多次消費到football,進行處理
127.0.0.1:6379> sadd likes football football football
(integer) 0
# 結果依然只有一個,多次sadd,和一次sadd的效果一樣,實現冪等
127.0.0.1:6379> smembers likes
1) "football"
- redis可以使用set+[NX]也可以實現冪等,NX選項只容許沒有這個數據時才能添加,有就不能添加。
# 添加key=name,value='clyang'成功
127.0.0.1:6379> set name clyang NX
OK
127.0.0.1:6379> get name
"clyang"
# 再次添加,失敗,實現冪等
127.0.0.1:6379> set name clyang NX
(nil)
以上的例子可以看出,不僅僅關系型數據庫,nosql也可以實現,只要能實現"INSERT IF NOT EXISTS"語義的存儲系統都可以實現冪等效果。
(3)在消費數據之前,設置一個前提條件,並且消費完數據之后,需要修改前提條件的狀態,這樣也可以避免重復消費。
- 比如給用戶轉賬,必須滿足余額等於100,余額加100元才能執行,消息里需要帶上余額信息,當消息里余額信息和數據庫里的余額不相等時,就不會再執行修改余額的操作。當第一次有類似{'余額':100,'加金額':100}的消息要處理時,由於滿足條件會執行余額更新的操作變成200,但后面的重復消息,因為消息里的余額不等於數據庫里的余額(上一次更新為200了),導致修改失敗,這樣也保證了冪等性。
- 上面是數字類型的的情況,如果是非數字或其他復雜的情況,該如何處理,參考前人,通用的方法是給消息里的數據,增加一個版本號,消費消息更新數據前,需要檢查數據里的版本號和消息里的版本號,如果不一樣就不能更新,並且更新后數據里版本號+1,這樣也可以實現冪等性。
# 第一次消費,數據里沒有這條消息,可以消費,並且數據里版本號+1變成2
{'msg':'當光照進來的時候,你嘴角上揚的驕傲就是最大的回報',versionId=1}
# 再一次消費,由於消息版本號是1,數據里版本號是2,無法消費
{'msg':'當光照進來的時候,你嘴角上揚的驕傲就是最大的回報',versionId=1}
# 同上,無法消費
{'msg':'當光照進來的時候,你嘴角上揚的驕傲就是最大的回報',versionId=1}
丟失數據
如何規避數據不丟失,參考文末博文,需要從producer、consumer和broker三方面考慮。關於丟失數據問題,如果當一個學術問題來考慮,是一種情況,如果是實際生產環境,又是另外一種情況,需要注意區分。
producer
(1)acks
生產者可以設置acks=-1或者all,保證發送到broker的消息不丟失。這樣設置是當消息發送到了leader副本,所有處於ISR列表中的follower副本都需要同步到這條數據,才可以。
- acks=-1:代表所有處於ISR列表中的follower partition都會同步寫入消息成功,才會返回ack到生產者
- acks=0:代表消息只要發送出去就行,其他不管,無需返回ack到生產者
- acks=1:代表發送消息到leader partition寫入成功就返回ack到生產者
//注意代碼中配置時,-1是字符串,不是數字
props.put("acks","-1");
(2)retry
在kafka中錯誤分為兩種,一種是可恢復的,另一種是不可恢復的。生產時,使用帶有回調的send方法,當遇到可以恢復的錯誤(如網絡波動、leader選舉中leader副本不可用的情況),設置retry次數和retry時間間隔后,在retry次數范圍內都不會進入onCompletition方法,多次嘗試(可以設置為Integer.MAX_VALUE)就會大概率成功發送。如果是不可以恢復的錯誤(如一條消息的最大大小超過max.request.size設置),最后肯定會進入下面的方法,可以做退而求其次的操作保證數據能保存,如將數據存儲到redis。
//生產者部分代碼
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception==null){
System.out.println("消息發送到分區成功");
}else{
System.out.println("消息發送失敗");
//TODO 寫入到redis
}
}
});
其他參考《Apache Kafka實戰》 4.6.1。
broker
(1)unclean.leader.election.enable=false
這個參數設置是否允許從非ISR列表中選取副本為leader副本,默認值是false,如果設置為了true,意味着可以從ISR列表以外選舉leader副本,這些ISR列表之外的副本,由於同步趕不上leader副本的更新進度,讓它們變成leader副本,就會出現HW水位被截斷的情況,導致數據丟失。
(2)replication.factor>=2
多副本是保證HA的前提,它使得某些broker即使宕機,依然可以對外提供服務,提高容錯性能。上面數值設置為2代表需要設置多個副本,不是說2個就行,一般3-5個,太多也會提高網絡開銷。
(3)min.insync.replicas>=2
分區ISR中至少有多少個副本,它至少有一個,即leader副本,需要設置為大於1個。如果只有1個則leader副本掛掉則不能提供服務。它需要配合上面acks=-1來使用,代表所有IRS中的副本都數據同步,其中一個掛掉,只要能保證有一個能提供服務,就可以。
注意,上面replication.factor需要配置大於min.insync.replicas,代表容許一些副本可以'掉隊',如果設置相等,則系統變脆了,即一個副本都不能落后,只要一個落后就會導致不能滿足上面最后一個條件,可用性降低。
如下圖所示,當配置replication.factor=3,min.insync.replicas=2,這樣保證在ISR中至少有兩個副本(圖示為[0,1]),萬一當前leader副本宕機了,ISR為1的broker上的副本將頂替成為leader副本,配合acks=-1的設置,數據不會丟失。當配置replication.factor=3,min.insync.replicas=1,則壞的情況是兩個follower副本都跟不上leader副本的節奏,導致IRS中只有1個副本,這樣萬一這個副本宕機,其他的副本由於數據不同步,unclean.leader.election.enable設置為true就會被選舉為leader副本就會出現數據丟失,這個時候設置acks=-1也顯示沒啥意義,該丟還是得丟。
另外查看主題topicA的1號分區ISR列表[1,2,0],可以看出有三個,數字代表所在的broker id。
[zk: localhost:2181(CONNECTED) 14] get /brokers/topics/topicA/partitions/1/state
{"controller_epoch":242,"leader":1,"version":1,"leader_epoch":75,"isr":[1,2,0]}
cZxid = 0x7500000106
ctime = Fri Mar 20 20:29:52 CST 2020
mZxid = 0x9200000064
mtime = Fri Apr 10 19:44:54 CST 2020
pZxid = 0x7500000106
cversion = 0
dataVersion = 186
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 79
numChildren = 0
consumer
消費者部分,一般來說現在使用spark streaming或者flink來處理,很少單獨寫消費者代碼,為了理解問題暫時按照普通的消費者代碼來處理。
一般來說,消費者需要設置enable.auto.commit = false來關閉自動提交消費者offset,改為手動提交的方式,在正常處理完數據再提交,如果處理失敗就保存上次處理成功的offset,這次的不提交,這樣可以規避丟失數據。
enable.auto.commit默認是true的,並且設置true的情況下還需要通過auto.commit.interval.ms設置提交的時間間隔來一起使用,代表每隔多久自動提交一次。這樣是有丟失風險的,假設消費者代碼按照如下的設置,如果某一次消費了100條數據,過了1秒自動提交了消費者offset,但是消費者還沒處理完,可能處理到了第80條,就不幸宕機了,再次重啟消費,發現81-100這個區間的消息消費不到了,造成數據的丟失。
//設置自動提交offset
props.put("enable.auto.commit","true");//注意kafka版本
//多久自動提交offset
props.put("auto.commit.interval.ms",1000);
分區有序
同一個分區的數據,默認是有序的,因此有如下兩種方案。
-
方案1:topic只設置一個分區,這樣消息就是全局有序,但是consumer group中只有一個consumer能消費,不能多個線程同時消費數據,一定程度上降低了性能。
-
方案2:topic可能有多個分區,但是可以指定消息的key,使得需要保證順序的消息都發送到同一個分區,這樣消費數據時消息也是有序的。
但是以上兩種情況存在一個坑,參考文末博文,當producer發送消息時因為某些原因如網絡延遲導致retry時,消息有重新排列失去有序性的風險,具體需要修改max.in.flight.requests.per.connection參數的值為1(默認為5),以下是官網對這個參數的解釋。
The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
翻譯一下大概意思就是,在producer向broker發送消息時,在一個connection上不阻塞的前提下,可以存在未及時返回ack的請求的最大數量。如果這個數值超過了1,並且用戶設置了retry功能,則在請求發送存在失敗時將有消息重排的風險。
如下所示,假設上面參數設置為4,則一個連接中可以發送4個請求,如果正常發送到了broker,則消息按照正常的順序保存在log文件,最后是買房->買車->彩禮->成家的順序。如果消息1、2、3在發送中有retry,多次發送后才成功,則有可能導致最后消息在log文件中順序是成家->買房->買車->彩禮的順序,如果按照這個順序消費,估計作為消費者的丈母娘是不能容忍的,所以上面的參數需要設置為1,這樣將保證買房->買車->彩禮->結婚的順序,即真正的分區有序,但是也是有代價的,會降低一點性能。
以上,理解不一定正確,學習就是一個不斷認識和糾錯的過程。
參考博文:
(1)《Apache Kafka實戰》
(2)https://segmentfault.com/a/1190000015316545 retry消息重排風險
(3)https://www.cnblogs.com/youngchaolin/p/11972899.html#_label4_4
(4)https://www.cnblogs.com/MrRightZhao/p/11498952.html kafka數據不丟失