一、前言
在正常的服務器運行過程中,時常會面臨服務器宕機重啟的情況,那么我們的消息此時會如何呢?很不幸的事情就是,我們的消息可能會消失,這肯定不是我們希望見到的結果。所以我們希望AMQP服務器崩潰了也可以將消息恢復,這稱之為消息持久化。RabbitMQ自然存在這種策略可以幫助我們完成這件事情。
二、持久化的消息
當RabbitMQ服務器重啟后,原先的隊列和交換器會隨同里面的消息一同消失。原因在於每個隊列和交換器都有durable屬性,該屬性默認是false,它決定了RabbitMQ是否需要在崩潰或者重啟之后重新創建隊列或者交換器。將它設置為true就代表了持久性,在服務器重啟之后就會重新持久的創建隊列和交換器。
當然做到這點還不夠,我們需要的是持久化的消息,所以在消息發布前,通過將消息的“投遞模式”(delivery mode)屬性設置為2將消息標記為持久化。到目前為止,消息還只是被表示為持久化,還需要被發布到持久化的交換器中並到達持久化的隊列中才行。如果不是這樣,包含持久化消息的隊列或者交換器揮着Rabbit崩潰重啟后不復存在,導致消息成為一個孤兒。因此,總結起來需要做到以下三點:
(1)將消息的投遞模式選項設置為2(持久);
(2)將消息發送到持久化的交換器;
(3)消息到達持久化的隊列。
注意,如果原先有非持久的交換器或者隊列,需要刪除后才可重新創建,否則就創建其他名稱的交換器或者隊列,代碼如下:
//聲明持久交換器 channel.ExchangeDeclare( "HelloExchange", //交換器名稱 ExchangeType.Direct,//交換器類型 true, //是否持久話 false, //是否自動刪除 null //關於交換器的詳細設置,鍵值對形式 ); //聲明持久隊列 channel.QueueDeclare( "HelloQueue",//隊列名稱 true, //是否持久化 false, //是否只對首次聲明的隊列可見 false, //是否自動刪除 null ////關於隊列和隊列內消息的詳細設置,鍵值對形式 ); //發布持久消息 string msg_str = "這是生產者第一次發布的消息"; IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//發布的數據類型 msg_pro.DeliveryMode = 2;//標記持久化
三、事務
目前為止,我們已經將消息、隊列和交換器設置為持久化。但是事實上還存在着'最后一英里'的距離,就是在把消息寫入磁盤前,消息由於服務器宕機而消失該如何?這時候就需要使用到事務,說到事務就會想到SQL中的事務,但是不能搞混了。AMQP中,在把信道設置為事務模式后,通過信道發送消息后還有多個其他的AMQP命令,這些命令是執行還是忽略,取決於消息的發送是否成功,消息發送成功信道會在事務中完成其他AMQP命令,就可以提交事務了,發送失敗則其他AMQP命令將不會執行,我們也會知道發送失敗,而采取相應的措施。事務保證了解決這最后的問題。
代碼如下:
using (IConnection conn = conn_factory.CreateConnection()) { //2.創建信道 using (IModel channel = conn.CreateModel()) { try { channel.TxSelect();//聲明事務 //3.發布消息 string msg_str = "這是生產者發布的消息"; IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//發布的數據類型 msg_pro.DeliveryMode = 2; channel.BasicPublish( "HelloExchange", //消息發送目標交換器名稱 "hola", //路由鍵 msg_pro, //消息的發布屬性 Encoding.UTF8.GetBytes(msg_str) //消息 ); channel.TxCommit();//提交事務 } catch(Exception ex) { channel.TxRollback();//回滾事務 } } }
四、發送方確認模式
雖然通過事務和持久化的消息、隊列和交換器可以確保消息不會丟失,但是對消息的吞吐量有着非常嚴重的影響,而且使用消息通信就是為了避免同步,可是事務卻會導致生產者程序產生同步。所以,有一個更好的方法保證消息投遞:發送方確認模式。和事務類似,我們需要將信道channel設置為confirm模式,而且只能通過重新創建信道來關閉該設置。一旦信道進入confirm模式,所有的信道上發布的消息都會被指派一個唯一的ID。當消息被投遞到隊列后,信道就會發送一個發送方確認模式給生產者程序,使得生產者知道消息安全到達隊列了。
發送發確認模式最大的好處是它們是異步的,沒有回滾的概念,更加輕量級,對性能的影響也幾乎忽略不計。
代碼如下:
channel.ConfirmSelect();//開啟發送確認模式 //3.發布消息 IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//發布的數據類型 msg_pro.DeliveryMode = 2; for(int i = 0; i < 5; i++) { string msg_str = string.Format("這是生產者發布的消息{0}", i); channel.BasicPublish( "HelloExchange", //消息發送目標交換器名稱 "hola", //路由鍵 msg_pro, //消息的發布屬性 Encoding.UTF8.GetBytes(msg_str) //消息 ); if (channel.WaitForConfirms()) Console.WriteLine(i); else Console.WriteLine("發送失敗"); }
可以看到channel.WaitForConfirms()方法是同步的,這樣的話效率會低一點,我們可以發送完所有的消息,然后用channel.WaitForConfirmsOrDie()一次性提交,如果中途有一個消息提交失敗或者超時,就會報錯Exception,需要全部重新提交。
五、小結
消息持久化的策略大致就是以上幾種,我們可以根據自己的實際需求來選擇相應的策略。如果有問題歡迎指出!