rabbitmq隊列延遲


1. 場景:“訂單下單成功后,15分鍾未支付自動取消”


1.傳統處理超時訂單

采取定時任務輪訓數據庫訂單,並且批量處理。其弊端也是顯而易見的;對服務器、數據庫性會有很大的要求,
並且當處理大量訂單起來會很力不從心,而且實時性也不是特別好。當然傳統的手法還可以再優化一下,
即存入訂單的時候就算出訂單的過期時間插入數據庫,設置定時任務查詢數據庫的時候就只需要查詢過期了的訂單,
然后再做其他的業務操作

 

 

2.rabbitMQ延時隊列方案


一台普通的rabbitmq服務器單隊列容納千萬級別的消息還是沒什么壓力的,而且rabbitmq集群擴展支持的也是非常好的,
並且隊列中的消息是可以進行持久化,即使我們重啟或者宕機也能保證數據不丟失


2. TTL和DLX


rabbitMQ中是沒有延時隊列的,也沒有屬性可以設置,只能通過死信交換器(DLX)和設置過期時間(TTL)結合起來實現延遲隊列

 

1.TTL


TTL是Time To Live的縮寫, 也就是生存時間。
RabbitMq支持對消息和隊列設置TTL,對消息這設置是在發送的時候指定,對隊列設置是從消息入隊列開始計算, 只要超過了隊列的超時時間配置, 那么消息會自動清除。
如果兩種方式一起使用消息對TTL和隊列的TTL之間較小的為准,也就是消息5s過期,隊列是10s,那么5s的生效。
默認是沒有過期時間的,表示消息沒有過期時間;如果設置為0,表示消息在投遞到消費者的時候直接被消息,否則丟棄。

設置消息的過期時間用 x-message-ttl 參數實現,單位毫秒。
設置隊列的過期時間用 x-expires 參數,單位毫秒,注意,不能設置為0

2.DLX和死信隊列


DLX即Dead-Letter-Exchange(死信交換機),它其實就是一個正常的交換機,能夠與任何隊列綁定。

死信隊列是指隊列(正常)上的消息(過期)變成死信后,能夠后發送到另外一個交換機(DLX),然后被路由到一個隊列上,
這個隊列,就是死信隊列

成為死信一般有以下幾種情況:
消息被拒絕(basic.reject or basic.nack)且帶requeue=false參數
消息的TTL-存活時間已經過期
隊列長度限制被超越(隊列滿)


注1:如果隊列上存在死信, RabbitMq會將死信消息投遞到設置的DLX上去 ,
注2:通過在隊列里設置x-dead-letter-exchange參數來聲明DLX,如果當前DLX是direct類型還要聲明
x-dead-letter-routing-key參數來指定路由鍵,如果沒有指定,則使用原隊列的路由鍵

3. 延遲隊列


通過DLX和TTL模擬出延遲隊列的功能,即,消息發送以后,不讓消費者拿到,而是等待過期時間,變成死信后,發送給死信交換機再路由到死信隊列進行消費


注1:延遲隊列(即死信隊列)產生流程見“images/01 死信隊列產生流程.png”

創建主模塊,及子模塊

 

 

 

2.主模塊
<!-- 1.packaging模式改為pom -->
<packaging>pom</packaging>

 

 

<!-- 2.添加子模塊 -->
<modules>
<module>rabbitmq-provider</module>
<module>rabbitmq-consumer</module>
<module>common-vo</module>
</modules>

 

 

在主模塊的POM的<dependencies>中添加公共子模塊common

 

 

 

1.生產者創建一個正常消息,並添加消息過期時間/死信交換機/死信路由鍵這3個參數

關鍵代碼如下

package com.hmc.rabbitmqprovider.rabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.xml.ws.WebEndpoint;
import java.util.HashMap;
import java.util.Map;

/**
 * @author胡明財
 * @site www.xiaomage.com
 * @company xxx公司
 * @create  2019-12-23 23:39
 */
@Configuration
public class RabbitQueueConfig {

    //定義隊列,交換機,路由鍵(正常)
    public  static  final  String NORMAL_QUEUE="normar-queue";
    public  static  final  String NORMAL_EXCHANGE="normar-exchange";
    public  static  final  String NORMAL_ROUTINGKEY="normar-routingkey";



    //定義隊列,交換機,路由鍵(死信)
    public  static  final  String DELAY_QUEUE="delay-queue";
    public  static  final  String DELAY_EXCHANGE="delay-exchange";
    public  static  final  String DELAY_ROUTINGKEY="delay-routingkey";


     @Bean
    public Queue norma1Queue(){
         Map<String,Object> map=new HashMap<>();
         map.put("x-message-ttl", 15000);//message在該隊列queue的存活時間最大為10秒
         map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange參數是設置該隊列的死信交換器(DLX)
         map.put("x-dead-letter-routing-key", DELAY_ROUTINGKEY);//x-dead-letter-routing-key參數是給這個DLX指定路由鍵
        return  new Queue(NORMAL_QUEUE,true,false,false,map);
    }
    //直連交換機
    @Bean
    public DirectExchange normalExchange(){
         return  new DirectExchange(NORMAL_EXCHANGE,true,false);
    }

    @Bean
    public Binding normalBinding(){
         return BindingBuilder.bind(norma1Queue()).to(normalExchange())
                 .with(NORMAL_ROUTINGKEY);
    }

//死信

   @Bean
   public  Queue delaQueue(){
         return new Queue(DELAY_QUEUE,true);
   }

   @Bean
   public  DirectExchange  delayExchange(){
         return  new DirectExchange(DELAY_EXCHANGE,true,false);

   }

     @Bean
    public  Binding delayBinding(){
      return  BindingBuilder.bind(delaQueue()).to(delayExchange())
           .with(DELAY_ROUTINGKEY);
   }
}

 

 

 controller層

package com.hmc.rabbitmqprovider.controller;

import com.hmc.commonvo.vo.OrderVo;
import com.hmc.rabbitmqprovider.rabbitmq.RabbitQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * @author胡明財
 * @site www.xiaomage.com
 * @company xxx公司
 * @create  2019-12-24 00:02
 */
@RestController
@Slf4j
public class SendController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/senderBymodel")
    public  Map<String,Object> senderBymodel(String orderNo){
        OrderVo orderVo=new OrderVo();

        orderVo.setOrderId(10000001L);
        orderVo.setOrderNo(orderNo);
        orderVo.setCreatedate(new Date());
        //推送消息
        rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE,
                RabbitQueueConfig.NORMAL_ROUTINGKEY,orderVo);
        log.info("生產者發送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE,
                RabbitQueueConfig.NORMAL_ROUTINGKEY);
        Map<String,Object> json=new HashMap<>();
        json.put("code",1);
        json.put("msg","發送消息成功。。。");
        return json;
    }


     @RequestMapping("/sender")
    public  Map<String,Object> sender(){
        Map<String,Object> data=this.createData();

        //推送消息
         rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE,
                 RabbitQueueConfig.NORMAL_ROUTINGKEY,data);
         log.info("生產者發送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE,
                 RabbitQueueConfig.NORMAL_ROUTINGKEY);
        Map<String,Object> json=new HashMap<>();
        json.put("code",1);
        json.put("msg","發送消息成功。。。");
        return json;
    }

    private Map<String,Object> createData(){
        Map<String,Object> data=new HashMap<>();
        String  createData= LocalDateTime.now().
                format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        String message="hello rabbitmq!!!";
        data.put("createData",createData);
        data.put("message",message);
        return data;
    }


}

 

 

yml文件配置

#服務的端口和項目名
server:
  port: 8081
  servlet:
    context-path: /rabbitmq-provider

## rabbitmq config
spring:
    rabbitmq:
      host: 192.168.197.134
      port: 5672
      username: springcloud
      password: 123456

     ## 與啟動容器時虛擬主機名字一致~~~與啟動容器時虛擬主機名字一致~~~與啟動容器時虛擬主機名字一致~~~
      virtual-host: my_vhost

 

pom配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.hmc</groupId>
        <artifactId>rabbitmq03</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>rabbitmq-provider</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-provider</name>
    <packaging>jar</packaging>
    <description>Demo project for Spring Boot</description>


</project>

 

  

 

開啟虛擬機測試生產者

 

 

 

 

 

 

 

 

 

這里可以看出消息10秒未消費的情況下,會推送到死信隊列,測試是成功的。

 

 

接下來我們用消費者進行消費

RabbitDelayReceiver

 

 
        

 

 
        

 

 3條消息已經成功消費

 

 

 

json轉換

  生產者代碼

package com.hmc.rabbitmqprovider.controller;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author胡明財
 * @site www.xiaomage.com
 * @company xxx公司
 * @create  2019-12-24 00:57
 */

/**
 * json轉換
 1.生產者
 */
@Configuration
public class RabbitTemplateConfig {


    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
           RabbitTemplate rabbitTemplate=new RabbitTemplate();
           rabbitTemplate.setConnectionFactory(connectionFactory);
           rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
           return  rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
     return new Jackson2JsonMessageConverter();
    }
}

 

消費者代碼

package com.hmc.rabbitmqconsumer.rabbitmq;

/**
 * @author胡明財
 * @site www.xiaomage.com
 * @company xxx公司
 * @create  2019-12-24 01:16
 */

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * json轉換
 * 消費者
 */
@Configuration
public class RabbitListenerReceiverConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jackson2JsonMessageConverter());
        return factory;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }


}

 

yml文件配置

#服務的端口和項目名
server:
  port: 8082
  servlet:
    context-path: /rabbitmq-consumer

## rabbitmq config
spring:
    rabbitmq:
      host: 192.168.197.134
      port: 5672
      username: springcloud
      password: 123456

     ## 與啟動容器時虛擬主機名字一致~~~與啟動容器時虛擬主機名字一致~~~與啟動容器時虛擬主機名字一致~~~
      virtual-host: my_vhost

 

pom配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.hmc</groupId>
        <artifactId>rabbitmq03</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>rabbitmq-cousumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-cousumer</name>
    <packaging>jar</packaging>
    <description>Demo project for Spring Boot</description>


</project>

 

 

controller

RequestMapping("/senderBymodel")
    public  Map<String,Object> senderBymodel(String orderNo){
        OrderVo orderVo=new OrderVo();

        orderVo.setOrderId(10000001L);
        orderVo.setOrderNo(orderNo);
        orderVo.setCreatedate(new Date());
        //推送消息
        rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE,
                RabbitQueueConfig.NORMAL_ROUTINGKEY,orderVo);
        log.info("生產者發送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE,
                RabbitQueueConfig.NORMAL_ROUTINGKEY);
        Map<String,Object> json=new HashMap<>();
        json.put("code",1);
        json.put("msg","發送消息成功。。。");
        return json;
    }

 

測試

 

 

消息格式

 

 

消費者接收消息格式


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM