消息確認機制
在之前異常處理部分就已經寫了,對於consumer的異常退出導致消息丟失,可以時候consumer的消息確認機制。重復的就不說了,這里說一些不一樣的。
consumer的消息確認機制
當一個消費者收到一個快遞,但是這個包裹是破損的,這時候一般會有以下選擇
拒收快遞,讓快遞員把快遞寄回。 (如果有多個consumer可能這條消息會到其它的consumer中,如果只有一個,那么下次獲取還是可以拿到)
簽收快遞,然后偷偷的扔了(錢多任性)
拒收快遞,聯系商家再給我補發一個
下面是具體的方法,BasicReject同時承擔了扔掉消息與退回。區別在第二個參數。BasicNack則是批量進行上面兩個操作,DeliveryTag小於或等於當前消息的都會進行該操作,當然是否批量是由第二個參數來決定的
//扔掉消息 channel.BasicReject(result.DeliveryTag, false); //退回消息 channel.BasicReject(result.DeliveryTag, true); //批量退回或刪除,中間的參數 是否批量 true是/false否 (也就是只一條) channel.BasicNack(result.DeliveryTag, true, true);
BasicRecover方法則是進行補發操作,其中的參數如果為true是把消息退回到queue但是有可能被其它的consumer接收到,設置為false是只補發給當前的consumer
//補發消息 true退回到queue中/false只補發給當前的consumer channel.BasicRecover(true);
消息發布者的確認機制
消息不只是在consumer處理的時候出問題,在發布的時候也可能會出問題。不是說把消息向方法里一丟就一定會成功的。所以發布的確認機制也是為高可靠性保駕護航。
rabbitmq為我們提供了兩種方式
確認方式 (confirm)
事務控制方式 (tx)
但是要知道這些都是耗費性能的,其中事務的性能消耗最大,confirm其次
第一種confirm方式,發布后等待rabbitmq返回消息發布狀態
//創建返回一個新的頻道 using (var channel = RabbitMqHelper.GetConnection().CreateModel()) { for (var i = 0; i < 6; i++) { channel.BasicPublish(string.Empty, "testqueue", null, Encoding.UTF8.GetBytes($"這是{i}個消息")); } //等待發布成功並返回發布狀態 bool isok = channel.WaitForConfirms(); Console.ReadKey(); }
第二種事務控制方式
//創建返回一個新的頻道 using (var channel = RabbitMqHelper.GetConnection().CreateModel()) { try { //鎖往 channel.TxSelect(); for (var i = 0; i < 6; i++) { channel.BasicPublish(string.Empty, "testqueue", null, Encoding.UTF8.GetBytes($"這是{i}個消息")); } //提交 channel.TxCommit(); Console.ReadKey(); } catch (Exception e) { //回退 channel.TxRollback(); } }
下面是針對速度測試圖,發布一萬條消息。 第一次我使用了事務控制方式。耗時915毫秒
第二次使用了confirm方式,耗時374毫秒
第三次沒有使用消息確認機制,耗時317毫秒