MSMQ(消息隊列)續


上一篇我簡單介紹了MSMQ的相關概念,本篇將以代碼說明

Message

    Message是MSMQ的數據存儲單元,我們的用戶數據一般也被填充在Message的body當中,因此很重要,讓我們來看一看其在.net中的體現,如圖:

    在圖上我們可以看見,Message提供了三個構造函數,參數body表示我們的用戶數據,當我們在構造函數中傳入,數據最終會被賦值給其同名屬性body,參數formatter對應同名屬性Formatter,它是一個序列化器,當我們的用戶數據是一個復雜類型,比如類的時候Message會自動采用該序列化器將我們的復雜類型序列化。message支持3種序列化對象:

-- XMLMessageFormatter對象----MessageQueue組件的默認格式化程序設置。

-- BinaryMessageFormatter對象;

-- ActiveXMessageFormatter對象;

    由於后兩者格式化后的消息通常不能為人閱讀,所以我們經常用到的是XMLMessageFormatter對象。該對象構造方法有三種重載:

public XmlMessageFormatter(); 
public XmlMessageFormatter(string[] targetTypeNames); 
public XmlMessageFormatter(Type[] targetTypes);

MSMQ隊列

      消息(Message)需要保存在msmq隊列中,.net中采用System.Messaging.MessageQueue來管理MSMQ隊列,它提供能操作MSMQ的絕大多數API,比如

     1.判斷指定路徑的隊列是否存在。其中path代表隊列的路徑,表示形式為"主機名\隊列名稱",例如:".\private$\myQueue",其中"."代表本地主機,"\private$\myQueue"則代表隊列的名稱,"private$"表示我們創建的是專用隊列,在網絡上我們可以通過路徑來唯一確定一個隊列。

public static bool Exists(string path);

     2.創建隊列。path代表隊列的路徑,transactional表示是否創建事務隊列,默認為fasle。關於事務隊列我在上一篇做了詳細的論述,這里不在重復。

public static MessageQueue Create(string path);
public static MessageQueue Create(string path, bool transactional);

    3.刪除隊列

public static void Delete(string path);

    4.發送消息到MSMQ。obj代表我們的用戶數據,transation表示將我們的發送操作納入事務當中。在前面我們說過MSMQ接收的是Message,但是在這里我們看到Send操作並未強制要求我們采用Message類型參數。這是因為當我傳入一個Object參數數據時,在Send操作的內部自動的給我們創建了一個Message消息對象,並且將我們的傳入的Object參數采用默認的序列化器序列化,然后裝入Message的body屬性當中,如果我們在Send方法中指定label屬性,它將被賦值給Message的同名Label屬性。當然我們完全可以自定義一個message對象傳入Send方法中

public void Send(object obj); public void Send(object obj, MessageQueueTransaction transaction); public void Send(object obj, string label); 

    5.接收消息。同理接收消息也可以被納入事務當中,采用Receive方法在取MSMQ的消息時,如果成功,會把MSMQ的對應消息給刪除掉,並且只能取到消息隊里中的排隊頭的消息。

public Message Receive();
public Message Receive(MessageQueueTransaction transaction);
public Message Receive(TimeSpan timeout);

    如果我們想取指定標識的消息,就的采用如下的方法了,id代表消息的唯一標示。

public Message ReceiveById(string id);
public Message ReceiveById(string id, MessageQueueTransaction transaction);

   如果我們在接收消息的后,不想把MSMQ隊列中的消息刪除怎么辦呢?那么采用下面的方法吧,因為這兩個方法接收MSMQ的消息,不會刪除MSMQ中對應的消息,所以他們不支持事務,即沒有提供事務的參數。

public Message Peek();
public Message PeekById(string id);

   我們也可以一次性吧隊列里面的所有消息取出來

public Message[] GetAllMessages();

實例 

    說了這么多,下面讓我們來代碼實戰一下,我們采用控制台程序做測試,我把MSMQ隊列做了簡單的封裝,如下

using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.Text;
using System.Threading.Tasks;

namespace Test
{
    public class QueueManger
    {
        /// <summary>
        /// 創建MSMQ隊列
        /// </summary>
        /// <param name="queuePath">隊列路徑</param>
        /// <param name="transactional">是否事務隊列</param>
        public static void Createqueue(string queuePath, bool transactional = false)
        {
            try
            {
                //判斷隊列是否存在
                if (!MessageQueue.Exists(queuePath))
                {
                    MessageQueue.Create(queuePath);
                    Console.WriteLine(queuePath + "已成功創建!");
                }
                else
                {
                    Console.WriteLine(queuePath + "已經存在!");
                }
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
        }
        /// <summary>
        /// 刪除隊列
        /// </summary>
        /// <param name="queuePath"></param>
        public static void Deletequeue(string queuePath)
        {
            try
            {
                //判斷隊列是否存在
                if (MessageQueue.Exists(queuePath))
                {
                    MessageQueue.Delete(@".\private$\myQueue");
                    Console.WriteLine(queuePath + "已刪除!");
                }
                else
                {
                    Console.WriteLine(queuePath + "不存在!");
                }
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
        }
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <typeparam name="T">用戶數據類型</typeparam>
        /// <param name="target">用戶數據</param>
        /// <param name="queuePath">隊列名稱</param>
        /// <param name="tran"></param>
        /// <returns></returns>
        public static bool SendMessage<T>(T target, string queuePath, MessageQueueTransaction tran = null)
        {
            try
            {
                //連接到本地的隊列
                MessageQueue myQueue = new MessageQueue(queuePath);
                System.Messaging.Message myMessage = new System.Messaging.Message();
                myMessage.Body = target;
                myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                //發送消息到隊列中
                if (tran == null)
                {
                    myQueue.Send(myMessage);
                }
                else
                {
                    myQueue.Send(myMessage, tran);
                }
                Console.WriteLine("消息已成功發送到"+queuePath + "隊列!");
                return true;
            }
            catch (ArgumentException e)
            {
                Console.WriteLine(e.Message);
                return false;
            }
        }
        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T">用戶的數據類型</typeparam>
        /// <param name="queuePath">消息路徑</param>
        /// <returns>用戶填充在消息當中的數據</returns>
        public static T ReceiveMessage<T>(string queuePath,MessageQueueTransaction tran=null)
        {
            //連接到本地隊列
            MessageQueue myQueue = new MessageQueue(queuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                //從隊列中接收消息
                System.Messaging.Message myMessage = tran == null ? myQueue.Receive() : myQueue.Receive(tran);
                return (T)myMessage.Body; //獲取消息的內容
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
            catch (InvalidCastException e)
            {
                Console.WriteLine(e.Message);
            }
            return default(T);
        }
        /// <summary>
        /// 采用Peek方法接收消息
        /// </summary>
        /// <typeparam name="T">用戶數據類型</typeparam>
        /// <param name="queuePath">隊列路徑</param>
        /// <returns>用戶數據</returns>
        public static T ReceiveMessageByPeek<T>(string queuePath)
        {
            //連接到本地隊列
            MessageQueue myQueue = new MessageQueue(queuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                //從隊列中接收消息
                System.Messaging.Message myMessage = myQueue.Peek();
                return (T)myMessage.Body; //獲取消息的內容
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
            catch (InvalidCastException e)
            {
                Console.WriteLine(e.Message);
            }
            return default(T);
        }
        /// <summary>
        /// 獲取隊列中的所有消息
        /// </summary>
        /// <typeparam name="T">用戶數據類型</typeparam>
        /// <param name="queuePath">隊列路徑</param>
        /// <returns>用戶數據集合</returns>
        public static List<T> GetAllMessage<T>(string queuePath)
        {
            MessageQueue myQueue = new MessageQueue(queuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                Message[] msgArr=  myQueue.GetAllMessages();
                List<T> list=new List<T>();
                msgArr.ToList().ForEach((o) => 
                {
                    list.Add((T)o.Body);
                });
                return list;
            }
            catch(Exception e)
            {
                Console.WriteLine(e.Message);
            }
            return null;
        }
    }
}
View Code

我們的用戶實體也很簡單,如下

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Test
{
    public class Student
    {
        /// <summary>
        /// 年齡
        /// </summary>
        public int Age { get; set; }
        /// <summary>
        /// 姓名
        /// </summary>
        public string Name { get; set; }
    }
}
View Code

下面我們來創建一個隊列,如圖我們成功的創建了"myqueue"隊列

using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.Text;
using System.Threading.Tasks;

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            string queuepath = @".\private$\myQueue";
            QueueManger.Createqueue(queuepath);
            Console.ReadKey();
        }
    }
}
View Code

下面我們向隊列中發送消息。如圖,從圖右邊可以看到消息成功被加入到隊列中

using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.Text;
using System.Threading.Tasks;

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            string queuepath = @".\private$\myQueue";
            //QueueManger.Createqueue(queuepath);
            Student stu = new Student() { Name="shaoshun",Age=18};
            QueueManger.SendMessage<Student>(stu, queuepath);
            Console.ReadKey();
        }
    }
}
View Code

接着我們采用Peek方法接收消息(即不移除MSMQ的對應消息),很顯然圖中的Message依然存在MSMQ隊列中

using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.Text;
using System.Threading.Tasks;

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            string queuepath = @".\private$\myQueue";
            //QueueManger.Createqueue(queuepath);
            //Student stu = new Student() { Name="shaoshun",Age=18};
            //QueueManger.SendMessage<Student>(stu, queuepath);
            Student stu=  QueueManger.ReceiveMessageByPeek<Student>(queuepath);
            Console.WriteLine(stu.Name);
            Console.ReadKey();
        }
    }
}
View Code

接着我們采用Receive方法來接收消息。這個時候我們可以很明顯的看見MSMQ原來對應的消息被刪除了

using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.Text;
using System.Threading.Tasks;

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            string queuepath = @".\private$\myQueue";
            //QueueManger.Createqueue(queuepath);
            //Student stu = new Student() { Name="shaoshun",Age=18};
            //QueueManger.SendMessage<Student>(stu, queuepath);
            //Student stu=  QueueManger.ReceiveMessageByPeek<Student>(queuepath);
            Student stu = QueueManger.ReceiveMessage<Student>(queuepath);
            Console.WriteLine(stu.Name);
            Console.ReadKey();
        }
    }
}
View Code

    最后讓我來測試,MSMQ的事務性。我們先刪除我們的隊列,在重新創建。我們連續向隊列中插入5個消息,但是在插入第5個消息的時候我們拋出異常,如果MSMQ支持事務的話那么前面發送的4個Message將被回滾掉,MSMQ隊列中應該為0個消息

using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.Text;
using System.Threading.Tasks;

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            string queuepath = @".\private$\myQueue";
            //QueueManger.Createqueue(queuepath);
            //Student stu = new Student() { Name="shaoshun",Age=18};
            //QueueManger.SendMessage<Student>(stu, queuepath);
            //Student stu=  QueueManger.ReceiveMessageByPeek<Student>(queuepath);
            //Student stu = QueueManger.ReceiveMessage<Student>(queuepath);
            //Console.WriteLine(stu.Name);

            QueueManger.Deletequeue(queuepath);
            QueueManger.Createqueue(queuepath);
            MessageQueueTransaction tran = new MessageQueueTransaction();
            tran.Begin();
            try
            {
                Student stu;
                for (int i = 0; i < 4; i++)
                {
                    stu=new Student(){Name="shaoshun"+i,Age=i};
                    QueueManger.SendMessage<Student>(stu, queuepath, tran);
                    if (i == 3)
                    {
                        throw new Exception();
                    }
                }
                tran.Commit();
            }
            catch
            {
                tran.Abort();
            }
            Console.ReadKey();
        }
    }
}
View Code

 

     另外值得注意的是,MSMQ的消息發送與接收,采用的是同步的方式。這樣假如我們的消息隊列中一個消息都沒有,我們調用Receive()去接收該隊列的消息會怎么樣呢? 程序會被阻塞在這里,直到消息隊列中有消息,程序才會接着往下走。碰到這種情況是很要命的,但是不怕MSMQ支持異步消息,由於篇幅有限我就不在多少,這里我給出一個異步操作的鏈接有興趣的朋友可以去研究下,點擊 這里 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM