設計思路,基於前人的傑作,略作改造。
首先我們要知道:
1.創建Connection代價是巨大的(Rabbitmq沒有實現連接池機制)。
2.基於Connection創建Channel代價小的多,理論上,一個connection創建channel次數是沒有限制的。
(說得再多,還是圖片具體點。)流程如下圖所示:
這里做了個小小改造,就是根據系統自身的需要創建自己所需要的連接。優先使用空閑連接,而不是還沒達到最大的連接限制時,優先進行創建新連接。
代碼改造實現如下:
public class MQHelper { private const string CacheKey_MQConnectionSetting = "MQConnectionSetting"; private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount"; // 空閑連接對象隊列 private readonly static ConcurrentQueue<IConnection> FreeConnectionQueue; //使用中(忙)連接對象字典 private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic; //連接池使用率 private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew; private readonly static Semaphore MQConnectionPoolSemaphore; //釋放和添加連接時的鎖對象 private readonly static object freeConnLock = new object(), addConnLock = new object(), getConnLock = new object(); // 連接總數 private static int connCount = 0; //默認最大保持可用連接數 public const int DefaultMaxConnectionCount = 50; //默認最大連接數可訪問次數 public const int DefaultMaxConnectionUsingCount = 10000; public const int DefaultRetryConnectionCount = 1;//默認重試連接次數 /// <summary> /// 初始化最大連接數 /// </summary> private static int MaxConnectionCount { get { //if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null) //{ // return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]); //} //else //{ // int mqMaxConnectionCount = 0; // string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount]; // if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0) // { // mqMaxConnectionCount = DefaultMaxConnectionCount; // } // string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); // HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath)); return 50; //} } } /// <summary> /// 建立連接 /// </summary> /// <param name="hostName">服務器地址</param> /// <param name="userName">登錄賬號</param> /// <param name="passWord">登錄密碼</param> /// <returns></returns> private ConnectionFactory CrateFactory() { var mqConfigDom = MqConfigDomFactory.CreateConfigDomInstance(); //獲取MQ的配置 var connectionfactory = new ConnectionFactory(); connectionfactory.HostName = mqConfigDom.MqHost; connectionfactory.UserName = mqConfigDom.MqUserName; connectionfactory.Password = mqConfigDom.MqPassword; connectionfactory.Port = mqConfigDom.MqPort; connectionfactory.VirtualHost = mqConfigDom.MqVirtualHost; return connectionfactory; } /// <summary> /// 創建connection連接 /// </summary> /// <returns></returns> public IConnection CreateMQConnection() { var factory = CrateFactory(); factory.AutomaticRecoveryEnabled = true;//自動重連 var connection = factory.CreateConnection(); connection.AutoClose = false; return connection; } /// <summary> /// 初始化 /// </summary> static MQHelper() { FreeConnectionQueue = new ConcurrentQueue<IConnection>(); BusyConnectionDic = new ConcurrentDictionary<IConnection, bool>(); MQConnectionPoolUsingDicNew = new ConcurrentDictionary<IConnection, int>();//連接池使用率 string semaphoreName = "MQConnectionPoolSemaphore"; try { if (null == MQConnectionPoolSemaphore) { bool semaphoreWasCreated; SemaphoreSecurity sems = new SemaphoreSecurity(); MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore", out semaphoreWasCreated);//信號量,控制同時並發可用線程數 if (semaphoreWasCreated) { MQConnectionPoolSemaphore = Semaphore.OpenExisting("MQConnectionPoolSemaphore", SemaphoreRights.FullControl); } } } catch (WaitHandleCannotBeOpenedException) { bool semaphoreWasCreated; string user = Environment.UserDomainName + "\\" + Environment.UserName; SemaphoreSecurity semSec = new SemaphoreSecurity(); SemaphoreAccessRule rule = new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Deny); semSec.AddAccessRule(rule); rule = new SemaphoreAccessRule(user, SemaphoreRights.ReadPermissions | SemaphoreRights.ChangePermissions, AccessControlType.Allow); semSec.AddAccessRule(rule); // Create a Semaphore object that represents the system // semaphore named by the constant 'semaphoreName', with // maximum count three, initial count three, and the // specified security access. The Boolean value that // indicates creation of the underlying system object is // placed in semaphoreWasCreated. // MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, semaphoreName, out semaphoreWasCreated, semSec); } catch (UnauthorizedAccessException ex) { MQConnectionPoolSemaphore = Semaphore.OpenExisting(semaphoreName, SemaphoreRights.ReadPermissions | SemaphoreRights.ChangePermissions); // Get the current ACL. This requires // SemaphoreRights.ReadPermissions. SemaphoreSecurity semSec = MQConnectionPoolSemaphore.GetAccessControl(); string user = Environment.UserDomainName + "\\" + Environment.UserName; SemaphoreAccessRule rule = new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Deny); semSec.RemoveAccessRule(rule);//移除 // Now grant the user the correct rights. rule = new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Allow); semSec.AddAccessRule(rule); //重新授權 MQConnectionPoolSemaphore.SetAccessControl(semSec); MQConnectionPoolSemaphore = Semaphore.OpenExisting(semaphoreName); } }
//釋放連接 public void CreateNewConnection2FreeQueue() { IConnection mqConnection = null; if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有連接數小於最大可用連接數,則直接創建新連接 { lock (addConnLock) { if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount) { mqConnection = CreateMQConnection(); FreeConnectionQueue.Enqueue(mqConnection);//加入到空閑隊列連接集合中 MQConnectionPoolUsingDicNew[mqConnection] = 0; } } } } public string MqConnectionInfo() { int scount = 0; try { scount=MQConnectionPoolSemaphore.Release(); MQConnectionPoolSemaphore.WaitOne(1); scount -= 1; } catch(SemaphoreFullException ex) { scount = MaxConnectionCount; } return $"當前信號量計數={scount},當前空閑連接長度 ={ FreeConnectionQueue.Count},當前忙連接長度 ={ BusyConnectionDic.Count},連接使用頻率信息如下:已達最大使用次數的有:{MQConnectionPoolUsingDicNew.Where(l=>l.Value>= DefaultMaxConnectionUsingCount).Count()},剩余{MQConnectionPoolUsingDicNew.Where(l => l.Value < DefaultMaxConnectionUsingCount).Count()}\r\n {JsonConvert.SerializeObject(MQConnectionPoolUsingDicNew)}"; } /// <summary> /// 在mq連接池中創建新連接 /// </summary> /// <returns></returns> public IConnection CreateMQConnectionInPoolNew(ref StringBuilder spanMsg) { Stopwatch sw = new Stopwatch(); long spanSum = 0; sw.Start(); // IConnection mqConnection = null; bool waitFree = false; int tryTimeCount = 0; try { TryEnter: waitFree = MQConnectionPoolSemaphore.WaitOne(10);//當<MaxConnectionCount時,會直接進入,否則會等待10ms繼續監測直到空閑連接信號出現 if(!waitFree) { tryTimeCount++; spanMsg.AppendLine($"阻塞10ms,空閑信號=[{waitFree}],進入第[{tryTimeCount}]次嘗試,"); if (tryTimeCount <= 99) { goto TryEnter; } } spanMsg.Append($"空閑信號=[{waitFree}],"); if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //沒有可用的 { sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"嘗試獲取可用空閑連接,沒有可用空閑連接,span:{sw.ElapsedMilliseconds}ms,"); sw.Restart(); if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有連接數小於最大可用連接數,則直接創建新連接 { lock (addConnLock) { if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount) { mqConnection = CreateMQConnection(); BusyConnectionDic[mqConnection] = true;//加入到忙連接集合中 MQConnectionPoolUsingDicNew[mqConnection] = 1; sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"創建一個新連接,並加到使用中連接集合中,span:{sw.ElapsedMilliseconds}ms,"); return mqConnection; } } } sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"沒有空閑連接,已到最大連接數{FreeConnectionQueue.Count + BusyConnectionDic.Count},等待連接釋放,span:{sw.ElapsedMilliseconds}ms,"); if(waitFree)//重試需要釋放之前占用的信號量 { int scount=MQConnectionPoolSemaphore.Release(); waitFree = false; spanMsg.Append($"釋放空閑信號,當前信號計數={scount},"); } return CreateMQConnectionInPoolNew(); } else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //如果取到空閑連接,判斷是否使用次數是否超過最大限制,超過則釋放連接並重新創建 { if (mqConnection.IsOpen) { mqConnection.Close(); } mqConnection.Dispose(); mqConnection = CreateMQConnection(); MQConnectionPoolUsingDicNew[mqConnection] = 0; sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"獲取到的可用空閑連接可能因累計使用通道次數{MQConnectionPoolUsingDicNew[mqConnection] + 1 }>最大可用通道次數{DefaultMaxConnectionUsingCount},或連接狀態處於不是開啟狀態={mqConnection.IsOpen},釋放當前連接並重建一個連接,span:{sw.ElapsedMilliseconds}ms,"); } sw.Restart(); BusyConnectionDic[mqConnection] = true;//加入到忙連接集合中 MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次數加1 sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.AppendLine($"將獲取到得空閑連接放入到忙集合中,並累加使用次數+1={MQConnectionPoolUsingDicNew[mqConnection] + 1},span:{sw.ElapsedMilliseconds}ms,"); return mqConnection; } catch(UnauthorizedAccessException ex) { throw ex; } catch (Exception ex) { if (null != mqConnection) { ResetMQConnectionToFree(mqConnection); } else if(waitFree)//信號量沒釋放,則進行釋放 { MQConnectionPoolSemaphore.Release(); } return null; } finally { spanMsg.AppendLine( $"獲取一個可用連接過程耗費{spanSum}ms,當前空閑連接長度={FreeConnectionQueue.Count},當前忙連接長度={BusyConnectionDic.Count}"); } } /// <summary> /// 在mq連接池中創建新連接 /// </summary> /// <returns></returns> public IConnection CreateMQConnectionInPoolNew() { //string spanMsg = string.Empty; StringBuilder spanMsg = new StringBuilder(); return CreateMQConnectionInPoolNew(ref spanMsg); } /// <summary> /// 釋放連接池中的連接 /// </summary> /// <param name="connection"></param> private void ResetMQConnectionToFree(IConnection connection) { try { lock (freeConnLock) { bool result = false; if (BusyConnectionDic.TryRemove(connection, out result)) //從忙隊列中取出 { } else { //if(!BusyConnectionDic.TryRemove(connection,out result)) } if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//如果因為高並發出現極少概率的>MaxConnectionCount,則直接釋放該連接 { connection.Close(); connection.Dispose(); } else if (connection.IsOpen) //如果OPEN狀態才加入空閑鏈接隊列 { FreeConnectionQueue.Enqueue(connection);//加入到空閑隊列,以便持續提供連接服務 } } } catch { throw; } finally { MQConnectionPoolSemaphore.Release();//釋放一個空閑連接信號 } } /// <summary> /// 發送消息 /// </summary> /// <param name="connection">消息隊列連接對象</param> /// <typeparam name="T">消息類型</typeparam> /// <param name="queueName">隊列名稱</param> /// <param name="durable">是否持久化</param> /// <param name="msg">消息</param> /// <returns></returns> public string SendMsg(IConnection connection, string queueName, string msg, bool durable = true, string exchange = "", string type = "fanout") { bool reTry = false; int reTryCount = 0; string sendErrMsg = string.Empty; do { reTry = false; try { using (var channel = connection.CreateModel())//建立通訊信道 { // 參數從前面開始分別意思為:隊列名稱,是否持久化,獨占的隊列,不使用時是否自動刪除,其他參數 channel.QueueDeclare(queueName, durable, false, false, null); if (!exchange.IsNullOrEmpty()) { channel.ExchangeDeclare(exchange: exchange, type: type, durable: durable); } channel.QueueBind(queueName, exchange, ""); //ExchangeDeclare(model, exchange, RabbitMqProxyConfig.ExchangeType.Fanout, isProperties); //QueueDeclare(model, queue, isProperties); //model.QueueBind(queue, exchange, routingKey); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//1表示不持久,2.表示持久化 ////properties.Type = ""; ////properties.CorrelationId if (!durable) properties = null; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange, queueName, properties, body); } sendErrMsg = string.Empty; } catch (Exception ex) { if ((++reTryCount) <= DefaultRetryConnectionCount) { ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } return ex.ToString(); } finally { if (!reTry) { ResetMQConnectionToFree(connection); } } } while (reTry); return sendErrMsg; } /// <summary> /// 消費消息 /// </summary> /// <param name="connection">消息隊列連接對象</param> /// <param name="queueName">隊列名稱</param> /// <param name="durable">是否持久化</param> /// <param name="dealMessage">消息處理函數</param> /// <param name="saveLog">保存日志方法,可選</param> public void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queueName, durable, false, false, null); //獲取隊列 channel.BasicQos(0, 1, false); //分發機制為觸發式 var consumer = new QueueingBasicConsumer(channel); //建立消費者 // 從左到右參數意思分別是:隊列名稱、是否讀取消息后直接刪除消息,消費者 channel.BasicConsume(queueName, false, consumer); while (true) //如果隊列中有消息 { ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息 string message = null; try { var body = ea.Body; message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { if (saveLog != null) { saveLog(message, ex); } } if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息從隊列中刪除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄 } } } } catch (Exception ex) { if (saveLog != null) { saveLog("QueueName:" + queueName, ex); } throw ex; } finally { ResetMQConnectionToFree(connection); } } /// <summary> /// 依次獲取單個消息 /// </summary> /// <param name="connection">消息隊列連接對象</param> /// <param name="QueueName">隊列名稱</param> /// <param name="durable">持久化</param> /// <param name="dealMessage">處理消息委托</param> public void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, durable, false, false, null); //獲取隊列 channel.BasicQos(0, 1, false); //分發機制為觸發式 uint msgCount = channel.MessageCount(QueueName); if (msgCount > 0) { var consumer = new QueueingBasicConsumer(channel); //建立消費者 // 從左到右參數意思分別是:隊列名稱、是否讀取消息后直接刪除消息,消費者 channel.BasicConsume(QueueName, false, consumer); ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息 try { var body = ea.Body; var message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { throw ex; } finally { if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息從隊列中刪除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄 } } } else { dealMessage(string.Empty); } } } catch (Exception ex) { throw ex; } finally { ResetMQConnectionToFree(connection); } } /// <summary> /// 獲取隊列消息數 /// </summary> /// <param name="connection"></param> /// <param name="QueueName"></param> /// <returns></returns> public int GetMessageCount(IConnection connection, string QueueName) { int msgCount = 0; bool reTry = false; int reTryCount = 0; do { reTry = false; try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, true, false, false, null); //獲取隊列 msgCount = (int)channel.MessageCount(QueueName); } } catch (Exception ex) { //if (BaseUtil.IsIncludeException<SocketException>(ex)) { if ((++reTryCount) <= DefaultRetryConnectionCount)//可重試1次 { ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } } throw ex; } finally { if (!reTry) { ResetMQConnectionToFree(connection); } } } while (reTry); return msgCount; } } public enum ConsumeAction { ACCEPT, // 消費成功 RETRY, // 消費失敗,可以放回隊列重新消費 REJECT, // 消費失敗,直接丟棄 }
在站點開啟多個工作進程的情況下,信號量的控制顯得很重要,它可以有效的控制並發。信號量是針對單機而言的。通過計數可以有效控制使用中的連接個數。從而有效控制連接池的總體數量,防止過度的創建帶來的毀滅性打擊。