Rabbitmq 官方給的NET consumer示例代碼如下,但使用過程,會遇到connection斷開的問題,一旦斷開,這個代碼就會報錯,如果你的消費者端是這樣的代碼的話,就會導致消費者掛掉。
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class ReceiveLogs { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare("logs", "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queueName, "logs", ""); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queueName, true, consumer); Console.WriteLine(" [*] Waiting for logs." + "To exit press CTRL+C"); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); } } } } }
那么如何會異常恢復呢?
之前我的操作方式是,建立一個ConnectionPool,在出現異常后,重建channel,也就是說,整個的異常恢復過程是自己處理的。最近研究因為研究Orleans,擔心RabbitMQ的NET client使用Task時,會遇到Orleans的坑,所以順手研究了下RabbitMQ NET Client的源碼,研究發現一種自動的錯誤恢復機制 AutomaticRecoveryEnabled = true 使用方式如下
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class ReceiveLogs { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", AutomaticRecoveryEnabled = true }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare("logs", "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queueName, "logs", ""); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queueName, true, consumer); Console.WriteLine(" [*] Waiting for logs." + "To exit press CTRL+C"); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); } } } } }
具體的恢復機制如下
1.在AutoRecoveringConnection初始化時,在鏈接關閉事件委托上增加斷開處理
public void init() { m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler()); AutorecoveringConnection self = this; EventHandler<ShutdownEventArgs> recoveryListener = (_, args) => { lock (recoveryLockTarget) { if (ShouldTriggerConnectionRecovery(args)) { try { self.BeginAutomaticRecovery(); } catch (Exception e) { // TODO: logging Console.WriteLine("BeginAutomaticRecovery() failed: {0}", e); } } } }; lock (m_eventLock) { ConnectionShutdown += recoveryListener; if (!m_recordedShutdownEventHandlers.Contains(recoveryListener)) { m_recordedShutdownEventHandlers.Add(recoveryListener); } } }
觀察調用的方式BeginAutomaticRecovery,可以看到這個方法內部調用了PerformAutomaticRecovery方法。我們直接看這個方法的內容,其中第一個調用的是方法RecoverConnectionDelegate
protected void PerformAutomaticRecovery() { lock (recoveryLockTarget) { RecoverConnectionDelegate(); RecoverConnectionShutdownHandlers(); RecoverConnectionBlockedHandlers(); RecoverConnectionUnblockedHandlers(); RecoverModels(); if (m_factory.TopologyRecoveryEnabled) { RecoverEntities(); RecoverConsumers(); } RunRecoveryEventHandlers(); } }
這個方法中調用的是
protected void RecoverConnectionDelegate() { bool recovering = true; while (recovering) { try { m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler()); recovering = false; } catch (Exception) { // TODO: exponential back-off Thread.Sleep(m_factory.NetworkRecoveryInterval); // TODO: provide a way to handle these exceptions } } }
可以看出,它是執行了死循環,直到連接重新打開,當然,如果遇到異常,它會調用Thread.Sleep來等待一下,然后再次執行連接恢復。