RocketMQ 不支持任意時間自定義的延遲消息,僅支持內置預設值的延遲時間間隔的延遲消息。
預設值的延遲時間間隔為:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
延時消息的使用場景
比如電商里,提交了一個訂單就可以發送一個延時消息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
生產
package com.xin.rocketmq.demo.testrun; import com.xin.rocketmq.demo.config.JmsConfig; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class ProducerDelay { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.10.11:9876"); producer.start(); Message msg1 = new Message( JmsConfig.TOPIC, "訂單001".getBytes()); msg1.setDelayTimeLevel(2);//延遲5秒 Message msg2 = new Message( JmsConfig.TOPIC, "訂單001".getBytes()); msg2.setDelayTimeLevel(4);//延遲30秒 SendResult sendResult1 = producer.send(msg1); SendResult sendResult2 = producer.send(msg2); System.out.println("Product1-同步發送-Product信息={}" + sendResult1); System.out.println("Product2-同步發送-Product信息={}" + sendResult2); producer.shutdown(); } }
消費
package com.xin.rocketmq.demo.testrun; import com.xin.rocketmq.demo.config.JmsConfig; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class ConsumerDelay { public static void main(String[] args) throws Exception { // 實例化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // 設置NameServer的地址 consumer.setNamesrvAddr("192.168.10.11:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息 consumer.subscribe(JmsConfig.TOPIC, "*"); // 注冊消息監聽者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者 consumer.start(); } }