至少一次(At least Once)指每個消息必須投遞一次。Consumer先Pull消息到本地,消費完成后,才向服務器返回ack,如果沒有消費一定不會ack消息,所以RocketMQ可以很好的支持此特性。
生產者
在同步非順序投遞的時候,每次都是輪詢到不同的隊列:
Message message = new Message("topic_family", (" 同步發送 ").getBytes()); SendResult sendResult = producer.getProducer().send(message);
結果:
Product-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221C3BBE0000, offsetMsgId=C0A80A0B00002A9F00000000007B4A5C, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=1], queueOffset=11014] Product-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221C6A3D0002, offsetMsgId=C0A80A0B00002A9F00000000007B4BB0, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=3], queueOffset=11012] Product-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221CAEE20004, offsetMsgId=C0A80A0B00002A9F00000000007B4D04, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=2], queueOffset=11004] Product-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221CEA6D0006, offsetMsgId=C0A80A0B00002A9F00000000007B4E58, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=2], queueOffset=11005]
異步
寫入補償log,進行重發
@GetMapping("/async") private void async() throws Exception { //創建消息 Message message = null; for (int i=0;i<100;i++){ if (i==90) { new RuntimeException(""); } message = new Message("topic_family", ("異步發送:" + i).getBytes()); System.out.println("異步發送:"+ i); //異步發送消息 producer.getProducer().send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { //log.info("Product-異步發送-輸出信息={}", sendResult); System.out.println("Product-異步發送-輸出信息={}" + sendResult); } @Override public void onException(Throwable e) { //e.printStackTrace(); System.out.println("Product-異步發送-異常" + e.getMessage()); //寫入補償log,進行重發 } }); } }
重發帶來的重復消息問題-上半場冪等
package com.xin.rocketmq.demo.testrun; import com.xin.rocketmq.demo.config.JmsConfig; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class ProduceOnce { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.10.11:9876"); producer.start(); Message sendMessage = new Message( JmsConfig.TOPIC, "訂單001".getBytes()); sendMessage.setKeys("OD0000000001");//模擬同一個ID SendResult sendResult1 = producer.send(sendMessage); SendResult sendResult2 = producer.send(sendMessage); System.out.println("Product1-同步發送-Product信息={}" + sendResult1); System.out.println("Product2-同步發送-Product信息={}" + sendResult2); producer.shutdown(); } }
結果:
Product1-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC088C18B4AAC2224396E40000, offsetMsgId=C0A80A0B00002A9F00000000007DE870, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=0], queueOffset=11258]
Product2-同步發送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC088C18B4AAC2224396E40000, offsetMsgId=C0A80A0B00002A9F00000000007DE926, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=1], queueOffset=11262]
消費端
package com.xin.rocketmq.demo.testrun; import com.xin.rocketmq.demo.config.JmsConfig; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class ConsumerOnce { public static void main(String[] args) throws InterruptedException, MQClientException { // 實例化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // 設置NameServer的地址 consumer.setNamesrvAddr("192.168.10.11:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息 consumer.subscribe(JmsConfig.TOPIC, "*"); // 注冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
結果,消費了2次
ConsumeMessageThread_1 Receive New Messages: [properties={MIN_OFFSET=0, MAX_OFFSET=11260, KEYS=OD0000000001, CONSUME_START_TIME=1591515973997, UNIQ_KEY=A9FEC2CC1B4418B4AAC22248484E0000, WAIT=true}, body=[-24, -82, -94, -27, -115, -107, 48, 48, 49], transactionId='null'}]] ConsumeMessageThread_2 Receive New Messages: properties={MIN_OFFSET=0, MAX_OFFSET=11264, KEYS=OD0000000001, CONSUME_START_TIME=1591515974001, UNIQ_KEY=A9FEC2CC1B4418B4AAC22248484E0000, WAIT=true}, body=[-24, -82, -94, -27, -115, -107, 48, 48, 49], transactionId='null'}]]
業務實現消費回調的時候,當且僅當此回調函數返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
,RocketMQ才會認為這批消息(默認是1條)是消費完成的。(具體如何ACK見后面章節)
如果這時候消息消費失敗,例如數據庫異常,余額不足扣款失敗等一切業務認為消息需要重試的場景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER
,RocketMQ就會認為這批消息消費失敗了。
為了保證消息是肯定被至少消費成功一次,RocketMQ會把這批消息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(默認是10秒,業務可設置)后,再次投遞到這個ConsumerGroup。而如果一直這樣重復消費都持續失敗到一定次數(默認16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工干預。
注:
- 如果業務的回調沒有處理好而拋出異常,會認為是消費失敗當
ConsumeConcurrentlyStatus.RECONSUME_LATER
處理。 - 當使用順序消費的回調
MessageListenerOrderly
時,由於順序消費是要前者消費成功才能繼續消費,所以沒有RECONSUME_LATER
的這個狀態,只有SUSPEND_CURRENT_QUEUE_A_MOMENT
來暫停隊列的其余消費,直到原消息不斷重試成功為止才能繼續消費。
我們可以使用db的唯一鍵,或者緩存的唯一Id來記錄需要消費一次的id。
RocketMQ無法避免消息重復(Exactly-Once),所以如果業務對消費重復非常敏感,務必要在業務層面進行去重處理。可以借助關系數據庫進行去重。首先需要確定消息的唯一鍵,可以是msgId,也可以是消息內容中的唯一標識字段,例如訂單Id等。在消費之前判斷唯一鍵是否在關系數據庫中存在。如果不存在則插入,並消費,否則跳過。(實際過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵沖突,則插入失敗,直接跳過)
msgId一定是全局唯一標識符,但是實際使用中,可能會存在相同的消息有兩個不同msgId的情況(消費者主動重發、因客戶端重投機制導致的重復等),這種情況就需要使業務字段進行重復消費。
每個消息在業務層面的唯一標識碼要設置到keys字段,方便將來定位消息丟失問題。服務器會為每個消息創建索引(哈希索引),應用可以通過topic、key來查詢這條消息內容,以及消息被誰消費。由於是哈希索引,請務必保證key盡可能唯一,這樣可以避免潛在的哈希沖突。
// 訂單Id String orderId = "20034568923546"; message.setKeys(orderId);
模擬代碼:
package com.xin.rocketmq.demo.testrun; import com.xin.rocketmq.demo.config.JmsConfig; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.ArrayList; import java.util.List; public class ConsumerOnce { public static void main(String[] args) throws InterruptedException, MQClientException { List<String> redisKeyList = new ArrayList<String>(); // 實例化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // 設置NameServer的地址 consumer.setNamesrvAddr("192.168.10.11:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息 consumer.subscribe(JmsConfig.TOPIC, "*"); // 注冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { // 默認msgs只有一條消息 String repeatID = ""; for (MessageExt msg : msgs) { String key = msg.getKeys(); return noRepeat(key); //return noRepeatConsume(repeatID,key); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } private ConsumeConcurrentlyStatus noRepeat(String key) { for (String item : redisKeyList){ System.out.println("Redis 緩存中的keys:" + item); } if (!redisKeyList.contains(key)) { redisKeyList.add(key); System.out.println("Redis redisKeyList.size():" + redisKeyList.size()); System.out.println("Redis 緩存插入:" + key); System.out.printf("%s Receive Messages: %s %n", Thread.currentThread().getName(), key); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } else{ System.out.printf("%s 重復Messages: %s %n", Thread.currentThread().getName(), key); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); // 啟動消費者實例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
對於並發的消費監聽器,你可以返回 RECONSUME_LATER 來通知消費者現在不能消費這條消息,並且希望可以稍后重新消費它。然后,你可以繼續消費其他消息。對於有序的消息監聽器,因為你關心它的順序,所以不能跳過消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告訴消費者等待片刻。
結果沒有重復消費了
Redis redisKeyList.size():1
Redis 緩存插入:OD0000000001
ConsumeMessageThread_2 Receive Messages: OD0000000001
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_1 重復Messages: OD0000000001
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_3 重復Messages: OD0000000001
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_4 重復Messages: OD0000000001
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_5 重復Messages: OD0000000001
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_6 重復Messages: OD0000000001
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_7 重復Messages: OD0000000001
Redis 緩存中的keys:OD0000000001
ConsumeMessageThread_8 重復Messages: OD0000000001
結果是不斷地重復嘗試消費,該怎么處理?可以刪除重復的記錄。
消費端常見的冪等操作總結
- 業務操作之前進行狀態查詢 消費端開始執行業務操作時,通過冪等id首先進行業務狀態的查詢,如:修改訂單狀態環節,當訂單狀態為成功/失敗則不需要再進行處理。那么我們只需要在消費邏輯執行之前通過訂單號進行訂單狀態查詢,一旦獲取到確定的訂單狀態則對消息進行提交,通知broker消息狀態為:ConsumeConcurrentlyStatus.CONSUME_SUCCESS 。
- 業務操作前進行數據的檢索 邏輯和第一點相似,即消費之前進行數據的檢索,如果能夠通過業務唯一id查詢到對應的數據則不需要進行再后續的業務邏輯。如:下單環節中,在消費者執行異步下單之前首先通過訂單號查詢訂單是否已經存在,這里可以查庫也可以查緩存。如果存在則直接返回消費成功,否則進行下單操作。
- 唯一性約束保證最后一道防線 上述第二點操作並不能保證一定不出現重復的數據,如:並發插入的場景下,如果沒有樂觀鎖、分布式鎖作為保證的前提下,很有可能出現數據的重復插入操作,因此我們務必要對冪等id添加唯一性索引,這樣就能夠保證在並發場景下也能保證數據的唯一性。
- 引入鎖機制 上述的第一點中,如果是並發更新的情況,沒有使用悲觀鎖、樂觀鎖、分布式鎖等機制的前提下,進行更新,很可能會出現多次更新導致狀態的不准確。如:對訂單狀態的更新,業務要求訂單只能從初始化->處理中,處理中->成功,處理中->失敗,不允許跨狀態更新。如果沒有鎖機制,很可能會將初始化的訂單更新為成功,成功訂單更新為失敗等異常的情況。 高並發下,建議通過狀態機的方式定義好業務狀態的變遷,通過樂觀鎖、分布式鎖機制保證多次更新的結果是確定的,悲觀鎖在並發環境不利於業務吞吐量的提高因此不建議使用。
- 消息記錄表 這種方案和業務層做的冪等操作類似,由於我們的消息id是唯一的,可以借助該id進行消息的去重操作,間接實現消費的冪等。
首先准備一個消息記錄表,在消費成功的同時插入一條已經處理成功的消息id記錄到該表中,注意一定要 與業務操作處於同一個事物 中,當新的消息到達的時候,根據新消息的id在該表中查詢是否已經存在該id,如果存在則表明消息已經被消費過,那么丟棄該消息不再進行業務操作即可。
肯定還有更多的場景我沒有涉及到,這里說到的操作均是互相之間有關聯的,將他們配合使用更能夠保證消費業務的冪等性。
不論怎樣,請牢記:緩存是不可靠的,在享受異步化、削峰、消息堆積等的好處之外,增加了業務復雜性,需要謹慎處理冪等操作 。