RabbitMQ 連接斷開處理-自動恢復


Rabbitmq 官方給的NET consumer示例代碼如下,但使用過程,會遇到connection斷開的問題,一旦斷開,這個代碼就會報錯,如果你的消費者端是這樣的代碼的話,就會導致消費者掛掉。

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("logs", "fanout");

                var queueName = channel.QueueDeclare().QueueName;

                channel.QueueBind(queueName, "logs", "");
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queueName, true, consumer);

                Console.WriteLine(" [*] Waiting for logs." +
                                  "To exit press CTRL+C");
                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] {0}", message);
                }
            }
        }
    }
}

那么如何會異常恢復呢?

之前我的操作方式是,建立一個ConnectionPool,在出現異常后,重建channel,也就是說,整個的異常恢復過程是自己處理的。最近研究因為研究Orleans,擔心RabbitMQ的NET client使用Task時,會遇到Orleans的坑,所以順手研究了下RabbitMQ NET Client的源碼,研究發現一種自動的錯誤恢復機制 AutomaticRecoveryEnabled = true 使用方式如下

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost", AutomaticRecoveryEnabled = true };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("logs", "fanout");

                var queueName = channel.QueueDeclare().QueueName;

                channel.QueueBind(queueName, "logs", "");
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queueName, true, consumer);

                Console.WriteLine(" [*] Waiting for logs." +
                                  "To exit press CTRL+C");
                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] {0}", message);
                }
            }
        }
    }
}

具體的恢復機制如下

1.在AutoRecoveringConnection初始化時,在鏈接關閉事件委托上增加斷開處理

public void init()
        {
            m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());

            AutorecoveringConnection self = this;
            EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
            {
                lock (recoveryLockTarget)
                {
                    if (ShouldTriggerConnectionRecovery(args))
                    {
                        try
                        {
                            self.BeginAutomaticRecovery();
                        }
                        catch (Exception e)
                        {
                            // TODO: logging
                            Console.WriteLine("BeginAutomaticRecovery() failed: {0}", e);
                        }
                    }
                }
            };
            lock (m_eventLock)
            {
                ConnectionShutdown += recoveryListener;
                if (!m_recordedShutdownEventHandlers.Contains(recoveryListener))
                {
                    m_recordedShutdownEventHandlers.Add(recoveryListener);
                }
            }
        }

觀察調用的方式BeginAutomaticRecovery,可以看到這個方法內部調用了PerformAutomaticRecovery方法。我們直接看這個方法的內容,其中第一個調用的是方法RecoverConnectionDelegate

protected void PerformAutomaticRecovery()
        {
            lock (recoveryLockTarget)
            {
                RecoverConnectionDelegate();
                RecoverConnectionShutdownHandlers();
                RecoverConnectionBlockedHandlers();
                RecoverConnectionUnblockedHandlers();

                RecoverModels();
                if (m_factory.TopologyRecoveryEnabled)
                {
                    RecoverEntities();
                    RecoverConsumers();
                }

                RunRecoveryEventHandlers();
            }
        }

 

這個方法中調用的是

protected void RecoverConnectionDelegate()
        {
            bool recovering = true;
            while (recovering)
            {
                try
                {
                    m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
                    recovering = false;
                }
                catch (Exception)
                {
                    // TODO: exponential back-off
                    Thread.Sleep(m_factory.NetworkRecoveryInterval);
                    // TODO: provide a way to handle these exceptions
                }
            }
        }

可以看出,它是執行了死循環,直到連接重新打開,當然,如果遇到異常,它會調用Thread.Sleep來等待一下,然后再次執行連接恢復。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM