RocketMQ定時(延遲)消息


RocketMQ 不支持任意時間自定義的延遲消息,僅支持內置預設值的延遲時間間隔的延遲消息。

預設值的延遲時間間隔為:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

延時消息的使用場景

比如電商里,提交了一個訂單就可以發送一個延時消息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。

生產

package com.xin.rocketmq.demo.testrun;

import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class ProducerDelay {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.setNamesrvAddr("192.168.10.11:9876");

        producer.start();
        Message msg1 = new Message(
                JmsConfig.TOPIC,
                "訂單001".getBytes());

        msg1.setDelayTimeLevel(2);//延遲5秒

        Message msg2 = new Message(
                JmsConfig.TOPIC,
                "訂單001".getBytes());

        msg2.setDelayTimeLevel(4);//延遲30秒

        SendResult sendResult1 = producer.send(msg1);
        SendResult sendResult2 = producer.send(msg2);
        System.out.println("Product1-同步發送-Product信息={}" + sendResult1);
        System.out.println("Product2-同步發送-Product信息={}" + sendResult2);
        producer.shutdown();
    }
}

消費

package com.xin.rocketmq.demo.testrun;

import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerDelay {

    public static void main(String[] args) throws Exception {
        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 設置NameServer的地址
        consumer.setNamesrvAddr("192.168.10.11:9876");

        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe(JmsConfig.TOPIC, "*");
        // 注冊消息監聽者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者
        consumer.start();
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM