Springboot整合Rabbitmq實現延時隊列之rabbitmq_delayed_message_exchange插件方式


很多時候我們想定時去做某件事情的時候我們會首先想到定時任務,quartz是個不錯的選擇,但是也有缺點,假如配置在項目中,集群部署會有重復執行的問題,如果持久化在mysql中,解決了集群的問題,但是過於依賴mysql,耦合嚴重,當然還有日志量龐大、執行時間精度、過於耗費系統資源等等問題。所以這時候使用消息隊列中間件的的延時隊列就是一個很好得解決方案,我們設置要觸發消費的時間和必要的參數入隊mq,到時監聽queue的消費者自然拿到消息然后去走業務流程,這里介紹的是基於rabbitmq中間件實現的TTL版的延時隊列。

什么是TTL?
先簡單介紹下rabbitmq執行的流程,和Spring boot整合ActiveMQ不太一樣,除了隊列(queue)之外還引入了交換機(exchange)的概念。
rabbitmq的交換機有4種模式,我不詳細介紹,簡單說下大體執行流程:

①:生產者將消息(msg)和路由鍵(routekey)發送指定的交換機(exchange)上
②:交換機(exchange)根據路由鍵(routekey)找到綁定自己的隊列(queue)並把消息給它
③:隊列(queue)再把消息發送給監聽它的消費者(customer)
那么延時隊列TTL又是什么呢?這里引入了一個死信(死亡信息)的概念,有死信必定有死亡時間,也就是我們希望延時多久的時間:

①:生產者將消息(msg)和路由鍵(routekey)發送指定的死信交換機(delayexchange)上
②:死信交換機(delayexchange)根據路由鍵(routekey1)找到綁定自己的死信隊列(delayqueue)並把消息給它
③:消息(msg)到期死亡變成死信轉發給死信接收交換機(delayexchange)
④:死信接收交換機(receiveexchange)根據路由鍵(routekey2)找到綁定自己的死信接收隊列(receivequeue)並把消息給它
⑤:死信接收隊列(receivequeue)再把消息發送給監聽它的消費者(customer)
ps:延時隊列也叫死信隊列。基於TTL模式的延時隊列會涉及到2個交換機、2個路由鍵、2個隊列…emmmmm比較麻煩
 
但是基於TTL的延時隊列存在一個問題,就是同一個隊列里的消息延時時間最好一致,比如說隊列里的延時時間都是1小時,千萬不能隊列里的消息延時時間亂七八糟多久的都有,這樣的話先入隊的消息如果延時時間過長會堵着后入隊延時時間小的消息,導致后面的消息到時也無法變成死信轉發出去,很坑!!!
舉個栗子:延時隊列里先后進入A,B,C三條消息,存活時間是3h,2h,1h,結果到了1小時C不會死,到了2hB不會死,到了3小時A死了,同時B,C也死了,意味着3h后A,B,C才能消費,很坑!!!
我本來使用時候以為會像redis的存活時間一樣,內部維護一個定時器去掃描死亡時間然后變成死信轉發,結果不是。。。
至於怎么解決這個問題,一個隊列里可以放不同死亡時間的消息,還能夠異步死亡轉發,請看下面:
 
TTL方式實現rabbitmq的延時隊列功能,在消息死亡時間比較靈活復雜的時候我們不可能聲明很多死信隊列去管理,而且聲明一個就要幾個個bean,很蛋疼,所以希望能夠有種方式使其消息死亡異步化,到期即死即消費,不會被阻塞,這里介紹使用插件的方式,不過需要rabbitmq要是3.6版本以上,也就是說,加入你的rabbitmq版本太老只能用TTL。
基於插件方式實現流程:
這里和TTL方式有個很大的不同就是TTL存放消息在死信隊列(delayqueue)里,二基於插件存放消息在延時交換機里(x-delayed-message exchange)。
①:生產者將消息(msg)和路由鍵(routekey)發送指定的延時交換機(exchange)上
②:延時交換機(exchange)存儲消息等待消息到期根據路由鍵(routekey)找到綁定自己的隊列(queue)並把消息給它
③:隊列(queue)再把消息發送給監聽它的消費者(customer)
插件可以自行去官網下載:

下載的插件放到rabbitmq的plugins里,執行命令安裝插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

流程介紹完了,看下具體代碼吧!

1.首先pom依賴:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 2.配置文件配置rabbitmq的信息

# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 手動ACK 不開啟自動ACK模式,目的是防止報錯后未正確處理消息丟失 默認 為 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual

 3.編寫rabbitmq配置類,聲明幾個bean

/**
 * rabbitmq配置類
 * 員工系統配置延時隊列
 * @author 47
 * @date 2020/1/7
 */
@Configuration
public class RabbitUserConfig {

    /**
     * 延時隊列交換機
     * 注意這里的交換機類型:CustomExchange 
     * @return
     */
    @Bean
    public CustomExchange delayExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delay_exchange","x-delayed-message",true, false,args);
    }

    /**
     * 延時隊列
     * @return
     */
    @Bean
    public Queue delayQueue(){
        return new Queue("delay_queue",true);
    }

    /**
     * 給延時隊列綁定交換機
     * @return
     */
    @Bean
    public Binding cfgDelayBinding(Queue cfgDelayQueue,CustomExchange cfgUserDelayExchange){
        return BindingBuilder.bind(cfgDelayQueue).to(cfgUserDelayExchange).with("delay_key").noargs();
    }
}

4.編寫rabbitmq生產者:

/**
 * rabbitMq生產者類
 * @author 47
 * @date 2020/1/17
 */
@Component
@Slf4j
public class RabbitProduct{

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendDelayMessage(List<Integer> list) {
    	 //這里的消息可以是任意對象,無需額外配置,直接傳即可
         log.info("===============延時隊列生產消息====================");
         log.info("發送時間:{},發送內容:{}", LocalDateTime.now(), list.toString());
         this.rabbitTemplate.convertAndSend(
                 "delay_exchange",
                 "delay_key",
                 list,
                 message -> {
                 	 //注意這里時間可以使long,而且是設置header
                     message.getMessageProperties().setHeader("x-delay",60000);
                     return message;
                 }
         );
     	 log.info("{}ms后執行", 60000);
    }

5.編寫rabbitmq消費者:

/**
 * activeMq消費者類
 * @author 47
 * @date 2020/1/7
 */
@Component
@Slf4j
public class RabbitConsumer {
    @Autowired
    private CcqCustomerCfgService ccqCustomerCfgService;

    /**
     * 默認情況下,如果沒有配置手動ACK, 那么Spring Data AMQP 會在消息消費完畢后自動幫我們去ACK
     * 存在問題:如果報錯了,消息不會丟失,但是會無限循環消費,一直報錯,如果開啟了錯誤日志很容易就吧磁盤空間耗完
     * 解決方案:手動ACK,或者try-catch 然后在 catch 里面將錯誤的消息轉移到其它的系列中去
     * spring.rabbitmq.listener.simple.acknowledge-mode = manual
     * @param list 監聽的內容
     */
    @RabbitListener(queues = "delay_queue")
    public void cfgUserReceiveDealy(List<Integer> list, Message message, Channel channel) throws IOException {
        log.info("===============接收隊列接收消息====================");
        log.info("接收時間:{},接受內容:{}", LocalDateTime.now(), list.toString());
        //通知 MQ 消息已被接收,可以ACK(從隊列中刪除)了
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        try {
            dosomething.....
        } catch (Exception e) {
            log.error("============消費失敗,嘗試消息補發再次消費!==============");
            log.error(e.getMessage());
            /**
             * basicRecover方法是進行補發操作,
             * 其中的參數如果為true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
             * 設置為false是只補發給當前的consumer
             */
            channel.basicRecover(false);
        }
    }
}

6.編寫測試類:

/**
 * @author 47
 * @date 2020/1/7
 */
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private RabbitProduct rabbitProduct;
    
    @GetMapping("/sendMessage")
    public void sendMessage(){
    	List<Integer> list = new ArrayList<>();
    	list.add(1);
    	list.add(2);
        rabbitProduct.sendDelayMessage(list);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM