RabbitMQ的消息可靠性(五)


一、可靠性問題分析

消息的可靠性投遞是使用消息中間件不可避免的問題,不管是使用哪種MQ都存在這種問題,接下來要說的就是在RabbitMQ中如何解決可靠性問題;在前面

 

 在前面說過消息的傳遞過程中有三個對象參與分別是:生產者、RabbitMQ(broker)、消費者;接下來就是要圍繞這三個對象來分析消息在傳遞過程中會在哪些環節出來可靠性問題;

RabbitMQ消息的可靠性投遞主要兩種實現:
1、通過實現消費的重試機制,通過@Retryable來實現重試,可以設置重試次數和重試頻率;
2、生產端實現消息可靠性投遞。
兩種方法消費端都可能收到重復消息,要求消費端必須實現冪等性消費。

 

 

 1.1、生產者丟失消息

生產者發送消息到broker時,要保證消息的可靠性,主要的方案有以下2種:

1.事務

2.confirm機制

1.1.1、事務

RabbitMQ提供了事務功能,也即在生產者發送數據之前開啟RabbitMQ事務,然后再發送消息,如果消息沒有成功發送到RabbitMQ,那么就拋出異常,然后進行事務回滾,回滾之后再重新發送消息,如果RabbitMQ接收到了消息,那么進行事務提交,再開始發送下一條數據。

優點

保證消息一定能夠發送到RabbitMQ中,發送端不會出現消息丟失的情況;

缺點

事務機制是阻塞(同步)的,每次發送消息必須要等到mq回應之后才能繼續發送消息,比較耗費性能,會導致吞吐量降下來

1.1.2、confirm模式

基於事務的特性,作為補償,RabbitMQ添加了消息確認機制,也即confirm機制。confirm機制和事務機制最大的不同就是事務是同步的,confirm是異步的,發送完一個消息后可以繼續發送下一個消息,mq接收到消息后會異步回調接口告知消息接收結果。生產者開啟confirm模式后,每次發送的消息都會分配一個唯一id,如果消息成功發送到了mq中,那么就會返回一個ack消息,表示消息接收成功,反之會返回一個nack,告訴你消息接收失敗,可以進行重試。依據這個機制,我們可以維護每個消息id的狀態,如果超過一定時間還是沒有接收到mq的回調,那么就重發消息。

1.1.3、confirm模式代碼演示

其實這塊代碼在前面幾篇文章的代碼中有體現過;下面以springboot集成的方式再演示一種

 pom.xml文件

 
         
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--web包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
 

application.yml

spring:
  rabbitmq:  #rabbitmq 連接配置
    publisher-confirm-type: correlated # 開啟confirm確認模式
    host: 192.168.0.1
    port: 5672
    username: admin
    password: admin

server:
  port: 8081
實現confirm方法
實現ConfirmCallback接口中的confirm方法,消息只要被 rabbitmq broker接收到就會觸ConfirmCallback 回調,ack為true表示消息發送成功,ack為false表示消息發送失敗
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    /***
     * @param correlationData 相關配置信息
     *  @param ack exchange交換機 是否成功收到了消息。true 成功,false代表失敗
     *  @param cause 失敗原因
     * */
    @Override
    public void confirm(CorrelationData correlationData ,boolean ack ,String cause) {

        if (ack){
            //消息發送成功
            System.out.println ("消息發送成功到交換機");
        }else{
            System.out.println ("發送失敗"+cause);
        }
    }
}
定義 Exchange 和 Queue
定義交換機 confirmTestExchange 和隊列 confirm_test_queue ,並將隊列綁定在交換機上。
/**
 * 定義隊列和交換機
 */
@Configuration
public class QueueConfig {
    @Bean(name="confirmTestExchange")
    public FanoutExchange confirmTestExchange(){
        return new FanoutExchange("confirmTestExchange",true,false);
    }
    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue(){
        return new Queue("confirm_test_queue",true,false,false);
    }

    @Bean
    public Binding confirmTestFanoutExcangeAndQueue(@Qualifier("confirmTestQueue")Queue queue,@Qualifier("confirmTestExchange") FanoutExchange fanoutExchange){
        return  BindingBuilder.bind(queue).to(fanoutExchange);
    }
}
生產者
@RestController
@RequestMapping(value = "/producer")
@CrossOrigin
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmCallbackService confirmCallbackService;

    @Autowired
    private ReturnCallbackService returnCallbackService;
    @GetMapping
    public void producer(){

        rabbitTemplate.setConfirmCallback ( confirmCallbackService );
        rabbitTemplate.convertAndSend ( "confirmTestExchange","","測試RabbitTemplate功能" );
    }
}
正確情況,ack返回true,表示投遞成功;下面測試下,發一個正常的截圖和非正常截圖,非正常的截圖只用把生產者的交換機名稱就行

消息未投遞到queue的退回模式

上面演示了消息投放到交換機的案例,下面演示一個消息從 exchange–>queue 投遞失敗則會返回一個 returnCallback的案例;生產端通過實現ReturnCallback接口,啟動消息失敗返回,消息路由不到隊列時會觸發該回調接口

修改yml文件
spring:
  rabbitmq:  #rabbitmq 連接配置
    publisher-confirm-type: correlated # 開啟confirm確認模式
    publisher-returns: true #開啟退回模式
    host: 192.168.0.1
    port: 5672
    username: admin
    password: admin

server:
  port: 8081
設置投遞失敗的模式
根據前面文章的講解可知如果消息沒有路由到Queue,則丟棄消息(默認);但開啟ReturnCallBack后,如果消息沒有路由到Queue,返回給消息發送方ReturnCallBack(開啟后)
rabbitTemplate.setMandatory(true);
實現returnedMessage方法
啟動消息失敗返回,消息路由不到隊列時會觸發該回調接口
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
    /**
     *
     * @param message 消息對象
     * @param i 錯誤碼
     * @param s 錯誤信息
     * @param s1 交換機
     * @param s2 路由鍵
     */
    @Override
    public void returnedMessage(Message message ,int i ,String s ,String s1 ,String s2) {
        System.out.println("消息對象:" + message);
        System.out.println("錯誤碼:" + i);
        System.out.println("錯誤信息:" + s);
        System.out.println("消息使用的交換器:" + s1);
        System.out.println("消息使用的路由key:" + s2);
        //業務代碼處理
    }
}

yml配置

spring:
  rabbitmq:  #rabbitmq 連接配置
    publisher-confirm-type: correlated # 開啟confirm確認模式
    publisher-returns: true #開啟退回模式
    host: 124.71.33.75
    port: 5672
    username: admin
    password: ghy20200707rabbitmq

server:
  port: 8081
public void producerLose(){

        /**
         *確保消息發送失敗后可以重新返回到隊列中
         */
        rabbitTemplate.setMandatory(true);

        /**
         * 消息投遞確認模式
         */
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        /**
         * 消息投遞到隊列失敗回調處理
         */
        rabbitTemplate.setReturnCallback(returnCallbackService);
        CorrelationData correlationData = new CorrelationData("id_"+System.currentTimeMillis()+"");
        //發送消息
        rabbitTemplate.convertAndSend("directExchange", "RabbitTemplate","測試RabbitTemplate功能" ,correlationData);
    }

測試接口

 

 1.2、消費者丟失消息

其實在生產者和消費者中間,rabbitmq也是會丟失消息的,解決方案就是持久化存儲,這個方案在前面有講過;所以在這里就跳過;下面直接說消息確認機制ack,ack指Acknowledge確認。 表示消費端收到消息后的確認方式

消費端消息的確認分為:自動確認(默認)、手動確認、不確認
  • AcknowledgeMode.NONE:不確認
  • AcknowledgeMode.AUTO:自動確認
  • AcknowledgeMode.MANUAL:手動確認
其中自動確認是指,當消息一旦被Consumer接收到,則自動確認收到,並將相應 message 從RabbitMQ 的消息 緩存中移除。但是在實際業務處理中,很可能消息接收到,業務處理出現異常,那么該消息就會丟失。如果設置了手動確認方式,則需要在業務處理成功后,調用channel.basicAck(),手動簽收,如果出現異常,則調用channel.basicNack()方法,讓其自動重新發送消息。
yml配置
spring:
  rabbitmq:  #rabbitmq 連接配置
    publisher-confirm-type: correlated # 開啟confirm確認模式
    publisher-returns: true #開啟退回模式
    host: 124.71.33.75
    port: 5672
    username: admin
    password: ghy20200707rabbitmq
    listener:
      simple:
        acknowledge-mode: manual #手動確認

server:
  port: 8081
確認配置
/**
 * 消費者消息確認機制
 */
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage {
    @RabbitHandler
    public void processHandler(String msg,Channel channel,Message message) throws IOException { 
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("消息內容" + new String(message.getBody())); 
            //TODO 具體業務邏輯 
            // 手動簽收[參數1:消息投遞序號,參數2:批量簽收] 
            channel.basicAck(deliveryTag, true); 
        } catch (Exception e) { 
            //拒絕簽收[參數1:消息投遞序號,參數2:批量拒絕,參數3:是否重新加入隊列] 
            channel.basicNack(deliveryTag, true, true); 
        } 
    }
}
channel.basicNack 方法與 channel.basicReject 方法區別在於basicNack可以批量拒絕多條消息,而basicReject一次只能拒絕一條消息。測試效果如下:

 

 要想測試異常很簡單,在代碼加一個報錯語句就可以測試了,我這里就不搞事了;

二、消費端限流

假設一個場景,首先,在 Rabbitmq 服務器積壓了有上萬條未處理的消息,這時隨便打開一個消費者客戶端,會出現這樣情況: 巨量的消息瞬間全部推送過來,但是單個客戶端無法同時處理這么多數據!當數據量特別大的時候,如果對生產端限流肯定是不科學的,因為有時候並發量就是特別大,有時候並發量又特別少,這是用戶的行為,用戶的行為是不可控的。所以正確的處理方案應該是對消費端限流,用於保持消費端的穩定,當消息數量激增的時候很有可能造成資源耗盡,以及影響服務的性能,導致系統的卡頓甚至直接崩潰。

2.1、TTL

Time To Live,消息過期時間設置
聲明隊列時,指定即可
TTL:過期時間
1. 隊列統一過期
2. 消息單獨過期
如果設置了消息的過期時間,也設置了隊列的過期時間,它以時間短的為准。隊列過期后,會將隊列所有消息全部移除;消息過期后,只有消息在隊列頂端,才會判斷其是否過期(移除掉)

三、死信隊列

死信隊列,當消息成為Dead message后,可以被重新發送到另一個交換機,這個交換機就是DLX;關於死信隊列的演示代碼在第一篇中有上傳過;這里就不再演示了;
 
 

 

 

消息成為死信的三種情況:
  1. 隊列消息長度到達限制;
  2. 消費者拒接消費消息,basicNack/basicReject,並且不把消息重新放入原目標隊列,requeue=false;
  3.  原隊列存在消息過期設置,消息到達超時時間未被消費;
 
隊列綁定死信交換機:
給隊列設置參數: x-dead-letter-exchange 和 x-dead-letter-routing-key也就是說此時Queue作為"生產者"

 

 

四、延遲隊列

延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費,最常見的業務就是定單服務,例如:一個定單下單后如果30分鍾內沒有支持,就要取消定單,回回滾庫存。
其實在RabbitMQ中並未提供延遲隊列功能 ,但是有替代方案,他的替代方案就是TTL+死信隊列組合實現延遲隊列的效果 ;
設置隊列過期時間30分鍾,當30分鍾過后,消息未被消費,進入死信隊列,路由到指定隊列,調用庫存系統,判斷訂單狀態。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM