MSMQ理論+實踐(上)


關於MSMQ使用的文章,網上一搜一大把,為什么還要寫呢?因為別人的終究是別人的,看一遍,只是過眼雲煙罷了,還是要自己動手實踐一下,才能把別人的變成自己的。再者就是網上大都是在一台電腦上寫的demo,我還是想知道在不同的電腦上是怎么使用的,畢竟要應用到項目中的話,消息中間件肯定是應用於分布式系統中傳遞消息了,所以,不管三七二十一,先來實踐一把。

MSMQ簡介:(摘自百度百科)

MicroSoft Message Queuing(微軟消息隊列)是在多個不同的應用之間實現相互通信的一種異步傳輸模式,相互通信的應用可以分布於同一台機器上,也可以分布於相連的網絡空間中的任一位置。它的實現原理是:消息的發送者把自己想要發送的信息放入一個容器中(我們稱之為Message),然后把它保存至一個系統公用空間的消息隊列(Message Queue)中;本地或者是異地的消息接收程序再從該隊列中取出發給它的消息進行處理。

第一步:安裝,因為MSMQ是Windows自帶的消息中間件,所以不需要下載,只要在Windows程序打開關閉新功能中啟用MSMQ服務即可。

 等待完成后,在計算機管理中,可以看到消息隊列菜單選項

第二步,新建解決方案,三層,分別是對消息操作的封裝,以及消息的消費者與消息的生產者

Common中需要引用System.Messaging.dll,值得注意的是MSMQProducer中也需要引用System.Messaging.dll,為什么呢?

因為QueueManger這個幫助類中SendMessage方法帶了一個默認值為null(MessageQueueTransaction tran = null)的參數,所以也需要引用這個dll。當然可以改造這個方法,就不必引用了,只是尊重原作者,我就啥也沒改,添加一下引用,能跑起來就行。

QueueManger這個類源碼取自https://www.cnblogs.com/shaoshun/p/3800208.html這篇博文,寫得很好,就直接拿來用了,感謝博主!

using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.Text;
using System.Threading.Tasks;
namespace Common
{
    public class QueueManger
    {
        /// <summary>
        /// 創建MSMQ隊列
        /// </summary>
        /// <param name="queuePath">隊列路徑</param>
        /// <param name="transactional">是否事務隊列</param>
        public static void Createqueue(string queuePath, bool transactional = false)
        {
            try
            {
                //判斷隊列是否存在
                if (!MessageQueue.Exists(queuePath))
                {
                    MessageQueue.Create(queuePath);
                    Console.WriteLine(queuePath + "已成功創建!");
                }
                else
                {
                    Console.WriteLine(queuePath + "已經存在!");
                }
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
        }
        /// <summary>
        /// 刪除隊列
        /// </summary>
        /// <param name="queuePath"></param>
        public static void Deletequeue(string queuePath)
        {
            try
            {
                //判斷隊列是否存在
                if (MessageQueue.Exists(queuePath))
                {
                    MessageQueue.Delete(queuePath);
                    Console.WriteLine(queuePath + "已刪除!");
                }
                else
                {
                    Console.WriteLine(queuePath + "不存在!");
                }
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
        }
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <typeparam name="T">用戶數據類型</typeparam>
        /// <param name="target">用戶數據</param>
        /// <param name="queuePath">隊列名稱</param>
        /// <param name="tran"></param>
        /// <returns></returns>
        public static bool SendMessage<T>(T target, string queuePath, MessageQueueTransaction tran = null)
        {
            try
            {
                //連接到本地的隊列
                MessageQueue myQueue = new MessageQueue(queuePath);
                System.Messaging.Message myMessage = new System.Messaging.Message();
                myMessage.Body = target;
                myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                //發送消息到隊列中
                if (tran == null)
                {
                    myQueue.Send(myMessage);
                }
                else
                {
                    myQueue.Send(myMessage, tran);
                }
                Console.WriteLine("消息已成功發送到" + queuePath + "隊列!");
                return true;
            }
            catch (ArgumentException e)
            {
                Console.WriteLine(e.Message);
                return false;
            }
        }
        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T">用戶的數據類型</typeparam>
        /// <param name="queuePath">消息路徑</param>
        /// <returns>用戶填充在消息當中的數據</returns>
        public static T ReceiveMessage<T>(string queuePath, MessageQueueTransaction tran = null)
        {
            //連接到本地隊列
            MessageQueue myQueue = new MessageQueue(queuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                //從隊列中接收消息
                System.Messaging.Message myMessage = tran == null ? myQueue.Receive() : myQueue.Receive(tran);
                return (T)myMessage.Body; //獲取消息的內容
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
            catch (InvalidCastException e)
            {
                Console.WriteLine(e.Message);
            }
            return default(T);
        }
        /// <summary>
        /// 采用Peek方法接收消息
        /// </summary>
        /// <typeparam name="T">用戶數據類型</typeparam>
        /// <param name="queuePath">隊列路徑</param>
        /// <returns>用戶數據</returns>
        public static T ReceiveMessageByPeek<T>(string queuePath)
        {
            //連接到本地隊列
            MessageQueue myQueue = new MessageQueue(queuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                //從隊列中接收消息
                System.Messaging.Message myMessage = myQueue.Peek();
                return (T)myMessage.Body; //獲取消息的內容
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
            catch (InvalidCastException e)
            {
                Console.WriteLine(e.Message);
            }
            return default(T);
        }
        /// <summary>
        /// 獲取隊列中的所有消息
        /// </summary>
        /// <typeparam name="T">用戶數據類型</typeparam>
        /// <param name="queuePath">隊列路徑</param>
        /// <returns>用戶數據集合</returns>
        public static List<T> GetAllMessage<T>(string queuePath)
        {
            MessageQueue myQueue = new MessageQueue(queuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                Message[] msgArr = myQueue.GetAllMessages();
                List<T> list = new List<T>();
                msgArr.ToList().ForEach((o) =>
                {
                    list.Add((T)o.Body);
                });
                return list;
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }
            return null;
        }
    }
}
View Code

生產者代碼:

class Program
    {
        static void Main(string[] args)
        {
            string queuepath = @".\private$\myQueue";
            QueueManger.Createqueue(queuepath);
            Student stu = new Student() { Name = "dengwei", Age = 18 };
            QueueManger.SendMessage<Student>(stu, queuepath);
            Console.ReadKey();
        }
    }

消費者代碼:注意與生產者queuePath的區別,因為我需要模擬的是遠程訪問消息隊列

TCP:FormatName:Direct=TCP:IP\\private$\\queueName

HTTP:FormatName:Direct=http://IP/msmq/private$/queueName(沒測試,下一篇測試)

class Program
    {
        static void Main(string[] args)
        {
            string queuepath = "FormatName:Direct=TCP:10.101.98.197\\private$\\myQueue";
            Student stu = QueueManger.ReceiveMessageByPeek<Student>(queuepath);
            Console.WriteLine(stu.Name);
            Console.ReadKey();
        }
    }

編譯完成之后,把消費者的可執行文件拷貝到另一台電腦,注意,這台電腦也必須啟用MSMQ才能正常通訊,執行程序,拋出異常了,訪問不了。

 於是又查了一番資料,改注冊表,分配權限等,都做了一遍,還是訪問不了,最后才發現還要消息隊列屬性中去掉禁用未經身份驗證的RPC調用。默認是勾上的

配置完成之后,再次調用,訪問成功。

 

 

每天,進步一點點...

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM