在上一篇我簡單介紹了MSMQ的相關概念,本篇將以代碼說明
Message
Message是MSMQ的數據存儲單元,我們的用戶數據一般也被填充在Message的body當中,因此很重要,讓我們來看一看其在.net中的體現,如圖:
在圖上我們可以看見,Message提供了三個構造函數,參數body表示我們的用戶數據,當我們在構造函數中傳入,數據最終會被賦值給其同名屬性body,參數formatter對應同名屬性Formatter,它是一個序列化器,當我們的用戶數據是一個復雜類型,比如類的時候Message會自動采用該序列化器將我們的復雜類型序列化。message支持3種序列化對象:
-- XMLMessageFormatter對象----MessageQueue組件的默認格式化程序設置。
-- BinaryMessageFormatter對象;
-- ActiveXMessageFormatter對象;
由於后兩者格式化后的消息通常不能為人閱讀,所以我們經常用到的是XMLMessageFormatter對象。該對象構造方法有三種重載:
public XmlMessageFormatter(); public XmlMessageFormatter(string[] targetTypeNames); public XmlMessageFormatter(Type[] targetTypes);
MSMQ隊列
消息(Message)需要保存在msmq隊列中,.net中采用System.Messaging.MessageQueue來管理MSMQ隊列,它提供能操作MSMQ的絕大多數API,比如
1.判斷指定路徑的隊列是否存在。其中path代表隊列的路徑,表示形式為"主機名\隊列名稱",例如:".\private$\myQueue",其中"."代表本地主機,"\private$\myQueue"則代表隊列的名稱,"private$"表示我們創建的是專用隊列,在網絡上我們可以通過路徑來唯一確定一個隊列。
public static bool Exists(string path);
2.創建隊列。path代表隊列的路徑,transactional表示是否創建事務隊列,默認為fasle。關於事務隊列我在上一篇做了詳細的論述,這里不在重復。
public static MessageQueue Create(string path); public static MessageQueue Create(string path, bool transactional);
3.刪除隊列
public static void Delete(string path);
4.發送消息到MSMQ。obj代表我們的用戶數據,transation表示將我們的發送操作納入事務當中。在前面我們說過MSMQ接收的是Message,但是在這里我們看到Send操作並未強制要求我們采用Message類型參數。這是因為當我傳入一個Object參數數據時,在Send操作的內部自動的給我們創建了一個Message消息對象,並且將我們的傳入的Object參數采用默認的序列化器序列化,然后裝入Message的body屬性當中,如果我們在Send方法中指定label屬性,它將被賦值給Message的同名Label屬性。當然我們完全可以自定義一個message對象傳入Send方法中
public void Send(object obj); public void Send(object obj, MessageQueueTransaction transaction); public void Send(object obj, string label);
5.接收消息。同理接收消息也可以被納入事務當中,采用Receive方法在取MSMQ的消息時,如果成功,會把MSMQ的對應消息給刪除掉,並且只能取到消息隊里中的排隊頭的消息。
public Message Receive(); public Message Receive(MessageQueueTransaction transaction); public Message Receive(TimeSpan timeout);
如果我們想取指定標識的消息,就的采用如下的方法了,id代表消息的唯一標示。
public Message ReceiveById(string id); public Message ReceiveById(string id, MessageQueueTransaction transaction);
如果我們在接收消息的后,不想把MSMQ隊列中的消息刪除怎么辦呢?那么采用下面的方法吧,因為這兩個方法接收MSMQ的消息,不會刪除MSMQ中對應的消息,所以他們不支持事務,即沒有提供事務的參數。
public Message Peek(); public Message PeekById(string id);
我們也可以一次性吧隊列里面的所有消息取出來
public Message[] GetAllMessages();
實例
說了這么多,下面讓我們來代碼實戰一下,我們采用控制台程序做測試,我把MSMQ隊列做了簡單的封裝,如下

using System; using System.Collections.Generic; using System.Linq; using System.Messaging; using System.Text; using System.Threading.Tasks; namespace Test { public class QueueManger { /// <summary> /// 創建MSMQ隊列 /// </summary> /// <param name="queuePath">隊列路徑</param> /// <param name="transactional">是否事務隊列</param> public static void Createqueue(string queuePath, bool transactional = false) { try { //判斷隊列是否存在 if (!MessageQueue.Exists(queuePath)) { MessageQueue.Create(queuePath); Console.WriteLine(queuePath + "已成功創建!"); } else { Console.WriteLine(queuePath + "已經存在!"); } } catch (MessageQueueException e) { Console.WriteLine(e.Message); } } /// <summary> /// 刪除隊列 /// </summary> /// <param name="queuePath"></param> public static void Deletequeue(string queuePath) { try { //判斷隊列是否存在 if (MessageQueue.Exists(queuePath)) { MessageQueue.Delete(@".\private$\myQueue"); Console.WriteLine(queuePath + "已刪除!"); } else { Console.WriteLine(queuePath + "不存在!"); } } catch (MessageQueueException e) { Console.WriteLine(e.Message); } } /// <summary> /// 發送消息 /// </summary> /// <typeparam name="T">用戶數據類型</typeparam> /// <param name="target">用戶數據</param> /// <param name="queuePath">隊列名稱</param> /// <param name="tran"></param> /// <returns></returns> public static bool SendMessage<T>(T target, string queuePath, MessageQueueTransaction tran = null) { try { //連接到本地的隊列 MessageQueue myQueue = new MessageQueue(queuePath); System.Messaging.Message myMessage = new System.Messaging.Message(); myMessage.Body = target; myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); //發送消息到隊列中 if (tran == null) { myQueue.Send(myMessage); } else { myQueue.Send(myMessage, tran); } Console.WriteLine("消息已成功發送到"+queuePath + "隊列!"); return true; } catch (ArgumentException e) { Console.WriteLine(e.Message); return false; } } /// <summary> /// 接收消息 /// </summary> /// <typeparam name="T">用戶的數據類型</typeparam> /// <param name="queuePath">消息路徑</param> /// <returns>用戶填充在消息當中的數據</returns> public static T ReceiveMessage<T>(string queuePath,MessageQueueTransaction tran=null) { //連接到本地隊列 MessageQueue myQueue = new MessageQueue(queuePath); myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); try { //從隊列中接收消息 System.Messaging.Message myMessage = tran == null ? myQueue.Receive() : myQueue.Receive(tran); return (T)myMessage.Body; //獲取消息的內容 } catch (MessageQueueException e) { Console.WriteLine(e.Message); } catch (InvalidCastException e) { Console.WriteLine(e.Message); } return default(T); } /// <summary> /// 采用Peek方法接收消息 /// </summary> /// <typeparam name="T">用戶數據類型</typeparam> /// <param name="queuePath">隊列路徑</param> /// <returns>用戶數據</returns> public static T ReceiveMessageByPeek<T>(string queuePath) { //連接到本地隊列 MessageQueue myQueue = new MessageQueue(queuePath); myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); try { //從隊列中接收消息 System.Messaging.Message myMessage = myQueue.Peek(); return (T)myMessage.Body; //獲取消息的內容 } catch (MessageQueueException e) { Console.WriteLine(e.Message); } catch (InvalidCastException e) { Console.WriteLine(e.Message); } return default(T); } /// <summary> /// 獲取隊列中的所有消息 /// </summary> /// <typeparam name="T">用戶數據類型</typeparam> /// <param name="queuePath">隊列路徑</param> /// <returns>用戶數據集合</returns> public static List<T> GetAllMessage<T>(string queuePath) { MessageQueue myQueue = new MessageQueue(queuePath); myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); try { Message[] msgArr= myQueue.GetAllMessages(); List<T> list=new List<T>(); msgArr.ToList().ForEach((o) => { list.Add((T)o.Body); }); return list; } catch(Exception e) { Console.WriteLine(e.Message); } return null; } } }
我們的用戶實體也很簡單,如下

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Test { public class Student { /// <summary> /// 年齡 /// </summary> public int Age { get; set; } /// <summary> /// 姓名 /// </summary> public string Name { get; set; } } }
下面我們來創建一個隊列,如圖我們成功的創建了"myqueue"隊列

using System; using System.Collections.Generic; using System.Linq; using System.Messaging; using System.Text; using System.Threading.Tasks; namespace Test { class Program { static void Main(string[] args) { string queuepath = @".\private$\myQueue"; QueueManger.Createqueue(queuepath); Console.ReadKey(); } } }
下面我們向隊列中發送消息。如圖,從圖右邊可以看到消息成功被加入到隊列中

using System; using System.Collections.Generic; using System.Linq; using System.Messaging; using System.Text; using System.Threading.Tasks; namespace Test { class Program { static void Main(string[] args) { string queuepath = @".\private$\myQueue"; //QueueManger.Createqueue(queuepath); Student stu = new Student() { Name="shaoshun",Age=18}; QueueManger.SendMessage<Student>(stu, queuepath); Console.ReadKey(); } } }
接着我們采用Peek方法接收消息(即不移除MSMQ的對應消息),很顯然圖中的Message依然存在MSMQ隊列中

using System; using System.Collections.Generic; using System.Linq; using System.Messaging; using System.Text; using System.Threading.Tasks; namespace Test { class Program { static void Main(string[] args) { string queuepath = @".\private$\myQueue"; //QueueManger.Createqueue(queuepath); //Student stu = new Student() { Name="shaoshun",Age=18}; //QueueManger.SendMessage<Student>(stu, queuepath); Student stu= QueueManger.ReceiveMessageByPeek<Student>(queuepath); Console.WriteLine(stu.Name); Console.ReadKey(); } } }
接着我們采用Receive方法來接收消息。這個時候我們可以很明顯的看見MSMQ原來對應的消息被刪除了

using System; using System.Collections.Generic; using System.Linq; using System.Messaging; using System.Text; using System.Threading.Tasks; namespace Test { class Program { static void Main(string[] args) { string queuepath = @".\private$\myQueue"; //QueueManger.Createqueue(queuepath); //Student stu = new Student() { Name="shaoshun",Age=18}; //QueueManger.SendMessage<Student>(stu, queuepath); //Student stu= QueueManger.ReceiveMessageByPeek<Student>(queuepath); Student stu = QueueManger.ReceiveMessage<Student>(queuepath); Console.WriteLine(stu.Name); Console.ReadKey(); } } }
最后讓我來測試,MSMQ的事務性。我們先刪除我們的隊列,在重新創建。我們連續向隊列中插入5個消息,但是在插入第5個消息的時候我們拋出異常,如果MSMQ支持事務的話那么前面發送的4個Message將被回滾掉,MSMQ隊列中應該為0個消息

using System; using System.Collections.Generic; using System.Linq; using System.Messaging; using System.Text; using System.Threading.Tasks; namespace Test { class Program { static void Main(string[] args) { string queuepath = @".\private$\myQueue"; //QueueManger.Createqueue(queuepath); //Student stu = new Student() { Name="shaoshun",Age=18}; //QueueManger.SendMessage<Student>(stu, queuepath); //Student stu= QueueManger.ReceiveMessageByPeek<Student>(queuepath); //Student stu = QueueManger.ReceiveMessage<Student>(queuepath); //Console.WriteLine(stu.Name); QueueManger.Deletequeue(queuepath); QueueManger.Createqueue(queuepath); MessageQueueTransaction tran = new MessageQueueTransaction(); tran.Begin(); try { Student stu; for (int i = 0; i < 4; i++) { stu=new Student(){Name="shaoshun"+i,Age=i}; QueueManger.SendMessage<Student>(stu, queuepath, tran); if (i == 3) { throw new Exception(); } } tran.Commit(); } catch { tran.Abort(); } Console.ReadKey(); } } }
另外值得注意的是,MSMQ的消息發送與接收,采用的是同步的方式。這樣假如我們的消息隊列中一個消息都沒有,我們調用Receive()去接收該隊列的消息會怎么樣呢? 程序會被阻塞在這里,直到消息隊列中有消息,程序才會接着往下走。碰到這種情況是很要命的,但是不怕MSMQ支持異步消息,由於篇幅有限我就不在多少,這里我給出一個異步操作的鏈接有興趣的朋友可以去研究下,點擊 這里