RabbitMq(7)消息延時推送


 

應用場景

目前常見的應用軟件都有消息的延遲推送的影子,應用也極為廣泛,例如:

  • 淘寶七天自動確認收貨。在我們簽收商品后,物流系統會在七天后延時發送一個消息給支付系統,通知支付系統將款打給商家,這個過程持續七天,就是使用了消息中間件的延遲推送功能。
  • 12306 購票支付確認頁面。我們在選好票點擊確定跳轉的頁面中往往都會有倒計時,代表着 30 分鍾內訂單不確認的話將會自動取消訂單。其實在下訂單那一刻開始購票業務系統就會發送一個延時消息給訂單系統,延時30分鍾,告訴訂單系統訂單未完成,如果我們在30分鍾內完成了訂單,則可以通過邏輯代碼判斷來忽略掉收到的消息。

在上面兩種場景中,如果我們使用下面兩種傳統解決方案無疑大大降低了系統的整體性能和吞吐量:

  • 使用 redis 給訂單設置過期時間,最后通過判斷 redis 中是否還有該訂單來決定訂單是否已經完成。這種解決方案相較於消息的延遲推送性能較低,因為我們知道 redis 都是存儲於內存中,我們遇到惡意下單或者刷單的將會給內存帶來巨大壓力。
  • 使用傳統的數據庫輪詢來判斷數據庫表中訂單的狀態,這無疑增加了IO次數,性能極低。
  • 使用 jvm 原生的 DelayQueue ,也是大量占用內存,而且沒有持久化策略,系統宕機或者重啟都會丟失訂單信息。

消息延時推送實現

在 RabbitMQ 3.6.x 之前我們一般采用死信隊列+TTL過期時間來實現延遲隊列。

在 RabbitMQ 3.6.x 開始,RabbitMQ 官方提供了延遲隊列的插件,可以下載放置到 RabbitMQ 根目錄下的 plugins 下

資源地址:https://www.rabbitmq.com/community-plugins.html

我這邊是在windows下,下載解壓放如plugin,然后cmd:

D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin>rabbitmq-plugins enable rabbitmq_delayed_message_exchange

代碼樣例

  /**
     * 延時Queue
     */
    public static final String DELAY_QUEUE = "delay.queue";

   @Bean
    public Queue delayQueue(){
        return new Queue(DELAY_QUEUE, true);
    }

    @Bean
    TopicExchange delayExchange() {
        Map<String, Object> pros = new HashMap<String, Object>();
        //設置交換機支持延遲消息推送
        pros.put("x-delayed-message", "topic");
        TopicExchange delayTopicExchange = new TopicExchange("delayTopicExchange",true,false,pros);

        delayTopicExchange.setDelayed(true);
        return delayTopicExchange;
    }

    @Bean
    Binding bindingDelayExchange(Queue delayQueue, TopicExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.#");
    }

生產者:

 public void delaySend() throws IOException {
        User user = new User();
        user.setUserName("Sender1.....");
        user.setMobile("6666666");
        byte[] body = Base64Utils.obj2byte(user);

        Message message = new Message(body,new MessageProperties());

        //延時插件https://www.rabbitmq.com/community-plugins.html
        //然后放plugin包
        //啟用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
        //Lambda表達式
        MessagePostProcessor messagePostProcessor = message1 -> {
            //設置消息持久化
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            message.getMessageProperties().setHeader("x-delay", 10000);
            //message.getMessageProperties().setDelay(20000);
            return message;
        };

        System.out.println("發送演示消息>>>>>"+new Date());
        rabbitTemplate.convertAndSend("delayTopicExchange","delay.message",message,messagePostProcessor);
    }

rabbitTemplate 通常在SpringBoot 中默認配置

測試用例:

   @Test
    public void send6() throws Exception {
        topicSender.delaySend();
    }

運行下:

 

 

消費端(另一工程):

 

 /**
     * 延時Queue
     */
    public static final String DELAY_QUEUE = "delay.queue";
package com.example.demo.rabbitMq.exchange.topic;

import com.example.demo.utils.Base64Utils;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

@Component
public class TopicReceiver6 {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitListener(queues = TopicRabbitConstant.DELAY_QUEUE)
    public void process(Message message, Channel channel) throws IOException {
        try {
            System.out.println("Receiver6  : " + new Date() + ">>>>" + Base64Utils.byteToObj(message.getBody()));

        } catch (Exception e) {
            logger.error("接收失敗",e);
        }
    }
}

整體測試驗證

1、啟動消費端

2、生產者發送消息

結果:

生產者工程Console:

消費這邊工程Console:

時間間隔10s

 

參考:

https://www.cnblogs.com/haixiang/p/10966985.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM