RabbitMQ 消息投遞以及ACK機制


RabbitMQ 消息投遞以及ACK機制

項目地址

https://gitee.com/Sir-yuChen/website.git

投遞出現問題

  • 生產者弄丟了數據
  • RabbitMQ 自己丟了數據
  • 消費端弄丟了數據

生產者丟失數據

生產者將數據發送到 RabbitMQ 的時候,可能在傳輸過程中因為網絡等問題而將數據弄丟了

方案一:開啟MQ事務

  • 也就是Transaction事務模式在創建channel的時候,可以把信道設置成事務模式,然后就可以發布消息給RabbitMQ了。如果channel.txCommit();的方法調用成功,就說明事務提交成功,則消息一定到達了RabbitMQ中。

  • 如果在事務提交執行之前由於RabbitMQ異常崩潰或者其他原因拋出異常,這個時候我們便可以將其捕獲,然后進行回滾。

  • 事務模式里面,只有收到了服務的的Commit-ok的指令,才能提交成功。所以可以解決生產者和服務端確認的問題

  • 缺點:事務模式有一個缺點,他是阻塞的,一條消息沒有發送完畢,不能發送下一條消息,嚴重降低RabbitMQ服務器的性能,吞吐量,一般不使用

image-20220322152605943

測試代碼:

springboot+RabbitMQ配置類:

rabbitTemplate.setChannelTransacted(true)

image-20220322153422686


// 開啟事務
channel.txSelect
try {
    // 這里發送消息
} catch (Exception e) {
    channel.txRollback
 
    // 這里再次重發這條消息
}
 
// 提交事務
channel.txCommit

方案二:Confirm確認模式

生產者設置開啟了 confirm 模式之后,每次寫的消息都會分配一個唯一的 ID,然后如何寫入了 RabbitMQ 之中,RabbitMQ 會給你回傳一個 ack 消息,告訴你這個消息發送 OK 了;如果RabbitMQ 沒能處理這個消息,會回調你一個 nack 接口,告訴你這個消息失敗了,你可以進行重試。

而且你可以結合這個機制知道自己在內存里維護每個消息的 ID,如果超過一定時間還沒接收到這個消息的回調,那么你可以進行手動重發。比如:像保證消息的冪等性一樣,在 Redis 中存入消息的唯一性ID,只有在成功接收到 ack 消息以后才會刪除,否則會定時重發

Confirm確認模式確認機制有三種

  • 單條普通模式
  • 批量確認模式
  • 異步手動確認模式

1. 單條普通模式

在生產者這邊通過調用channel.confirmSelect()方法將信道設置為Confirm模式,然后發送消息。一旦消息被投遞到交換機之后,RabbitMQ就會發送一個確認(Basic.ACK)給生產者,也就是調用channel.waitForConfirms()返回true,這樣生產者就知道消息被服務端接受了

如果網絡錯誤,會拋出連接異常。如果交換機不存在,會拋出404錯誤

單條普通模式效率不高,並不常用

 	【詳細測試代碼,在下面】
	//開啟發布確認
        channel.confirmSelect();
        long begin = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", QUEUE_ONE, null, message.getBytes());
            //服務端返回 false 或超時時間內未返回,生產者可以消息重發
            boolean flag = channel.waitForConfirms();
            if (flag) {
                System.out.println("消息發送成功");
            }
        }

image-20220322163821519

2. 批量確認模式

批量確認,就是在開啟confirm模式后,先發送一批消息

  		//開啟發布確認
        channel.confirmSelect();
        //批量確認消息大小
        int batchSize = 88;
        //未確認消息個數
        int outstandingMessageCount = 0;
        long begin = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", QUEUE, null, message.getBytes());
            outstandingMessageCount++;
            if (outstandingMessageCount == batchSize) {
                channel.waitForConfirms();//確認代碼
                outstandingMessageCount = 0;
            }
        }
        //為了確保還有剩余沒有確認消息 再次確認
        if (outstandingMessageCount > 0) {
            channel.waitForConfirms();
        }

image-20220322164117875

  • 只要channel.waitForConfirmsOrDie();方法沒有拋出異常,就代表消息被服務端接受了。

  • 批量確認的方式比單條確認的方式效率要高,但是也有兩個問題:

  • 問題一:首先就是批量的數量確定。對於不同的業務,到底發送多少條消息確認一次?數量太少,效率提升不上去。數量多的話

  • 問題二:比如我們發1000條消息才確認一次,如果前面999條消息都被接受了,但是最后一條失敗了,那么前面的所有數據都需要重發

3. 異步確認模式

異步確認模式需要添加一個confirmListener,並且用一個SortedSet來維護一個批次中沒有被確認的消息。

  @Test
    public  void publishMessageAsync() throws Exception {
        try (Channel channel = this.getConnectionMQ()) {
            channel.queueDeclare(QUEUE, false, false, false, null);
            //開啟發布確認
            channel.confirmSelect();
            /**
             * 線程安全有序的一個哈希表,適用於高並發的情況
             * 1.輕松的將序號與消息進行關聯
             * 2.輕松批量刪除條目 只要給到序列號
             * 3.支持並發訪問
             */
            ConcurrentSkipListMap<Long, String> outstandingConfirms = new
                    ConcurrentSkipListMap<>();
            /**
             * 確認收到消息的一個回調
             * 1.消息序列號
             * 2.true 可以確認小於等於當前序列號的消息
             * false 確認當前序列號消息
             */
            ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                if (multiple) {
                    //返回的是小於等於當前序列號的未確認消息 是一個 map
                    ConcurrentNavigableMap<Long, String> confirmed =
                            outstandingConfirms.headMap(sequenceNumber, true);
                    //清除該部分未確認消息
                    confirmed.clear();
                }else{
                    //只清除當前序列號的消息
                    outstandingConfirms.remove(sequenceNumber);
                }
            };
            ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
                String message = outstandingConfirms.get(sequenceNumber);
                System.out.println("發布的消息"+message+"未被確認,序列號"+sequenceNumber);
            };
            /**
             * 添加一個異步確認的監聽器
             * 1.確認收到消息的回調
             * 2.未收到消息的回調
             */
            channel.addConfirmListener(ackCallback, null);
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "消息" + i;
                /**
                 * channel.getNextPublishSeqNo()獲取下一個消息的序列號
                 * 通過序列號與消息體進行一個關聯
                 * 全部都是未確認的消息體
                 */
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                channel.basicPublish("", QUEUE, null, message.getBytes());
            }
            long end = System.currentTimeMillis();
            //發布888個異步確認消息,耗時28ms
            System.out.println("發布" + MESSAGE_COUNT + "個異步確認消息,耗時" + (end - begin) +
                    "ms");
        }
    }

注意:

  • 在Spring boot +rabbitMq 項目confirm模式實在channel中開啟的,RabbitTemplate對channel進行了封裝

    rabbitTemplate.setConfirmCallback(自己的實現類)//實現RabbitTemplate.ConfirmCallback
    

    測試完整代碼:

    package com.zy.website.test.mq;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmCallback;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.zy.website.test.BaseTest;
    import org.junit.Test;
    
    import java.util.concurrent.ConcurrentNavigableMap;
    import java.util.concurrent.ConcurrentSkipListMap;
    
    /**
     * 測試MQ Confirm確認機制
     **/
    public class MqConfirmTest extends BaseTest {
        //設置執行次數
        public static final int MESSAGE_COUNT = 888;
        private static final String QUEUE = "confirmTestQueue";
    
        public Channel getConnectionMQ() throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            //設置MabbitMQ所在服務器的ip和端口
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            return channel;
        }
        /**
         * 單個發布確認
         */
        @Test
        public void publishMessageIndividually() throws Exception {
            Channel channel = this.getConnectionMQ();
            //聲明隊列
            channel.queueDeclare(QUEUE, false, false, false, null);
            //開啟發布確認
            channel.confirmSelect();
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("", QUEUE, null, message.getBytes());
                //服務端返回 false 或超時時間內未返回,生產者可以消息重發
                boolean flag = channel.waitForConfirms();
                if (flag) {
                    System.out.println("消息發送成功" + i);
                }
            }
            long end = System.currentTimeMillis();
            //發布888個單獨確認消息,耗時526ms
            System.out.println("發布" + MESSAGE_COUNT + "個單獨確認消息,耗時" + (end - begin) +
                    "ms");
        }
        /**
         * 批量發布確認
         */
        @Test
        public void publishMessageBatch() throws Exception {
            Channel channel = this.getConnectionMQ();
            channel.queueDeclare(QUEUE, false, false, false, null);
            //開啟發布確認
            channel.confirmSelect();
            //批量確認消息大小
            int batchSize = 88;
            //未確認消息個數
            int outstandingMessageCount = 0;
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("", QUEUE, null, message.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount == batchSize) {
                    channel.waitForConfirms();//確認代碼
                    outstandingMessageCount = 0;
                }
            }
            //為了確保還有剩余沒有確認消息 再次確認
            if (outstandingMessageCount > 0) {
                channel.waitForConfirms();
            }
            long end = System.currentTimeMillis();
            System.out.println("發布" + MESSAGE_COUNT + "個批量確認消息,耗時" + (end - begin) +
                    "ms");
        }
        /**
         * 異步發布確認
         */
        @Test
        public void publishMessageAsync() throws Exception {
            try (Channel channel = this.getConnectionMQ()) {
                channel.queueDeclare(QUEUE, false, false, false, null);
                //開啟發布確認
                channel.confirmSelect();
                /**
                 * 線程安全有序的一個哈希表,適用於高並發的情況
                 * 1.輕松的將序號與消息進行關聯
                 * 2.輕松批量刪除條目 只要給到序列號
                 * 3.支持並發訪問
                 */
                ConcurrentSkipListMap<Long, String> outstandingConfirms = new
                        ConcurrentSkipListMap<>();
                /**
                 * 確認收到消息的一個回調
                 * 1.消息序列號
                 * 2.true 可以確認小於等於當前序列號的消息
                 * false 確認當前序列號消息
                 */
                ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                    if (multiple) {
                        //返回的是小於等於當前序列號的未確認消息 是一個 map
                        ConcurrentNavigableMap<Long, String> confirmed =
                                outstandingConfirms.headMap(sequenceNumber, true);
                        //清除該部分未確認消息
                        confirmed.clear();
                    } else {
                        //只清除當前序列號的消息
                        outstandingConfirms.remove(sequenceNumber);
                    }
                };
                ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
                    String message = outstandingConfirms.get(sequenceNumber);
                    System.out.println("發布的消息" + message + "未被確認,序列號" + sequenceNumber);
                };
                /**
                 * 添加一個異步確認的監聽器
                 * 1.確認收到消息的回調
                 * 2.未收到消息的回調
                 */
                channel.addConfirmListener(ackCallback, null);
                long begin = System.currentTimeMillis();
                for (int i = 0; i < MESSAGE_COUNT; i++) {
                    String message = "消息" + i;
                    /**
                     * channel.getNextPublishSeqNo()獲取下一個消息的序列號
                     * 通過序列號與消息體進行一個關聯
                     * 全部都是未確認的消息體
                     */
                    outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                    channel.basicPublish("", QUEUE, null, message.getBytes());
                }
                long end = System.currentTimeMillis();
                //發布888個異步確認消息,耗時28ms
                System.out.println("發布" + MESSAGE_COUNT + "個異步確認消息,耗時" + (end - begin) +
                        "ms");
            }
        }
    }
    
    

RabbitMq自己丟失數據

消息從交換機路由到隊列

  • 當前隊列不存在,消息丟失
  • routinKey錯誤,消息丟失

兩種處理無法路由的消息,一種就是讓服務端重發給生產者,一種是讓交換機路由到另一個備份的交換機

測試:

  1. 發送一條消息到一個不存在的routinKey
  2. 觸發回發機制

配置類:

    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnCallback(new MqReturnCallBack());//消息回發

image-20220322170806010

MqReturnCallBack類:


import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

/**
 * MQ 消息回發
 **/
@Component
public class MqReturnCallBack implements RabbitTemplate.ReturnCallback {

    private static Logger logger = LogManager.getLogger(MqConfirmCallback.class);

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        try {
            logger.info("MQ消息回發  return--message:" + new String(message.getBody(), "UTF-8") + ",replyCode:" + replyCode
                    + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
        } catch (UnsupportedEncodingException e) {
            logger.error("MQ消息回發 異常{}",e);
        }
    }
}

發送消息:

image-20220322170853824

打印日志:

image-20220322170912465

備份交換機

在創建交換機的時候,從屬性中指定備份交換機即可

// 在聲明交換機的時候指定備份交換機
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("alternate-exchange", "ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE", "topic", false, false, false, arguments);

持久化

RabbitMQ的服務或者硬件發生故障,比如系統宕機、重啟、關閉等,可能會導致內存中的消息丟失,所以我們要把消息本身和元數據(隊列、交換機、綁定信息)都保存到磁盤中

1. 隊列持久化

​ queue的持久化是通過durable=true來實現的,只需要在隊列聲明的時候設置

注意:只會持久化隊列,並不會持久化隊列中的消息

image-20220322172025175

//源碼
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
        this(name, durable, exclusive, autoDelete, (Map)null);
    }
//參數:
/*
queue:queue的名稱
exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。
    這里需要注意三點:
    1. 排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一連接創建的排他隊列;
    2.“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;
    3.即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。
autoDelete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。
*/

2. 消息持久化

設置消息的持久化:

image-20220322172825062

總要參數為:setDeliveryMode()發送消息的時候會確認消息是否進行持久化

枚舉類詳情MessageDeliveryMode:
public enum MessageDeliveryMode {
    NON_PERSISTENT,
    PERSISTENT;

    private MessageDeliveryMode() {
    }

    public static int toInt(MessageDeliveryMode mode) {
        switch(mode) {
        case NON_PERSISTENT:
            return 1;
        case PERSISTENT:
                //當返回為2的時候就會進行消息的持久化
            return 2;
        default:
            return -1;
        }
    }

    public static MessageDeliveryMode fromInt(int modeAsNumber) {
        switch(modeAsNumber) {
        case 1:
            return NON_PERSISTENT;
        case 2:
            return PERSISTENT;
        default:
            return null;
        }
    }
}

    

3. 交換機持久化

如果exchange不設置持久化,那么當broker服務重啟之后,exchange將不復存在,那么既而發送方rabbitmq producer就無法正常發送消息

image-20220322173212834


消費者端丟失數據

手動ACK及自動ACK

  • 為了保證消息從隊列可靠的達到消費者,RabbitMQ 提供了消息確認機制(Message Acknowledgement)。消費者在訂閱隊列時,可以指定 autoAck 參數,當 autoAck 參數等於 false 時,RabbitMQ 會等待消費者顯式地回復確認信號后才從內存(或者磁盤)中移除消息(實際上是先打上刪除標記,之后在刪除)。當 autoAck 參數等於 true 時,RabbitMQ 會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正地消費到了這些消息。

  • 采用消息確認機制后,只要設置 autoAck 參數為 false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題,因為 RabbitMQ 會一直等待持有消息直到消費者顯式調用 Basic.Ack 命令為止。

  • 當autoAck 參數為 false 時,對於 RabbitMQ 服務器端而言,隊列中的消息分成了兩部分:一部分是等待投遞給消費者的消息;一部分是已經投遞給消費者,但是還沒有收到消費者確認信號的消息。如果 RabbitMQ 服務器端一直沒有收到消費者的確認信號,並且消費此消息的消費者已經斷開連接,則服務器端會安排該消息重新進入隊列,等待投遞給下一個消費者(也可能還是原來的那個消費者)。

  • RabbitMQ 不會為未確認的消息設置過期時間,判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息連接是否已經斷開,這個設置的原因是 RabbitMQ 允許消費者消費一條消息的時間可以很久

  • RabbitMQ 的 Web 管理平台上可以看到當前隊列中的 “Ready” 狀態和 “Unacknowledged” 狀態的消息數,分別對應等待投遞給消費者的消息數和已經投遞給消費者但是未收到確認信號的消息數

