rocketmq之延遲隊列(按照18個等級來發送)


 

1 啟動消費者等待傳入的訂閱消息

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List;  public class ScheduledMessageConsumer {   public static void main(String[] args) throws Exception {  // Instantiate message consumer  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");  // Subscribe topics  consumer.subscribe("TestTopic", "*");  // Register message listener  consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {  for (MessageExt message : messages) {  // Print approximate delay time period  System.out.println("Receive message[msgId=" + message.getMsgId() + "] "  + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");  }  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }  });  // Launch consumer  consumer.start();  } }
 

2 發送延遲消息

import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message;  public class ScheduledMessageProducer {   public static void main(String[] args) throws Exception {  // Instantiate a producer to send scheduled messages  DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");  // Launch producer  producer.start();  int totalMessagesToSend = 100;  for (int i = 0; i < totalMessagesToSend; i++) {  Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());  // This message will be delivered to consumer 10 seconds later.  message.setDelayTimeLevel(3);  // Send the message  producer.send(message);  }   // Shutdown producer after use.  producer.shutdown();  }  }
 

3 確認

您應該會看到消息在其存儲時間后大約 10 秒被消耗。

4 延遲消息的使用場景

例如在電子商務中,如果提交訂單,可以發送延遲消息,1小時后可以查看訂單狀態。 如果訂單仍未付款,則可以取消訂單並釋放庫存。

5 使用延遲消息的限制

// org/apache/rocketmq/store/config/MessageStoreConfig.java  private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
 

當前 RocketMQ 不支持任意時間的延遲。 生產者發送延遲消息前需要設置幾個固定的延遲級別,分別對應1s到2h的1到18個延遲級,消息消費失敗會進入延遲消息隊列,消息發送時間與設置的延遲級別和重試次數有關。

See SendMessageProcessor.java

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM