RabbitMQ消息隊列:ACK機制


     每個Consumer可能需要一段時間才能處理完收到的數據。如果在這個過程中,Consumer出錯了,異常退出了,而數據還沒有處理完成,那么 非常不幸,這段數據就丟失了。

因為我們采用no-ack的方式進行確認,也就是說,每次Consumer接到數據后,而不管是否處理完 成,RabbitMQ Server會立即把這個Message標記為完成,然后從queue中刪除了。

     如果一個Consumer異常退出了,它處理的數據能夠被另外的Consumer處理,這樣數據在這種情況下就不會丟失了(注意是這種情況下)。

     為了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。為了保證數據能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack。而應該是在處理完數據后發送ack。

    在處理數據后發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ可以去安全的刪除它了。

    如果Consumer退出了但是沒有發送ack,那么RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數據也不會丟失。

 

    這里並沒有用到超時機制。RabbitMQ僅僅通過Consumer的連接中斷來確認該Message並沒有被正確處理。

    也就是說,RabbitMQ給了Consumer足夠長的時間來做數據處理。

    這樣即使你通過Ctr-C中斷了Recieve.cs,那么Message也不會丟失了,它會被分發到下一個Consumer。

    如果忘記了ack,那么后果很嚴重。當Consumer退出時,Message會重新分發。然后RabbitMQ會占用越來越多的內存,由於 RabbitMQ會長時間運行,因此這個“內存泄漏”是致命的。

    去調試這種錯誤,可以通過一下命令打印un-acked Messages.

 

如果連接沒有斷開應用要通知服務器讓消息重新發送:

    可以通過channel.nack(message)來讓不通過的消息再次進入消息隊列。

     if(body==’Hello World3!’){chnl.nack(msg);

          //這樣就可以讓這個消息再次進入隊列而不用重啟服務。

     }else{

          console.log(‘ack’);

          chnl.ack(msg);

     }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM