一、本文產生原由:
之前文章《總結消息隊列RabbitMQ的基本用法》已對RabbitMQ的安裝、用法都做了詳細說明,而本文主要是針對在高並發且單次從RabbitMQ中消費消息時,出現了連接數不足、連接響應較慢、RabbitMQ服務器崩潰等各種性能問題的解方案,之所以會出現我列舉的這些問題,究基根源,其實是TCP連接創建與斷開太過頻繁所致,這與我們使用ADO.NET來訪問常規的關系型DB(如:SQL SERVER、MYSQL)有所不同,在訪問DB時,我們一般都建議大家使用using包裹,目的是每次創建完DB連接,使用完成后自動釋放連接,避免不必要的連接數及資源占用。可能有人會問,為何訪問DB,可以每次創建再斷開連接,都沒有問題,而同樣訪問MQ(本文所指的MQ均是RabbitMQ),每次創建再斷開連接,如果在高並發且創建與斷開頻率高的時候,會出現性能問題呢?其實如果了解了DB的連接創建與斷開以及MQ的連接創建與斷開原理就知道其中的區別了。這里我簡要說明一下,DB連接與MQ連接 其實底層都是基於TCP連接,創建TCP連接肯定是有資源消耗的,是非常昂貴的,原則上盡可能少的去創建與斷開TCP連接,DB創建連接、MQ創建連接可以說是一樣的,但在斷開銷毀連接上就有很大的不同,DB創建連接再斷開時,默認情況下是把該連接回收到連接池中,下次如果再有DB連接創建請求,則先判斷DB連接池中是否有空閑的連接,若有則直接復用,若沒有才創建連接,這樣就達到了TCP連接的復用,而MQ創建連接都是新創建的TCP連接,斷開時則直接斷開TCP連接,簡單粗暴,看似資源清理更徹底,但若在高並發高頻率每次都重新創建與斷開MQ連接,則性能只會越來越差(上面說過TCP連接是非常昂貴的),我在公司項目中就出現了該問題,后面在技術總監的指導下,對MQ的連接創建與斷開作了優化,實現了類似DB連接池的概念。
連接池,故名思義,連接的池子,所有的連接作為一種資源集中存放在池中,需要使用時就可以到池中獲取空閑連接資源,用完后再放回池中,以此達到連接資源的有效重用,同時也控制了資源的過度消耗與浪費(資源多少取決於池子的容量)
二、源代碼奉獻(可直接復制應用到大家的項目中)
下面就先貼出實現MQHelper(含連接池)的源代碼:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Util;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Web.Caching;
using System.Web;
using System.Configuration;
using System.IO;
using System.Collections.Concurrent;
using System.Threading;
using System.Runtime.CompilerServices;
namespace Zuowj.Core
{
public class MQHelper
{
private const string CacheKey_MQConnectionSetting = "MQConnectionSetting";
private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount";
private readonly static ConcurrentQueue<IConnection> FreeConnectionQueue;//空閑連接對象隊列
private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic;//使用中(忙)連接對象集合
private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew;//連接池使用率
private readonly static Semaphore MQConnectionPoolSemaphore;
private readonly static object freeConnLock = new object(), addConnLock = new object();
private static int connCount = 0;
public const int DefaultMaxConnectionCount = 30;//默認最大保持可用連接數
public const int DefaultMaxConnectionUsingCount = 10000;//默認最大連接可訪問次數
private static int MaxConnectionCount
{
get
{
if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null)
{
return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]);
}
else
{
int mqMaxConnectionCount = 0;
string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount];
if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0)
{
mqMaxConnectionCount = DefaultMaxConnectionCount;
}
string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath));
return mqMaxConnectionCount;
}
}
}
/// <summary>
/// 建立連接
/// </summary>
/// <param name="hostName">服務器地址</param>
/// <param name="userName">登錄賬號</param>
/// <param name="passWord">登錄密碼</param>
/// <returns></returns>
private static ConnectionFactory CrateFactory()
{
var mqConnectionSetting = GetMQConnectionSetting();
var connectionfactory = new ConnectionFactory();
connectionfactory.HostName = mqConnectionSetting[0];
connectionfactory.UserName = mqConnectionSetting[1];
connectionfactory.Password = mqConnectionSetting[2];
if (mqConnectionSetting.Length > 3) //增加端口號
{
connectionfactory.Port = Convert.ToInt32(mqConnectionSetting[3]);
}
return connectionfactory;
}
private static string[] GetMQConnectionSetting()
{
string[] mqConnectionSetting = null;
if (HttpRuntime.Cache[CacheKey_MQConnectionSetting] == null)
{
//MQConnectionSetting=Host IP|;userid;|;password
string mqConnSettingStr = ConfigurationManager.AppSettings[CacheKey_MQConnectionSetting];
if (!string.IsNullOrWhiteSpace(mqConnSettingStr))
{
mqConnSettingStr = EncryptUtility.Decrypt(mqConnSettingStr);//解密MQ連接字符串,若項目中無此需求可移除,EncryptUtility是一個AES的加解密工具類,大家網上可自行查找
if (mqConnSettingStr.Contains(";|;"))
{
mqConnectionSetting = mqConnSettingStr.Split(new[] { ";|;" }, StringSplitOptions.RemoveEmptyEntries);
}
}
if (mqConnectionSetting == null || mqConnectionSetting.Length < 3)
{
throw new Exception("MQConnectionSetting未配置或配置不正確");
}
string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
HttpRuntime.Cache.Insert(CacheKey_MQConnectionSetting, mqConnectionSetting, new CacheDependency(appConfigPath));
}
else
{
mqConnectionSetting = HttpRuntime.Cache[CacheKey_MQConnectionSetting] as string[];
}
return mqConnectionSetting;
}
public static IConnection CreateMQConnection()
{
var factory = CrateFactory();
factory.AutomaticRecoveryEnabled = true;//自動重連
var connection = factory.CreateConnection();
connection.AutoClose = false;
return connection;
}
static MQHelper()
{
FreeConnectionQueue = new ConcurrentQueue<IConnection>();
BusyConnectionDic = new ConcurrentDictionary<IConnection, bool>();
MQConnectionPoolUsingDicNew = new ConcurrentDictionary<IConnection, int>();//連接池使用率
MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore");//信號量,控制同時並發可用線程數
}
public static IConnection CreateMQConnectionInPoolNew()
{
SelectMQConnectionLine:
MQConnectionPoolSemaphore.WaitOne();//當<MaxConnectionCount時,會直接進入,否則會等待直到空閑連接出現
IConnection mqConnection = null;
if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有連接數小於最大可用連接數,則直接創建新連接
{
lock (addConnLock)
{
if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)
{
mqConnection = CreateMQConnection();
BusyConnectionDic[mqConnection] = true;//加入到忙連接集合中
MQConnectionPoolUsingDicNew[mqConnection] = 1;
// BaseUtil.Logger.DebugFormat("Create a MQConnection:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
return mqConnection;
}
}
}
if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //如果沒有可用空閑連接,則重新進入等待排隊
{
// BaseUtil.Logger.DebugFormat("no FreeConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
goto SelectMQConnectionLine;
}
else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //如果取到空閑連接,判斷是否使用次數是否超過最大限制,超過則釋放連接並重新創建
{
mqConnection.Close();
mqConnection.Dispose();
// BaseUtil.Logger.DebugFormat("close > DefaultMaxConnectionUsingCount mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
mqConnection = CreateMQConnection();
MQConnectionPoolUsingDicNew[mqConnection] = 0;
// BaseUtil.Logger.DebugFormat("create new mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
}
BusyConnectionDic[mqConnection] = true;//加入到忙連接集合中
MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次數加1
// BaseUtil.Logger.DebugFormat("set BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
return mqConnection;
}
private static void ResetMQConnectionToFree(IConnection connection)
{
lock (freeConnLock)
{
bool result = false;
if (BusyConnectionDic.TryRemove(connection, out result)) //從忙隊列中取出
{
// BaseUtil.Logger.DebugFormat("set FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
}
else
{
// BaseUtil.Logger.DebugFormat("failed TryRemove BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
}
if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//如果因為高並發出現極少概率的>MaxConnectionCount,則直接釋放該連接
{
connection.Close();
connection.Dispose();
}
else
{
FreeConnectionQueue.Enqueue(connection);//加入到空閑隊列,以便持續提供連接服務
}
MQConnectionPoolSemaphore.Release();//釋放一個空閑連接信號
//Interlocked.Decrement(ref connCount);
//BaseUtil.Logger.DebugFormat("Enqueue FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2},thread count:{3}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count,connCount);
}
}
/// <summary>
/// 發送消息
/// </summary>
/// <param name="connection">消息隊列連接對象</param>
/// <typeparam name="T">消息類型</typeparam>
/// <param name="queueName">隊列名稱</param>
/// <param name="durable">是否持久化</param>
/// <param name="msg">消息</param>
/// <returns></returns>
public static string SendMsg(IConnection connection, string queueName, string msg, bool durable = true)
{
try
{
using (var channel = connection.CreateModel())//建立通訊信道
{
// 參數從前面開始分別意思為:隊列名稱,是否持久化,獨占的隊列,不使用時是否自動刪除,其他參數
channel.QueueDeclare(queueName, durable, false, false, null);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;//1表示不持久,2.表示持久化
if (!durable)
properties = null;
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish("", queueName, properties, body);
}
return string.Empty;
}
catch (Exception ex)
{
return ex.ToString();
}
finally
{
ResetMQConnectionToFree(connection);
}
}
/// <summary>
/// 消費消息
/// </summary>
/// <param name="connection">消息隊列連接對象</param>
/// <param name="queueName">隊列名稱</param>
/// <param name="durable">是否持久化</param>
/// <param name="dealMessage">消息處理函數</param>
/// <param name="saveLog">保存日志方法,可選</param>
public static void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null)
{
try
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, durable, false, false, null); //獲取隊列
channel.BasicQos(0, 1, false); //分發機制為觸發式
var consumer = new QueueingBasicConsumer(channel); //建立消費者
// 從左到右參數意思分別是:隊列名稱、是否讀取消息后直接刪除消息,消費者
channel.BasicConsume(queueName, false, consumer);
while (true) //如果隊列中有消息
{
ConsumeAction consumeResult = ConsumeAction.RETRY;
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息
string message = null;
try
{
var body = ea.Body;
message = Encoding.UTF8.GetString(body);
consumeResult = dealMessage(message);
}
catch (Exception ex)
{
if (saveLog != null)
{
saveLog(message, ex);
}
}
if (consumeResult == ConsumeAction.ACCEPT)
{
channel.BasicAck(ea.DeliveryTag, false); //消息從隊列中刪除
}
else if (consumeResult == ConsumeAction.RETRY)
{
channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列
}
else
{
channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄
}
}
}
}
catch (Exception ex)
{
if (saveLog != null)
{
saveLog("QueueName:" + queueName, ex);
}
throw ex;
}
finally
{
ResetMQConnectionToFree(connection);
}
}
/// <summary>
/// 依次獲取單個消息
/// </summary>
/// <param name="connection">消息隊列連接對象</param>
/// <param name="QueueName">隊列名稱</param>
/// <param name="durable">持久化</param>
/// <param name="dealMessage">處理消息委托</param>
public static void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage)
{
try
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(QueueName, durable, false, false, null); //獲取隊列
channel.BasicQos(0, 1, false); //分發機制為觸發式
uint msgCount = channel.MessageCount(QueueName);
if (msgCount > 0)
{
var consumer = new QueueingBasicConsumer(channel); //建立消費者
// 從左到右參數意思分別是:隊列名稱、是否讀取消息后直接刪除消息,消費者
channel.BasicConsume(QueueName, false, consumer);
ConsumeAction consumeResult = ConsumeAction.RETRY;
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息
try
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
consumeResult = dealMessage(message);
}
catch (Exception ex)
{
throw ex;
}
finally
{
if (consumeResult == ConsumeAction.ACCEPT)
{
channel.BasicAck(ea.DeliveryTag, false); //消息從隊列中刪除
}
else if (consumeResult == ConsumeAction.RETRY)
{
channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列
}
else
{
channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄
}
}
}
else
{
dealMessage(string.Empty);
}
}
}
catch (Exception ex)
{
throw ex;
}
finally
{
ResetMQConnectionToFree(connection);
}
}
/// <summary>
/// 獲取隊列消息數
/// </summary>
/// <param name="connection"></param>
/// <param name="QueueName"></param>
/// <returns></returns>
public static int GetMessageCount(IConnection connection, string QueueName)
{
int msgCount = 0;
try
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(QueueName, true, false, false, null); //獲取隊列
msgCount = (int)channel.MessageCount(QueueName);
}
}
catch (Exception ex)
{
throw ex;
}
finally
{
ResetMQConnectionToFree(connection);
}
return msgCount;
}
}
public enum ConsumeAction
{
ACCEPT, // 消費成功
RETRY, // 消費失敗,可以放回隊列重新消費
REJECT, // 消費失敗,直接丟棄
}
}
現在對上述代碼的核心點作一個簡要的說明:
先說一下靜態構造函數:
FreeConnectionQueue 用於存放空閑連接對象隊列,為何使用Queue,因為當我從中取出1個空閑連接后,空閑連接數就應該少1個,這個Queue很好滿足這個需求,而且這個Queue是並發安全的Queue哦(ConcurrentQueue)
BusyConnectionDic 忙(使用中)連接對象集合,為何這里使用字典對象呢,因為當我用完后,需要能夠快速的找出使用中的連接對象,並能快速移出,同時重新放入到空閑隊列FreeConnectionQueue ,達到連接復用
MQConnectionPoolUsingDicNew 連接使用次數記錄集合,這個只是輔助記錄連接使用次數,以便可以計算一個連接的已使用次數,當達到最大使用次數時,則應斷開重新創建
MQConnectionPoolSemaphore 這個是信號量,這是控制並發連接的重要手段,連接池的容量等同於這個信號量的最大可並行數,保證同時使用的連接數不超過連接池的容量,若超過則會等待;
具體步驟說明:
1.MaxConnectionCount:最大保持可用連接數(可以理解為連接池的容量),可以通過CONFIG配置,默認為30;
2.DefaultMaxConnectionUsingCount:默認最大連接可訪問次數,我這里沒有使用配置,而是直接使用常量固定為1000,大家若有需要可以改成從CONFIG配置,參考MaxConnectionCount的屬性設置(采取了依賴緩存)
3.CreateMQConnectionInPoolNew:從連接池中創建MQ連接對象,這個是核心方法,是實現連接池的地方,代碼中已注釋了重要的步驟邏輯,這里說一下實現思路:
3.1 通過MQConnectionPoolSemaphore.WaitOne() 利用信號量的並行等待方法,如果當前並發超過信號量的最大並行度(也就是作為連接池的最大容量),則需要等待空閑連接池,防止連接數超過池的容量,如果並發沒有超過池的容量,則可以進入獲取連接的邏輯;
3.2FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount,如果空閑連接隊列+忙連接集合的總數小於連接池的容量,則可以直接創建新的MQ連接,否則FreeConnectionQueue.TryDequeue(out mqConnection) 嘗試從空閑連接隊列中獲取一個可用的空閑連接使用,若空閑連接都沒有,則需要返回到方法首行,重新等待空閑連接;
3.3MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen 如果取到空閑連接,則先判斷使用次數是否超過最大限制,超過則釋放連接或空閑連接已斷開連接也需要重新創建,否則該連接可用;
3.4BusyConnectionDic[mqConnection] = true;加入到忙連接集合中,MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1; 使用次數加1,確保每使用一次連接,連接次數能記錄
4.ResetMQConnectionToFree:重置釋放連接對象,這個是保證MQ連接用完后能夠回收到空閑連接隊列中(即:回到連接池中),而不是直接斷開連接,這個方法很簡單就不作作過多說明。
好了,都說明了如何實現含連接池的MQHelper,現在再來舉幾個例子來說明如何用:
三、實際應用(簡單易上手)
獲取並消費一個消息:
public string GetMessage(string queueName)
{
string message = null;
try
{
var connection = MQHelper.CreateMQConnectionInPoolNew();
MQHelper.ConsumeMsgSingle(connection, queueName, true, (msg) =>
{
message = msg;
return ConsumeAction.ACCEPT;
});
}
catch (Exception ex)
{
BaseUtil.Logger.Error(string.Format("MQHelper.ConsumeMsgSingle Error:{0}", ex.Message), ex);
message = "ERROR:" + ex.Message;
}
//BaseUtil.Logger.InfoFormat("第{0}次請求,從消息隊列(隊列名稱:{1})中獲取消息值為:{2}", Interlocked.Increment(ref requestCount), queueName, message);
return message;
}
發送一個消息:
public string SendMessage(string queueName, string msg)
{
string result = null;
try
{
var connection = MQHelper.CreateMQConnectionInPoolNew();
result = MQHelper.SendMsg(connection, queueName, msg);
}
catch (Exception ex)
{
BaseUtil.Logger.Error(string.Format("MQHelper.SendMessage Error:{0}", ex.Message), ex);
result = ex.Message;
}
return result;
}
獲取消息隊列消息數:
public int GetMessageCount(string queueName)
{
int result = -1;
try
{
var connection = MQHelper.CreateMQConnectionInPoolNew();
result = MQHelper.GetMessageCount(connection, queueName);
}
catch (Exception ex)
{
BaseUtil.Logger.Error(string.Format("MQHelper.GetMessageCount Error:{0}", ex.Message), ex);
result = -1;
}
return result;
}
這里說一下:BaseUtil.Logger 是Log4Net的實例對象,另外上面沒有針對持續訂閱消費消息(ConsumeMsg)作說明,因為這個其實可以不用連接池也不會有問題,因為它是一個持久訂閱並持久消費的過程,不會出現頻繁創建連接對象的情況。
最后要說的是,雖說代碼貼出來,大家一看就覺得很簡單,好像沒有什么技術含量,但如果沒有完整的思路也還是需要花費一些時間和精力的,代碼中核心是如何簡單高效的解決並發及連接復用的的問題,該MQHelper有經過壓力測試並順利在我司項目中使用,完美解決了之前的問題,由於這個方案是我在公司通宵實現的,可能有一些方面的不足,大家可以相互交流或完善后入到自己的項目中。
2019-7-3更新:優化解決當已緩存的連接不可用時,導致無法復用,連接池一直被無效的長連接占滿問題,以及處理消息時增加失敗自動重試功能,代碼如下:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Util;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Web.Caching;
using System.Web;
using System.Configuration;
using System.IO;
using System.Collections.Concurrent;
using System.Threading;
using System.Runtime.CompilerServices;
using System.Net.Sockets;
namespace KYLDMQService.Core
{
public class MQHelper
{
private const string CacheKey_MQConnectionSetting = "MQConnectionSetting";
private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount";
public const int DefaultMaxConnectionCount = 30;//默認最大保持可用連接數
public const int DefaultMaxConnectionUsingCount = 10000;//默認最大連接可訪問次數
public const int DefaultReTryConnectionCount = 1;//默認重試連接次數
private static int MaxConnectionCount
{
get
{
if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null)
{
return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]);
}
else
{
int mqMaxConnectionCount = 0;
string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount];
if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0)
{
mqMaxConnectionCount = DefaultMaxConnectionCount;
}
string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath));
return mqMaxConnectionCount;
}
}
}
/// <summary>
/// 建立連接
/// </summary>
/// <param name="hostName">服務器地址</param>
/// <param name="userName">登錄賬號</param>
/// <param name="passWord">登錄密碼</param>
/// <returns></returns>
private static ConnectionFactory CrateFactory()
{
var mqConnectionSetting = GetMQConnectionSetting();
var connectionfactory = new ConnectionFactory();
connectionfactory.HostName = mqConnectionSetting[0];
connectionfactory.UserName = mqConnectionSetting[1];
connectionfactory.Password = mqConnectionSetting[2];
if (mqConnectionSetting.Length > 3) //增加端口號
{
connectionfactory.Port = Convert.ToInt32(mqConnectionSetting[3]);
}
return connectionfactory;
}
private static string[] GetMQConnectionSetting()
{
string[] mqConnectionSetting = null;
if (HttpRuntime.Cache[CacheKey_MQConnectionSetting] == null)
{
//MQConnectionSetting=Host IP|;userid;|;password
string mqConnSettingStr = ConfigurationManager.AppSettings[CacheKey_MQConnectionSetting];
if (!string.IsNullOrWhiteSpace(mqConnSettingStr))
{
mqConnSettingStr = EncryptUtility.Decrypt(mqConnSettingStr);
if (mqConnSettingStr.Contains(";|;"))
{
mqConnectionSetting = mqConnSettingStr.Split(new[] { ";|;" }, StringSplitOptions.RemoveEmptyEntries);
}
}
if (mqConnectionSetting == null || mqConnectionSetting.Length < 3)
{
throw new Exception("MQConnectionSetting未配置或配置不正確");
}
string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
HttpRuntime.Cache.Insert(CacheKey_MQConnectionSetting, mqConnectionSetting, new CacheDependency(appConfigPath));
}
else
{
mqConnectionSetting = HttpRuntime.Cache[CacheKey_MQConnectionSetting] as string[];
}
return mqConnectionSetting;
}
public static IConnection CreateMQConnection()
{
var factory = CrateFactory();
factory.AutomaticRecoveryEnabled = true;//自動重連
var connection = factory.CreateConnection();
connection.AutoClose = false;
return connection;
}
private readonly static ConcurrentQueue<IConnection> FreeConnectionQueue;//空閑連接對象隊列
private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic;//使用中(忙)連接對象集合
private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew;//連接池使用率
private readonly static Semaphore MQConnectionPoolSemaphore;
private readonly static object freeConnLock = new object(), addConnLock = new object();
private static int connCount = 0;
static MQHelper()
{
FreeConnectionQueue = new ConcurrentQueue<IConnection>();
BusyConnectionDic = new ConcurrentDictionary<IConnection, bool>();
MQConnectionPoolUsingDicNew = new ConcurrentDictionary<IConnection, int>();//連接池使用率
MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore");//信號量,控制同時並發可用線程數
}
public static IConnection CreateMQConnectionInPoolNew()
{
MQConnectionPoolSemaphore.WaitOne(10000);//當<MaxConnectionCount時,會直接進入,否則會等待直到空閑連接出現
//Interlocked.Increment(ref connCount);
//BaseUtil.Logger.DebugFormat("thread Concurrent count:{0}", connCount);
//int totalCount = FreeConnectionQueue.Count + BusyConnectionDic.Count;
//BaseUtil.Logger.DebugFormat("totalCount:{0}", totalCount);
//if (totalCount > MaxConnectionCount)
//{
// System.Diagnostics.Debug.WriteLine("ConnectionCount:" + totalCount);
// BaseUtil.Logger.DebugFormat("more than totalCount:{0}",totalCount);
//}
IConnection mqConnection = null;
try
{
if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有連接數小於最大可用連接數,則直接創建新連接
{
lock (addConnLock)
{
if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)
{
mqConnection = CreateMQConnection();
BusyConnectionDic[mqConnection] = true;//加入到忙連接集合中
MQConnectionPoolUsingDicNew[mqConnection] = 1;
// BaseUtil.Logger.DebugFormat("Create a MQConnection:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
return mqConnection;
}
}
}
if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //如果沒有可用空閑連接,則重新進入等待排隊
{
// BaseUtil.Logger.DebugFormat("no FreeConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
return CreateMQConnectionInPoolNew();
}
else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //如果取到空閑連接,判斷是否使用次數是否超過最大限制,超過則釋放連接並重新創建
{
if (mqConnection.IsOpen)
{
mqConnection.Close();
}
mqConnection.Dispose();
// BaseUtil.Logger.DebugFormat("close > DefaultMaxConnectionUsingCount mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
mqConnection = CreateMQConnection();
MQConnectionPoolUsingDicNew[mqConnection] = 0;
// BaseUtil.Logger.DebugFormat("create new mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
}
BusyConnectionDic[mqConnection] = true;//加入到忙連接集合中
MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次數加1
// BaseUtil.Logger.DebugFormat("set BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
return mqConnection;
}
catch //如果在創建連接發生錯誤,則判斷當前是否已獲得Connection,如果獲得則釋放連接,最終都會釋放連接池計數
{
if (mqConnection != null)
{
ResetMQConnectionToFree(mqConnection);
}
else
{
MQConnectionPoolSemaphore.Release();
}
throw;
}
}
private static void ResetMQConnectionToFree(IConnection connection)
{
try
{
lock (freeConnLock)
{
bool result = false;
if (BusyConnectionDic.TryRemove(connection, out result)) //從忙隊列中取出
{
// BaseUtil.Logger.DebugFormat("set FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
}
else//若極小概率移除失敗,則再重試一次
{
if (!BusyConnectionDic.TryRemove(connection, out result))
{
BaseUtil.Logger.DebugFormat("failed TryRemove BusyConnectionDic(2 times):{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
}
}
if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//如果因為高並發出現極少概率的>MaxConnectionCount,則直接釋放該連接
{
connection.Close();
connection.Dispose();
}
else if (connection.IsOpen)//如果是OPEN狀態才加入空閑隊列,否則直接丟棄
{
FreeConnectionQueue.Enqueue(connection);//加入到空閑隊列,以便持續提供連接服務
}
}
}
catch
{
throw;
}
finally
{
MQConnectionPoolSemaphore.Release();//釋放一個空閑連接信號
}
//Interlocked.Decrement(ref connCount);
//BaseUtil.Logger.DebugFormat("Enqueue FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2},thread count:{3}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count,connCount);
}
/// <summary>
/// 發送消息
/// </summary>
/// <param name="connection">消息隊列連接對象</param>
/// <typeparam name="T">消息類型</typeparam>
/// <param name="queueName">隊列名稱</param>
/// <param name="durable">是否持久化</param>
/// <param name="msg">消息</param>
/// <returns></returns>
public static string SendMsg(IConnection connection, string queueName, string msg, bool durable = true)
{
bool reTry = false;
int reTryCount = 0;
string sendErrMsg = null;
do
{
reTry = false;
try
{
using (var channel = connection.CreateModel())//建立通訊信道
{
// 參數從前面開始分別意思為:隊列名稱,是否持久化,獨占的隊列,不使用時是否自動刪除,其他參數
channel.QueueDeclare(queueName, durable, false, false, null);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;//1表示不持久,2.表示持久化
if (!durable)
properties = null;
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish("", queueName, properties, body);
}
sendErrMsg = string.Empty;
}
catch (Exception ex)
{
if (BaseUtil.IsIncludeException<SocketException>(ex))
{
if ((++reTryCount) <= DefaultReTryConnectionCount)//可重試1次
{
ResetMQConnectionToFree(connection);
connection = CreateMQConnectionInPoolNew();
reTry = true;
}
}
sendErrMsg = ex.ToString();
}
finally
{
if (!reTry)
{
ResetMQConnectionToFree(connection);
}
}
} while (reTry);
return sendErrMsg;
}
/// <summary>
/// 消費消息
/// </summary>
/// <param name="connection">消息隊列連接對象</param>
/// <param name="queueName">隊列名稱</param>
/// <param name="durable">是否持久化</param>
/// <param name="dealMessage">消息處理函數</param>
/// <param name="saveLog">保存日志方法,可選</param>
public static void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null)
{
try
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, durable, false, false, null); //獲取隊列
channel.BasicQos(0, 1, false); //分發機制為觸發式
var consumer = new QueueingBasicConsumer(channel); //建立消費者
// 從左到右參數意思分別是:隊列名稱、是否讀取消息后直接刪除消息,消費者
channel.BasicConsume(queueName, false, consumer);
while (true) //如果隊列中有消息
{
ConsumeAction consumeResult = ConsumeAction.RETRY;
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息
string message = null;
try
{
var body = ea.Body;
message = Encoding.UTF8.GetString(body);
consumeResult = dealMessage(message);
}
catch (Exception ex)
{
if (saveLog != null)
{
saveLog(message, ex);
}
}
if (consumeResult == ConsumeAction.ACCEPT)
{
channel.BasicAck(ea.DeliveryTag, false); //消息從隊列中刪除
}
else if (consumeResult == ConsumeAction.RETRY)
{
channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列
}
else
{
channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄
}
}
}
}
catch (Exception ex)
{
if (saveLog != null)
{
saveLog("QueueName:" + queueName, ex);
}
throw ex;
}
finally
{
//MQConnectionPool[connection] = false;//改為空閑
ResetMQConnectionToFree(connection);
}
}
/// <summary>
/// 依次獲取單個消息
/// </summary>
/// <param name="connection">消息隊列連接對象</param>
/// <param name="QueueName">隊列名稱</param>
/// <param name="durable">持久化</param>
/// <param name="dealMessage">處理消息委托</param>
public static void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage)
{
bool reTry = false;
int reTryCount = 0;
ConsumeAction consumeResult = ConsumeAction.RETRY;
IModel channel = null;
BasicDeliverEventArgs ea = null;
do
{
reTry = false;
try
{
channel = connection.CreateModel();
channel.QueueDeclare(QueueName, durable, false, false, null); //獲取隊列
channel.BasicQos(0, 1, false); //分發機制為觸發式
uint msgCount = channel.MessageCount(QueueName);
if (msgCount > 0)
{
var consumer = new QueueingBasicConsumer(channel); //建立消費者
// 從左到右參數意思分別是:隊列名稱、是否讀取消息后直接刪除消息,消費者
channel.BasicConsume(QueueName, false, consumer);
ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
consumeResult = dealMessage(message);
}
else
{
dealMessage(string.Empty);
}
}
catch (Exception ex)
{
if (BaseUtil.IsIncludeException<SocketException>(ex))
{
if ((++reTryCount) <= DefaultReTryConnectionCount)//可重試1次
{
if (channel != null) channel.Dispose();
ResetMQConnectionToFree(connection);
connection = CreateMQConnectionInPoolNew();
reTry = true;
}
}
throw ex;
}
finally
{
if (!reTry)
{
if (channel != null && ea != null)
{
if (consumeResult == ConsumeAction.ACCEPT)
{
channel.BasicAck(ea.DeliveryTag, false); //消息從隊列中刪除
}
else if (consumeResult == ConsumeAction.RETRY)
{
channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列
}
else
{
channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄
}
}
if (channel != null) channel.Dispose();
ResetMQConnectionToFree(connection);
}
}
} while (reTry);
}
/// <summary>
/// 獲取隊列消息數
/// </summary>
/// <param name="connection"></param>
/// <param name="QueueName"></param>
/// <returns></returns>
public static int GetMessageCount(IConnection connection, string QueueName)
{
int msgCount = 0;
bool reTry = false;
int reTryCount = 0;
do
{
reTry = false;
try
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(QueueName, true, false, false, null); //獲取隊列
msgCount = (int)channel.MessageCount(QueueName);
}
}
catch (Exception ex)
{
if (BaseUtil.IsIncludeException<SocketException>(ex))
{
if ((++reTryCount) <= DefaultReTryConnectionCount)//可重試1次
{
ResetMQConnectionToFree(connection);
connection = CreateMQConnectionInPoolNew();
reTry = true;
}
}
throw ex;
}
finally
{
if (!reTry)
{
ResetMQConnectionToFree(connection);
}
}
} while (reTry);
return msgCount;
}
}
public enum ConsumeAction
{
ACCEPT, // 消費成功
RETRY, // 消費失敗,可以放回隊列重新消費
REJECT, // 消費失敗,直接丟棄
}
}
