C# RabbitMq 連接池封裝


設計思路,基於前人的傑作,略作改造。

首先我們要知道:

1.創建Connection代價是巨大的(Rabbitmq沒有實現連接池機制)。

2.基於Connection創建Channel代價小的多,理論上,一個connection創建channel次數是沒有限制的。

(說得再多,還是圖片具體點。)流程如下圖所示:

 

這里做了個小小改造,就是根據系統自身的需要創建自己所需要的連接。優先使用空閑連接,而不是還沒達到最大的連接限制時,優先進行創建新連接。

代碼改造實現如下:

 public class MQHelper
    {
        private const string CacheKey_MQConnectionSetting = "MQConnectionSetting";
        private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount";

        // 空閑連接對象隊列
        private readonly static ConcurrentQueue<IConnection> FreeConnectionQueue;
        //使用中(忙)連接對象字典
        private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic;
        //連接池使用率
        private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew;

        private readonly static Semaphore MQConnectionPoolSemaphore;
        //釋放和添加連接時的鎖對象
        private readonly static object freeConnLock = new object(), addConnLock = new object(), getConnLock = new object();

        // 連接總數
        private static int connCount = 0;
        //默認最大保持可用連接數
        public const int DefaultMaxConnectionCount = 50;

        //默認最大連接數可訪問次數
        public const int DefaultMaxConnectionUsingCount = 10000;

        public const int DefaultRetryConnectionCount = 1;//默認重試連接次數

        /// <summary>
        /// 初始化最大連接數
        /// </summary>
        private static int MaxConnectionCount
        {
            get
            {
                //if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null)
                //{
                //    return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]);
                //}
                //else
                //{
                //    int mqMaxConnectionCount = 0;
                //    string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount];
                //    if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0)
                //    {
                //        mqMaxConnectionCount = DefaultMaxConnectionCount;
                //    }

                //    string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
                //    HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath));
                    return 50;
                //}

            }

        }

        /// <summary>
        /// 建立連接
        /// </summary>
        /// <param name="hostName">服務器地址</param>
        /// <param name="userName">登錄賬號</param>
        /// <param name="passWord">登錄密碼</param>
        /// <returns></returns>
        private ConnectionFactory CrateFactory()
        {
            var mqConfigDom = MqConfigDomFactory.CreateConfigDomInstance(); //獲取MQ的配置
            var connectionfactory = new ConnectionFactory();
            connectionfactory.HostName = mqConfigDom.MqHost;
            connectionfactory.UserName = mqConfigDom.MqUserName;
            connectionfactory.Password = mqConfigDom.MqPassword;
            connectionfactory.Port = mqConfigDom.MqPort;
            connectionfactory.VirtualHost = mqConfigDom.MqVirtualHost;
        
            return connectionfactory;
        }

        /// <summary>
        /// 創建connection連接
        /// </summary>
        /// <returns></returns>
        public IConnection CreateMQConnection()
        {
            var factory = CrateFactory();
            factory.AutomaticRecoveryEnabled = true;//自動重連
            var connection = factory.CreateConnection();
            connection.AutoClose = false;
            return connection;
        }

        /// <summary>
        /// 初始化
        /// </summary>
        static MQHelper()
        {
            FreeConnectionQueue = new ConcurrentQueue<IConnection>();
            BusyConnectionDic = new ConcurrentDictionary<IConnection, bool>();
            MQConnectionPoolUsingDicNew = new ConcurrentDictionary<IConnection, int>();//連接池使用率
            string semaphoreName = "MQConnectionPoolSemaphore";
            try
            {
                if (null == MQConnectionPoolSemaphore)
                {
                    bool semaphoreWasCreated;
                    SemaphoreSecurity sems = new SemaphoreSecurity();
                    MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore", out semaphoreWasCreated);//信號量,控制同時並發可用線程數

                    if (semaphoreWasCreated)
                    {
                        MQConnectionPoolSemaphore = Semaphore.OpenExisting("MQConnectionPoolSemaphore", SemaphoreRights.FullControl);
                    }

                }
            }
            catch (WaitHandleCannotBeOpenedException)
            {
                bool semaphoreWasCreated;


                string user = Environment.UserDomainName + "\\" + Environment.UserName;
                SemaphoreSecurity semSec = new SemaphoreSecurity();

                SemaphoreAccessRule rule = new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Deny);
                semSec.AddAccessRule(rule);

                rule = new SemaphoreAccessRule(user, SemaphoreRights.ReadPermissions | SemaphoreRights.ChangePermissions, AccessControlType.Allow);
                semSec.AddAccessRule(rule);

                // Create a Semaphore object that represents the system
                // semaphore named by the constant 'semaphoreName', with
                // maximum count three, initial count three, and the
                // specified security access. The Boolean value that 
                // indicates creation of the underlying system object is
                // placed in semaphoreWasCreated.
                //
                MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, semaphoreName, out semaphoreWasCreated, semSec);
            }
            catch (UnauthorizedAccessException ex)
            {
                MQConnectionPoolSemaphore = Semaphore.OpenExisting(semaphoreName, SemaphoreRights.ReadPermissions | SemaphoreRights.ChangePermissions);
                // Get the current ACL. This requires 
                // SemaphoreRights.ReadPermissions.
                SemaphoreSecurity semSec = MQConnectionPoolSemaphore.GetAccessControl();
                string user = Environment.UserDomainName + "\\" + Environment.UserName;
                SemaphoreAccessRule rule = new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Deny);
                semSec.RemoveAccessRule(rule);//移除
                // Now grant the user the correct rights.
                rule = new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Allow);
                semSec.AddAccessRule(rule); //重新授權
                MQConnectionPoolSemaphore.SetAccessControl(semSec);
                MQConnectionPoolSemaphore = Semaphore.OpenExisting(semaphoreName);
            }
        }

//釋放連接
public void CreateNewConnection2FreeQueue() { IConnection mqConnection = null; if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有連接數小於最大可用連接數,則直接創建新連接 { lock (addConnLock) { if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount) { mqConnection = CreateMQConnection(); FreeConnectionQueue.Enqueue(mqConnection);//加入到空閑隊列連接集合中 MQConnectionPoolUsingDicNew[mqConnection] = 0; } } } } public string MqConnectionInfo() { int scount = 0; try { scount=MQConnectionPoolSemaphore.Release(); MQConnectionPoolSemaphore.WaitOne(1); scount -= 1; } catch(SemaphoreFullException ex) { scount = MaxConnectionCount; } return $"當前信號量計數={scount},當前空閑連接長度 ={ FreeConnectionQueue.Count},當前忙連接長度 ={ BusyConnectionDic.Count},連接使用頻率信息如下:已達最大使用次數的有:{MQConnectionPoolUsingDicNew.Where(l=>l.Value>= DefaultMaxConnectionUsingCount).Count()},剩余{MQConnectionPoolUsingDicNew.Where(l => l.Value < DefaultMaxConnectionUsingCount).Count()}\r\n {JsonConvert.SerializeObject(MQConnectionPoolUsingDicNew)}"; } /// <summary> /// 在mq連接池中創建新連接 /// </summary> /// <returns></returns> public IConnection CreateMQConnectionInPoolNew(ref StringBuilder spanMsg) { Stopwatch sw = new Stopwatch(); long spanSum = 0; sw.Start(); // IConnection mqConnection = null; bool waitFree = false; int tryTimeCount = 0; try { TryEnter: waitFree = MQConnectionPoolSemaphore.WaitOne(10);//當<MaxConnectionCount時,會直接進入,否則會等待10ms繼續監測直到空閑連接信號出現 if(!waitFree) { tryTimeCount++; spanMsg.AppendLine($"阻塞10ms,空閑信號=[{waitFree}],進入第[{tryTimeCount}]次嘗試,"); if (tryTimeCount <= 99) { goto TryEnter; } } spanMsg.Append($"空閑信號=[{waitFree}],"); if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //沒有可用的 { sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"嘗試獲取可用空閑連接,沒有可用空閑連接,span:{sw.ElapsedMilliseconds}ms,"); sw.Restart(); if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有連接數小於最大可用連接數,則直接創建新連接 { lock (addConnLock) { if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount) { mqConnection = CreateMQConnection(); BusyConnectionDic[mqConnection] = true;//加入到忙連接集合中 MQConnectionPoolUsingDicNew[mqConnection] = 1; sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"創建一個新連接,並加到使用中連接集合中,span:{sw.ElapsedMilliseconds}ms,"); return mqConnection; } } } sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"沒有空閑連接,已到最大連接數{FreeConnectionQueue.Count + BusyConnectionDic.Count},等待連接釋放,span:{sw.ElapsedMilliseconds}ms,"); if(waitFree)//重試需要釋放之前占用的信號量 { int scount=MQConnectionPoolSemaphore.Release(); waitFree = false; spanMsg.Append($"釋放空閑信號,當前信號計數={scount},"); } return CreateMQConnectionInPoolNew(); } else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //如果取到空閑連接,判斷是否使用次數是否超過最大限制,超過則釋放連接並重新創建 { if (mqConnection.IsOpen) { mqConnection.Close(); } mqConnection.Dispose(); mqConnection = CreateMQConnection(); MQConnectionPoolUsingDicNew[mqConnection] = 0; sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"獲取到的可用空閑連接可能因累計使用通道次數{MQConnectionPoolUsingDicNew[mqConnection] + 1 }>最大可用通道次數{DefaultMaxConnectionUsingCount},或連接狀態處於不是開啟狀態={mqConnection.IsOpen},釋放當前連接並重建一個連接,span:{sw.ElapsedMilliseconds}ms,"); } sw.Restart(); BusyConnectionDic[mqConnection] = true;//加入到忙連接集合中 MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次數加1 sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.AppendLine($"將獲取到得空閑連接放入到忙集合中,並累加使用次數+1={MQConnectionPoolUsingDicNew[mqConnection] + 1},span:{sw.ElapsedMilliseconds}ms,"); return mqConnection; } catch(UnauthorizedAccessException ex) { throw ex; } catch (Exception ex) { if (null != mqConnection) { ResetMQConnectionToFree(mqConnection); } else if(waitFree)//信號量沒釋放,則進行釋放 { MQConnectionPoolSemaphore.Release(); } return null; } finally { spanMsg.AppendLine( $"獲取一個可用連接過程耗費{spanSum}ms,當前空閑連接長度={FreeConnectionQueue.Count},當前忙連接長度={BusyConnectionDic.Count}"); } } /// <summary> /// 在mq連接池中創建新連接 /// </summary> /// <returns></returns> public IConnection CreateMQConnectionInPoolNew() { //string spanMsg = string.Empty; StringBuilder spanMsg = new StringBuilder(); return CreateMQConnectionInPoolNew(ref spanMsg); } /// <summary> /// 釋放連接池中的連接 /// </summary> /// <param name="connection"></param> private void ResetMQConnectionToFree(IConnection connection) { try { lock (freeConnLock) { bool result = false; if (BusyConnectionDic.TryRemove(connection, out result)) //從忙隊列中取出 { } else { //if(!BusyConnectionDic.TryRemove(connection,out result)) } if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//如果因為高並發出現極少概率的>MaxConnectionCount,則直接釋放該連接 { connection.Close(); connection.Dispose(); } else if (connection.IsOpen) //如果OPEN狀態才加入空閑鏈接隊列 { FreeConnectionQueue.Enqueue(connection);//加入到空閑隊列,以便持續提供連接服務 } } } catch { throw; } finally { MQConnectionPoolSemaphore.Release();//釋放一個空閑連接信號 } } /// <summary> /// 發送消息 /// </summary> /// <param name="connection">消息隊列連接對象</param> /// <typeparam name="T">消息類型</typeparam> /// <param name="queueName">隊列名稱</param> /// <param name="durable">是否持久化</param> /// <param name="msg">消息</param> /// <returns></returns> public string SendMsg(IConnection connection, string queueName, string msg, bool durable = true, string exchange = "", string type = "fanout") { bool reTry = false; int reTryCount = 0; string sendErrMsg = string.Empty; do { reTry = false; try { using (var channel = connection.CreateModel())//建立通訊信道 { // 參數從前面開始分別意思為:隊列名稱,是否持久化,獨占的隊列,不使用時是否自動刪除,其他參數 channel.QueueDeclare(queueName, durable, false, false, null); if (!exchange.IsNullOrEmpty()) { channel.ExchangeDeclare(exchange: exchange, type: type, durable: durable); } channel.QueueBind(queueName, exchange, ""); //ExchangeDeclare(model, exchange, RabbitMqProxyConfig.ExchangeType.Fanout, isProperties); //QueueDeclare(model, queue, isProperties); //model.QueueBind(queue, exchange, routingKey); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//1表示不持久,2.表示持久化 ////properties.Type = ""; ////properties.CorrelationId if (!durable) properties = null; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange, queueName, properties, body); } sendErrMsg = string.Empty; } catch (Exception ex) { if ((++reTryCount) <= DefaultRetryConnectionCount) { ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } return ex.ToString(); } finally { if (!reTry) { ResetMQConnectionToFree(connection); } } } while (reTry); return sendErrMsg; } /// <summary> /// 消費消息 /// </summary> /// <param name="connection">消息隊列連接對象</param> /// <param name="queueName">隊列名稱</param> /// <param name="durable">是否持久化</param> /// <param name="dealMessage">消息處理函數</param> /// <param name="saveLog">保存日志方法,可選</param> public void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queueName, durable, false, false, null); //獲取隊列 channel.BasicQos(0, 1, false); //分發機制為觸發式 var consumer = new QueueingBasicConsumer(channel); //建立消費者 // 從左到右參數意思分別是:隊列名稱、是否讀取消息后直接刪除消息,消費者 channel.BasicConsume(queueName, false, consumer); while (true) //如果隊列中有消息 { ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息 string message = null; try { var body = ea.Body; message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { if (saveLog != null) { saveLog(message, ex); } } if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息從隊列中刪除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄 } } } } catch (Exception ex) { if (saveLog != null) { saveLog("QueueName:" + queueName, ex); } throw ex; } finally { ResetMQConnectionToFree(connection); } } /// <summary> /// 依次獲取單個消息 /// </summary> /// <param name="connection">消息隊列連接對象</param> /// <param name="QueueName">隊列名稱</param> /// <param name="durable">持久化</param> /// <param name="dealMessage">處理消息委托</param> public void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, durable, false, false, null); //獲取隊列 channel.BasicQos(0, 1, false); //分發機制為觸發式 uint msgCount = channel.MessageCount(QueueName); if (msgCount > 0) { var consumer = new QueueingBasicConsumer(channel); //建立消費者 // 從左到右參數意思分別是:隊列名稱、是否讀取消息后直接刪除消息,消費者 channel.BasicConsume(QueueName, false, consumer); ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息 try { var body = ea.Body; var message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { throw ex; } finally { if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息從隊列中刪除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄 } } } else { dealMessage(string.Empty); } } } catch (Exception ex) { throw ex; } finally { ResetMQConnectionToFree(connection); } } /// <summary> /// 獲取隊列消息數 /// </summary> /// <param name="connection"></param> /// <param name="QueueName"></param> /// <returns></returns> public int GetMessageCount(IConnection connection, string QueueName) { int msgCount = 0; bool reTry = false; int reTryCount = 0; do { reTry = false; try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, true, false, false, null); //獲取隊列 msgCount = (int)channel.MessageCount(QueueName); } } catch (Exception ex) { //if (BaseUtil.IsIncludeException<SocketException>(ex)) { if ((++reTryCount) <= DefaultRetryConnectionCount)//可重試1次 { ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } } throw ex; } finally { if (!reTry) { ResetMQConnectionToFree(connection); } } } while (reTry); return msgCount; } } public enum ConsumeAction { ACCEPT, // 消費成功 RETRY, // 消費失敗,可以放回隊列重新消費 REJECT, // 消費失敗,直接丟棄 }

在站點開啟多個工作進程的情況下,信號量的控制顯得很重要,它可以有效的控制並發。信號量是針對單機而言的。通過計數可以有效控制使用中的連接個數。從而有效控制連接池的總體數量,防止過度的創建帶來的毀滅性打擊。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM