一、消息隊列如何解決消息不會丟失問題
消息從生產到消費可以經歷三個階段:生產階段、存儲階段和消費階段。
- 生產階段:在這個階段,從消息在Producer創建出來,經過網絡傳輸發送到Broker端。
- 存儲階段: 消息在Broker端存儲,如果是集群,消息會在這個階段被復制到其他的副本上。
- 消費階段:Consumer從Broker上拉取消息,經過網絡 傳輸發送在Consumer上。

在這三個階段都存在消息可能丟失的情況。
- 生產階段:消息隊列通常使用確認機制,來保證消息可靠傳遞:當你代碼調用發送消息的方法,消息隊列的客戶端會把消息發送到Broker,Broker接受到消息會返回客戶端一個確認。只要Producer收到了Broker的確認響應,就可以保證消息在生產階段不會丟失。有些消息隊列在長時間沒收到發送的確認響應后,會自動重試,如果重試再失敗,就會一返回值或者異常方式返回給客戶端。所以在編寫發送消息的代碼,需要正確處理消息發送返回值或者異常,保證這個階段消息不丟失。
- 存儲階段:如果對消息可靠性要求非常高,可以通過配置Broker參數來避免因為宕機丟消息。對於單個節點Broker,需要配置Broker參數,在收到消息后,將消息寫入磁盤再給Producer返回確認響應。如果是Broker集群,需要將Broker集群配置成:至少兩個以上節點收到消息,再給客戶端發送確認響應。
- 消費階段:消費階段采用和生產階段類似的確認機制來保證消息的可靠傳遞。Consumer收到消息后,需在執行消費邏輯后在發送確認消息。
總結:
- 生產階段,需要捕獲消息發送錯誤,並重發消息
- 存儲階段,通過配置刷盤和復制參數,讓消息寫入多個副本的磁盤上,來確保消息不會因為某個Broker宕機或者磁盤損壞而丟失。
- 消費階段:需要在處理完全部消費業務邏輯后,再發送確認消息。
二、解決消息不會重復消費
-
冪等性:一個請求,不管重復來多少次,結果是不會改變的。
-
RabbitMQ、RocketMQ、Kafka等任何隊列不保證消息不重復,如果業務需要消息不重復消費,則需要消費端處理業務消息要保持冪等性
- 方式一:Redis的setNX() , 做消息id去重 java版本目前不支持設置過期時間
//Redis中操作,判斷是否已經操作過 TODO boolean flag = jedis.setNX(key); if(flag){ //消費 }else{ //忽略,重復消費 }
- 方式二:redis的 Incr 原子操作:key自增,大於0 返回值大於0則說明消費過,(key可以是消息的md5取值, 或者如果消息id設計合理直接用id做key)
int num = jedis.incr(key); if(num == 1){ //消費 }else{ //忽略,重復消費 }
-
方式三:數據庫去重表
-
設計一個去重表,某個字段使用Message的key做唯一索引,因為存在唯一索引,所以重復消費會失敗
CREATE TABLE
message_record
(id
int(11) unsigned NOT NULL AUTO_INCREMENT,key
varchar(128) DEFAULT NULL,create_time
datetime DEFAULT NULL, PRIMARY KEY (id
), UNIQUE KEYkey
(key
) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-
三、保證消息隊列的高可用行
建立高可用集群
四、消息積壓在消息隊列里面
1.大量消息在mq里積壓了幾個小時了還沒解決
場景:幾千萬條數據在MQ里積壓了七八個小時,從下午4點多,積壓到了晚上很晚,10點多,11點多。線上故障了,這個時候要不然就是修復consumer的問題,讓他恢復消費速度,然后傻傻的等待幾個小時消費完畢。這個肯定不行。一個消費者一秒是1000條,一秒3個消費者是3000條,一分鍾是18萬條,1000多萬條。
所以如果你積壓了幾百萬到上千萬的數據,即使消費者恢復了,也需要大概1小時的時間才能恢復過來。
解決方案:”
這種時候只能操作臨時擴容,以更快的速度去消費數據了。具體操作步驟和思路如下:
①先修復consumer的問題,確保其恢復消費速度,然后將現有consumer都停掉。
②臨時建立好原先10倍或者20倍的queue數量(新建一個topic,partition是原來的10倍)。
③然后寫一個臨時分發消息的consumer程序,這個程序部署上去消費積壓的消息,消費之后不做耗時處理,直接均勻輪詢寫入臨時建好分10數量的queue里面。
④緊接着征用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的消息。
⑤這種做法相當於臨時將queue資源和consumer資源擴大10倍,以正常速度的10倍來消費消息。
⑥等快速消費完了之后,恢復原來的部署架構,重新用原來的consumer機器來消費消息。

2.消息設置了過期時間,過期就丟了怎么辦
假設你用的是rabbitmq,rabbitmq是可以設置過期時間的,就是TTL,如果消息在queue中積壓超過一定的時間就會被rabbitmq給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在mq里,而是大量的數據會直接搞丟。
解決方案:
這種情況下,實際上沒有什么消息擠壓,而是丟了大量的消息。所以第一種增加consumer肯定不適用。
這種情況可以采取 “批量重導” 的方案來進行解決。
在流量低峰期(比如夜深人靜時),寫一個程序,手動去查詢丟失的那部分數據,然后將消息重新發送到mq里面,把丟失的數據重新補回來。
3.積壓消息長時間沒有處理,mq放不下了怎么辦
如果走的方式是消息積壓在mq里,那么如果你很長時間都沒處理掉,此時導致mq都快寫滿了,咋辦?這個還有別的辦法嗎?
解決方案:
這個就沒有辦法了,肯定是第一方案執行太慢,這種時候只好采用 “丟棄+批量重導” 的方式來解決了。
首先,臨時寫個程序,連接到mq里面消費數據,收到消息之后直接將其丟棄,快速消費掉積壓的消息,降低MQ的壓力,然后走第二種方案,在晚上夜深人靜時去手動查詢重導丟失的這部分數據。