一、概述
消費者處理一個任務是需要一段時間的,如果有一個消費者正在處理一個比較耗時的任務並且只處理了一部分,突然這個時候消費者宕機了,那么會出現什么情況呢?
要回答這個問題,我們先了解一下 RabbitMQ 的消息應答機制
為了保證消息從隊列可靠地達到消費者並且被消費者消費處理,RabbitMQ 提供了消息應答機制,RabbitMQ 有兩種應答機制,自動應答和手動應答
1、自動應答、RabbitMQ 只要將消息分發給消費者就被認為消息傳遞成功,就會將內存中的消息刪除,而不管消費者有沒有處理完消息
2、手動應答、RabbitMQ 將消息分發給了消費者,並且只有當消費者處理完成了整個消息之后才會被認為消息傳遞成功了,然后才會將內存中的消息刪除
可以看出,如果是自動應答模式,消費者在處理任務的過程中宕機了,那么消息將會丟失,而手動應答則能夠保證消息不會被丟失,所以在實際的應用當中絕大多數都采用手動應答
二、手動應答常用 API
// 該消息已經處理完成了,RabbitMQ 內存可以刪除該消息了
void basicAck(long deliveryTag, boolean multiple)
// 不處理該消息,直接拒絕,然后將該消息丟棄
void basicReject(long deliveryTag, boolean requeue)
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
三、原理圖
Producer 生產消息發送給消息隊列,Consumer01 消費消息1、Consumer02 消費消息2、Consumer01 接收到了消息之后,在處理完部分邏輯的時候突然宕機了,Consumer01 未發送 ACK,此時消息1 不會丟失,而是重新進入隊列,由狀態正常的 Consumer02 消費掉
四、編碼
1、RabbitmqUtils(工具類)
public class RabbitmqUtils {
private static final String HOST_ADDRESS = "192.168.59.130";
private static final String USER_NAME = "admin";
private static final String PASSWORD = "admin123";
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_ADDRESS);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
2、Producer
public class Producer {
private static final String QUEUE_NAME = "ackDemo";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "有意思的消息--->";
for (int i = 1; i < 11; i++) {
channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes(StandardCharsets.UTF_8));
}
System.out.println("Producer send message successfully");
}
}
3、Consumer01
public class Consumer01 {
private static final String QUEUE_NAME = "ackDemo";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
try {
// 休眠 10 s
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 參數一、deliveryTag:消息應答標記
// 參數二、multiple:(false、只應答接收到的那個消息 true、應答所有傳遞過來的消息)
// 處理完邏輯之后應答 ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(message);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag);
};
// 設置手動應答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
4、Consumer02
public class Consumer02 {
private static final String QUEUE_NAME = "ackDemo";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
// 參數一、deliveryTag:消息應答標記
// 參數二、multiple:(false、只應答接收到的那個消息 true、應答所有傳遞過來的消息)
// 處理完邏輯之后應答 ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(message);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag);
};
// 設置手動應答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
五、測試過程及結果
1、先啟動 Cousumer01、Consumer02
2、生產者發送 10 條消息,根據默認的輪詢規則,一個消費者(假設此時為 Consumer01)消費第 1、3、5、7、9 條消息,另外一個消費者(假設此時為 Consumer02)消費第 2、4、6、8、10 條消息
3、當 Consumer01 消費第 1、3 條消息的時候手動強制關閉 Consumer01,那么原先本應該由 Consumer01 消費的第 5、7、9 條消息不會丟失,它們將重新進入隊列由 Consumer02 消費掉
4、Consumer01、Consumer02 消費的消息如下: