RabbitMQ---8、連接斷開處理-斷線重連


本文轉載於:https://www.itsvse.com/thread-4636-1-1.html;

參考文獻:http://www.likecs.com/show-29874.html;https://stackoverflow.com/questions/41279186/guaranteed-publishing-of-messages-on-rabbitmq-on-network-loss;

Rabbitmq 官方給的NET consumer示例代碼如下,但使用過程,會遇到connection斷開的問題,一旦斷開,這個代碼就會報錯,就會導致消費者或者生產者掛掉。

下圖是生產者發送消息,我手動停止了rabbitmq,然后又重新啟動了rabbitmq,大概等啟動成功以后,為了防止服務沒有完全啟動,我又等待了10秒鍾

服務完全啟動成功以后,我嘗試重新發送一些消息,報錯,如下:

************** 異常文本 **************
RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=320, text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'", classId=0, methodId=0, cause=
   在 RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
   在 RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
   在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, Byte[] body)
   在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body)
   在 rabbitMQ_Publish.Form1.button1_Click(Object sender, EventArgs e) 位置 C:\project\my\RabbitMQ-demo\rabbitMQ-Publish\Form1.cs:行號 37
   在 System.Windows.Forms.Control.OnClick(EventArgs e)
   在 System.Windows.Forms.Button.OnClick(EventArgs e)
   在 System.Windows.Forms.Button.PerformClick()
   在 System.Windows.Forms.Form.ProcessDialogKey(Keys keyData)
   在 System.Windows.Forms.TextBoxBase.ProcessDialogKey(Keys keyData)
   在 System.Windows.Forms.Control.PreProcessMessage(Message& msg)
   在 System.Windows.Forms.Control.PreProcessControlMessageInternal(Control target, Message& msg)
   在 System.Windows.Forms.Application.ThreadContext.PreTranslateMessage(MSG& msg)




<ignore_js_op> 

那么如何會異常恢復呢?或者說斷線重連呢?

RabbitMQ NET Client的源碼,研究發現一種自動的錯誤恢復機制 AutomaticRecoveryEnabled = true 使用方式如下


  1. var factory = new ConnectionFactory() { HostName = "localhost", AutomaticRecoveryEnabled = true };
復制代碼

具體的恢復機制如下

1.在AutoRecoveringConnection初始化時,在鏈接關閉事件委托上增加斷開處理


  1. public void init()
  2.         {
  3.             m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
  4.             AutorecoveringConnection self = this;
  5.             EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
  6.             {
  7.                 lock (recoveryLockTarget)
  8.                 {
  9.                     if (ShouldTriggerConnectionRecovery(args))
  10.                     {
  11.                         try
  12.                         {
  13.                             self.BeginAutomaticRecovery();
  14.                         }
  15.                         catch (Exception e)
  16.                         {
  17.                             // TODO: logging
  18.                             Console.WriteLine("BeginAutomaticRecovery() failed: {0}", e);
  19.                         }
  20.                     }
  21.                 }
  22.             };
  23.             lock (m_eventLock)
  24.             {
  25.                 ConnectionShutdown += recoveryListener;
  26.                 if (!m_recordedShutdownEventHandlers.Contains(recoveryListener))
  27.                 {
  28.                     m_recordedShutdownEventHandlers.Add(recoveryListener);
  29.                 }
  30.             }
  31.         }
復制代碼

觀察調用的方式BeginAutomaticRecovery,可以看到這個方法內部調用了PerformAutomaticRecovery方法。我們直接看這個方法的內容,其中第一個調用的是方法RecoverConnectionDelegate

  1. protected void PerformAutomaticRecovery()
  2.         {
  3.             lock (recoveryLockTarget)
  4.             {
  5.                 RecoverConnectionDelegate();
  6.                 RecoverConnectionShutdownHandlers();
  7.                 RecoverConnectionBlockedHandlers();
  8.                 RecoverConnectionUnblockedHandlers();
  9.                 RecoverModels();
  10.                 if (m_factory.TopologyRecoveryEnabled)
  11.                 {
  12.                     RecoverEntities();
  13.                     RecoverConsumers();
  14.                 }
  15.                 RunRecoveryEventHandlers();
  16.             }
  17.         }
復制代碼



這個方法中調用的是

  1. protected void RecoverConnectionDelegate()
  2.         {
  3.             bool recovering = true;
  4.             while (recovering)
  5.             {
  6.                 try
  7.                 {
  8.                     m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
  9.                     recovering = false;
  10.                 }
  11.                 catch (Exception)
  12.                 {
  13.                     // TODO: exponential back-off
  14.                     Thread.Sleep(m_factory.NetworkRecoveryInterval);
  15.                     // TODO: provide a way to handle these exceptions
  16.                 }
  17.             }
  18.         }
復制代碼

可以看出,它是執行了死循環,直到連接重新打開,當然,如果遇到異常,它會調用Thread.Sleep來等待一下,然后再次執行連接恢復。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM