每個Consumer可能需要一段時間才能處理完收到的數據。如果在這個過程中,Consumer出錯了,異常退出了,而數據還沒有處理完成,那么 非常不幸,這段數據就丟失了。
因為我們采用no-ack的方式進行確認,也就是說,每次Consumer接到數據后,而不管是否處理完 成,RabbitMQ Server會立即把這個Message標記為完成,然后從queue中刪除了。
如果一個Consumer異常退出了,它處理的數據能夠被另外的Consumer處理,這樣數據在這種情況下就不會丟失了(注意是這種情況下)。
為了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。為了保證數據能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack。而應該是在處理完數據后發送ack。
在處理數據后發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ可以去安全的刪除它了。
如果Consumer退出了但是沒有發送ack,那么RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數據也不會丟失。
這里並沒有用到超時機制。RabbitMQ僅僅通過Consumer的連接中斷來確認該Message並沒有被正確處理。
也就是說,RabbitMQ給了Consumer足夠長的時間來做數據處理。
這樣即使你通過Ctr-C中斷了Recieve.cs,那么Message也不會丟失了,它會被分發到下一個Consumer。
如果忘記了ack,那么后果很嚴重。當Consumer退出時,Message會重新分發。然后RabbitMQ會占用越來越多的內存,由於 RabbitMQ會長時間運行,因此這個“內存泄漏”是致命的。
去調試這種錯誤,可以通過一下命令打印un-acked Messages.
如果連接沒有斷開應用要通知服務器讓消息重新發送:
可以通過channel.nack(message)來讓不通過的消息再次進入消息隊列。
if(body==’Hello World3!’){chnl.nack(msg);
//這樣就可以讓這個消息再次進入隊列而不用重啟服務。
}else{
console.log(‘ack’);
chnl.ack(msg);
}