六、RabbitMq 發布確認模式(confirm模式)


發布確認原理

生產者將信道設置成 confirm 模式,一旦信道進入 confirm 模式,所有在該信道上面發布的消息都將會被指派一個唯一的 ID(從 1 開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發送一個確認給生產者(包含消息的唯一 ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會在將消息寫入磁盤之后發出,broker 回傳給生產者的確認消息中 delivery-tag 域包含了確認消息的序列號,此外 broker 也可以設置basic.ack 的 multiple 域,表示到這個序列號之前的所有消息都已經得到了處理。

confirm 模式最大的好處在於他是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息,如果 RabbitMQ 因為自身內部錯誤導致消息丟失,就會發送一條 nack 消息,生產者應用程序同樣可以在回調方法中處理該 nack 消息。

發布確認的策略

開啟發布確認的方法

發布確認默認是沒有開啟的,如果要開啟需要調用方法 confirmSelect,每當你要想使用發布確認,都需要在 channel 上調用該方法

Channel channel = connection.createChannel();
channel.confirmSelect();

同步確認發布

單個確認發布

這是一種簡單的確認方式,它是一種同步確認發布的方式,也就是發布一個消息之后只有它被確認發布,后續的消息才能繼續發布,waitForConfirmsOrDie(long)這個方法只有在消息被確認的時候才返回,如果在指定時間范圍內這個消息沒有被確認那么它將拋出異常。

這種確認方式有一個最大的缺點就是:發布速度特別的慢,因為如果沒有確認發布的消息就會阻塞所有后續消息的發布,這種方式最多提供每秒不超過數百條發布消息的吞吐量。當然對於某些應用程序來說這可能已經足夠了。

private static final int MESSAGE_COUNT = 1000;

public static void publishMessageIndividually() throws Exception {
    try (Channel channel = RabbitMqUtils.getChannel()) {
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        //開啟發布確認
        channel.confirmSelect();
        long begin = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            //服務端返回 false 或超時時間內未返回,生產者可以消息重發
            boolean flag = channel.waitForConfirms();
            if (flag) {
                System.out.println("消息發送成功");
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("發布" + MESSAGE_COUNT + "個單獨確認消息,耗時" + (end - begin) + "ms");
    }
}

批量確認發布

上面那種方式非常慢,與單個等待確認消息相比,先發布一批消息然后一起確認可以極大地提高吞吐量,當然這種方式的缺點就是:當發生故障導致發布出現問題時,不知道是哪個消息出現問題了,我們必須將整個批處理保存在內存中,以記錄重要的信息而后重新發布消息。當然這種方案仍然是同步的,也一樣阻塞消息的發布。

private static final int MESSAGE_COUNT = 1000;    

public static void publishMessageBatch() throws Exception {
    try (Channel channel = RabbitMqUtils.getChannel()) {
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        //開啟發布確認
        channel.confirmSelect();
        //批量確認消息大小
        int batchSize = 100;
        //未確認消息個數
        int outstandingMessageCount = 0;
        long begin = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            outstandingMessageCount++;
            if (outstandingMessageCount == batchSize) {
                channel.waitForConfirms();
                outstandingMessageCount = 0;
            }
        }
        //為了確保還有剩余沒有確認消息 再次確認
        if (outstandingMessageCount > 0) {
            channel.waitForConfirms();
        }
        long end = System.currentTimeMillis();
        System.out.println("發布" + MESSAGE_COUNT + "個批量確認消息,耗時" + (end - begin) + "ms");
    }
}

異步確認發布

異步確認雖然編程邏輯比上兩個要復雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回調函數來達到消息可靠性傳遞的,這個中間件也是通過函數回調來保證是否投遞成功,下面就讓我們來詳細講解異步確認是怎么實現的。

消息異步確認

    public static void publishMessageAsync() throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null);
            //開啟發布確認
            channel.confirmSelect();
            /**
             * 線程安全有序的一個哈希表,適用於高並發的情況
             * 1.輕松的將序號與消息進行關聯
             * 2.輕松批量刪除條目 只要給到序列號
             * 3.支持並發訪問
             */
            ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
            /**
             * 確認收到消息的一個回調
             * 1.消息序列號
             * 2.true 可以確認小於等於當前序列號的消息
             * false 確認當前序列號消息
             */
            ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                if (multiple) {
                    //返回的是小於等於當前序列號的未確認消息 是一個 map
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
                    //清除該部分未確認消息
                    confirmed.clear();
                } else {
                    //只清除當前序列號的消息
                    outstandingConfirms.remove(sequenceNumber);
                }
            };
            ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
                String message = outstandingConfirms.get(sequenceNumber);
                System.out.println("發布的消息" + message + "未被確認,序列號" + sequenceNumber);
            };
            /**
             * 添加一個異步確認的監聽器
             * 1.確認收到消息的回調
             * 2.未收到消息的回調
             */
            channel.addConfirmListener(ackCallback, null);
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "消息" + i;
                /**
                 * channel.getNextPublishSeqNo()獲取下一個消息的序列號
                 * 通過序列號與消息體進行一個關聯
                 * 全部都是未確認的消息體
                 */
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                channel.basicPublish("", queueName, null, message.getBytes());
            }
            long end = System.currentTimeMillis();
            System.out.println("發布" + MESSAGE_COUNT + "個異步確認消息,耗時" + (end - begin) + "ms");
        }
    }

如何處理異步未確認消息

最好的解決的解決方案就是把未確認的消息放到一個基於內存的能被發布線程訪問的隊列,比如說用 ConcurrentLinkedQueue 這個隊列在 confirm callbacks 與發布線程之間進行消息的傳遞。

以上 3 種發布確認速度對比

單獨發布消息

  • 同步等待確認,簡單,但吞吐量非常有限。

批量發布消息

  • 批量同步等待確認,簡單,合理的吞吐量,一旦出現問題但很難推斷出是那條消息出現了問題。

異步處理:

  • 最佳性能和資源使用,在出現錯誤的情況下可以很好地控制,但是實現起來稍微難些
public static void main(String[] args) throws Exception {
 //這個消息數量設置為 1000 好些 不然花費時間太長
 publishMessagesIndividually();
 publishMessagesInBatch();
 handlePublishConfirmsAsynchronously();
}
//運行結果
發布 1,000 個單獨確認消息耗時 50,278 ms
發布 1,000 個批量確認消息耗時 635 ms
發布 1,000 個異步確認消息耗時 92 ms


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM