SpringBoot+RabbitMQ學習筆記(五)RabbitMQ消息持久化處理


一丶簡介

 

在@Queue和@Exchange注解中都有autoDelete屬性,值是布爾類型的字符串。如:autoDelete=“false”。

@Queue:當所有消費客戶端斷開連接后,是否自動刪除隊列: true:刪除,false:不刪除。

@Exchange:當所有綁定隊列都不在使用時,是否自動刪除交換器: true:刪除,false:不刪除。

當所有消費客戶端斷開連接時,而我們對RabbitMQ消息進行了持久化,那么這時未被消費的消息存於RabbitMQ服務器的內存中,如果RabbitMQ服務器都關閉了,那么未被消費的數據也都會丟失了。

下面編寫代碼試試RabbitMQ的消息持久化處理。

二丶配置文件

這里使用的是前面博客里寫的error日志消息隊列來測試的。這里還是創建兩個項目,一個作為生產者,一個作為消費者。

生產者配置:

server.port=8883

spring.application.name=hello-world
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.thymeleaf.cache=false

#設置交換器名稱
mq.config.exchange=log.direct
#設置error隊列的路由鍵
mq.config.queue.error.routing.key=log.error.routing.key
View Code

消費者配置

server.port=8884

spring.application.name=lesson1

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#設置交換器名稱
mq.config.exchange=log.direct
#設置error隊列名稱
mq.config.queue.error=log.error
#設置error路由鍵
mq.config.queue.error.routing.key=log.error.routing.key
View Code

三丶編寫生產者

package com.example.rabbitdurableprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:發送消息
 */
@Component
public class DurableSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    //exChange 交換器
    @Value("${mq.config.exchange}")
    private String exChange;

    //routingkey 路由鍵
    @Value("${mq.config.queue.error.routing.key}")
    private String routingKey;
    /**
     * 發送消息的方法
     * @param msg
     */
    public void send(String msg){
        //向消息隊列發送消息
        //參數1:交換器名稱
        //參數2:路由鍵
        //參數3:消息
        this.amqpTemplate.convertAndSend(exChange,routingKey,msg);

    }
}
View Code

四丶編寫消費者

這里消費者服務配置中@Queue中的autoDelete屬性設置的是true,即未持久化,一會兒測試下看沒有持久化的消息隊列在所有的消費者服務器斷開后是怎樣的。

package com.ant.rabbitdurableconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:消息接收者
 * @RabbitListener bindings:綁定隊列
 * @QueueBinding  value:綁定隊列的名稱
 *                  exchange:配置交換器
 * @Queue : value:配置隊列名稱
 *          autoDelete:是否是一個可刪除的臨時隊列
 * @Exchange value:為交換器起個名稱
 *           type:指定具體的交換器類型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.error.routing.key}"
        )
)
public class DurableErrorReceiver {

    /**
     * 接收消息的方法,采用消息隊列監聽機制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){
        System.out.println("error-receiver:"+msg);
    }
}
View Code

 

五丶編寫測試類

這里寫了一個死循環持續向消息隊列中發送消息,用變量falg來記錄發送編號。

package com.example.amqp;

import com.example.helloworld.HelloworldApplication;
import com.example.rabbitdurableprovider.DurableSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = HelloworldApplication.class)
public class QueueTest {

    @Autowired
    private DurableSender durableSender;

    @Test
    public void test3() throws InterruptedException {
        int flag = 0;
        while (true){
            flag++;
            Thread.sleep(2000);
            durableSender.send("hello--"+flag);

        }


    }
}
View Code

先啟動消費者服務器,然后啟動測試類,控制台輸出如下信息,然后關閉tomcat,模擬消費者服務器故障。這里可以看到消費者接受到的消息停留在第81條就出現了“故障”,但是生產者還是在持續不斷的向消費者發送消息。

這時再重啟tomcat,消費者又接受到了消息,但是是從第111條消息開始的,那么81-111之間的這些消息就都丟失了。

 

修改消費者服務的代碼,將autoDelete設置為“false”,將RabbitMQ消息進行持久化處理。

@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.error}",autoDelete = "false"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.error.routing.key}"
        )
)

修改后重啟消費者服務器,再次調用測試方法。然后關閉消費者服務器,模擬“故障”。這時看到消息接收到第15條服務器就“故障”了。

 

重啟消費者服務器。可以看到服務器一啟動,消費者就從消息隊列中讀取到了服務器“故障”時緩存在RabbitMQ中的消息,消息並未丟失,RabbitMQ消息持久化處理成功。

 

OK!以上就是今天學習的RabbitMQ消息持久化處理,如有不對之處,歡迎指正!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM