重要參考文章來源:http://gigi.nullneuron.net/gigilabs/resilient-connections-with-rabbitmq-net-client/
原因是這樣的,我在Windows客戶端有一個Windows后台服務,負責與服務端的數據交互,數據上傳及數據下載
1.數據上傳部分是使用的rabbitmq donnet庫發送消息至RabbittMQ服務器,服務器另外有一個應用程序會監控RabbitMQ服務器的指定隊列,完成數據的上傳服務
2.數據下載部分是使用的rabbitmq donnet庫監控RabbitMQ服務器指定的隊列,服務器應用程序將數據發送到指定的RabbitMQ服務器的隊列中,客戶端就會能獲取到服務器應用發送過來的數據
這樣的架構還是比較好用的,使用的效果目前還行,但遇到一個比較頭痛的問題,Windows后台服務一直在Windows平板電腦上運行,除非手動安裝及更新應用時,才會將Windows服務進行重新安裝或重啟,其他的情況是不會進行重啟的
了解到RabbitMQ是有自動重連的技術的,可以參考地址:https://yq.aliyun.com/articles/369969
這個效果只作用於,服務器沒有掛掉,只是中間有一些網絡問題時才可以進行重連
但有一種情況是沒有處理到的,我們已經在客戶端對RabbitMQ某個隊列進行監控,但服務器突然掛掉,然后幾分鍾后重新啟動了,這時,客戶端可以重新建立連接,但卻不會自動對隊列產生監控,無法拿到消息
現時對代碼做出一些處理
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using System.Threading; using RabbitMQ.Client.Events; using System.IO; namespace MES_MonitoringService.Common { public class RabbitMQClientHandler { private static string defaultRabbitMQHostName = Common.ConfigFileHandler.GetAppConfig("RabbitMQServerHostName"); private static string defaultRabbitMQPort = Common.ConfigFileHandler.GetAppConfig("RabbitMQServerPort"); private static string defaultRabbitMQUserName = Common.ConfigFileHandler.GetAppConfig("RabbitMQUserName"); private static string defaultRabbitMQPassword = Common.ConfigFileHandler.GetAppConfig("RabbitMQPassword"); private static string defaultRabbitVirtualHost = Common.ConfigFileHandler.GetAppConfig("RabbitMQVirtualHost"); // 定義一個靜態變量來保存類的實例 private static RabbitMQClientHandler uniqueInstance; //定義一個標識確保線程同步 private static readonly object locker = new object(); /*-------------------------------------------------------------------------------------*/ //ConnectionFactory private static ConnectionFactory mc_ConnectionFactory = null; //Connection private static IConnection Connection; //發送頻道及接收頻道分開,避免互相影響,導致整個服務不可用 //Send Channel private static IModel SendChannel; //Listen Channel private static IModel ListenChannel; //數據監控隊列 private static string MC_SyncDataConsume = string.Empty; // //private SyncDataHandler syncDataHandlerClass; /*-------------------------------------------------------------------------------------*/ /// <summary> /// 定義私有構造函數,使外界不能創建該類實例 /// </summary> public RabbitMQClientHandler() { Reconnect(); } /// <summary> /// 定義公有方法提供一個全局訪問點,同時你也可以定義公有屬性來提供全局訪問點 /// </summary> /// <returns></returns> public static RabbitMQClientHandler GetInstance() { // 當第一個線程運行到這里時,此時會對locker對象 "加鎖", // 當第二個線程運行該方法時,首先檢測到locker對象為"加鎖"狀態,該線程就會掛起等待第一個線程解鎖 // lock語句運行完之后(即線程運行完之后)會對該對象"解鎖" // 雙重鎖定只需要一句判斷就可以了 if (uniqueInstance == null) { lock (locker) { // 如果類的實例不存在則創建,否則直接返回 if (uniqueInstance == null) { uniqueInstance = new RabbitMQClientHandler(); } } } return uniqueInstance; } static void Connect() { try { Common.LogHandler.WriteLog("獲取RabbitMQ服務器參數:" + defaultRabbitMQHostName + ":" + defaultRabbitMQPort + " (" + defaultRabbitMQUserName + "/" + defaultRabbitMQPassword + ")"); //連接工廠 mc_ConnectionFactory = new ConnectionFactory(); //連接工廠信息 mc_ConnectionFactory.HostName = defaultRabbitMQHostName;// "localhost"; int rabbitmq_port = 5672;// 默認是5672端口 int.TryParse(defaultRabbitMQPort, out rabbitmq_port); mc_ConnectionFactory.Port = rabbitmq_port;// "5672" mc_ConnectionFactory.UserName = defaultRabbitMQUserName;// "guest"; mc_ConnectionFactory.Password = defaultRabbitMQPassword;// "guest"; mc_ConnectionFactory.VirtualHost = defaultRabbitVirtualHost;// "/" mc_ConnectionFactory.RequestedHeartbeat = 30;//心跳包 mc_ConnectionFactory.AutomaticRecoveryEnabled = true;//自動重連 mc_ConnectionFactory.TopologyRecoveryEnabled = true;//拓撲重連 mc_ConnectionFactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10); //創建連接 Connection = mc_ConnectionFactory.CreateConnection(); //斷開連接時,調用方法自動重連 Connection.ConnectionShutdown += Connection_ConnectionShutdown; //創建發送頻道 SendChannel = Connection.CreateModel(); //創建接收頻道 ListenChannel = Connection.CreateModel(); //發送頻道確認模式,發送了消息后,可以收到回應 SendChannel.ConfirmSelect(); if(!string.IsNullOrEmpty(MC_SyncDataConsume)) { //重新監控消息 RabbitmqMessageConsume(MC_SyncDataConsume); } Common.LogHandler.WriteLog("嘗試連接至RabbitMQ服務器:" + defaultRabbitMQHostName); } catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException e) { throw e; } catch (Exception ex) { throw ex; } } static void Cleanup() { try { if (SendChannel != null && SendChannel.IsOpen) { try { SendChannel.Close(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ重新連接,正在嘗試關閉之前的Channel[發送],但遇到錯誤", ex); } SendChannel = null; } if (ListenChannel != null && ListenChannel.IsOpen) { try { ListenChannel.Close(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ重新連接,正在嘗試關閉之前的Channel[接收],但遇到錯誤", ex); } ListenChannel = null; } if (Connection != null && Connection.IsOpen) { try { Connection.Close(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ重新連接,正在嘗試關閉之前的連接,但遇到錯誤", ex); } Connection = null; } } catch (IOException ex) { throw ex; } } private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e) { LogHandler.WriteLog("): RabbitMQ已經斷開連接,正在嘗試重新連接至RabbitMQ服務器"); Reconnect(); } private static void Reconnect() { try { //清除連接及頻道 Cleanup(); var mres = new ManualResetEventSlim(false); // state is initially false while (!mres.Wait(3000)) // loop until state is true, checking every 3s { try { //連接 Connect(); mres.Set(); // state set to true - breaks out of loop } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ嘗試連接RabbitMQ服務器出現錯誤:" + ex.Message, ex); } } } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ嘗試重新連接RabbitMQ服務器出現錯誤:" + ex.Message, ex); } } /*-------------------------------------------------------------------------------------*/ /// <summary> /// Direct路由,發送消息至服務端 /// </summary> /// <param name="exchangeName">交換機名稱</param> /// <param name="routingKey">RoutingKey</param> /// <param name="queueName">隊列名稱</param> /// <param name="message">消息內容</param> /// <returns></returns> public bool DirectExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("連接為空或連接已經關閉"); if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道為空或通道已經關閉"); //創建一個持久化的隊列 bool queueDurable = true; string QueueName = queueName; string ExchangeName = exchangeName; string RoutingKey = routingKey; //聲明交換機 SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Direct); //聲明隊列 SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //路由綁定隊列 SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null); //設置消息持久性 IBasicProperties props = SendChannel.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2;//持久性 //消息內容轉碼,並發送至服務器 var messageBody = System.Text.Encoding.UTF8.GetBytes(message); SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody); //等待確認 return SendChannel.WaitForConfirms(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex); return false; } } /// <summary> /// Fanout路由,發送消息至服務端 /// </summary> /// <param name="exchangeName">交換機名稱</param> /// <param name="routingKey">RoutingKey</param> /// <param name="queueName">隊列名稱</param> /// <param name="message">消息內容</param> /// <returns></returns> public bool FanoutExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("連接為空或連接已經關閉"); if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道為空或通道已經關閉"); //創建一個持久化的頻道 bool queueDurable = true; string QueueName = queueName; string ExchangeName = exchangeName; string RoutingKey = routingKey; //聲明交換機 SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Fanout); //聲明隊列 SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //路由綁定隊列 SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null); //設置消息持久性 IBasicProperties props = SendChannel.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2;//持久性 //消息內容轉碼,並發送至服務器 var messageBody = System.Text.Encoding.UTF8.GetBytes(message); SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody); //等待確認 return SendChannel.WaitForConfirms(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex); return false; } } /// <summary> /// Topic路由,發送消息至服務端 /// </summary> /// <param name="exchangeName">交換機名稱</param> /// <param name="routingKey">RoutingKey</param> /// <param name="queueName">隊列名稱</param> /// <param name="message">消息內容</param> /// <returns></returns> public bool TopicExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("連接為空或連接已經關閉"); if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道為空或通道已經關閉"); //創建一個持久化的頻道 bool queueDurable = true; string QueueName = queueName; string ExchangeName = exchangeName; string RoutingKey = routingKey; //聲明交換機 SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Topic); //聲明隊列 SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //路由綁定隊列 SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null); //設置消息持久性 IBasicProperties props = SendChannel.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2;//持久性 //消息內容轉碼,並發送至服務器 var messageBody = System.Text.Encoding.UTF8.GetBytes(message); SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody); //等待確認 return SendChannel.WaitForConfirms(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex); return false; } } /// <summary> /// Topic路由,接收同步消息 /// </summary> /// <param name="queueName">監聽的隊列</param> public void SyncDataFromServer(string queueName) { try { //設定參數,方便重啟RabbitMQ服務器時處理 MC_SyncDataConsume = queueName; RabbitmqMessageConsume(MC_SyncDataConsume); } catch (Exception ex) { LogHandler.WriteLog("TopicExchangeConsumeMessageFromServer運行錯誤:" + ex.Message, ex); throw ex; } } private static void RabbitmqMessageConsume(string queueName) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("連接為空或連接已經關閉"); if (ListenChannel == null || !ListenChannel.IsOpen) throw new Exception("通道為空或通道已經關閉"); bool queueDurable = true; string QueueName = queueName; //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建 ListenChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息 ListenChannel.BasicQos(0, 1, false); //創建基於該隊列的消費者,綁定事件 var consumer = new EventingBasicConsumer(ListenChannel); //回應消息監控 consumer.Received += SyncData_Received; //綁定消費者 ListenChannel.BasicConsume(QueueName, //隊列名 false, //false:手動應答;true:自動應答 consumer); Common.LogHandler.WriteLog("開始監控RabbitMQ服務器,隊列" + QueueName); } catch (Exception ex) { throw ex; } } private static void SyncData_Received(object sender, BasicDeliverEventArgs e) { try { //TOOD 驗證程序退出后消費者是否退出去了 var body = e.Body; //消息主體 var message = Encoding.UTF8.GetString(body); LogHandler.WriteLog("[x] 隊列接收到消息:" + message.ToString()); //處理數據 bool processSuccessFlag = new SyncDataHandler().ProcessSyncData(message); if (processSuccessFlag) { //回復確認 ListenChannel.BasicAck(e.DeliveryTag, false); } else { //未正常處理的消息,重新放回隊列 ListenChannel.BasicReject(e.DeliveryTag, true); } } catch (RabbitMQ.Client.Exceptions.OperationInterruptedException ex1) { Thread.Sleep(5000); ListenChannel.BasicNack(e.DeliveryTag, false, true); } catch (Exception ex) { Thread.Sleep(5000); ListenChannel.BasicNack(e.DeliveryTag, false, true); } } } }