消息延時在日常隨處可見:
1、訂單創建10min之后不發起支付,自動取消。
2、30min定時推送一次郵件信息。
最常用到方式后台定時任務輪訓,量小的時候可以使用,量大會出現數據讀取會性能問題。RabbitMQ並沒有直接實現延時隊列,但是可以利用RabbitMQ兩個屬性實現延時隊列特性:
1、x-message-ttl:消息過期時間(Time To Live,TTL),超過過期時間之后即變為死信(Dead-letter),不會再被消費者消費。
設置TTL有兩種方式:
(1)創建隊列時指定x-message-ttl,此時整個隊列具有統一過期時間;
(2)發送消息為每個消息設置expiration,此時消息之間過期時間不同。
注意:如果兩者都設置,過期時間取兩者最小。
2、x-dead-letter-exchange(RabbitMQ文檔) :過期消息路由轉發(轉發器類型),當消息達到過期時間由該exchange按照配置的x-dead-letter-routing-key轉發到指定隊列,最后被消費者消費。
spring整合RabbitMQ :
RabbitMQ延時隊列邏輯:

1、exchange_delay_begin:緩沖隊列exchange交換器,用於將消息轉發至緩存消息隊列 queue_delay_begin 。
2、exchange_delay_done:死信(dead-letter)隊列exchange交換器,用於將隊列 queue_delay_begin 轉發到死信隊列。
3、queue_delay_begin:緩沖消息隊列,等待消息過期。
4、queue_delay_done:死信消息隊列,消費者能夠真正消費信息。
spring-rabbitmq.xml :
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
<!--配置connection-factory,指定連接rabbit server參數 -->
<rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="127.0.0.1" port="5672" publisher-confirms="true"/>
<!-- 延時隊列 -->
<rabbit:direct-exchange id="exchange_delay_begin" name="exchange_delay_begin" durable="false" auto-delete="false" >
<rabbit:bindings>
<rabbit:binding queue="queue_delay_begin" key="delay" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:queue name="queue_delay_begin" durable="false">
<rabbit:queue-arguments>
<!-- 隊列過期時間 -->
<entry key="x-message-ttl" value="30000" value-type="java.lang.Long" />
<entry key="x-dead-letter-exchange" value="exchange_delay_done" />
<entry key="x-dead-letter-routing-key" value="delay" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:direct-exchange id="exchange_delay_done" name="exchange_delay_done" durable="false" auto-delete="false" >
<rabbit:bindings>
<rabbit:binding queue="queue_delay_done" key="delay" />
<!-- binding key 相同為 【delay】exchange轉發消息到多個隊列 -->
<!--<rabbit:binding queue="queue_delay_done_two" key="delay" />-->
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:queue name="queue_delay_done" durable="false"/>
<rabbit:template id="delayMsgTwoTemplate" connection-factory="connectionFactory" />
<bean id="messageConsumer" class="com.nancy.rabbitmq.demo.MessageConsumer"></bean>
<!-- 消息接收者 -->
<rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false" >
<rabbit:listener queues="queue_delay_done" ref="messageConsumer" />
</rabbit:listener-container>
</beans>
DelayMessageProducer.java
@Service
public class DelayMessageProducer {
@Resource(name="delayMsgTwoTemplate")
private AmqpTemplate delayMsgTwoTemplate;
public void delayMsgTwo(String exchange, String routingKey, Object msg) {
delayMsgTwoTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(String.valueOf(10000));
return message;
}
});
}
}
MessageConsumer.java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class MessageConsumer implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("consumer receive message 22------->:{}"+ message);
}
}
application.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<import resource="spring-rabbitmq.xml" />
<!-- 掃描指定package注釋的注冊為Spring Beans -->
<context:component-scan base-package="com.nancy.rabbitmq" />
<!-- 激活annotation功能 -->
<context:annotation-config />
<!-- 激活annotation功能 -->
<context:spring-configured />
</beans>
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>learning-demo</artifactId>
<groupId>com.nancy</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>demo-jms</artifactId>
<packaging>jar</packaging>
<name>demo-jms</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>5.0.0.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.6</version>
</dependency>
<!--依賴包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<!--rabbit MQ -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.6.6.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.6.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${spring.version}</version>
<exclusions>
<exclusion>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<!--junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
</project>
DelayQueueTest.java
import com.alibaba.fastjson.JSONObject;
import com.nancy.rabbitmq.delay.DelayMessageProducer;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class DelayQueueTest {
private ApplicationContext context = null;
// @Ignore
@org.junit.Before
public void setUp() throws Exception {
context = new ClassPathXmlApplicationContext("rabbitmq/application.xml");
}
// @Ignore
@Test
public void delayQueueTest() throws Exception {
DelayMessageProducer messageProducer = context.getBean(DelayMessageProducer.class);
int a = 10;
while (a > 0) {
System.out.println("send "+ a);
messageProducer.delayMsgTwo("exchange_delay_begin","delay", "hello world delay2 :" + a--);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("sended ");
Thread.sleep(1000*60);
}
}
運行結果: 發送消息 10s之后, 消費監聽到消息 消費。
send 10
send 9
send 8
send 7
send 6
send 5
send 4
send 3
send 2
send 1
sended
consumer receive message 22------->:{}(Body:'hello world delay2 :10' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :9' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :8' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=3, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :7' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=4, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :6' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=5, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :5' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=6, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :4' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=7, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :3' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=8, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :2' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=9, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :1' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=10, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
參考資料:
DLX: https://www.rabbitmq.com/dlx.html
RabbitMQ如何實現延遲隊列?:https://blog.csdn.net/u013256816/article/details/55106401
消息隊列之 RabbitMQ :https://www.jianshu.com/p/79ca08116d57
使用RabbitMQ實現延遲任務 : https://www.cnblogs.com/haoxinyue/p/6613706.html
轉自:https://www.cnblogs.com/xiaoxing/p/9250823.html

