1 啟動消費者等待傳入的訂閱消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // Instantiate message consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); // Subscribe topics consumer.subscribe("TestTopic", "*"); // Register message listener consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // Launch consumer consumer.start(); } }
2 發送延遲消息
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); // Launch producer producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); // Send the message producer.send(message); } // Shutdown producer after use. producer.shutdown(); } }
3 確認
您應該會看到消息在其存儲時間后大約 10 秒被消耗。
4 延遲消息的使用場景
例如在電子商務中,如果提交訂單,可以發送延遲消息,1小時后可以查看訂單狀態。 如果訂單仍未付款,則可以取消訂單並釋放庫存。
5 使用延遲消息的限制
// org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
當前 RocketMQ 不支持任意時間的延遲。 生產者發送延遲消息前需要設置幾個固定的延遲級別,分別對應1s到2h的1到18個延遲級,消息消費失敗會進入延遲消息隊列,消息發送時間與設置的延遲級別和重試次數有關。
See SendMessageProcessor.java