image-20220322173629493

  • ACK分為手動和自動
    • 消費者確認發生在監聽隊列的消費者處理業務失敗,如:發生了異常,不符合要求的數據等,這些場景我們就需要手動處理,比如重新發送或者丟棄
    • RabbitMQ 消息確認機制(ACK)默認是自動確認的,自動確認會在消息發送給消費者后立即確認,但存在丟失消息的可能,如果消費端消費邏輯拋出異常,假如你用回滾了也只是保證了數據的一致性,但是消息還是丟了,也就是消費端沒有處理成功這條消息,那么就相當於丟失了消息
  • 消息確認模式:
    • AcknowledgeMode.NONE:自動確認
    • AcknowledgeMode.AUTO:根據情況確認
    • AcknowledgeMode.MANUAL:手動確認
  • 消費者收到消息后,手動調用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到這些消息后,才認為本次投遞完成
    • Basic.Ack 命令:用於確認當前消息
    • Basic.Nack 命令:用於否定當前消息(注意:這是AMQP 0-9-1的RabbitMQ擴展)
    • Basic.Reject 命令:用於拒絕當前消息

1. basicAck 方法

basicAck 方法用於確認當前消息

void basicAck(long var1, boolean var3) throws IOException;
/*
參數說明:
long deliveryTag:唯一標識 ID。

boolean multiple:上面已經解釋。

boolean requeue:
	如果 requeue 參數設置為 true,則 RabbitMQ 會重新將這條消息存入隊列,以便發送給下一個訂閱的消費者;
	如果 requeue 參數設置為 false,則 RabbitMQ 立即會還把消息從隊列中移除,而不會把它發送給新的消費者。
*/

image-20220322174237871

2. basicNack 方法

basicNack 方法用於否定當前消息。 由於 basicReject 方法一次只能拒絕一條消息,如果想批量拒絕消息,則可以使用 basicNack 方法。消費者客戶端可以使用 channel.basicNack 方法來實現,方法定義如下:

    void basicNack(long var1, boolean var3, boolean var4) throws IOException;
/*
參數說明:
long var1:唯一標識 ID。

boolean var3:上面已經解釋。

boolean var4:
	如果 requeue 參數設置為 true,則 RabbitMQ 會重新將這條消息存入隊列,以便發送給下一個訂閱的消費者;
    如果 requeue 參數設置為 false,則 RabbitMQ 立即會還把消息從隊列中移除,而不會把它發送給新的消費者
*/

**3.basicReject 方法 **

basicReject 方法用於明確拒絕當前的消息而不是確認。 RabbitMQ 在 2.0.0 版本開始引入 Basic.Reject 命令,消費者客戶端可以調用與其對應的 channel.basicReject 方法來告訴 RabbitMQ 拒絕這個消息

void basicReject(long deliveryTag, boolean requeue) throws IOException;
/*
參數說明:
long deliveryTag:唯一標識 ID。

boolean requeue:上面已經解釋
*/

image-20220322174443793

4. basicRecover方法

是否恢復消息到隊列,參數是是否requeue,true則重新入隊列,並且盡可能的將之前recover的消息投遞給其他消費者消費,而不是自己再次消費。false則消息會重新被投遞給自己

RecoverOk basicRecover(boolean var1) throws IOException;

RabbitMQ學習系列

RabbitMQ 安裝快速下載

springboot簡單整合RabbitMQ

RabbitMQ消息隊列

RabbitMQ 消息投遞以及ACK機制

RabbitMQ 消息冪等性&順序性&消息積壓&面試問題


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM