RabbitMQ消息可靠性傳輸


消息的可靠性投遞是使用消息中間件不可避免的問題,不管是使用kafka、rocketMQ或者rabbitMQ,那么在RabbitMQ中如何保證消息的可靠性投遞呢?

先再看一下RabbitMQ消息傳遞的流程圖:

從上面的圖可以看到,消息的投遞有三個對象參與:

  • 生產者
  • RabbitMQ(broker)
  • 消費者

那么消息的可靠性傳輸也主要是針對以上三個對象來分析,首先是生產者。

生產者丟失消息

生產者發送消息到broker時,要保證消息的可靠性,主要的方案有以下2種:

1.事務

2.confirm機制

事務

RabbitMQ提供了事務功能,也即在生產者發送數據之前開啟RabbitMQ事務,然后再發送消息,如果消息沒有成功發送到RabbitMQ,那么就拋出異常,然后進行事務回滾,回滾之后再重新發送消息,如果RabbitMQ接收到了消息,那么進行事務提交,再開始發送下一條數據。

優點

保證消息一定能夠發送到RabbitMQ中,發送端不會出現消息丟失的情況;

缺點

事務機制是阻塞(同步)的,每次發送消息必須要等到mq回應之后才能繼續發送消息,比較耗費性能,會導致吞吐量降下來

confirm模式

基於事務的特性,作為補償,RabbitMQ添加了消息確認機制,也即confirm機制。

confirm機制和事務機制最大的不同就是事務是同步的,confirm是異步的,發送完一個消息后可以繼續發送下一個消息,mq接收到消息后會異步回調接口告知消息接收結果。

生產者開啟confirm模式后,每次發送的消息都會分配一個唯一id,如果消息成功發送到了mq中,那么就會返回一個ack消息,表示消息接收成功,反之會返回一個nack,告訴你消息接收失敗,可以進行重試。依據這個機制,我們可以維護每個消息id的狀態,如果超過一定時間還是沒有接收到mq的回調,那么就重發消息。

代碼實現

配置文件

spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true

此處省略了mq的其他配置項,只留下了開啟confirm機制的三個配置項

publisher-confirm: 開啟消息到達exchange的回調,發送成功失敗都會觸發回調

publisher-returns: 開啟消息從exhcange路由到queue的回調,只有路由失敗時才會觸發回調

mandatory: 為true時,如果exchange根據routingKey將消息路由到queue時找不到匹配的queue,觸發return回調,為false時,exchange直接丟棄消息。

創建交換機、隊列以及綁定

@Bean
public FanoutExchange fanoutExchange(){
	return new FanoutExchange(exchangeName,true,false);
}
@Bean
public Queue queue(){
	return new Queue(queueName,true);
}

@Bean
public Binding binding(Queue queue, FanoutExchange fanoutExchange){
	return  BindingBuilder.bind(queue).to(fanoutExchange);
}

實現接口

實現RabbitTemplate.ConfirmCallbackRabbitTemplete.ReturnCallback接口,並且重寫confirm和returnedMessage方法,並將其添加到RabbitTemplate的回調中,完整的生產者如下所示:

@Component
@Slf4j
public class MyProducer {

    @Value("${platform.exchange-name}")
    private String exchangeName;

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(){
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
            if(ack){
                log.info("消息{}接收成功",correlationData.getId());
            }else{
                log.info("消息{}接收失敗,原因{}",correlationData.getId(),cause);
            }
        });

        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
            log.info("消息{}發送失敗,應答碼{},原因{},交換機{},路由鍵{}",message.toString(),replyCode,replyText,exchange,routingKey);
        });

        for (int i = 0; i < 10; i++) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(exchangeName,"","消息==>"+i,correlationData);
        }
    }
}

此時調用生產者發送消息,可以看到控制台輸出以下內容:

從mq的可視化管理界面上也可以看到,消息成功進入了隊列中

上述情況是消息正確發送到交換機的情況,那么如果我在發送消息時,故意寫錯交換機的名稱會有什么情況呢,假設我們把生產者代碼改為如下所示:

for (int i = 0; i < 10; i++) {
	CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
	//rabbitTemplate.convertAndSend(exchangeName,"","消息==>"+i,correlationData);
	rabbitTemplate.convertAndSend("test-confirm","","消息==>"+i,correlationData);
}

此時再次啟動生產者,得到的結果如下:

消息f2214022-b3c0-462e-8a71-b0fb209bb784處理失敗,失敗原因channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test-confirm' in vhost 'test', class-id=60, method-id=40)

消息90046cf8-9f84-4aba-b2e5-e667e8466e8c處理失敗,失敗原因channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test-confirm' in vhost 'test', class-id=60, method-id=40)

通過上述的兩個例子我們可以知道,當消息成功發送到交換機的時候,返回的是ack,當消息沒有成功發送到交換機的時候,返回的是nack,並且會將異常的原因一起返回回來,通過分析異常原因可以知道消息沒有正確發送的原因進而進行修改后重新發送消息。

上述的兩個例子主要是針對ConfirmCallback的,什么情況下會觸發ReturnCallback呢?前面也說過,就是當消息已經成功到達交換機,當交換機根據routingKey將消息路由到隊列時,發現沒有匹配的隊列或者交換機根本就沒有綁定隊列,此時就會觸發RetrunCallback,但是如果消息成功路由到隊列中,Returncallback是不會觸發的。

還是上面的例子,我們把交換機綁定的隊列給刪除掉,讓交換機不綁定任何隊列

此時再執行上述的生產者代碼,可以看到控制台的輸出結果

通過控制台信息我們可以看出來,消息成功到達了交換機,觸發了ConfirmCallback回調,但是從交換機路由到隊列時由於找不到匹配的隊列,因此觸發了ReturnCallback回調。

此外,要想在消息不能正確路由到隊列時觸發ReturnCallback回調,還必須設置rabbitmq.template.mandary=true,否則,消息直接被交換機丟棄,不會觸發ReturnCallback回調。

confirm總結

confirm機制通過異步回調的方式來確認消息是否到達交換機以及消息是否正確路由到隊列,主要可以總結為以下4點:

消息正確到達交換機,觸發ConfirmCallback回調,返回ack;

消息沒有正確到達交換機,觸發ConfirmReturnCallback回調,返回nack;

消息正確的從交換機路由到隊列,不觸發ReturnCallback回調;

消息沒有正確的從交換機路由到隊列,設置mandory=true的情況下,觸發ReturnCallback回調;

RabbitMQ(broker)丟失消息

前面我們從生產者的角度分析了消息可靠性傳輸的原理和實現,這一部分我們從broker的角度來看一下如何能保證消息的可靠性傳輸?

假設有現在一種情況,生產者已經成功將消息發送到了交換機,並且交換機也成功的將消息路由到了隊列中,但是在消費者還未進行消費時,mq掛掉了,那么重啟mq之后消息還會存在嗎?如果消息不存在,那就造成了消息的丟失,也就不能保證消息的可靠性傳輸了。

也就是現在的問題變成了如何在mq掛掉重啟之后還能保證消息是存在的?

解決方案:

開啟RabbitMQ的持久化,也即消息寫入后會持久化到磁盤,此時即使mq掛掉了,重啟之后也會自動讀取之前存儲的額數據

開啟持久化的步驟:

  • 創建交換機時,設置durable=true

  • 創建queue時,設置durable=true

這只會持久化當前隊列的元數據,不會持久化消息數據

  • 發送消息時,設置消息的deliveryMode=2

此時才會將消息持久化到磁盤上去,如果使用SpringBoot的話,發送消息時自動設置deliveryMode=2,不需要人工再去設置

使用可視化管理頁面,可以看到隊列中的數據的deliveryMode=2

通過以上方式,可以保證大部分消息在broker不會丟失,但是還是有很小的概率會丟失消息,什么情況下會丟失呢?

假如消息到達隊列之后,還未保存到磁盤mq就掛掉了,此時還是有很小的幾率會導致消息丟失的。

這就要mq的持久化和前面的confirm進行配合使用,只有當消息寫入磁盤后才返回ack,那么就是在持久化之前mq掛掉了,但是由於生產者沒有接收到ack信號,此時可以進行消息重發。

消費者丟失消息

消費者什么情況下會丟失消息呢?

消費者接收到消息,但是還未處理或者還未處理完,此時消費者進程掛掉了,比如重啟或者異常斷電等,此時mq認為消費者已經完成消息消費,就會從隊列中刪除消息,從而導致消息丟失。

那該如何避免這種情況呢?這就要用到RabbitMQ提供的ack機制,RabbitMQ默認是自動ack的,此時需要將其修改為手動ack,也即自己的程序確定消息已經處理完成后,手動提交ack,此時如果再遇到消息未處理進程就掛掉的情況,由於沒有提交ack,RabbitMQ就不會刪除這條消息,而是會把這條消息發送給其他消費者處理,但是消息是不會丟的。

代碼實現

配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手動ack
        prefetch: 5

消費者實現

@Component
@Slf4j
public class MyConsumer {
    @RabbitHandler
    @RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
    public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
        try {
            //int temp = 10/0;
            log.info("消息{}消費成功",msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("接收消息過程中出現異常,執行nack");
            //第三個參數為true表示異常消息重新返回隊列,會導致一直在刷新消息,且返回的消息處於隊列頭部,影響后續消息的處理
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.error("消息{}異常",message.getMessageProperties().getHeaders());
        }
    }
}

acknowledge-mode: manual就表示開啟手動ack,該配置項的其他兩個值分別是none和auto

auto:消費者根據程序執行正常或者拋出異常來決定是提交ack或者nack,不要把none和auto搞混了

manual: 手動ack,用戶必須手動提交ack或者nack

none: 沒有ack機制

默認值是auto,如果將ack的模式設置為auto,此時如果消費者執行異常的話,就相當於執行了nack方法,消息會被放置到隊列頭部,消息會被無限期的執行,從而導致后續的消息無法消費。

對於channel.basicNack方法的第三個參數,表示消息nack后是否返回隊列,如果設置為true,表示返回隊列,此時消息處於隊列頭部,消費者會一直處理該消息,影響后續消息的消費,設置為false時表示不返回隊列,此時如果設置有DLX(死信隊列),那么消息會進入DLX中,后續再對該消息進行相應的處理,如果沒有設置DLX,此時消息就會被丟棄。關於私信隊列后續再單獨來說。

根據以上分析,一般情況下,為了保證消息不丟失,還是建議使用手動ack的方式。

總結

本文主要從生產者、broker以及消費者三個層面分析了消息可能丟失的原因以及相應的解決方案,也就是生產者要確認消息發送到交換機,交換機要確認消息路由到隊列,隊列要對存儲的消息進行持久化存儲,消費者在消費時使用手動ack的方式確認消息完成消息,進過上述一系列的步驟達到消息可靠性傳輸的目的,下一篇是RabbitMQ的重試機制。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM