消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了三條消息分別是訂單創建、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以並行消費的。RocketMQ可以嚴格的保證消息有序。
順序消息分為全局順序消息與分區順序消息,全局順序是指某個Topic下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可。
- 全局順序 對於指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發布和消費。 適用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息發布和消費的場景
- 分區順序 對於指定的一個 Topic,所有消息根據 sharding key 進行區塊分區。 同一個分區內的消息按照嚴格的 FIFO 順序進行發布和消費。 Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。 適用場景:性能要求高,以 sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息發布和消費的場景。
RocketMQ可以嚴格的保證消息有序。但這個順序,不是全局順序,只是分區(queue)順序。要全局順序只能一個分區。
非順序的原因,是因為發送消息的時候,消息發送默認是會采用輪詢的方式發送到不通的queue(分區)。而消費端消費的時候,是會分配到多個queue的,多個queue是同時拉取提交消費。如果是輪詢的生產者,加上同時的多線程讀取,自然不能保證順序。
如果把一個分區順序投放在同一個分區里面,RocketMQ的確是能保證FIFO的。那么要做到順序消息,應該怎么實現呢——把消息確保投遞到同一條queue。
比如一個訂單,根據同一個id,把創建-扣款(買家)-付款(給商家)的三條消息,按順序推入一個隊列,那消費的時候也是順序執行的。
關鍵代碼,按訂單ID放入同一分區:
producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; //根據訂單id選擇發送queue long index = id % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId());//訂單id
完整的生產者
package com.xin.rocketmq.demo.testrun; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; /** * Producer,發送順序消息 */ public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.10.11:9876"); producer.start(); String[] tags = new String[]{"TagA", "TagC", "TagD"}; // 訂單列表 List<OrderStep> orderList = new Producer().buildOrders(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); for (int i = 0; i < 10; i++) { // 加個時間前綴 String body = dateStr + " Hello RocketMQ " + orderList.get(i); Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; //根據訂單id選擇發送queue long index = id % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId());//訂單id System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); } /** * 訂單的步驟 */ private static class OrderStep { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } } /** * 生成模擬訂單數據 */ private List<OrderStep> buildOrders() { List<OrderStep> orderList = new ArrayList<OrderStep>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } }
結果把同一個id放在一個分區了
SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='創建'} SendResult status:SEND_OK, queueId:1, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='創建'} SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='付款'} SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='創建'} SendResult status:SEND_OK, queueId:1, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='付款'} SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='付款'} SendResult status:SEND_OK, queueId:1, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='完成'} SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='推送'} SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='完成'} SendResult status:SEND_OK, queueId:3, body:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='完成'}
消費者
package com.xin.rocketmq.demo.testrun; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; /** * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交) */ public class ConsumerInOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setNamesrvAddr("192.168.10.11:9876"); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { // 可以看到每個queue有唯一的consume線程來消費, 訂單對每個queue(分區)有序 System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { //模擬業務邏輯處理中... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
消費也是分區的順序消費
consumeThread=ConsumeMessageThread_1queueId=1, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='創建'} consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='創建'} consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='付款'} consumeThread=ConsumeMessageThread_1queueId=1, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='付款'} consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='創建'} consumeThread=ConsumeMessageThread_1queueId=1, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111065, desc='完成'} consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='付款'} consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='推送'} consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103117235, desc='完成'} consumeThread=ConsumeMessageThread_2queueId=3, content:2020-06-07 14:28:49 Hello RocketMQ OrderStep{orderId=15103111039, desc='完成'}
消費者將鎖定每個消息隊列,以確保他們被逐個消費,雖然這將會導致性能下降,但是當你關心消息順序的時候會很有用。我們不建議拋出異常,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作為替代。