A. Delivery Tag
參考資料:https://www.rabbitmq.com/confirms.html
仔細查看一下 Consumer 的回調方法:
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
......
consumerChannel1.basicAck(envelope.getDeliveryTag(), false);
}
當我們需要確認一條消息已經被消費時,我們調用的 basicAck 方法的第一個參數是 Delivery Tag。
Delivery Tag 用來標識信道中投遞的消息。RabbitMQ 推送消息給 Consumer 時,會附帶一個 Delivery Tag,以便 Consumer 可以在消息確認時告訴 RabbitMQ 到底是哪條消息被確認了。
RabbitMQ 保證在每個信道中,每條消息的 Delivery Tag 從 1 開始遞增。
運行下面的例子可以直觀的看到這點:
gordon.study.rabbitmq.ack.TestAckBasic.java
public class TestAckBasic {
private static final String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel consumerChannel1 = connection.createChannel();
consumerChannel1.queueDeclare(QUEUE_NAME, false, false, false, null);
consumerChannel1.basicQos(3);
Consumer consumer1 = new DefaultConsumer(consumerChannel1) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.printf("in consumer A (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message);
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
}
consumerChannel1.basicAck(envelope.getDeliveryTag(), false);
}
};
consumerChannel1.basicConsume(QUEUE_NAME, false, consumer1);
final Channel consumerChannel2 = connection.createChannel();
consumerChannel2.basicQos(3);
Consumer consumer2 = new DefaultConsumer(consumerChannel2) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.printf("in consumer B (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message);
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
}
consumerChannel2.basicAck(envelope.getDeliveryTag(), false);
}
};
consumerChannel2.basicConsume(QUEUE_NAME, false, consumer2);
Channel senderChannel = connection.createChannel();
for (int i = 0; i < 10;) {
String message = "NO. " + ++i;
TimeUnit.MILLISECONDS.sleep(100);
senderChannel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
}
senderChannel.close();
}
}
result:
in consumer A (delivery tag is 1): NO. 1
in consumer B (delivery tag is 1): NO. 2
in consumer A (delivery tag is 2): NO. 3
in consumer B (delivery tag is 2): NO. 4
in consumer A (delivery tag is 3): NO. 5
in consumer B (delivery tag is 3): NO. 6
in consumer A (delivery tag is 4): NO. 7
in consumer B (delivery tag is 4): NO. 8
in consumer A (delivery tag is 5): NO. 9
in consumer B (delivery tag is 5): NO. 10
可見,兩個信道的 delivery tag 分別從 1 遞增到 5。(如果修改代碼,將兩個 Consumer 共享同一個信道,則 delivery tag 是從 1 遞增到 10,參考 gordon.study.rabbitmq.ack.TestAckInOneChannel.java)
basicAck 方法的第二個參數 multiple 取值為 false 時,表示通知 RabbitMQ 當前消息被確認;如果為 true,則額外將比第一個參數指定的 delivery tag 小的消息一並確認。(批量確認針對的是整個信道,參考gordon.study.rabbitmq.ack.TestBatchAckInOneChannel.java。)
對同一消息的重復確認,或者對不存在的消息的確認,會產生 IO 異常,導致信道關閉。
B. 忘了確認會怎樣
如果我們注釋掉22行,讓 consumerChannel1 不再確認消息,世界會怎樣?
只要程序還在運行,這3條消息就一直是 Unacked 狀態,無法被 RabbitMQ 重新投遞。更厲害的是,RabbitMQ 消息消費並沒有超時機制,也就是說,程序不重啟,消息就永遠是 Unacked 狀態。處理運維事件時不要忘了這些 Unacked 狀態的消息。
當程序關閉時(實際只要 Consumer 關閉就行),這3條消息會恢復為 Ready 狀態。
C. 取消確認
當消費消息出現異常時,我們需要取消確認,這時我們可以使用 Channel 的 basicReject 方法。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
第一個參數指定 delivery tag,第二個參數說明如何處理這個失敗消息。requeue 值為 true 表示該消息重新放回隊列頭,值為 false 表示放棄這條消息。
一般來說,如果是系統無法處理的異常,我們一般是將 requeue 設為 false,例如消息格式錯誤,再處理多少次也是異常。調用第三方接口超時這類異常 requeue 應該設為 true。
從 basicReject 方法參數可見,取消確認不支持批量操作(類似於 basicAck 的 multiple 參數)。所以,RabbitMQ 增加了 basicNack 方法以提供批量取消能力。參考 https://www.rabbitmq.com/nack.html
PS:Reject 的消息重新推送來時,delivery tag 就是新的值了。