以RabbitMQ為例,默認情況下 RabbitMQ 是自動ACK機制,就意味着 MQ 會在消息發送完畢后,自動幫我們去ACK,然后刪除消息的信息。
這樣依賴就存在這樣一個問題:
如果消費者處理消息需要較長時間,最好的做法是消費端處理完之后手動去確認。
1、配置文件:
rabbitmq:
host: ${yun.activity.rabbitmq.host}
port: ${yun.activity.rabbitmq.port}
username: ${yun.activity.rabbitmq.username}
password: ${yun.activity.rabbitmq.password}
virtual-host: ${yun.activity.rabbitmq.virtual-host}
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual
通過acknowledge-mode: manual 設置為手動設置Ack模式。
2、消費者:
@RabbitListener(queues = "activity-eleven-first-notify", containerFactory = "activityElevenCainerFactory")
public void processNormalOrder(Message message, Channel channel) {
try {
dealInfo(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
以上代碼通過,channel.basicAck去手動回復消息處理狀態,
除了basicAck之外,RabbitMQ還提供了其他幾個方法進行設置Ack,如下:
另外,消息的確認類型:
1)channel.basicAck(deliveryTag, multiple);
consumer處理成功后,通知broker刪除隊列中的消息,如果設置multiple=true,表示支持批量確認機制以減少網絡流量。
例如:有值為5,6,7,8 deliveryTag的投遞
如果此時channel.basicAck(8, true);則表示前面未確認的5,6,7投遞也一起確認處理完畢。
如果此時channel.basicAck(8, false);則僅表示deliveryTag=8的消息已經成功處理。
2)channel.basicNack(deliveryTag, multiple, requeue);
consumer處理失敗后,例如:有值為5,6,7,8 deliveryTag的投遞。
如果channel.basicNack(8, true, true);表示deliveryTag=8之前未確認的消息都處理失敗且將這些消息重新放回隊列中。
如果channel.basicNack(8, true, false);表示deliveryTag=8之前未確認的消息都處理失敗且將這些消息直接丟棄。
如果channel.basicNack(8, false, true);表示deliveryTag=8的消息處理失敗且將該消息重新放回隊列。
如果channel.basicNack(8, false, false);表示deliveryTag=8的消息處理失敗且將該消息直接丟棄。
3)channel.basicReject(deliveryTag, requeue);
相比channel.basicNack,除了沒有multiple批量確認機制之外,其他語義完全一樣。
如果channel.basicReject(8, true);表示deliveryTag=8的消息處理失敗且將該消息重新放回隊列。
如果channel.basicReject(8, false);表示deliveryTag=8的消息處理失敗且將該消息直接丟棄。
由此可見,手動Ack如果處理方式不對會發生一些問題。
1.沒有及時ack,或者程序出現bug,所有的消息將被存在unacked中,消耗內存
如果忘記了ack,那么后果很嚴重。當Consumer退出時,Message會重新分發。然后RabbitMQ會占用越來越多的內存,由於 RabbitMQ會長時間運行,因此這個“內存泄漏”是致命的。
2.如果使用BasicNack,將消費失敗的消息重新塞進隊列的頭部,則會造成死循環。
(解決basicNack造成的消息循環循環消費的辦法是為隊列設置“回退隊列”,設置回退隊列和閥值,如設置隊列為q1,閥值為2,則在rollback兩次后將消息轉入q1)
綜上,手動ack需要注意的是:
1.在消費者端一定要進行ack,或者是nack,可以放在try方法塊的finally中執行
2.可以對消費者的異常狀態進行捕捉,根據異常類型選擇ack,或者nack拋棄消息,nack再次嘗試
3.對於nack的再次嘗試,是進入到隊列頭的,如果一直是失敗的狀態,將會造成阻塞。所以最好是專門投遞到“死信隊列”,
【When a message is requeued, it will be placed to its original position in its queue, if possible. If not (due to concurrent deliveries and acknowledgements from other consumers when multiple consumers share a queue), the message will be requeued to a position closer to queue head.】
————————————————
原文鏈接:https://blog.csdn.net/vtopqx/article/details/104019115