在一個大型的分布式系統中,消息隊列是不可缺少的中間件,能很好的解決異步消息、應用解耦、均衡並發等問題。在.net中,偶然發現一個效率不錯、安全可靠、功能齊全的消息組件,忍不住翻譯過來,供大家快速預覽。
注:原作者用windows服務啟動消息隊列服務,但是本人在win10上測試出錯,可自行改成控制台啟動消息隊列服務,然后用第三方工具注冊服務(如:SrvanyUI)
原文:http://www.codeproject.com/Articles/193611/DotNetMQ-A-Complete-Message-Queue-System-for-NET
正文:
一個新的、獨立的、開源的,完全基於C#和.NET Framework3.5的消息隊列系統
文章概要
- 介紹
- 什么是消息傳遞?
- 什么是DotNetMQ?
- 為什么要一個新的消息代理?
- 消息代理的必要性
- 現有的消息代理
- 安裝、運行DotNetMQ
- 第一個DotNetMQ程序
- 注冊應用程序到DotNetMQ
- 開發Application1
- 開發Application2
- 消息屬性:傳送規則(Transmit Rule)
- 客戶端屬性:通訊方式(CommunicationWay)
- 客戶端屬性:出錯時重新連接服務器(ReConnectServerOnError)
- 客戶端屬性:自動確認消息(AutoAcknowledgeMessages)
- 配置DotNetMQ
- 服務端
- 應用程序
- 路由/負載均衡
- 其他設置
- 網絡傳輸消息
- 一個簡單的應用程序
- 一個真實的案例:分布式短信處理器(Distributed SMS Processor)
- 請求/應答式通信
- 面向服務架構的DotNetMQ
- 簡單應用程序:短息/郵件發送器
- 服務端
- 客戶端
- Web服務支持
- 簡單應用程序:短息/郵件發送器
- DotNetMQ性能
- 歷史
- 引用
介紹
在這篇文章中,我將介紹一個新的、獨立的、開源的,完全基於C#和.NET Framework3.5的消息隊列系統,DotNetMQ是一個消息代理,它包括確保傳輸,路由,負載均衡,服務器圖等等多項功能。我將從解釋消息的概念和消息代理的必要性講起,然后,我會說明什么是DotNetMQ,以及如何使用它。
什么是消息傳遞
消息傳遞是一種異步通信方式,具體就是在同一個或不同的機器上運行的多個應用程序之間可靠的消息傳遞。應用程序通過發送一種叫消息的數據包和其他應用程序通信。
一個消息可以是一個字符串,一個字節數組,一個對象等等。通常情況下,一個發送者(生產者)程序創建一個消息,並將其推送到一個消息隊列,然后一個接受者(消費者)程序從隊列中獲取這個消息並處理它。發送程序和接受程序不需要同時運行,因為消息傳遞是一個異步過程。這就是所謂的松耦合通信。
另一方面,Web服務方法調用(遠程方法調用)是一種緊耦合的同步通信(這兩個應用程序在整個通信的過程中都必須是運行着並且可用,如果Web服務脫機或在方法調用期間發生錯誤,那么客戶端應用程序將得到一個異常)。

圖 - 1:兩個應用程序間最簡單的消息傳遞。
在上圖中,兩個應用程序通過消息隊列進行松散耦合方式通信。如果接受者處理消息的速度慢於發送者產生消息的速度,那么隊列里的消息數就會增加。此外,在發送者發送消息的過程中,接受者可能是離線的。在這種情況下,當接收者上線后,它會從隊列中得到消息(當它開始並加入這個隊列時)。
消息隊列通常由消息代理提供。消息代理是一個獨立的應用程序(一個服務),其他應用程序通過連接它發送、接收消息。在消息被接收者接收之前,消息代理負責存儲消息。消息代理可以通過路由多台機器把消息傳送給目標應用程序,在消息被接收者正確處理之前,消息代理會一直嘗試傳送它。有時候消息代理也被稱為面向消息的中間件(Message-Oriented-Middleware MOM)或者簡單的叫消息隊列(Message Queue MQ).
什么是DotNetMQ?
DotNetMQ是一個開源的消息代理,它有以下幾個特點:
- 持久和非持久的消息發送。
- 即使在系統崩潰時,也會保證持久消息的傳送。
- 可在一個機器圖里自動和手動設置消息的路由。
- 支持多種數據庫(MS SQL Server,MySQL,SQLite,和一些現有的基於內存的存儲)
- 支持不存儲,直接發送及時消息。
- 支持請求/應答式的消息。
- 用客戶端類庫和DotNetMQ消息代理通信很方便
- 內置的框架,可以輕松地在消息隊列上構建RMI服務。
- 支持把消息傳送給ASP.NET Web服務。
- 基於圖形界面的管理和監控工具。
- 易於安裝,管理和使用。
- 完全由C#開發(使用.NET Framework 3.5)。
在開始創建它的時候,我更喜歡叫它為MDS(消息傳送系統 Message Delivery System)。因為它不僅是一個消息隊列,而且還是一個直接傳送消息到應用程序的系統和一個提供了建立應用服務框架的環境。我把它叫做DotNetMQ,是因為它完全由.NET開發,而且這個名字也更好記。所以它原來的名字是MDS,以至於源碼里有許多以MDS為前綴的類。
為什么要一個新的消息代理?
消息代理的必要性
首先,我將演示一個需要消息代理的簡單情況。
在我的業務經歷中,我見到過一些非常糟糕且不尋常的異步企業應用集成解決方案。通常是運行在一台服務器上的一個程序執行一些任務,並且產生一些數據,然后將結果數據發送到另一台服務器上的另一個程序。第二個應用在數據上執行其他任務或計算結果(這台服務器在同一網絡中或是通過互聯網連接)。另外,消息數據必須是持久的。即使遠程程序沒有工作或網絡不可用,消息必須第一時間發送過去。
讓我們來看看下面的設計圖:

圖 - 2:一個糟糕的集成應用程序解決方案。
Application -1 和Application -2是可執行程序(或是Windows服務),Sender Service是一個Windows服務。Application -1執行一些任務,產生數據,並調用Server-B服務器上的Remote Web Service方法來傳輸數據。這個web服務將數據插入到數據表。Application -2定期檢查數據表來獲得新的數據行並處理它們(然后從表中刪除它們,或將其標記為已處理,避免處理重復數據)。
如果在調用Web服務時或Web服務處理數據時出錯,數據不能丟失,並且稍后必須重發。但是,Application -1有其他任務要做,所以它不能一次又一次的嘗試重發數據。它只是將數據插入到數據表。另一個Windows服務(如果Application -1是一直運行的,也可以使里的一個線程)定期檢查這個表,並嘗試將數據發送到Web服務,直到數據成功發送。
這個解決方案的確是可靠的(消息確保傳送了),但它不是兩個應用程序之間通信的有效方式。該解決方案有一些非常關鍵的問題:
- 需要很長的開發時間(去編碼)。
- 要定制所有的消息類型(或遠程方法調用),對於一個新的Web服務方法調用,你必須改變所有的服務、應用程序和數據表。
- 對每一個相似的服務,必須開發基本上一樣的軟件和結構(或復制,然后修改)。
- 編碼后需要對服務、程序、數據庫做太多的測試和維護。
- 一些程序和服務在沒有新消息的時候,還是會定期檢查數據庫(如果數據庫沒有很好的索引和優化,這可能會嚴重消耗系統資源)。
現在用消息代理來做這所有的事情,用最有效的方式負責將消息傳送給遠程應用。同一應用程序集成用上DotNetMQ展示於下圖。

圖 - 3:使用DotNetMQ的簡單消息傳遞。
DotNetMQ是一個獨立的Windows服務,分別運行在Server-A和Server-B服務器上。因此,你只需編寫代碼和DotNetMQ通信。使用DotNetMQ客戶端類庫,和DotNetMQ服務發送、接收信息是非常容易和快速的。Application -1准備消息,設置目標,並將消息傳遞給DotNetMQ代理。DotNetMQ代理將以最有效和最快的方式傳遞給Application -2。
現有的消息代理
很顯然,在集成應用程序中消息代理是有必要的。我網上搜索,查找書籍,想找一個免費的(最好也是開源的)而且是.Net用起來很容易的消息代理。讓我們看看我找到了什么:
- Apache ActiveMQ(http://activemq.apache.org):它是開源的,並且實現了JMS(Java Message Service,java消息服務在java世界里是一個標准的消息傳輸API)。它也有一個.NET客戶端類庫。我為了更加了解,讀完了“ActiveMQ in Action”整本書,並且開發了一些簡單的應用。即使我通讀了這本書,我沒有看到一個簡單可靠的方式來構建一個共同合作和路有消息的ActiveMQ服務圖。我也沒有看到如何給一個消息設置目標服務器。它自動路由消息,但我不能有效的控制路由的路徑。我的理解是,它通常和Apache Camel(http://camel.apache.org)一起使用來實現常見的應用集成模式。Apache Camel也是另一個需要去了解的領域,更糟的是,它只使用Java。最后,我認為它不夠簡單易用,特別是配置,監控和管理。於是我放棄了對ActiveMQ的研究。
- MSMQ(http://msdn.microsoft.com/en-us/library/ms711472(VS.85).aspx):這是來自微軟的解決方案,是.NET應用程序最合適的框架。它很容易使用和學習,而且它有工具看檢測隊列和消息。它尤其適用於那些運行在同一台機器上,或可以直接連接到同一台機器的應用程序間的異步通信。但我無法找到一個內置的解決方案,構建一個MSMQ服務器圖來路由消息。因為路由是我的出發點,所以我只好淘汰掉這個消息代理。
- RabbitMQ(http://www.rabbitmq.com):它是由Erlang(有愛立信開發的一種編程語言)開發的。你需要先安裝Erlang。我花了很多時間來安裝,配置,並寫了一個示例程序。它有一個.NET客戶端,但當我試圖開發並運行一個簡單的程序是,出現很多錯誤。很難安裝,很難使不同服務器上的兩個RabbitMQ協同工作。過了幾天,我就放棄了,因為我覺得學習並開始開發程序不應該那么難。
- OpenAMQ(http://www.openamq.org),ZeroMQ(http://www.zeromq.org):我總體研究了這兩個消息代理,但我發現我不能輕易做我想用.NET想做的事。
- 其他:我還發現了一些其他的項目,但它們缺失一些重要的功能如路由,持久消息傳遞,請求/應答消息...等。
如你所見,在上面的列表中沒有哪一個消息代理是完全由.NET開發的。
從用戶角度來看,我只是想通過“消息數據,目標服務器和應用程序名稱”來定位我的代理。其他的我都不關心。他將會根據需要在網絡上多次路由一個消息,最后發送到目標服務器的目標程序上。我的消息傳送系統必須為我提供這個便利。這是我的出發點。我根據這一點大概設計了消息代理的結構。下圖顯示了我想要的。

圖 - 4:自動路由消息的消息代理服務器圖。
Application -1 傳遞一個消息到本地服務器(Server-A)上的消息代理:
- 目標服務器:Server-D
- 目標應用程序:Application -2
- 消息數據:應用程序特定的數據
Server-A沒有直接和Server-D連接。因此,消息代理在服務器間轉發消息(這個消息依次通過Server-A,Server-B,Server-C,Server-D),消息最后到達Server-D上的消息代理,然后傳遞給Application -2。注意在Server-E上也有一個Application-2在運行,但是它不會收到這個消息,因為消息的目標服務器是Server-D。
DotNetMQ提供了這種功能和便利。它在服務器圖上找到最佳的(最短的)路徑把消息從原服務器轉發到目標服務器。
經過這種全面的介紹會,讓我們看看如果在實踐中使用DotNetMQ。
安裝、運行DotNetMQ
現在還沒有實現自動安裝,不過安裝DotNetMQ是非常容易的。下載並解壓文章開始提供的二進制文件。只需將所有的東西復制到C:\Progame Files\DotNetMQ\下,然后運行INSTALL_x86.bat(如果你用的是64位系統,那么將執行INSTALL_x64)。
你可以檢查Windows服務,看看DotNetMQ是否已經安裝並正常工作。
第一個DotNetMQ程序
讓我們看看實際中的DotNetMQ。為了使第一個程序足夠簡單,我假設是同一台機器上的兩個控制台應用程序(實際上,就像我們待會在文章中看到的那個,和在兩台機器上的兩個應用程序是沒什么顯著差異的,只是需要設置一下消息的目標服務器名字而已)。
- Application1:從用戶輸入那里得到一個字符串消息,並將其發送到Application2.
- Application2:在控制台上打印出傳入的消息。
注冊應用程序到DotNetMQ
我們的應用程序為了使用DotNetMQ,要先注冊一下,只需操作一次,是一個非常簡單的過程。運行DotNetMQ管理器(DotNETMQ文件夾下的MDSManager.exe,如上所訴,默認是在C:\Programe Files\DotNetMQ\文件夾下),並在Applications菜單中打開Application類表。點擊Add New Appliction按鈕,輸入應用程序名稱。
如上所述,添加Application1和Application2到DotNetMQ。最后,你的應用程序列表應該像下面這樣。

圖 - 5:DotNetMQ管理工具的應用程序列表界面。
開發Application1
在Visual Studio中創建一個名稱為Application1的控制台應用程序,並添加MDSCommonLib.dll引用,這個dll文件里提供了連接到DotNetMQ必需的一些類。然后在Program.cs文件中寫上下面的代碼:
using System; using System.Text; using MDS.Client; namespace Application1 { class Program { static void Main(string[] args) { //Create MDSClient object to connect to DotNetMQ //Name of this application: Application1 var mdsClient = new MDSClient("Application1"); //Connect to DotNetMQ server mdsClient.Connect(); Console.WriteLine("Write a text and press enter to send " + "to Application2. Write 'exit' to stop application."); while (true) { //Get a message from user var messageText = Console.ReadLine(); if (string.IsNullOrEmpty(messageText) || messageText == "exit") { break; } //Create a DotNetMQ Message to send to Application2 var message = mdsClient.CreateMessage(); //Set destination application name message.DestinationApplicationName = "Application2"; //Set message data message.MessageData = Encoding.UTF8.GetBytes(messageText); //Send message message.Send(); } //Disconnect from DotNetMQ server mdsClient.Disconnect(); } } }
在創建MDSClient對象時,我們把要連接的應用程序名稱傳給構造函數,用這個構造函數,我們將用默認端口(10905)連接本地服務器(127.0.0.1)上的DotNetMQ。重載的構造函數可以用於連接其他服務器和端口。
MDSClient的CreateMessage方法返回一個IOutgoingMessage的對象。對象的MessageData屬性是實際發送給目標應用程序的數據,它是一個字節數組。我們使用UTF8編碼把用戶輸入的文本轉換成字節數組。對象的DestinationApplicationName和DestinationServerName屬性是用於設置消息的目標地址。如果我們沒有指定目標服務器,默認就是本地服務器。最后,我們發送這個消息對象。
開發Application2
在Visual Studio里創建一個新的控制台應用程序,命名為Application2,添加MDSCommonLib.dll並寫下以下代碼:
using System; using System.Text; using MDS.Client; namespace Application2 { class Program { static void Main(string[] args) { //Create MDSClient object to connect to DotNetMQ //Name of this application: Application2 var mdsClient = new MDSClient("Application2"); //Register to MessageReceived event to get messages. mdsClient.MessageReceived += MDSClient_MessageReceived; //Connect to DotNetMQ server mdsClient.Connect(); //Wait user to press enter to terminate application Console.WriteLine("Press enter to exit..."); Console.ReadLine(); //Disconnect from DotNetMQ server mdsClient.Disconnect(); } /// <summary> /// This method handles received messages from other applications via DotNetMQ. /// </summary> /// <param name="sender"></param> /// <param name="e">Message parameters</param> static void MDSClient_MessageReceived(object sender, MessageReceivedEventArgs e) { //Get message var messageText = Encoding.UTF8.GetString(e.Message.MessageData); //Process message Console.WriteLine(); Console.WriteLine("Text message received : " + messageText); Console.WriteLine("Source application : " + e.Message.SourceApplicationName); //Acknowledge that message is properly handled //and processed. So, it will be deleted from queue. e.Message.Acknowledge(); } } }
我們用和Application1相似的方法創建一個MDSClient對象,不同的就是連接應用程序的名稱是Application2。為了接收消息,需要給MDSClient對象注冊MessageReceived事件。然后我們連接DotNetMQ,直到用戶輸入Enter才斷開。
當一個消息發送給Application2是,MDSClient_MessageReceived方法就會被調用來處理消息。我們從MessageReceivedEventArgs參數對象的Message屬性可以得到發送過來的消息。這個消息的類型是IIncomingMessage。IIncomingMessage對象的MessageData屬性實際包含了由Application1發送的消息數據。由於它是一個字節數組,我們用UTF8編碼把它轉換成字符串。然后把文本消息打印到控制台上。

圖 - 6:Application1通過DotNetMQ發送兩個消息到Application2。
處理傳入消息之后,還需要來確認這個消息。這表示消息已經正確接收並處理。然后DotNetMQ將從消息隊列中把消息刪除。我們也可以用Reject方法拒絕一個消息(如果在出錯的情況下我們不能處理這個消息)。在這種情況下,該消息將回到消息隊列,稍后再試着發到目標應用程序(如果在同一個服務器上存在另一個Application2的實體,也可能發到另一個上)。這是DotNetMQ系統的一個強大機制。因此,可以確保消息不會丟失並絕對可以被處理。如果你不確認或拒絕一個消息,系統假設是被拒絕的。所以,即使你的應用程序崩潰了,在你的應用程序正常運行后,還是會收到消息的。
如果你在同一台服務器上運行多個Application2的實例,哪一個會收到消息呢?在這種情況下,DotNetMQ會把消息順序地發給這多個實例。所以你可以創建多發送/接收的系統。一個消息只能被一個實例接收(實例接收相互不同的消息)。DotNetMQ提供這所有功能和同步。
消息屬性:傳送規則(Transmit Rule)
在發送一個消息之前,你可以像這樣設置一個消息的Transmit Rule屬性:
message.TransmitRule = MessageTransmitRules.NonPersistent;
傳送規則有三種類型:
- StoreAndForward:這個是默認傳送規則,消息是持久的,不會丟失的,並且使確保傳送的。如果Send方法沒有拋出異常,就表明消息已被DotNetMQ接收,而且存儲到了數據庫。直到目標應用程序接收並確認了它,這個消息會一直存儲在數據庫里。
- NonPersistent:消息不會存儲到數據庫,這是發送消息最快的方式。僅在DotNetMQ服務停止工作,消息才會丟失。
- DirectlySend:這個是DotNetMQ獨有的功能。這種類型的消息直接發送給目標應用程序。在接收者確認一個消息之前,發送者程序是一直被阻塞的。所以,如果發送者在調用Send方法的過程中沒有發生異常,就意味着該消息被接受者正確接收並確認。如果在傳送消息時發生錯誤,或接受者處於脫機狀態,或者接受者拒絕了消息,發送者在調用Send方法時都會得到一個異常。即使應用程序是在不同的服務器上(更即使在應用程序之間有許多服務器要路由),這個規則依然能正常工作。
由於默認的傳送規則是StoreAndForward,讓我們試試下面這些:
- 運行Application1(這時Application2沒有運行),輸入一些消息,然后關閉程序。
- 運行Application2,你將看到消息沒有丟失,而是被Application2接收了。
即使在Application1發送過消息后,你停止了DotNetMQ服務,你的消息也是不會丟失的,這就叫持久化。
客戶端屬性:通訊方式(CommunicationWay)
默認情況下,一個應用程序可以通過MDSClient發送和接收消息(CommunicationWays.SendAndReceive)。如果一個應用程序不需要接收消息,可以設置MDSClient的CommunicationWay為CommunicationWays.Send。這個屬性在連接DotNetMQ之前或在和DotNetMQ通信中都可以改變。
客戶端屬性:出錯時重新連接服務器(ReConnectServerOnError)
默認情況下,MDSClient由於某種原因斷開DotNetMQ時會自動重連。所以,即使你重啟DotNetMQ服務,也不用重啟你的應用程序。你可以把ReconnectServerOnError設置為false來禁用自動重連。
客戶端屬性:自動確認消息(AutoAcknowledgeMessages)
默認情況下,你必須在MessageReceived事件中顯式的確認消息。否則,系統將認為消息是被拒絕了。如果你想改變這種行為,你需要把AutoAcknowledgeMessages屬性設為true。在這種情況下,如果你的MessageReceived事件處理程序沒有拋出異常,你也沒有顯式確認和拒絕一個消息,系統將自動確認該消息(如果拋出異常,該消息將被拒絕)。
配置DotNetMQ
有兩種方式可以配置DotNetMQ:通過XML配置文件或用DotNetMQ管理工具(一個Windows Forms程序),這里我分別演示這兩種方法,有些配置是及時生效的,而有些則需要重啟DotNetMQ。
服務端
你可以只在一台服務器上運行DotNetMQ,在這種情況下,是不需要為服務器配置任何東西的。但如果你想在多台服務器上運行DotNetMQ並使它們相互通信,你就需要定義服務器圖了。
一個服務器圖包含兩個或更多個節點,每一個節點都是一個具有IP地址和TCP端口(被DotNetMQ用的那個)的服務器。你可以用DotNetMQ管理器配置/設計一個服務器圖。

圖 - 8:DotNetMQ服務器圖管理。
在上圖中,你看到了一個包含5個節點的服務器圖。紅色節點表示當前服務器(當前服務器就是你用DotNetMQ管理器連接的那個)。直線表示兩個節點(它們互為相鄰節點)是可連接的(它們可以發送/接收消息)。服務器/節點圖形中的名稱是很重要的,它被用來向該服務器發送消息。
你可以雙擊圖形中的一個服務器來編輯它的屬性。為了連接兩個服務器,你要按住Ctrl鍵,點擊第一個再點擊第二個(斷開連接也是相同的操作)。你可以通過點擊右鍵,選擇Set as this server來設置管理器連接該服務器。你可以從圖中刪除一個服務器或通過右鍵菜單添加一個新的服務器。最后,你可以通過拖拽添加或移除服務器。
當你設計好服務器圖之后,你必須點擊Save & Update Graph按鈕來保存這些修改。這些修改將保存在DotNetMQ安裝目錄的MDSSettings.xml文件里。你必須重啟DotNetMQ才能應用這些修改。
對於上面的服務器圖,對應的MDSSettings.xml設置如下:
<?xml version="1.0" encoding="utf-8"?> <MDSConfiguration> <Settings> ... </Settings> <Servers> <Server Name="halil_pc" IpAddress="192.168.10.105" Port="10099" Adjacents="emre_pc" /> <Server Name="emre_pc" IpAddress="192.168.10.244" Port="10099" Adjacents="halil_pc,out_server,webserver1,webserver2" /> <Server Name="out_server" IpAddress="85.19.100.185" Port="10099" Adjacents="emre_pc" /> <Server Name="webserver1" IpAddress="192.168.10.263" Port="10099" Adjacents="emre_pc,webserver2" /> <Server Name="webserver2" IpAddress="192.168.10.44" Port="10099" Adjacents="emre_pc,webserver1" /> </Servers> <Applications> ... </Applications> <Routes> ... </Routes> </MDSConfiguration>
當然,這個配置是要根據你實際的網絡進行的。你必須在圖中所有服務器上安裝DotNetMQ。此外,還必須在所有服務器上配置相同的服務器圖(你可以很容易地從XML文件復制服務器節點到其他服務器上)。
DotNetMQ采用段路徑算法發送消息(沒有在XML配置文件里手動定義路由的情況下)。考慮這個情景,運行在halil_pc的Application A發送一個消息到webserver2上的Application B,路徑是很簡單的:Application A -> halil_pc -> emre_pc -> webserver2 -> Application B。halil_pc通過服務器圖定義知道下一個要轉發到的服務器(emre_pc)。
最后,MDSSettings.design.xml包含了服務器圖的設計信息(節點在屏幕上的位置)。這個文件只是用於DotNetMQ管理器的服務器圖窗體,運行時的DotNetMQ服務是不需要的。
應用程序
就像圖 - 5顯示的那樣,你可以把和DotNetMQ關聯的應用程序作為消息代理來添加/刪除。對於這些修改是不需要重啟DotNetMQ的。應用程序的配置也保存在MDSSettings.xml文件里,就像下面這樣:
<?xml version="1.0" encoding="utf-8"?> <MDSConfiguration> ... <Applications> <Application Name="Application1" /> <Application Name="Application2" /> </Applications> ... </MDSConfiguration>
一個應用程序必須在這個列表里才能和DotNetMQ連接。如果你直接修改xml文件,你必須重啟DotNetMQ服務才能生效。
路由/負載均衡
DotNetMQ的有一個路由功能。現在路由設置只能通過MDSSettings.xml設置。你可以看到下面文件里有兩種路由設置:
<?xml version="1.0" encoding="utf-8" ?> <MDSConfiguration> ... <Routes> <Route Name="Route-App2" DistributionType="Sequential" > <Filters> <Filter DestinationServer="this" DestinationApplication="Application1" /> </Filters> <Destinations> <Destination Server="Server-A" Application="Application1" RouteFactor="1" /> <Destination Server="Server-B" Application="Application1" RouteFactor="1" /> <Destination Server="Server-C" Application="Application1" RouteFactor="1" /> </Destinations> </Route> <Route Name="Route-App2" DistributionType="Random" > <Filters> <Filter DestinationServer="this" DestinationApplication="Application2" /> <Filter SourceApplication="Application2" TransmitRule="StoreAndForward" /> </Filters> <Destinations> <Destination Server="Server-A" Application="Application2" RouteFactor="1" /> <Destination Server="Server-B" Application="Application2" RouteFactor="3" /> </Destinations> </Route> </Routes> ... </MDSConfiguration>
每個路由節點有兩個屬性:Name屬性是對用戶友好的顯示(不影響路由功能),DistributionType是路由的策略。這里有兩種類型的路由策略:
- Sequential:消息依次順序的路由到目標服務器。Destination的RouteFactor是分發因子。
- Random:消息隨機的路由到目標服務器。選擇Server-A服務器的概率是:(Server-A的RouteFactor)/(Destinations里所有RouteFactor的總和)。
Filters用於決定消息使用哪個路由。如果一個消息的屬性和其中一個過濾器匹配,該消息就會被路由。這有5個條件(XML的5個屬性)來定義一個過濾器:
- SourceServer:消息的第一個源服務器,可以用this表示當前服務器。
- SourceApplication:發現消息的應用程序。
- DestinationServer:消息的最終目標服務器,可以用this表示當前服務器。
- DestinationApplication:接收消息的應用程序。
- TransmitRule:消息傳送規則的一種(StoreAndForward,DirectlySend,NonPersistent)。
過濾消息時,不會考慮沒有定義的條件。所以,如果所有的條件都是空的(或直接沒定義),那么所有的消息都適合這個過濾器。只有所有的條件都匹配時,一個過濾器才適合這個消息。如果一個消息正確匹配(至少是過濾器定義的都匹配)一個路由中的一個過濾器,那么這個路由將被選擇並使用。
Destinations是用來將消息路由到其他服務器用的。一個目標服務器被選中是根據Route節點的DistributionType屬性(前面解釋過)決定的。一個destination節點必須定義三個屬性:
- Server:目標服務器,可以用this表示當前服務器。
- Application:目標應用程序,目標應用程序通常和消息的原目標程序是一樣的,不過這里你可以重定向到另一個應用程序。
- RouteFactor:這個屬性用於表明一個目標服務器被選中的相對比率,可以用來做負載均衡。如果你想把消息平均分發到所有服務器上,你可以把所有目標服務器的FouteFactor屬性都設為1。但是如果你有兩台服務器,其中一台比另一台性能強大的多,你可以通過設置這個路由因子來達到選擇第一台服務器的概率是第二台的兩倍以上。
修改路由配置,必須重啟DotNetMQ才會生效。
其他設置
目前DotNetMQ支持3中存儲類型:SQLite(默認),MySQL和內存(譯者注:根據下面內容,還支持MSSQL)。你可以在MDSSettings.xml修改存儲類型。
- SQLite:使用SQLite數據庫系統。這個是默認存儲類型,使用(DotNetMQ安裝目錄\SqliteDB\MDS.s3db)文件作為數據庫。
- MSSQL:使用微軟SQL Server數據庫,你需要提供ConnectionString屬性作為連接字符串(下面會說到)。
- MySQL-ODBC:通過ODBC使用MySQL數據庫,你需要提供ConnectionString數據作為連接字符串。
- MySQL-Net:通過.NET Adapter(.NET適配器)使用MySQL數據庫,你需要提供ConnectionString數據作為連接字符串。
- Memory:使用內存作為存儲設備。在這種情況下,如果DotNetMQ停止了,持久性消息會丟失。
下面是一個使用MySQL-ODBC作為存儲的簡單配置:
<Settings> <Setting Key="ThisServerName" Value="halil_pc" /> <Setting Key="StorageType" Value="MySQL-ODBC" /> <Setting Key="ConnectionString" Value="uid=root;server=localhost;driver={MySQL ODBC 3.51 Driver};database=mds" /> </Settings>
你可以在Setup\Databases文件夾(這個文件夾在DotNetMQ的安裝目錄)找到所需的文件,然后創建數據庫和數據表,以供DotNetMQ使用。如果你有什么問題,可以隨時問我。
還有一個設置是定義"current/this"這個名稱代表哪台服務器的,這個值必須是Servers節點里的一個服務器名。如果你用DotNetMQ管理器編輯服務器圖,這個值是自動設置的。
網絡傳輸消息
向一個網絡服務器的應用程序發消息是和向同一個服務器的應用程序發消息一樣簡單的。
一個簡單的應用程序
讓我們考慮下面這個網絡:

圖 - 8:兩個應用程序通過DotNetMQ在網絡上通信。
運行在ServerA上的Application1想發消息到ServerC上的Application2,由於防火牆的規則,ServerA和ServerC不能直接連接。讓我們修改一下在第一個DotNetMQ程序里開發的程序。
Application2甚至一點有不用修改,只要把Application2上ServerC上運行並等待傳入的消息即可。
Application1只是在如何發消息的地方稍微改動一點,就是設置DestinationServerName(目標服務器名)為ServerC。
var message = mdsClient.CreateMessage(); message.DestinationServerName = "ServerC"; //Set destination server name here! message.DestinationApplicationName = "Application2"; message.MessageData = Encoding.UTF8.GetBytes(messageText); message.Send();
就這樣,就完事兒了。你不需要知道ServerC在哪里,也不需要直接連接ServerC...這些全部定義在DotNetMQ設置里。注意:如果你不給一個消息設置DestinationServerName,系統假設目標服務器就是"current/this"指定的那台服務器,DotNetMQ也將把消息發送到同一台服務器上的應用程序。另外,如果你定義了必要的路由,你就不必設置目標服務器了,DotNetMQ會自動地路由消息。
當然,DotNetMQ的設置必須根據服務器間的連接(服務器圖)來設置,並且Application1和Application2必須像配置DotNetMQ部分說的那樣注冊到DotNetMQ服務器。
一個真實的案例:分布式短信處理器(Distributed SMS Processor)
正如你已看到的那樣,DotNetMQ可以用於構建分布式,負載均衡應用系統。在本節中,我將討論一個生活中真實的場景:一個分布式消息處理系統。
假定有一個用於音樂比賽投票的短消息(MSM)服務。所有競賽者唱過他們的歌曲后,觀眾給他們最喜歡的歌手投票,會發一條像"VOTE 103"這樣的短信到我們的短息服務器。並假定這次投票會在短短的30分鍾完成,大約有五百萬人發短息到我們的服務。
我們將會接收每一條短息,處理它(格式化短息文本,修改數據庫,以便增加選手的票數),並要發送確認消息給發送者。我們從兩台服務器接收消息,在四台服務器上處理消息,然后從兩台服務器上發送確認消息。我們總共有八台服務器。讓我們看看完整的系統示意圖:

圖 - 9:分布式短信處理系統
這里有三種類型的應用:接受者,處理器,和發送者。在這種情況下,你就可以使用DotNetMQ作為消息隊列和負載均衡器,通過配置服務器圖和路由(就像配置DotNetMQ小節中描述的那樣),來構建一個分布式的,可擴展的消息處理系統。
請求/應答式通信
在許多情況下,一個應用發一個消息到另一個應用,然后得到一個應答消息。DotNetMQ對這種通信方式有內置的支持。考慮這樣一個服務:用於查詢庫存的狀態。這里有兩種消息類型:
[Serializable] public class StockQueryMessage { public string StockCode { get; set; } } [Serializable] public class StockQueryResultMessage { public string StockCode { get; set; } public int ReservedStockCount { get; set; } public int TotalStockCount { get; set; } }
下面展示了一個簡單的庫存服務。
using System; using MDS; using MDS.Client; using StockCommonLib; namespace StockServer { class Program { static void Main(string[] args) { var mdsClient = new MDSClient("StockServer"); mdsClient.MessageReceived += MDSClient_MessageReceived; mdsClient.Connect(); Console.WriteLine("Press enter to exit..."); Console.ReadLine(); mdsClient.Disconnect(); } static void MDSClient_MessageReceived(object sender, MessageReceivedEventArgs e) { //Get message var stockQueryMessage = GeneralHelper.DeserializeObject(e.Message.MessageData) as StockQueryMessage; if (stockQueryMessage == null) { return; } //Write message content Console.WriteLine("Stock Query Message for: " + stockQueryMessage.StockCode); //Get stock counts from a database... int reservedStockCount; int totalStockCount; switch (stockQueryMessage.StockCode) { case "S01": reservedStockCount = 14; totalStockCount = 80; break; case "S02": reservedStockCount = 0; totalStockCount = 25; break; default: //Stock does not exists! reservedStockCount = -1; totalStockCount = -1; break; } //Create a reply message for stock query var stockQueryResult = new StockQueryResultMessage { StockCode = stockQueryMessage.StockCode, ReservedStockCount = reservedStockCount, TotalStockCount = totalStockCount }; //Create a MDS response message to send to client var responseMessage = e.Message.CreateResponseMessage(); responseMessage.MessageData = GeneralHelper.SerializeObject(stockQueryResult); //Send message responseMessage.Send(); //Acknowledge the original request message. //So, it will be deleted from queue. e.Message.Acknowledge(); } } }
這個庫存服務監聽進來的StockQueryMessage消息對象,然后把StockQueryResultMessage消息對象發送給查詢者。為了簡單起見,我沒有從數據庫查詢庫存。應答消息對象是由傳入消息對象的CreateResponseMessage()方法創建的。最后,發出回應消息后要確認進入的消息。現在,我展示一個簡單的庫存客戶端從服務器查詢庫存的示例:
using System; using MDS; using MDS.Client; using MDS.Communication.Messages; using StockCommonLib; namespace StockApplication { class Program { static void Main(string[] args) { Console.WriteLine("Press enter to query a stock status"); Console.ReadLine(); //Connect to DotNetMQ var mdsClient = new MDSClient("StockClient"); mdsClient.MessageReceived += mdsClient_MessageReceived; mdsClient.Connect(); //Create a stock request message var stockQueryMessage = new StockQueryMessage { StockCode = "S01" }; //Create a MDS message var requestMessage = mdsClient.CreateMessage(); requestMessage.DestinationApplicationName = "StockServer"; requestMessage.TransmitRule = MessageTransmitRules.NonPersistent; requestMessage.MessageData = GeneralHelper.SerializeObject(stockQueryMessage); //Send message and get response var responseMessage = requestMessage.SendAndGetResponse(); //Get stock query result message from response message var stockResult = (StockQueryResultMessage) GeneralHelper.DeserializeObject(responseMessage.MessageData); //Write stock query result Console.WriteLine("StockCode = " + stockResult.StockCode); Console.WriteLine("ReservedStockCount = " + stockResult.ReservedStockCount); Console.WriteLine("TotalStockCount = " + stockResult.TotalStockCount); //Acknowledge received message responseMessage.Acknowledge(); Console.ReadLine(); //Disconnect from DotNetMQ server. mdsClient.Disconnect(); } static void mdsClient_MessageReceived(object sender, MessageReceivedEventArgs e) { //Simply acknowledge other received messages e.Message.Acknowledge(); } } }
在上面的示例中,為了演示目的TransmitRule設置成了NonPersistent(非持久)。當然,你可以發送StoreAndForward(持久性)消息。這個是程序運行的截圖:

圖 - 10:請求/應答式的通信應用。
面向服務架構的DotNetMQ
SOA(面向服務的架構)是以個流行多年的概念了。Web服務和WCF是兩個主要的SOA解決方案。一般情況下,一個消息隊列系統是不會預期支持SOA的。同時,消息通信是異步的,松耦合的過程,而Web服務方法調用則通常是同步的,緊耦合的。即使(正如你在前面示例程序中看到的那樣)消息通信並不如調用一個遠程方法一樣簡單,但是當你的消息數增加,你的應用變復雜以至於難以維護時就不一樣了。DotNetMQ支持持久性和非持久性的遠程調用機制,所有你可以異步地調用一個遠程方法,DotNetMQ會確保調用成功。
簡單應用程序:短息/郵件發送器
在這里我們將開發一個簡單的服務,可用於發送短信和郵件。也許沒有必要專門寫一個服務來發送短信和郵件,這些功能都可以在應用自身實現,但是想象一下你有很多應用都要發郵件,在發送時如果郵件服務出問題了怎么辦?在可以成功發送郵件之前,應用程序必須一直嘗試。所以你必須在你的應用程序中建立一個郵件隊列機制,用於一次又一次的嘗試發送。在最壞的情況下,你的應用程序可能只運行很短的時間(如Web服務)或者必須在發送完郵件前關閉。但是在郵件服務器上線后,你還必須發送,不允許郵件丟失。
在這種情況下,你可以開發一個單獨的郵件/短信服務,它將嘗試發送直到成功。你可以通過DotNetMQ開發一個郵件服務,僅當郵件發送成功時確認請求,如果發送失敗,只要不確認(或拒絕)消息就行了,它稍后會重試。
服務端
首先,我們開發短信/郵件的服務部分。為了實現這個,我們必須定義一個派生自MDSService的類型:
using System; using MDS.Client.MDSServices; namespace SmsMailServer { [MDSService(Description = "This service is a " + "sample mail/sms service.", Version = "1.0.0.0")] public class MyMailSmsService : MDSService { //All parameters and return values can be defined. [MDSServiceMethod(Description = "This method is used send an SMS.")] public void SendSms( [MDSServiceMethodParameter("Phone number to send SMS.")] string phone, [MDSServiceMethodParameter("SMS text to be sent.")] string smsText) { //Process SMS Console.WriteLine("Sending SMS to phone: " + phone); Console.WriteLine("Sms Text: " + smsText); //Acknowledge the message IncomingMessage.Acknowledge(); } //You do not have to define any parameters [MDSServiceMethod] public void SendEmail(string emailAddress, string header, string body) { //Process email Console.WriteLine("Sending an email to " + emailAddress); Console.WriteLine("Header: " + header); Console.WriteLine("Body : " + body); //Acknowledge the message IncomingMessage.Acknowledge(); } // A simple method just to show return values. [MDSServiceMethod] [return: MDSServiceMethodParameter("True, if phone number is valid.")] public bool IsValidPhone([MDSServiceMethodParameter( "Phone number to send SMS.")] string phone) { //Acknowledge the message IncomingMessage.Acknowledge(); //Return result return (phone.Length == 10); } } }
如你所見,它只是一個帶有特性(Attribute)的一個常規C#類。MDSService和MDSServiceMethod兩個特性是必須的,其他的特性是可選的(不過寫上去是最好了,你將很快會看到什么會用這些特性)。你提供服務的方法必須有MDSServiceMehod特性,如果你不想公開一些方法,只要不加MDSServiceMethod特性就行了。
你還必須在你的服務方法中確認消息,否則,這個消息(引起這個服務方法調用的那個)就不會從消息隊列中刪除,而我們的服務方法將會被再次調用。如果我們不能處理這個消息(比如,如果郵件服務沒有工作,我們沒辦法發送時)我們也可以拒絕它。如果我們拒絕了這個消息,它稍后還會發送給我們(很可靠)。你可以通過MDSService類的IncomingMessage屬性得到原消息,另外,你也可以通過RemoteApplication屬性得到遠程應用程序的信息。
創建了正確的服務類后,我們必須創建一個應用來運行它,下面是用一個簡單的控制台程序運行我們的MyMailSmsService服務:
using System; using MDS.Client.MDSServices; namespace SmsMailServer { class Program { static void Main(string[] args) { using (var service = new MDSServiceApplication("MyMailSmsService")) { service.AddService(new MyMailSmsService()); service.Connect(); Console.WriteLine("Press any key to stop service"); Console.ReadLine(); } } } }
如你所見,只需要3行代碼就可以創建並運行服務,由於MDSService是可銷毀的,所以你可以uing語句,另外,你也可以使用MDSServiceApplication的Disconnect方法手動關閉服務。你可以通過AddService方法在一個MDSServiceApplication中運行多個服務。
客戶端
為了開發一個使用DotNetMQ服務的應用,你必須創建一個服務代理(就像Web服務和WCF那樣)。為了創建代理,你可以用MDSServiceProxyGenerator工具。首先,編譯你的服務項目,然后運行MDSServiceProxyGenerator.exe(在DotNetMQ安裝目錄).

圖 - 11:為DotNetMQ服務生成代理類。
選擇你的服務程序集(在這個簡單的例子中是指SmsMailServer.exe)。你可以選擇服務類或生成這個程序集里所有服務的代理。輸入一個命名空間和一個目標文件夾,然后生成代理類。生成玩后,你就可以把它加到你的項目里了。
我就不展示這個代理類了,但你必須了解它(你可以看源碼,它是一個很簡單的類)。你方法/參數上的特性用來生成這個代理類的注釋。
在我們的項目里添加這個代理類后,我們就可以想簡單方法調用那樣向服務發消息了。
using System; using MDS.Client; using MDS.Client.MDSServices; using SampleService; namespace SmsMailClient { class Program { static void Main(string[] args) { Console.WriteLine("Press enter to test SendSms method"); Console.ReadLine(); //Application3 is name of an application that sends sms/email. using (var serviceConsumer = new MDSServiceConsumer("Application3")) { //Connect to DotNetMQ server serviceConsumer.Connect(); //Create service proxy to call remote methods var service = new MyMailSmsServiceProxy(serviceConsumer, new MDSRemoteAppEndPoint("MyMailSmsService")); //Call SendSms method service.SendSms("3221234567", "Hello service!"); } } } }
你也可以調用服務的其他方法,會得到像常規方法那樣的返回值。實際上,你的方法調用被轉換成了可靠的消息,比如,即使你的遠程應用程序(MyMailSmsService)在方法調用時沒有運行,在服務啟動后也會被調用,所以你的方法調用是一定會被調用的。
你可以通過改變服務代理的TransmitRule屬性來改變消息傳輸的規則。如果服務方法返回void,那么他的默認傳輸規則是StoreAndForward。如果服務方法有個一返回值,那么方法調用將會不可靠(因為方法調用時同步的,要等待一個結果的),它的規則是DiretlySend。你可以選擇任何類型作為方法的參數,如果參數類型是基元類型(string,int,byte...),就不需要附加的設置,但是如果你想用你自定義的類型作為方法參數,這個類型必須標記為Serializable,因為DotNetMQ會用二進制序列化參數。
注意:你在運行這個例子前必須在DotNetMQ里注冊MyMailSmsService和Application3。
Web服務支持
當然,你可以在Web服務里連接DotNetMQ,因為把本身還是一個.Net應用程序。但是,為什么你要寫一個ASP.NET Web方法為應用程序處理消息(而且可以在同一個上下文中回復消息)呢?Web服務更適合這樣請求/應答式的方法調用。
DotNetMQ支持ASP.NET web服務並可以傳遞消息到web服務。這里有個web服務的模板樣品(在下載文件中)來實現這一目標。它的定義如下:
using System; using System.Web.Services; using MDS.Client.WebServices; [WebService(Namespace = "http://www.dotnetmq.com/mds")] [WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)] public class MDSAppService : WebService { /// <summary> /// MDS server sends messages to this method. /// </summary> /// <param name="bytesOfMessage">Byte array form of message</param> /// <returns>Response message to incoming message</returns> [WebMethod(Description = "Receives incoming messages to this web service.")] public byte[] ReceiveMDSMessage(byte[] bytesOfMessage) { var message = WebServiceHelper.DeserializeMessage(bytesOfMessage); try { var response = ProcessMDSMessage(message); return WebServiceHelper.SerializeMessage(response); } catch (Exception ex) { var response = message.CreateResponseMessage(); response.Result.Success = false; response.Result.ResultText = "Error in ProcessMDSMessage method: " + ex.Message; return WebServiceHelper.SerializeMessage(response); } } /// <summary> /// Processes incoming messages to this web service. /// </summary> /// <param name="message">Message to process</param> /// <returns>Response Message</returns> private IWebServiceResponseMessage ProcessMDSMessage(IWebServiceIncomingMessage message) { //Process message //Send response/result var response = message.CreateResponseMessage(); response.Result.Success = true; return response; } }
如上所述,你不需要改變ReceiveMDSMessage方法,而且必須在ProcessMDSMessage方法里處理消息。另外,你需要向下面這樣在MDSSettings.xml里定義你的web服務地址,你也可以用DotNetMQ管理工具添加web服務。
... <Applications> <Application Name="SampleWebServiceApp"> <Communication Type="WebService" Url="http://localhost/SampleWebApplication/SampleService.asmx" /> </Application> </Applications> ...
DotNetMQ的性能
這是一些通過DotNetMQ傳送消息的測試結果:
消息傳送:
- 持久地 10,000個消息大約需要25秒(約每秒400個消息)。
- 非持久地 10,000個消息大約需要3.5秒(約每秒2850個消息)。
方法調用(在DotNetMQ服務里)
- 持久地 10,000個方法調用大約需要25秒(約每秒400個)。
- 非持久地 10,000個方法調用大約需要8.7秒(約每秒1150個)。
測試平台:Intel Core 2 Duo 3,00 GHZ CPU.2 GB RAM PC。消息傳送和方法調用是在同一台電腦上的兩個應用程序之間進行的。
引用
書籍:Enterprise Integration Patterns: Designing,Building,and Deploying Messaging Solutions .作者 Gregor Hohpe,Bobby Woolf(艾迪生韋斯利出版,2003年)。
歷史
- 2011-05-23(DotNetMQ v0.9.1.0)
- 添加對微軟SQL Server數據庫的支持。
- 把MySQLConnectionString設置改成ConnectionString。
- 修改源碼。
- 根據修改更新了文章。
- 2011-05-16 (DotNetMQ v0.9.0.0)
- 添加web服務模板的下載。
- 對文章做了一些修改和添加。
- 2011-05-09(DotNetMQ v0.9.0.0)
- 第一次發布。
