RabbitMQ入門_06_深入了解ack


A. Delivery Tag

參考資料:https://www.rabbitmq.com/confirms.html

仔細查看一下 Consumer 的回調方法:

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                ......
                consumerChannel1.basicAck(envelope.getDeliveryTag(), false);
            }

當我們需要確認一條消息已經被消費時,我們調用的 basicAck 方法的第一個參數是 Delivery Tag。

Delivery Tag 用來標識信道中投遞的消息。RabbitMQ 推送消息給 Consumer 時,會附帶一個 Delivery Tag,以便 Consumer 可以在消息確認時告訴 RabbitMQ 到底是哪條消息被確認了。

RabbitMQ 保證在每個信道中,每條消息的 Delivery Tag 從 1 開始遞增。

運行下面的例子可以直觀的看到這點:

gordon.study.rabbitmq.ack.TestAckBasic.java

public class TestAckBasic {
 
    private static final String QUEUE_NAME = "hello";
 
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel consumerChannel1 = connection.createChannel();
        consumerChannel1.queueDeclare(QUEUE_NAME, false, false, false, null);
        consumerChannel1.basicQos(3);
        Consumer consumer1 = new DefaultConsumer(consumerChannel1) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.printf("in consumer A (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message);
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                }
                consumerChannel1.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        consumerChannel1.basicConsume(QUEUE_NAME, false, consumer1);
 
        final Channel consumerChannel2 = connection.createChannel();
        consumerChannel2.basicQos(3);
        Consumer consumer2 = new DefaultConsumer(consumerChannel2) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.printf("in consumer B (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message);
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                }
                consumerChannel2.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        consumerChannel2.basicConsume(QUEUE_NAME, false, consumer2);
 
        Channel senderChannel = connection.createChannel();
        for (int i = 0; i < 10;) {
            String message = "NO. " + ++i;
            TimeUnit.MILLISECONDS.sleep(100);
            senderChannel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        }
        senderChannel.close();
    }
}

result:

in consumer A (delivery tag is 1): NO. 1
in consumer B (delivery tag is 1): NO. 2
in consumer A (delivery tag is 2): NO. 3
in consumer B (delivery tag is 2): NO. 4
in consumer A (delivery tag is 3): NO. 5
in consumer B (delivery tag is 3): NO. 6
in consumer A (delivery tag is 4): NO. 7
in consumer B (delivery tag is 4): NO. 8
in consumer A (delivery tag is 5): NO. 9
in consumer B (delivery tag is 5): NO. 10

可見,兩個信道的 delivery tag 分別從 1 遞增到 5。(如果修改代碼,將兩個 Consumer 共享同一個信道,則 delivery tag 是從 1 遞增到 10,參考 gordon.study.rabbitmq.ack.TestAckInOneChannel.java

basicAck 方法的第二個參數 multiple 取值為 false 時,表示通知 RabbitMQ 當前消息被確認;如果為 true,則額外將比第一個參數指定的 delivery tag 小的消息一並確認。(批量確認針對的是整個信道,參考gordon.study.rabbitmq.ack.TestBatchAckInOneChannel.java。)

對同一消息的重復確認,或者對不存在的消息的確認,會產生 IO 異常,導致信道關閉。

B. 忘了確認會怎樣

如果我們注釋掉22行,讓 consumerChannel1 不再確認消息,世界會怎樣?

Unacked messages

只要程序還在運行,這3條消息就一直是 Unacked 狀態,無法被 RabbitMQ 重新投遞。更厲害的是,RabbitMQ 消息消費並沒有超時機制,也就是說,程序不重啟,消息就永遠是 Unacked 狀態。處理運維事件時不要忘了這些 Unacked 狀態的消息

當程序關閉時(實際只要 Consumer 關閉就行),這3條消息會恢復為 Ready 狀態。

C. 取消確認

當消費消息出現異常時,我們需要取消確認,這時我們可以使用 Channel 的 basicReject 方法。

    void basicReject(long deliveryTag, boolean requeue) throws IOException;

第一個參數指定 delivery tag,第二個參數說明如何處理這個失敗消息。requeue 值為 true 表示該消息重新放回隊列頭,值為 false 表示放棄這條消息

一般來說,如果是系統無法處理的異常,我們一般是將 requeue 設為 false,例如消息格式錯誤,再處理多少次也是異常。調用第三方接口超時這類異常 requeue 應該設為 true。

從 basicReject 方法參數可見,取消確認不支持批量操作(類似於 basicAck 的 multiple 參數)。所以,RabbitMQ 增加了 basicNack 方法以提供批量取消能力。參考 https://www.rabbitmq.com/nack.html

PS:Reject 的消息重新推送來時,delivery tag 就是新的值了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM