什么是MSMQ
Message Queuing(MSMQ) 是微軟開發的消息中間件,可應用於程序內部或程序之間的異步通信。主要的機制是:消息的發送者把自己想要發送的信息放入一個容器中(我們稱之為Message),然后把它保存至一個系統公用空間的消息隊列(Message Queue)中;本地或者是異地的消息接收程序再從該隊列中取出發給它的消息進行處理。下圖展示了這一流程

MSMQ隊列是一個可持久的隊列,因此不必擔心不間斷地插入隊列會導致數據的丟失,在網站系統中不用擔心網站掛掉后數據丟失問題。
MSMQ微軟消息隊列,這個是個很好的異步通信技術。非常類似於我們手機發短信,雖然對方關機了,但只要一開機通信服務器仍然自動會傳送信息。所以MSMQ有這樣的時間監控技術.很實用.可以確保通信的穩定性.
使用MSMQ的基本流程
1. 啟動MSMQ服務,【控制面板】--【程序與功能】--【關閉/打開windows功能】--添加MSMQ功能,勾選全部選項。
控制面板->控制面板->所有控制面板項->程序和功能->選中安裝

2. 創建Message Queue隊列。
3. 打開Message Queue隊列。
4. 將消息發送至Message Queue隊列或者從Message Queue隊列中接收消息。
5. 關閉Message Queue隊列。
C# 中使用MSMQ
使用MessageQueue類操作MSMQ,其在System.Messaging命名空間下,需要添加引用
定義的接口
public interface IMessageSender<T> : IDisposable
{
/// <summary>
/// 發送消息
/// </summary>
/// <param name="message">消息對象</param>
void SendMessage(T message);
void SendMessages(List<T> message);
/// <summary>
/// 發送消息
/// </summary>
/// <param name="message">消息對象</param>
/// <param name="label">消息標簽</param>
void SendMessage(T message, string label);
}
接口實現
/// <summary>
/// 消息隊列對象,由MQueueFactory創建指定路徑的隊列對象,可發送或批量接收消息。
/// </summary>
/// <typeparam name="T">消息隊列存儲的消息對象類型</typeparam>
public sealed class MQueue<T> : IDisposable, IMessageSender<T>, IMessageReceiver<T>
{
public MQueue(MessageQueue mq, string user = "Everyone")
{
InnerQueue = mq;
InnerQueue.Formatter = new XmlMessageFormatter(new[] { typeof(T) });
InnerQueue.SetPermissions(user ?? "Everyone",
MessageQueueAccessRights.GenericRead | MessageQueueAccessRights.DeleteMessage |
MessageQueueAccessRights.DeleteQueue | MessageQueueAccessRights.DeleteJournalMessage);
}
#region IMessageSender
/// <summary>
/// 內部消息隊列對象
/// </summary>
private MessageQueue InnerQueue { get; set; }
/// <summary>
/// 發送消息
/// </summary>
/// <param name="message">消息對象</param>
public void SendMessage(T message)
{
InnerQueue.Send(message);
}
public void SendMessages(List<T> message)
{
foreach (var item in message)
{
InnerQueue.Send(item);
}
}
/// <summary>
/// 發送消息
/// </summary>
/// <param name="message">消息對象</param>
/// <param name="label">消息標簽</param>
public void SendMessage(T message, string label)
{
try
{
InnerQueue.Send(message, label);
}
catch (Exception ex)
{
var path = InnerQueue.Path;
InnerQueue = new MessageQueue(path);
}
}
#endregion
#region IMessageReceiver
/// <summary>
/// 獲取隊列中所有消息
/// </summary>
/// <typeparam name="T">消息類型</typeparam>
/// <param name="exTarget">異常時觸發</param>
/// <returns></returns>
public List<T> GetAllMessages<T>(MQExceptionTarget exTarget = null)
{
return GetMessagesByNum<T>(null, exTarget);
}
/// <summary>
/// 獲取隊列中指定數量消息
/// </summary>
/// <typeparam name="T">消息類型</typeparam>
/// <param name="num">一次最多取num條數據,默認取所有數據</param>
/// <param name="exTarget">異常委托</param>
/// <returns></returns>
public List<T> GetMessagesByNum<T>(int? num = null, MQExceptionTarget exTarget = null)
{
var list = new List<T>();
if (num.HasValue && num <= 0)
{
return list;
}
MessageEnumerator enumerator = InnerQueue.GetMessageEnumerator2();
while (enumerator.MoveNext())
{
Message msg = enumerator.RemoveCurrent();
enumerator.Reset();
if (msg != null)
{
try
{
list.Add((T)msg.Body);
if (num.HasValue && list.Count >= num)
{
break;
}
}
catch (Exception ex)
{
if (exTarget != null)
exTarget(ex);
}
}
}
return list;
}
#endregion
public void Dispose()
{
if (InnerQueue != null)
{
InnerQueue.Dispose();
}
}
}
建立隊列工廠
/// <summary>
/// 消息隊列工廠,通過指定路徑創建或獲取相應隊列對象
/// </summary>
public class MQueueFactory
{
/// <summary>
/// 默認隊列路徑,在未指定路徑的情況下,將創建並返回該路徑的消息隊列對象
/// </summary>
private const string DefaultPath = @".\private$\myQueue";
/// <summary>
/// 創建默認路徑的消息隊列對象
/// </summary>
/// <typeparam name="T">消息隊列存儲的消息對象類型</typeparam>
/// <returns></returns>
public static MQueue<T> Create<T>()
{
return Create<T>(DefaultPath);
}
/// <summary>
/// 創建指定路徑的消息隊列路徑
/// </summary>
/// <typeparam name="T">消息隊列存儲的消息對象類型</typeparam>
/// <param name="connStr">指定的消息隊列鏈接字符串 def:".\private$\myQueue"</param>
/// <param name="autoCreate">不存在則創建隊列 遠程地址不能創建</param>
/// <param name="user">獲得隊列額外權限的個人、組或計算機</param>
/// <param name="cacheKey">web中Cache鍵值</param>
/// <returns></returns>
public static MQueue<T> Create<T>(string connStr=@".\private$\myQueue", bool autoCreate = false, string user = "Everyone", string cacheKey = "MQCache")
{
string path = connStr ?? DefaultPath;
HttpContext httpContext = HttpContext.Current;
if (autoCreate && !MessageQueue.Exists(path))
{
MessageQueue.Create(path);
}
var mq = new MessageQueue(path);
if (httpContext != null)
{
string key = "MQueue" + typeof(T).Name + cacheKey;
if ((httpContext.Cache[key] == null))
{
httpContext.Cache[key] = new MQueue<T>(mq);
}
return httpContext.Cache[key] as MQueue<T>;
}
return new MQueue<T>(mq,user);
}
}
寫入隊列
MQueueFactory.Create<string>(@".\private$\myQueue", autoCreate: true).SendMessage("我是寫入的數據~~~");
獲取消息
MQueueFactory.Create<string>(@".\private$\myQueue").GetAllMessages<string>();
