rocketmq之延迟队列(按照18个等级来发送)


 

1 启动消费者等待传入的订阅消息

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List;  public class ScheduledMessageConsumer {   public static void main(String[] args) throws Exception {  // Instantiate message consumer  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");  // Subscribe topics  consumer.subscribe("TestTopic", "*");  // Register message listener  consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {  for (MessageExt message : messages) {  // Print approximate delay time period  System.out.println("Receive message[msgId=" + message.getMsgId() + "] "  + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");  }  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }  });  // Launch consumer  consumer.start();  } }
 

2 发送延迟消息

import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message;  public class ScheduledMessageProducer {   public static void main(String[] args) throws Exception {  // Instantiate a producer to send scheduled messages  DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");  // Launch producer  producer.start();  int totalMessagesToSend = 100;  for (int i = 0; i < totalMessagesToSend; i++) {  Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());  // This message will be delivered to consumer 10 seconds later.  message.setDelayTimeLevel(3);  // Send the message  producer.send(message);  }   // Shutdown producer after use.  producer.shutdown();  }  }
 

3 确认

您应该会看到消息在其存储时间后大约 10 秒被消耗。

4 延迟消息的使用场景

例如在电子商务中,如果提交订单,可以发送延迟消息,1小时后可以查看订单状态。 如果订单仍未付款,则可以取消订单并释放库存。

5 使用延迟消息的限制

// org/apache/rocketmq/store/config/MessageStoreConfig.java  private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
 

当前 RocketMQ 不支持任意时间的延迟。 生产者发送延迟消息前需要设置几个固定的延迟级别,分别对应1s到2h的1到18个延迟级,消息消费失败会进入延迟消息队列,消息发送时间与设置的延迟级别和重试次数有关。

See SendMessageProcessor.java

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM