rocketmq的順序消息需要滿足2點:
1.Producer端保證發送消息有序,且發送到同一個隊列。
2.consumer端保證消費同一個隊列。
生產端:
RocketMQ可以嚴格的保證消息有序。但這個順序,不是全局順序,只是分區(queue)順序。要全局順序只能一個分區。
但是同一條queue里面,RocketMQ的確是能保證FIFO的
確保消息放到同一個queue中,需要使用 MessageQueueSelector
列如:
String body = dateStr + " Hello RocketMQ " + orderList.get(i); Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes()); //確保同一個訂單號的數據放到同一個queue中 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; long index = id % mqs.size(); return mqs.get((int)index); } }, orderList.get(i).getOrderId());//訂單id
消費端:
需要使用 MessageListenerOrderly 來消費數據。
MessageListenerOrderly與MessageListenerConcurrently區別
MessageListenerOrderly:有序消費,同一隊列的消息同一時刻只能一個線程消費,可保證消息在同一隊列嚴格有序消費
MessageListenerConcurrently:並發消費
public class ConsumerInOrder { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876"); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.print(Thread.currentThread().getName() + " Receive New Messages: " ); for (MessageExt msg: msgs) { System.out.println(msg + ", content:" + new String(msg.getBody())); } try { //模擬業務邏輯處理中... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
參考文檔
https://blog.csdn.net/earthhour/article/details/78323026