1. 場景:“訂單下單成功后,15分鍾未支付自動取消”
1.傳統處理超時訂單
采取定時任務輪訓數據庫訂單,並且批量處理。其弊端也是顯而易見的;對服務器、數據庫性會有很大的要求,
並且當處理大量訂單起來會很力不從心,而且實時性也不是特別好。當然傳統的手法還可以再優化一下,
即存入訂單的時候就算出訂單的過期時間插入數據庫,設置定時任務查詢數據庫的時候就只需要查詢過期了的訂單,
然后再做其他的業務操作
2.rabbitMQ延時隊列方案
一台普通的rabbitmq服務器單隊列容納千萬級別的消息還是沒什么壓力的,而且rabbitmq集群擴展支持的也是非常好的,
並且隊列中的消息是可以進行持久化,即使我們重啟或者宕機也能保證數據不丟失
2. TTL和DLX
rabbitMQ中是沒有延時隊列的,也沒有屬性可以設置,只能通過死信交換器(DLX)和設置過期時間(TTL)結合起來實現延遲隊列
1.TTL
TTL是Time To Live的縮寫, 也就是生存時間。
RabbitMq支持對消息和隊列設置TTL,對消息這設置是在發送的時候指定,對隊列設置是從消息入隊列開始計算, 只要超過了隊列的超時時間配置, 那么消息會自動清除。
如果兩種方式一起使用消息對TTL和隊列的TTL之間較小的為准,也就是消息5s過期,隊列是10s,那么5s的生效。
默認是沒有過期時間的,表示消息沒有過期時間;如果設置為0,表示消息在投遞到消費者的時候直接被消息,否則丟棄。
設置消息的過期時間用 x-message-ttl 參數實現,單位毫秒。
設置隊列的過期時間用 x-expires 參數,單位毫秒,注意,不能設置為0。
2.DLX和死信隊列
DLX即Dead-Letter-Exchange(死信交換機),它其實就是一個正常的交換機,能夠與任何隊列綁定。
死信隊列是指隊列(正常)上的消息(過期)變成死信后,能夠后發送到另外一個交換機(DLX),然后被路由到一個隊列上,
這個隊列,就是死信隊列
成為死信一般有以下幾種情況:
消息被拒絕(basic.reject or basic.nack)且帶requeue=false參數
消息的TTL-存活時間已經過期
隊列長度限制被超越(隊列滿)
注1:如果隊列上存在死信, RabbitMq會將死信消息投遞到設置的DLX上去 ,
注2:通過在隊列里設置x-dead-letter-exchange參數來聲明DLX,如果當前DLX是direct類型還要聲明
x-dead-letter-routing-key參數來指定路由鍵,如果沒有指定,則使用原隊列的路由鍵
3. 延遲隊列
通過DLX和TTL模擬出延遲隊列的功能,即,消息發送以后,不讓消費者拿到,而是等待過期時間,變成死信后,發送給死信交換機再路由到死信隊列進行消費
注1:延遲隊列(即死信隊列)產生流程見“images/01 死信隊列產生流程.png”

創建主模塊,及子模塊

2.主模塊
<!-- 1.packaging模式改為pom -->
<packaging>pom</packaging>

<!-- 2.添加子模塊 -->
<modules>
<module>rabbitmq-provider</module>
<module>rabbitmq-consumer</module>
<module>common-vo</module>
</modules>

在主模塊的POM的<dependencies>中添加公共子模塊common

1.生產者創建一個正常消息,並添加消息過期時間/死信交換機/死信路由鍵這3個參數
關鍵代碼如下
package com.hmc.rabbitmqprovider.rabbitmq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.xml.ws.WebEndpoint; import java.util.HashMap; import java.util.Map; /** * @author胡明財 * @site www.xiaomage.com * @company xxx公司 * @create 2019-12-23 23:39 */ @Configuration public class RabbitQueueConfig { //定義隊列,交換機,路由鍵(正常) public static final String NORMAL_QUEUE="normar-queue"; public static final String NORMAL_EXCHANGE="normar-exchange"; public static final String NORMAL_ROUTINGKEY="normar-routingkey"; //定義隊列,交換機,路由鍵(死信) public static final String DELAY_QUEUE="delay-queue"; public static final String DELAY_EXCHANGE="delay-exchange"; public static final String DELAY_ROUTINGKEY="delay-routingkey"; @Bean public Queue norma1Queue(){ Map<String,Object> map=new HashMap<>(); map.put("x-message-ttl", 15000);//message在該隊列queue的存活時間最大為10秒 map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange參數是設置該隊列的死信交換器(DLX) map.put("x-dead-letter-routing-key", DELAY_ROUTINGKEY);//x-dead-letter-routing-key參數是給這個DLX指定路由鍵 return new Queue(NORMAL_QUEUE,true,false,false,map); } //直連交換機 @Bean public DirectExchange normalExchange(){ return new DirectExchange(NORMAL_EXCHANGE,true,false); } @Bean public Binding normalBinding(){ return BindingBuilder.bind(norma1Queue()).to(normalExchange()) .with(NORMAL_ROUTINGKEY); } //死信 @Bean public Queue delaQueue(){ return new Queue(DELAY_QUEUE,true); } @Bean public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE,true,false); } @Bean public Binding delayBinding(){ return BindingBuilder.bind(delaQueue()).to(delayExchange()) .with(DELAY_ROUTINGKEY); } }
controller層
package com.hmc.rabbitmqprovider.controller; import com.hmc.commonvo.vo.OrderVo; import com.hmc.rabbitmqprovider.rabbitmq.RabbitQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.HashMap; import java.util.Map; /** * @author胡明財 * @site www.xiaomage.com * @company xxx公司 * @create 2019-12-24 00:02 */ @RestController @Slf4j public class SendController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/senderBymodel") public Map<String,Object> senderBymodel(String orderNo){ OrderVo orderVo=new OrderVo(); orderVo.setOrderId(10000001L); orderVo.setOrderNo(orderNo); orderVo.setCreatedate(new Date()); //推送消息 rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE, RabbitQueueConfig.NORMAL_ROUTINGKEY,orderVo); log.info("生產者發送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE, RabbitQueueConfig.NORMAL_ROUTINGKEY); Map<String,Object> json=new HashMap<>(); json.put("code",1); json.put("msg","發送消息成功。。。"); return json; } @RequestMapping("/sender") public Map<String,Object> sender(){ Map<String,Object> data=this.createData(); //推送消息 rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE, RabbitQueueConfig.NORMAL_ROUTINGKEY,data); log.info("生產者發送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE, RabbitQueueConfig.NORMAL_ROUTINGKEY); Map<String,Object> json=new HashMap<>(); json.put("code",1); json.put("msg","發送消息成功。。。"); return json; } private Map<String,Object> createData(){ Map<String,Object> data=new HashMap<>(); String createData= LocalDateTime.now(). format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String message="hello rabbitmq!!!"; data.put("createData",createData); data.put("message",message); return data; } }
yml文件配置
#服務的端口和項目名 server: port: 8081 servlet: context-path: /rabbitmq-provider ## rabbitmq config spring: rabbitmq: host: 192.168.197.134 port: 5672 username: springcloud password: 123456 ## 與啟動容器時虛擬主機名字一致~~~與啟動容器時虛擬主機名字一致~~~與啟動容器時虛擬主機名字一致~~~ virtual-host: my_vhost
pom配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.hmc</groupId> <artifactId>rabbitmq03</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <artifactId>rabbitmq-provider</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-provider</name> <packaging>jar</packaging> <description>Demo project for Spring Boot</description> </project>
開啟虛擬機測試生產者



這里可以看出消息10秒未消費的情況下,會推送到死信隊列,測試是成功的。

接下來我們用消費者進行消費
RabbitDelayReceiver


3條消息已經成功消費

json轉換
生產者代碼
package com.hmc.rabbitmqprovider.controller; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author胡明財 * @site www.xiaomage.com * @company xxx公司 * @create 2019-12-24 00:57 */ /** * json轉換 1.生產者 */ @Configuration public class RabbitTemplateConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } }
消費者代碼
package com.hmc.rabbitmqconsumer.rabbitmq; /** * @author胡明財 * @site www.xiaomage.com * @company xxx公司 * @create 2019-12-24 01:16 */ import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * json轉換 * 消費者 */ @Configuration public class RabbitListenerReceiverConfig { @Bean public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(jackson2JsonMessageConverter()); return factory; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } }
yml文件配置
#服務的端口和項目名 server: port: 8082 servlet: context-path: /rabbitmq-consumer ## rabbitmq config spring: rabbitmq: host: 192.168.197.134 port: 5672 username: springcloud password: 123456 ## 與啟動容器時虛擬主機名字一致~~~與啟動容器時虛擬主機名字一致~~~與啟動容器時虛擬主機名字一致~~~ virtual-host: my_vhost
pom配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.hmc</groupId> <artifactId>rabbitmq03</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <artifactId>rabbitmq-cousumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-cousumer</name> <packaging>jar</packaging> <description>Demo project for Spring Boot</description> </project>
controller
RequestMapping("/senderBymodel") public Map<String,Object> senderBymodel(String orderNo){ OrderVo orderVo=new OrderVo(); orderVo.setOrderId(10000001L); orderVo.setOrderNo(orderNo); orderVo.setCreatedate(new Date()); //推送消息 rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE, RabbitQueueConfig.NORMAL_ROUTINGKEY,orderVo); log.info("生產者發送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE, RabbitQueueConfig.NORMAL_ROUTINGKEY); Map<String,Object> json=new HashMap<>(); json.put("code",1); json.put("msg","發送消息成功。。。"); return json; }
測試

消息格式

消費者接收消息格式

