什么是消息應答
消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務並僅只完成 了部分突然它掛掉了,會發生什么情況。RabbitMQ 一旦向消費者傳遞了一條消息,便立即將該消 息標記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息。以及后續 發送給該消費這的消息,因為它無法接收到。 為了保證消息在發送過程中不丟失,rabbitmq 引入消息應答機制,消息應答就是:消費者在接收 到消息並且處理該消息之后,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該消息刪除了。
消費者收到的每一條消息都必須進行確認。消息確認后,RabbitMQ才會從隊列刪除這條消息,RabbitMQ不會為未確認的消息設置超時時間,它判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息的消費者連接是否已經斷開。這么設計的原因是RabbitMQ允許消費者消費一條消息的時間可以很久很久。
消息應答方式
RabbitMQ中消息的應答,共有兩種方式:
自動應答
手動應答
自動應答
消費者在消費消息時,可以指定autoAck參數,當autoAck=true
時,一旦消費者接收到了消息,就視為自動應答了消息。
RabbitMQ中聲明消費者處理業務邏輯之前自動確認就已經完成了(推送Consume、消息拉取Ge都是如此),可由下面代碼進行測試,如下:
上述代碼我們采用了自動確認,並且在業務處理的最后手動拋出異常( 模擬出錯 ),然后我們進行測試,發現重啟消費者后不會再收到消息( 本來是應該再次接收到的,因為我們模擬出錯了,這條消息應該算是消費失敗的 ),說明該消息已經被消費者消費掉並確認了,RabbitMQ已經將其移除了,我們模擬出錯了 但是消息仍然被移除了 , 可以理解為消息丟失了 , 自動確認優點是快 , 缺點是有丟失消息的風險
手動應答
手動應答分為三種情況:
(1) 手動不確認
channel.basicNack(deliveryTag, false, true);
basic.nack方法為不確認deliveryTag對應的消息,第二個參數是否應用於多消息(這個參數不細講了 ,不建議使用 , 有丟失消息風險),第三個參數是否requeue,與basic.reject區別就是同時支持多個消息,可以nack該消費者先前接收未ack的所有消息。nack后的消息也會被自己消費到。
(2) 手動拒絕
channel.basicReject(deliveryTag, true);
basic.reject方法拒絕deliveryTag對應的消息,第二個參數是否requeue,true則重新入隊列,否則丟棄或者進入死信隊列。
該方法reject后,該消費者還是會消費到該條被reject的消息。
(3) 手動確認應答
從手動應答的介紹,我們發現在自動確認機制,如果消費者在處理消息的過程中,出了錯,就沒有什么辦法重新處理這條消息,所以我們很多時候,需要在消息處理成功后,再確認消息,這就需要手動確認。
當autoAck=false時,RabbitMQ會等待消費者手動發回ack信號后 , 才從內存(和磁盤,如果是持久化消息的話)中移除消息。
采用消息確認機制后,只要令autoAck=false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題,因為RabbitMQ會一直持有消息直到消費者手動調用channel.basicAck為止。
當autoAck=false時,對於RabbitMQ服務器端而言,如果服務器端一直沒有收到消費者的ack信號,並且消費此消息的消費者已經斷開連接,則服務器端會安排該消息重新進入隊列,等待投遞給下一個消費者(也可能還是原來的那個消費者)。
這里我們啟動了手動確認后,就必須調用channel.basicAck方法進行確認,否則的話RabbitMQ會一直進行等待,當我們這個消費者關閉后,RabbitMQ會將該條消息再發給對應的消費者進行消費,直到有消費者對該條消息進行消費並應答完成。
看到上述對消息的處理后,我們還發現一個問題,如果channel.basicAck收到確認前的代碼有問題,會拋出異常無法進行手動確認怎么辦,一般消費者也不會連接中斷,那么該消息就一直無法被處理,連被其他消費者處理的機會都沒有
所以一般我們會進行try-catch處理,處理成功則手動確認,失敗或有異常則拒絕。
上面抄襲自 : https://blog.csdn.net/newbie0107/article/details/106820615
下面是我覺得需要補充的幾點:
1)上面的例子中有一點 ,我覺得非常重要 .
"在手動應答的情況下 , 如果channel.basicAck收到確認前的代碼有問題,會拋出異常,導致無法進行手動確認,一般消費者也不會連接中斷,那么該消息就一直無法被處理,連被其他消費者處理的機會都沒有,所以一般我們會進行try-catch處理,處理成功則手動確認,失敗或有異常則拒絕。"
這點我自己寫了例子驗證過的,確實是這樣, 所以一定要進行try-catch處理 ,不然這個消費者就會卡住不動 ,神馬操作都不做 ,也不會往后繼續消費別的消息.
2) 我看完博客一直有疑問 , 在手動應答的情景中 , 如果程序都沒有問題 , 但是我就是一直不確認 , 會發生什么事情,
結果就是如下圖:
隊列還是會被消費, 只是從ready狀態轉換成了 unacked狀態 , 而且當前消費者 ,消費完這200個后,不會因為這200個處於unacked狀態 , 就重復再把這200個再消費一邊 , 但是 !!! 如果重新打開一個新的消費者(或者重啟當前消費者) , 這個新的消費者會把這200個unacked的再消費一遍 , 這個新消費者消費完這200個之后 , 也就停住腳步了 , 等待新的消息進來, 不會一遍又一遍的重復消費這200個
3)
手動確認模式,消息手動拒絕中如果requeue為true會重新放入隊列,但是如果消費者在處理過程中一直拋出異常,會導致入隊-》拒絕-》入隊的循環,該怎么處理呢?
第一種方法是根據異常類型來選擇是否重新放入隊列。
第二種方法是先成功確認,然后通過channel.basicPublish()重新發布這個消息。重新發布的消息網上說會放到隊列后面,進而不會影響已經進入隊列的消息處理。
推薦一篇關於rabbitmq消息應答的特別有意思的博客
https://www.cnblogs.com/englefly/p/8435998.html