RabbitMQ 延時消息隊列


一、簡述

二、示例demo

 

、簡述

延時消息在日常隨處可見:

1、訂單創建10min之后不發起支付,自動取消。

2、30min定時推送一次郵件信息。

最常用到方式為定時任務輪訓,數據量小的時候使用沒什么問題 而當有千萬甚至上億的數據量時就會出現數據讀取的瓶頸,此時全表掃面進行處理一定是下下策。但是也有比較討巧的方式,分享公司內部訂單拆分的例子:

由於線上每天訂單量50萬+的增長量,單表早已無法吃撐這個增長的速度。采取的方式為訂單歸檔:線上熱數據保留2-3天的數據,其余都歸檔進入歷史訂單表中,這樣熱數據在200萬以內。
訂單超過10min不支付即取消的功能,可以采取簡單的掃表形式而不會出現數據讀取性能的問題。

 這樣的方式很簡單,但需要跟業務進行溝通妥協,本文會講另一種方式即RabbitMQ延遲隊列。RabbitMQ實際並沒有直接實現延時隊列,但可利用RabbitMQ提供的屬性來模擬延時隊列,甚至已經有的配套的插件rabbitmq_delayed_message_exchange 下面先介紹使用到的RabbitMQ的屬性。

1、消息的Time To Live (TTL) 

x-message-ttl:消息過期時間,超過過期時間之后即變為死信(Dead-letter)不會再被消費者消費。

設置消息TTL有兩種方式:

  • 創建隊列時指定x-message-ttl,此時隊列所有的消息具有統一過期時間。
  • 發送消息為每個消息設置 expiration,此時消息之間過期時間不同。 

如果兩者都設置,過期時間取兩者最小。如果設置TTL為0即表示除非立馬能發送到隊列,否則直接丟棄該消息。利用TTL為0的特性再結合死信轉發器可以替代RabbitMQ 3.0的immediate參數。

2、隊列的TTL

x-expires: RabbitMQ會確保時間達到后將隊列刪除,但是並不保障這個動作有多及時。隊列過期代表着處於未使用狀態,即

  • 隊列無任何消費者
  • 隊列沒有被重新聲明
  • 隊列在過期未調用Basic.Get命令獲取消息

3、x-dead-letter-exchange(RabbitMQ文檔):死信轉發器(轉發器類型)當消息達到過期時間未被消費則會由該exchange按照配置的x-dead-letter-routing-key轉發到指定隊列,最后被消費者消費,如果未配置x-dead-letter-routing-key則會按照原隊列的key進行轉發。

4、隊列的消息在以下幾種情況會變成死信(Dead-letter)

  • 設置的x-message-ttl或者expiration到期,即消息過期
  • 消息被消費者拒絕(調用Basic.Reject / Basic.Nack)且 requeue參數設置為false
  • 隊列達到最大長度

 

二、示例demo

  • 單個延遲隊列

RabbitMQ延時隊列邏輯:

 

 

  

1、exchange_delay_begin:緩沖隊列exchange交換器,用於將消息轉發至緩存消息隊列 queue_delay_begin 。

2、exchange_delay_done:死信(dead-letter)隊列exchange交換器,用於將隊列 queue_delay_begin 轉發到死信隊列。

3、queue_delay_begin:緩沖消息隊列,等待消息過期。

4、queue_delay_done:死信消息隊列,消費者能夠真正消費信息。

 spring-rabbitmq.xml :

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd"> <!--配置connection-factory,指定連接rabbit server參數 --> <rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="127.0.0.1" port="5672" publisher-confirms="true"/> <!-- 延時隊列 --> <rabbit:direct-exchange id="exchange_delay_begin" name="exchange_delay_begin" durable="false" auto-delete="false" > <rabbit:bindings> <rabbit:binding queue="queue_delay_begin" key="delay" /> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:queue name="queue_delay_begin" durable="false"> <rabbit:queue-arguments> <!-- 隊列過期時間 --> <entry key="x-message-ttl" value="30000" value-type="java.lang.Long" /> <entry key="x-dead-letter-exchange" value="exchange_delay_done" /> <entry key="x-dead-letter-routing-key" value="delay" /> </rabbit:queue-arguments> </rabbit:queue> <rabbit:direct-exchange id="exchange_delay_done" name="exchange_delay_done" durable="false" auto-delete="false" > <rabbit:bindings> <rabbit:binding queue="queue_delay_done" key="delay" /> <!-- binding key 相同為 【delay】exchange轉發消息到多個隊列 --> <!--<rabbit:binding queue="queue_delay_done_two" key="delay" />--> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:queue name="queue_delay_done" durable="false"/> <rabbit:template id="delayMsgTwoTemplate" connection-factory="connectionFactory" /> <bean id="messageConsumer" class="com.nancy.rabbitmq.demo.MessageConsumer"></bean> <!-- 消息接收者 --> <rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false" > <rabbit:listener queues="queue_delay_done" ref="messageConsumer" /> </rabbit:listener-container> </beans>

 DelayMessageProducer.java

@Service
public class DelayMessageProducer { @Resource(name="delayMsgTwoTemplate") private AmqpTemplate delayMsgTwoTemplate; public void delayMsgTwo(String exchange, String routingKey, Object msg) { delayMsgTwoTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(String.valueOf(10000)); return message; } }); } }

 MessageConsumer.java

public class MessageConsumer implements MessageListener { @Override public void onMessage(Message message) { System.out.println("consumer receive message 22------->:{}"+ message); } }

 application.xml 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
    <import resource="spring-rabbitmq.xml" />
    <!-- 掃描指定package注釋的注冊為Spring Beans -->
    <context:component-scan base-package="com.nancy.rabbitmq" />
    <!-- 激活annotation功能 -->
    <context:annotation-config />
    <!-- 激活annotation功能 -->
    <context:spring-configured />
</beans>

DelayQueueTest.java

public class DelayQueueTest {
    private ApplicationContext context = null;
    @org.junit.Before
    public void setUp() throws Exception {
        context = new ClassPathXmlApplicationContext("rabbitmq/application.xml");
    }
    @Test
    public void delayQueueTest() throws Exception {
        DelayMessageProducer messageProducer = context.getBean(DelayMessageProducer.class);
        int a = 10;
        while (a > 0) {
            System.out.println("send "+ a);
            messageProducer.delayMsgTwo("exchange_delay_begin","delay", "hello world delay2 :" + a--);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("sended ");
        Thread.sleep(1000*60);
    }
}

 運行結果: 發送消息 10s之后, 消費監聽到消息 消費。

send 10
send 9
send 8
send 7
send 6
send 5
send 4
send 3
send 2
send 1
sended 
consumer receive message 22------->:{}(Body:'hello world delay2 :10' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :9' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :8' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=3, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :7' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=4, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :6' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=5, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :5' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=6, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :4' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=7, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :3' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=8, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :2' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=9, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :1' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=10, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
View Code

  

  • 多個延遲隊列

實際的業務需求中會出現不同的時間延遲,此時可設置多個隊列以達到不同的延遲效果。例如5個隊列 common-queue_5s、common-queue_15s、common-queue_30s、common-queue_45s、common-queue_50s達到不同的延遲效果,整體的結構如下:

 

 上述bindingKey的值有所簡化,但對路由結構圖無影響。這里使用一個死信轉發器(轉發器類型)通過綁定不同的key路由到不同的死信隊列。也可以死信轉發器和死信隊列一對一綁定 即 成對出現(dlx_exchange_5s 和 dead-letter-queue_5s)

這里貼出部分xml部分配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

    <rabbit:connection-factory
            id="connectionFactory"
            host="${rabbit.host}"
            port="${rabbit.port}"
            username="${rabbit.username}"
            password="${rabbit.password}"
            publisher-confirms="true"
    />

    <rabbit:admin connection-factory="connectionFactory" ignore-declaration-exceptions="true" />

    <!-- 正常隊列 -->
    <!-- 5s過期 -->
    <rabbit:queue name="common-queue_5s">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl" value="5000" value-type="java.lang.Long" />
            <entry key="x-dead-letter-exchange" value="dlx-exchange" />
            <entry key="x-dead-letter-routing-key" value="dead-letter-queue_5s" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <!-- 15s過期 -->
    <rabbit:queue name="common-queue_15s">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl" value="15000" value-type="java.lang.Long" />
            <entry key="x-dead-letter-exchange" value="dlx-exchange" />
            <entry key="x-dead-letter-routing-key" value="dead-letter-queue_15s" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <!-- 30s過期 -->
    <rabbit:queue name="common-queue_30s">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl" value="30000" value-type="java.lang.Long" />
            <entry key="x-dead-letter-exchange" value="dlx-exchange" />
            <entry key="x-dead-letter-routing-key" value="dead-letter-queue_30s" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <!-- 45s過期 -->
    <rabbit:queue name="common-queue_45s">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl" value="45000" value-type="java.lang.Long" />
            <entry key="x-dead-letter-exchange" value="dlx-exchange" />
            <entry key="x-dead-letter-routing-key" value="dead-letter-queue_45s" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <!-- 50s過期 -->
    <rabbit:queue name="common-queue_50s">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl" value="50000" value-type="java.lang.Long" />
            <entry key="x-dead-letter-exchange" value="dlx-exchange" />
            <entry key="x-dead-letter-routing-key" value="dead-letter-queue_50s" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <!-- 正常路由 -->
    <rabbit:direct-exchange name="common-exchange" durable="false" id="common-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="common-queue_5s" />
            <rabbit:binding queue="common-queue_15s" />
            <rabbit:binding queue="common-queue_30s" />
            <rabbit:binding queue="common-queue_45s" />
            <rabbit:binding queue="common-queue_50s" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 死信隊列 -->
    <rabbit:queue name="dead-letter-queue_5s" />
    <rabbit:queue name="dead-letter-queue_15s" />
    <rabbit:queue name="dead-letter-queue_30s" />
    <rabbit:queue name="dead-letter-queue_45s" />
    <rabbit:queue name="dead-letter-queue_50s" />
    <rabbit:direct-exchange name="dlx-exchange" durable="false" id="dlx-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="dead-letter-queue_5s" />
            <rabbit:binding queue="dead-letter-queue_15s" />
            <rabbit:binding queue="dead-letter-queue_30s" />
            <rabbit:binding queue="dead-letter-queue_45s" />
            <rabbit:binding queue="dead-letter-queue_50s" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
    
    <!-- 配置consumer, 監聽的類和queue的對應關系 -->
    <rabbit:listener-container
        connection-factory="connectionFactory" acknowledge="manual" >
        <rabbit:listener queues="dead-letter-queue_5s" ref="receiveConfirmTestListener" />
        <rabbit:listener queues="dead-letter-queue_15s" ref="receiveConfirmTestListener" />
        <rabbit:listener queues="dead-letter-queue_30s" ref="receiveConfirmTestListener" />
        <rabbit:listener queues="dead-letter-queue_45s" ref="receiveConfirmTestListener" />
        <rabbit:listener queues="dead-letter-queue_50s" ref="receiveConfirmTestListener" />
    </rabbit:listener-container>

</beans>
View Code

 junit測試:

@RunWith(SpringJUnit4ClassRunner.class)  
@ContextConfiguration(locations = {"classpath:application-context.xml"})  
public class TestDeadLetter {  
    @Autowired  
    private DeadLetterPublishService publishService;  

    @Test
    public void testALL() throws InterruptedException{
        String message = "currentTime:" + System.currentTimeMillis();
        System.out.println("test1---message: "+ message);
        publishService.send("common-exchange","common-queue_5s", message);
        publishService.send("common-exchange","common-queue_15s", message);
        publishService.send("common-exchange","common-queue_30s", message);
        publishService.send("common-exchange","common-queue_45s", message);
        publishService.send("common-exchange","common-queue_50s", message);
        Thread.sleep(100000);
    }

} 
TestDeadLetter.java

 最后運行結果:消息實際發送時間點 和 消息被延遲消費時間點無限接近 五個消息分別延遲大約 5s 15s 30s 45s 50s  但做不到精確一致。

test1---message: currentTime:1566920053524
// 。。。。
1566920058551 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:18 CST 2019, routing-keys=[common-queue_5s], queue=common-queue_5s}], x-first-death-reason=expired, x-first-death-queue=common-queue_5s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_5s, deliveryTag=1, messageCount=0]:currentTime:1566920053524
1566920068578 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:28 CST 2019, routing-keys=[common-queue_15s], queue=common-queue_15s}], x-first-death-reason=expired, x-first-death-queue=common-queue_15s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_15s, deliveryTag=1, messageCount=0]:currentTime:1566920053524
1566920083550 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:43 CST 2019, routing-keys=[common-queue_30s], queue=common-queue_30s}], x-first-death-reason=expired, x-first-death-queue=common-queue_30s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_30s, deliveryTag=1, messageCount=0]:currentTime:1566920053524
1566920098549 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:58 CST 2019, routing-keys=[common-queue_45s], queue=common-queue_45s}], x-first-death-reason=expired, x-first-death-queue=common-queue_45s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_45s, deliveryTag=1, messageCount=0]:currentTime:1566920053524
1566920103551 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:35:03 CST 2019, routing-keys=[common-queue_50s], queue=common-queue_50s}], x-first-death-reason=expired, x-first-death-queue=common-queue_50s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_50s, deliveryTag=1, messageCount=0]:currentTime:1566920053524

 

  

參考資料:

DLX: https://www.rabbitmq.com/dlx.html

RabbitMQ如何實現延遲隊列?:https://blog.csdn.net/u013256816/article/details/55106401

消息隊列之 RabbitMQ :https://www.jianshu.com/p/79ca08116d57

使用RabbitMQ實現延遲任務 : https://www.cnblogs.com/haoxinyue/p/6613706.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM