一.造成重復消費的原因
在於回饋機制。正常情況下,消費者在消費消息時候,消費完畢后,會發送一個ACK確認信息給消息隊列(broker),消息隊列(broker)就知道該消息被消費了,就會將該消息從消息隊列中刪除。
不同的消息隊列發送的確認信息形式不同,例如RabbitMQ是發送一個ACK確認消息,RocketMQ是返回一個CONSUME_SUCCESS成功標志,kafka實際上有個offset的概念。
造成重復消費的原因?,就是因為網絡原因閃斷,ACK返回失敗等等故障,確認信息沒有傳送到消息隊列,導致消息隊列不知道自己已經消費過該消息了,再次將該消息分發給其他的消費者。(因為消息重試等機制的原因,如果一個consumer斷了,rocketmq有consumer集群,會將該消息重新發給其他consumer)
這個問題針對業務場景來答,分以下三種情況:
(1)比如,你拿到這個消息做數據庫的insert操作,那就容易了,給這個消息做一個唯一的主鍵,那么就算出現重復消費的情況,就會導致主鍵沖突,避免數據庫出現臟數據。
(2)再比如,你拿到這個消息做redis的set的操作,那就容易了,不用解決,因為你無論set幾次結果都是一樣的,set操作本來就算冪等操作。
(3)如果上面兩種情況還不行,上大招。准備一個第三方介質,來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將<id,message>以K-V形式寫入redis.那消費者開始消費前,先去redis中查詢有沒有消費記錄即可。
二.單機環境解決方案
生產者:發送消息同時set一個key做唯一標識
public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); producer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i = 0; i < 1; i++) { Thread.sleep(1000); // 每秒發送一次MQ Message msg = new Message("itmayiedu-topic", // topic 主題名稱 "TagA", // tag 臨時值 ("itmayiedu-6" + i).getBytes()// body 內容 ); //setKey,做唯一標識 msg.setKeys(System.currentTimeMillis() + ""); SendResult sendResult = producer.send(msg); System.out.println(sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); }
消費者:
//保存標識的集合 static private Map<String, String> logMap = new HashMap<>(); public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("itmayiedu-topic", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { String key = null; String msgId = null; try { for (MessageExt msg : msgs) { key = msg.getKeys(); //判斷集合當中有沒有存在key,存在就不需要重試,不存在先存key再回來重試后消費消息 if (logMap.containsKey(key)) { // 無需繼續重試。 System.out.println("key:"+key+",無需重試..."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } msgId = msg.getMsgId(); System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody())); //模擬異常 int i = 1 / 0; } } catch (Exception e) { e.printStackTrace(); //重試 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } finally { //保存key logMap.put(key, msgId); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); }
執行效果:

三.集群環境解決方案
在生產者端要保證冪等性的話,大概可以使用以下兩種方式:
① RocketMQ支持消息查詢的功能,只要去RocketMQ查詢一下是否已經發送過該條消息就可以了,不存在則發送,存在則不發送
② 引入redis,在發送消息到RocketMQ成功之后,向redis中插入一條數據,如果發生重試,則先去redis中查詢一下該條消息是否已經發送過了,存在的話就不重復發送消息了
生產者的這兩種冪等性方案都可以實現,但是都存在一定的缺陷
方案①,RocketMQ消息查詢的性能不是特別好,如果是在高並發的場景下,每條消息在發送到RocketMQ時都去查詢一下,可能會影響接口的性能
方案②,在一些極端的場景下,redis也無法保證消息發送成功之后,就一定能寫入redis成功,比如寫入消息成功而redis此時宕機,那么再次查詢redis判斷消息是否已經發送過,是無法得到正確結果的
既然在消費者做冪等性的方案都不是特別靠譜,那就再在消費者端來做吧
消息的消費,最后都對應的是數據庫的操作,只要在消息消費的時候,判斷一下數據庫中是否已經消費過了這條消息,就可以保證冪等性了,例如使用唯一索引,保證一條消息只被消費一次。
參考:https://blog.csdn.net/LO_YUN/article/details/104135197
去重原則:1.冪等性 2.業務去重
冪等性:(處理必須唯一) 無論這個業務請求被(consumer)執行多少次,我們的數據庫的結果都是唯一的,不可變的。
去重策略:去重表機制,業務拼接去重策略(比如唯一流水號)
1.建立一個消息表,拿到這個消息做數據庫的insert操作。給這個消息做一個唯一主鍵(primary key)或者唯一約束,那么就算出現重復消費的情況,就會導致主鍵沖突。
高並發下去重:采用Redis去重(key天然支持原子性並要求不可重復),但是由於不在一個事務,要求有適當的補償策略,但是對於很重要的業務,不應該支持補償
2.利用redis事務,主鍵(我們必須把全量的操作數據都存放在redis里,然后定時去和數據庫做數據同步)—-即消費處理后,該處理本來應該保存在數據庫的,先保存在redis,再通過一定業務方式從redis中取數據進行db持久化
3.利用redis和關系型數據庫一起做去重機制
4.拿到這個消息做redis的set的操作.redis就是天然冪等性
5.准備一個第三方介質,來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將 < id,message>以K-V形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。
消息重復消費是一個非常常見的問題,在很多系統調用頻繁的場景下,都可能會出現超時重試的情況,還有就是系統頻繁迭代,經常重啟系統更新的場景,也會出現消息重復消費
生產者端發送重復的消息到RocketMQ中其實問題不大,消息只是在RocketMQ中重復了,並沒有影響到系統的數據,我們只需要在最后修改數據庫的時候,保證好冪等性即可
