如何保證消息不丟失
就我們市面上常見的消息隊列而言,只要配置得當,我們的消息就不會丟。
先來看看這個圖,
可以看到一共有三個階段,分別是生產消息、存儲消息和消費消息。我們從這三個階段分別入手來看看如何確保消息不會丟失。
生產消息
生產者發送消息至Broker
,需要處理Broker
的響應,不論是同步還是異步發送消息,同步和異步回調都需要做好try-catch
,妥善的處理響應,如果Broker
返回寫入失敗等錯誤消息,需要重試發送。當多次發送失敗需要作報警,日志記錄等。
這樣就能保證在生產消息階段消息不會丟失。
存儲消息
存儲消息階段需要在消息刷盤之后再給生產者響應,假設消息寫入緩存中就返回響應,那么機器突然斷電這消息就沒了,而生產者以為已經發送成功了。
如果Broker
是集群部署,有多副本機制,即消息不僅僅要寫入當前Broker
,還需要寫入副本機中。那配置成至少寫入兩台機子后再給生產者響應。這樣基本上就能保證存儲的可靠了。一台掛了還有一台還在呢(假如怕兩台都掛了..那就再多些)。
那假如來個地震機房機子都掛了呢?emmmmmm...大公司基本上都有異地多活。
消費消息
這里經常會有同學犯錯,有些同學當消費者拿到消息之后直接存入內存隊列中就直接返回給Broker
消費成功,這是不對的。
你需要考慮拿到消息放在內存之后消費者就宕機了怎么辦。所以我們應該在消費者真正執行完業務邏輯之后,再發送給Broker
消費成功,這才是真正的消費了。
所以只要我們在消息業務邏輯處理完成之后再給Broker
響應,那么消費階段消息就不會丟失。
小結一下
可以看出,保證消息的可靠性需要三方配合。
生產者
需要處理好Broker
的響應,出錯情況下利用重試、報警等手段。
Broker
需要控制響應的時機,單機情況下是消息刷盤后返回響應,集群多副本情況下,即發送至兩個副本及以上的情況下再返回響應。
消費者
需要在執行完真正的業務邏輯之后再返回響應給Broker
。
但是要注意消息可靠性增強了,性能就下降了,等待消息刷盤、多副本同步后返回都會影響性能。因此還是看業務,例如日志的傳輸可能丟那么一兩條關系不大,因此沒必要等消息刷盤再響應。
如何處理重復消息
我們先來看看能不能避免消息的重復。
假設我們發送消息,就管發,不管Broker
的響應,那么我們發往Broker
是不會重復的。
但是一般情況我們是不允許這樣的,這樣消息就完全不可靠了,我們的基本需求是消息至少得發到Broker
上,那就得等Broker
的響應,那么就可能存在Broker
已經寫入了,當時響應由於網絡原因生產者沒有收到,然后生產者又重發了一次,此時消息就重復了。
再看消費者消費的時候,假設我們消費者拿到消息消費了,業務邏輯已經走完了,事務提交了,此時需要更新Consumer offset
了,然后這個消費者掛了,另一個消費者頂上,此時Consumer offset
還沒更新,於是又拿到剛才那條消息,業務又被執行了一遍。於是消息又重復了。
可以看到正常業務而言消息重復是不可避免的,因此我們只能從另一個角度來解決重復消息的問題。
關鍵點就是冪等。既然我們不能防止重復消息的產生,那么我們只能在業務上處理重復消息所帶來的影響。
冪等處理重復消息
冪等是數學上的概念,我們就理解為同樣的參數多次調用同一個接口和調用一次產生的結果是一致的。
例如這條 SQLupdate t1 set money = 150 where id = 1 and money = 100;
執行多少遍money
都是150,這就叫冪等。
因此需要改造業務處理邏輯,使得在重復消息的情況下也不會影響最終的結果。
可以通過上面我那條 SQL 一樣,做了個前置條件判斷,即money = 100
情況,並且直接修改,更通用的是做個version
即版本號控制,對比消息中的版本號和數據庫中的版本號。
或者通過數據庫的約束例如唯一鍵,例如insert into update on duplicate key...
。
或者記錄關鍵的key,比如處理訂單這種,記錄訂單ID,假如有重復的消息過來,先判斷下這個ID是否已經被處理過了,如果沒處理再進行下一步。當然也可以用全局唯一ID等等。
基本上就這么幾個套路,真正應用到實際中還是得看具體業務細節。
如何保證消息的有序性
有序性分:全局有序和部分有序。
全局有序
如果要保證消息的全局有序,首先只能由一個生產者往Topic
發送消息,並且一個Topic
內部只能有一個隊列(分區)。消費者也必須是單線程消費這個隊列。這樣的消息就是全局有序的!
不過一般情況下我們都不需要全局有序,即使是同步MySQL Binlog
也只需要保證單表消息有序即可。
部分有序
因此絕大部分的有序需求是部分有序,部分有序我們就可以將Topic
內部划分成我們需要的隊列數,把消息通過特定的策略發往固定的隊列中,然后每個隊列對應一個單線程處理的消費者。這樣即完成了部分有序的需求,又可以通過隊列數量的並發來提高消息處理效率。
圖中我畫了多個生產者,一個生產者也可以,只要同類消息發往指定的隊列即可。
如果處理消息堆積
消息的堆積往往是因為生產者的生產速度與消費者的消費速度不匹配。有可能是因為消息消費失敗反復重試造成的,也有可能就是消費者消費能力弱,漸漸地消息就積壓了。
因此我們需要先定位消費慢的原因,如果是bug
則處理 bug
,如果是因為本身消費能力較弱,我們可以優化下消費邏輯,比如之前是一條一條消息消費處理的,這次我們批量處理,比如數據庫的插入,一條一條插和批量插效率是不一樣的。
假如邏輯我們已經都優化了,但還是慢,那就得考慮水平擴容了,增加Topic
的隊列數和消費者數量,注意隊列數一定要增加,不然新增加的消費者是沒東西消費的。一個Topic中,一個隊列只會分配給一個消費者。
當然你消費者內部是單線程還是多線程消費那看具體場景。不過要注意上面提高的消息丟失的問題,如果你是將接受到的消息寫入內存隊列之后,然后就返回響應給Broker
,然后多線程向內存隊列消費消息,假設此時消費者宕機了,內存隊列里面還未消費的消息也就丟了。