持久化
持久化可以提高RabbitMQ的可靠性,防止異常情況下的數據丟失。RabbitMQ的持久化分為三個部分:交換器的持久化、隊列的持久化和消息的持久化。
交換器的持久化通過聲明隊列時將durable參數置為true實現。如果交換器不設置持久化,在RabbitMQ服務重啟之后,相關交換器的元數據會丟失,但消息不會丟失,只是不能再將消息發送到這個交換器。一個長期使用的交換器來建議將其置為持久化的。
隊列的持久化通過在聲明隊列時將durable參數置為true實現。如果隊列不設置持久化,在RabbitMQ服務重啟之后,相關隊列的元數據會丟失,此時數據也會丟失。
消息的持久化在消息的投遞模式(BasicProperties中的DeliveryMode屬性)設置為2即可實現消息的持久化。單單設置消息持久化而不設置隊列的持久化毫無意義,只有同時設置隊列和消息的持久化才能保證RabbitMQ服務重啟后,消息依舊存在。
之前的示例出現過該代碼:
var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; string message = "RabbitMQ Test"; //傳遞的消息內容 channel.BasicPublish("normalExchange", "normalKey", properties, Encoding.UTF8.GetBytes(message));
寫入磁盤的速度比寫入內存的速度慢得多,將所有的消息都設置為持久化,會嚴重影響RabbitMQ的性能(隨機)。在選擇是否要將消息持久化時,需要在可靠性和吐吞量之間做一個權衡。
將交換器、隊列、消息都設置持久化並不能百分之百保證數據不丟失。消費者訂閱消費隊列時將autoAck參數設置為true,並在接收消息之后沒來得及處理就發生宕機,這也算數據丟失。另外一種情況就是RabbitMQ接收到消息,在持久化(保存到磁盤)之前,服務節點發生了宕機、重啟等異常情況,也會造成消息丟失。
生產者確認
在使用RabbitMQ的時候,我們還會考慮的一個問題就是消息的生產者將消息發送出去之后,消息有沒有正確地到達服務器?默認情況下發送消息的操作不會返回任何信息給生產者,也就是默認情況下生產者是不能確認消息有沒有正確地到達服務器。
RabbitMQ針對這個問題,提供了兩種解決方式:
通過事務機制實現(與數據庫中的事務概念並不相同);
通過發送方確認(publisher confirm)機制實現。
事務機制
事務機制相關的方法有:channel.TxSelect、channel.TxCommit和channel.TxRollback。channel.TxSelect將當前的信道設置成事務模式,channel.TxCommit用於提交事務,channel.TxRollback用於事務回滾。通過channel.TxSelect方法開啟事務之后,發布消息給RabbitMQ了,如果事務提交成功,則消息一定到達了RabbitMQ,如果在事務提交之前由於RabbitMQ異常崩潰或者其他原因拋出異常,通過執行channel.TxRollback方法來實現事務回滾。
示例代碼:
using (var channel = connection.CreateModel()) { channel.TxSelect(); try { //發送消息 channel.TxCommit(); } catch (Exception) { channel.TxRollback(); } }
對一個通道而言TxSelect只需執行一次,TxCommit和TxRollback則需要多次執行,如循環發送多條消息時BasicPublish、TxCommit和TxRollback方法包裹進循環內即可,TxSelect在循環外調用。
協議流轉過程(左為事務確認右為事務回滾):
事務確實能夠解決消息發送方和RabbitMQ之間消息確認的問題,但是事務機制(不管是確認還是回滾多了Tx.Select和Tx.Commit或Tx.Rollback四個步驟)會嚴重影響RabbitMQ的性能。事務機制在一條消息發送之后會使發送端阻塞,等待RabbitMQ回應之后才能繼續發送下一條消息。
發送方確認機制
發送方確認(publisher confirm)機制是RabbitMQ提供的一個相比於事務機制的改進方案。
生產者將信道設置成confirm(確認)模式【調用channel.ConfirmSelect方法(即Confirm.Select命令)將信道設置為confirm模式】,一旦信道進入confirm模式,在該信道上面發布的消息都會被指派一個唯一ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,RabbitMQ會發送一個確認(Basic.Ack)給生產者(包含消息的唯一ID),告知生產者消息已經正確到達目的地。如果消息和隊列是可持久化的,確認消息會在消息寫入磁盤之后發出。RabbitMQ回傳給生產者的確認消息中的DeliveryTag包含了確認消息的序號,RabbitMQ也可以設置channel.BasicAck方法中的multiple參數,表示這個序號之前的所有消息都已經確認。
confirm模式有普通confirm、批量confirm和異步confirm三種模式。
普通confirm
調用WaitForConfirmsOrDie等待消息返回。如果消息nack或者超時則該方法將拋出異常。異常的處理通常包括記錄錯誤消息和/或重新嘗試發送消息。
示例代碼:
channel.QueueDeclare("confirm_queue", false, false, false, null); var message = "Confirm Message"; var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.BasicPublish("", "confirm_queue", properties, Encoding.UTF8.GetBytes(message)); // uses a 5 second timeout channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
消息的確認阻礙了所有后續消息的發布,顯著減慢了發布速度,所以普通confirm是三種方式中性能最低的,但是是實現方式最簡單的,對性能要求不高的程序很適合。
批量confirm
批量confirm表示我們發布一批消息,對整批消息執行等待確認。
批量confirm示例代碼(一次確認10條消息):
for (int i = 0; i < 10; i++) { var msg = $"Confirm Message {i}"; channel.BasicPublish("", "confirm_queue", null, Encoding.UTF8.GetBytes(msg)); } channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
上面的代碼只是最簡單的示例,實際使用中可能需要增加消息緩存的代碼,批量confirm異常時對這批消息進行重新發布,確認成功是情況消息緩存。
批量confirm出現返回Basic.Nack或者超時時,客戶端需要將這一批次的消息全部重發會造成重復消息的情況,這一點需要注意。
異步confirm
異步confirm是為channel怎加兩個事件BasicAcks和BasicNacks,分別用來處理RabbitMQ回傳的Basic.Ack和Basic.Nack。兩個事件的回調函數都有一個對應的 EventArgs 參數(BasicNackEventArgs類型) ,包含如下兩個參數:
DeliveryTag:表示對應消息的序號
Multiple:表示是確認一條消息(false)還是確認當前序號前的所有消息(false)。
示例代碼:
var outstandingConfirms = new ConcurrentDictionary<ulong, string>(); channel.BasicAcks += (sender, ea) => { if (ea.Multiple) { var confirmed = outstandingConfirms.Where(k => k.Key <= ea.DeliveryTag); foreach (var entry in confirmed) { outstandingConfirms.TryRemove(entry.Key, out _); } } else { outstandingConfirms.TryRemove(ea.DeliveryTag, out _); } }; channel.BasicNacks += (sender, ea) => { outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body); Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}"); //同理BasicAcks維護outstandingConfirms }; var msg = "Async Msg"; outstandingConfirms.TryAdd(channel.NextPublishSeqNo, msg); channel.BasicPublish("", "confirm_queue", null, Encoding.UTF8.GetBytes(msg));
上述代碼定義了一個ConcurrentDictionary對象將消息和對應的消息序號關聯起來,發布消息前通過channel.NextPublishSeqNo獲取消息的序號,然后在回調確認時清理消息緩存字典。
消息分發
當RabbitMQ隊列有多個消費者時,默認情況下,隊列收到的消息將以輪詢(round-robin)分發的方式發送給消費者,每條消息只發送給訂閱列表里的一個消費者。如果有n個消費者,那么RabbitMQ會將第m條消息分發給第m%n(取余的方式)個消費者。如果個別消費者來不及消費那么多的消息,而其他消費者由於某些原因(比如業務邏輯簡單、機器性能卓越等)很快地處理完了所分配到的消息,進而進程空閑,這就造成整體應用吞吐量的下降,此時輪詢分發機制就不是那么的優雅了。可以借助channel.BasicQos方法限制允許信道上的消費者所能保持的最大未確認消息的數量,未確認消息達到上限后RabbitMQ就不會再向這個消費者再發送消息,直至消費者確認了某條消息。
BasicQos方法參數介紹:
prefetchSize,預取大小服務器將傳遞的最大內容量(以八位字節為單位),如果不受限制,則為0。默認值:0。
prefetchCount,服務器一次請求將傳遞的最大郵件數,如果沒有限制,則為0。調用此方法時,該值必填。默認值:0
global,是否將設置應用於整個頻道,而不是每個消費者
默認值:false,應用於本身(一個消費者)
true:應用於整個頻道
關於BasicQos的示例RabbitMQ官方文檔Fair Dispatch部分有完成的示例代碼:https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
消息順序性
消息的順序性是指消費者消費到的消息和發送者發布的消息的順序是一致的。如果生產者發布的消息分別為msg1、msg2、msg3,那么消費者也是按照msg1、msg2、msg3的順序進行消費的。
但是實際使用時會有很多情況打破消息順序性,如生成者啟用事務機制,某種原因進行了事務回滾由一個新線程補發消息以及消息設置了優先級等。消息的順序必然不會和生產者發送消息的順序一致。
如果要保證消息的順序性,根據具體業務處理,比如在消息體內添加全局有序標識來實現。
Github
https://github.com/MayueCif/RabbitMQ