使用NServiceBus開發分布式應用
系列主題:基於消息的軟件架構模型演變
NServiceBus 是一個.Net平台下開源的消息服務框架,這類產品有時也被稱作ESB(Enterprise Service Bus)——企業服務總線。
NServicebus官方地址:http://particular.net/
git: https://github.com/Particular/NServiceBus
NServiceBus原作者Udi Dahan,該產品最早於2006年發行了第一個版本,這是一個企業級的開源產品,企業開發需要購買License,參照:http://particular.net/licensing。
一、NServiceBus的特性
1、高性能和可擴展性
可以廣泛應用於許多業務領域,可擴展性和性能都經過了實戰檢驗。
2、具有自動重試的可靠性集成
通過配置機制提供基於消息通訊的的最佳實踐方案,能夠識別錯誤響應並自動重試。
3、工作流和后台任務調度
通過Saga來完成長時間運行的流程定義和管理功能,提供強大而靈活的工作流功能。
4、消息的集中審核流程
很容易將整個分布式系統聚集到一個中心位置配置消息審核。
5、通過發布/訂閱來減少耦合
提供了發布/訂閱機制。可擴展、可配置、易於理解和易於使用。
6、易於擴展和配置
多個靈活的擴展點和配置選項,NServieBus可以根據用戶需求對各個特性進行自定義配置。
7、支持廣泛的消息傳輸技術
提供了MSMQ, RabbitMQ, SQL Server, Windows Azure Queues,Windows AzureService Bus消息傳輸機制,當然你也可以自定義或者選擇由社區開發的消息傳輸方案。
二、.NET平台下其他ESB介紹
1、Biztalk
在微軟的世界里,BizTalk Server一直被用來解決異構平台上應用程序之間數據交換的復雜集成問題。BizTalk同樣提供了發布/訂閱模式實現松耦合的架構。有時候你需要將現有的代碼和一個運行在不同技術和協議下的歷史遺留程序集成,這是一個經典的企業應用程序集成(Enterprise Application Integration-EAI)的場景。在這種場景之下,可以在業務服務之間使用NServiceBus,在這些服務的邊界之內,你可以使用BizTalk與現有的歷史遺留應用進行集成。
如您所見,服務邊界之后的BizTalk是一個對異構應用的整合。
2、MassTransit
MassTransit是一個.NET平台下用來創建分布式應用程序的輕量級開源消息總線。
官網:http://masstransit-project.com/
git: https://github.com/MassTransit/MassTransit
MassTransit的第一個版本開發於2007年,作者Chris Patterson 和Dru Sellers 在一個會議中偶然相識,他們覺得當時.Net平台下沒有一個他們想要的服務總線框架,而那時NServiceBus也剛剛發布,很多功能都不完善,並且也沒有很好的社區支持。所以他倆開發了自己的ESB產品——MassTransit,目前最新的MassTransit基於NET4.0中的異步支持重寫了所有代碼。MassTransit的目標並不是要在分布式領域面面俱到從而適應大型的企業級開發,而是能實現一個強壯的輕量級消息總線。
三、從Hello World開始
分布式應用開發是一個比較復雜的過程,無論從涉及的技術知識體系還是開發,調試,部署都會帶來很多挑戰,我希望通過這個簡單的例子展示分布式開發中的基本思想。
1、准備工作
安裝MSMQ服務,NServiceBus默認使用MSMQ服務,所以在開始這個例子之前確保已安裝MSMQ服務。
2、 新建一個類型為Console Application的客戶端:NBus.Practice.GreetingClient,客戶端會命令服務端輸出“Hello World”。
安裝nuget包:
1
|
Install-Package NServiceBus
|
3、 初始化一個Bus,然后給服務端發出命令。
NServiceBus提供了多個Host方案,應用程序自己Host或者使用NServiceBus.Host程序來Host應用程序。當然你還可以將NServicBus程序Host在一個Windows服務中。這個客戶端我們選擇Host在客戶端自身當中。
我們只是用幾個必要的選項來配置bus。重點是此段代碼配置了一個EndPoint:“Nbus.Practice.HelloWorld.Client”,這個名稱代表了了此客戶端的網絡地址。
有了這些配置,我們就可以利用這些配置來創建一個bus出來。
1
2
3
4
|
using
(IBus bus = Bus.Create(busConfiguration).Start())
{
SendGreetingCommand(bus);
}
|
接下來的代碼通過bus.Send<TCommand>(TCommand cmd)方法給服務端發送一個命令:
1
2
3
4
5
6
7
8
9
10
11
12
|
private
static
void
SendGreetingCommand(IBus bus)
{
Console.WriteLine(
"Press 'Enter' to send a message.To exit, Ctrl + C"
);
var
i = 0;
while
(Console.ReadLine() !=
null
)
{
var
id = Guid.NewGuid();
bus.Send(
new
GreetingCommand() { Id = id, Times = i });
i++;
Console.WriteLine(
"Send a new GreetingCommand message with id: {0}"
, id.ToString(
"N"
));
}
}
|
GreetingCommand就是一個消息,在消息總線中,一切交流都是通過消息來實現的。
4、建立一個公共的類庫:NBus.Practice.GreetingMessage來定義消息
在NServiceBus中,命令類型要繼承於ICommand,事件類型要繼承於IEvent,消息是一個由屬性構成的簡單類型,最終需要序列化並可以在網絡中傳播。
1
2
3
4
5
|
public
class
GreetingCommand:ICommand
{
public
Guid Id {
get
;
set
; }
public
int
Times {
get
;
set
; }
}
|
5、這個客戶端幾乎要完成了,我們創建了個bus,然后發送了一條消息。現在的問題在於這條息發送給誰呢?發送給誰這件事通過配置文件來完成。
1
2
3
|
<
MessageEndpointMappings
>
<
add
Messages="NBus.Practice.GreetingMessage" Endpoint="Nbus.Practice.HelloWorld.Server" />
</
MessageEndpointMappings
>
|
這個配置的含義是:將定義在程序集NBus.Practice.GreetingMessage中的消息發送到EndPoint為Nbus.Practice.HelloWorld.Server的程序中。
6、顯然我們需要一個EndPoint為Nbus.Practice.HelloWorld.Server的服務端,新建一個類型為Console Application類型的服務端:NBus.Practice.GreetingServer,並且用同樣的方法創建一個bus並啟動。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
static
void
Main(
string
[] args)
{
var
busConfiguration =
new
BusConfiguration();
busConfiguration.EndpointName(
"Nbus.Practice.HelloWorld.Server"
);
busConfiguration.UseSerialization<JsonSerializer>();
busConfiguration.UsePersistence<InMemoryPersistence>();
using
(IBus bus = Bus.Create(busConfiguration).Start())
{
Console.WriteLine(
"Press any key to exit"
);
Console.ReadKey();
}
}
|
為了處理GreetingCommand,新建一個GreetingHandler.cs的類,只需要繼承IHandleMessages<GreetingCommand>即可表明該類會處理類型為GreetingCommand的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public
class
GreetingHandler:IHandleMessages<GreetingCommand>
{
private
readonly
IBus _bus;
public
GreetingHandler(IBus bus)
{
_bus = bus;
}
public
void
Handle(GreetingCommand message)
{
Console.WriteLine(
"Received greetingCommand:{0}, times:{1}, Hello world"
,message.Id,message.Times);
_bus.Publish(
new
GreetingEvent(){Id = message.Id,Times = message.Times});
}
}
|
方法public void Handle(GreetingCommand message)描述了當收到GreetingCommand消息會輸出了字符串“Hello World”。另外代碼最后發布了一個類型為GreetingEvent的事件,所有對此事件感興趣的訂閱者都可以訂閱此事件。在NServiceBus中發布一個事件采用bus.Publish<TEvent>(TEvent event)方法。你會注意到我們通過構造器注入的方式來獲取bus實例。
也許你會很關心此服務端中的配置文件如何配置呢?此服務端收到了別的程序發送的消息,具體誰發送的他並不知情。另外發布了一個事件,但是具體誰來訂閱該事件,作為事件的發布者並不知道,所以該項目的消息路由配置為空:
1
|
<
MessageEndpointMappings
></
MessageEndpointMappings
>
|
7、截至目前,我們已經完成了顯示hello world的任務,讓我們運行起來看看吧:
在項目配置中將NBus.Practice.GreetingClient和NBus.Practice.GreetingServer同時設置為啟動項。CTRL+F5
運行結果:當我們在客戶端中按"Enter"鍵,服務端會收到消息並輸出“Hello World”。
此時如果我們關閉服務端,並在客戶端中多敲幾次回車鍵發送GreetingCommand消息會怎么樣?讓我么來模擬真實場景下服務端應用由於未知原因宕機,會出現什么情況呢?
此時如果我們打開MSMQ管理工具就會發現服務端未處理的消息存儲在隊列中,直到服務端再次上線重新處理這些消息,從而保證了分布式應用中數據的最終一致性。
8、剛才服務端處理完GreetingCommand並發布了GreetingEvent事件,我們接下來新建一個類型為Console Application的事件訂閱者:NBus.Practice.GreetingSubscriber。
事件訂閱者跟之前一樣需要創建並啟動一個bus。
9、新建一個Congratulation.cs來處理GreetingEvent消息:
1
2
3
4
5
6
7
|
public
class
Congratulation:IHandleMessages<GreetingEvent>
{
public
void
Handle(GreetingEvent message)
{
Console.WriteLine(
"Received greeting event id:{0}, time{1}, congratulations, you have learned NServiceBus."
,message.Id,message.Times);
}
}
|
10、訂閱者的消息路由如何配置?
1
2
3
|
<
MessageEndpointMappings
>
<
add
Messages="NBus.Practice.GreetingMessage" Endpoint="Nbus.Practice.HelloWorld.Server" />
</
MessageEndpointMappings
>
|
這句配置的含義是:訂閱EndPoint為Nbus.Practice.HelloWorld.Server且消息定義在程序集為NBus.Practice.GreetingMessage中的消息。
由此可見根據程序的角色不同,配置文件的配置具有不同的含義。
11、在事件的發布/訂閱模式中,訂閱者可以是一個或多個,我們將新建第二個訂閱者來展示此功能。新建一個類型為Class library的程序集: NBus.Practice.GreetingAnotherSubscriber。
這次的程序之所以要換成Class library是因為我們本次要使用NServiceBus.Host來Host此程序。
1
|
Install-Package NServiceBus.Host
|
NServiceBus會為項目自動添加一個EndpointConfig.cs文件,我們將在此文件中配置bus:
1
2
3
4
5
6
7
8
9
|
public
class
EndpointConfig : IConfigureThisEndpoint
{
public
void
Customize(BusConfiguration configuration)
{
configuration.EndpointName(
"Nbus.Practice.HelloWorld.AnotherSubscriber"
);
configuration.UseSerialization<JsonSerializer>();
configuration.UsePersistence<InMemoryPersistence>();
}
}
|
同時添加一個Handler來處理GreetingEvent消息。
1
2
3
4
5
6
7
|
public
class
GreetingLogger:IHandleMessages<GreetingEvent>
{
public
void
Handle(GreetingEvent message)
{
Console.WriteLine(
"Received greeting event id:{0}, time{1}, I will log it."
,message.Id,message.Times);
}
}
|
通過下面的方式使用NServiceBus.Host.exe來Host本程序。
11、 最后把本教程中建立的四個程序全都跑起來看看效果:
四、總結
本文通過一個簡單的實例展示了如何在服務總線中發送命令,如何使用發布/訂閱基本思想來實現一個Hello world。這個例子很簡單,但是隱隱之中展現了CQRS的基本思想:Client發送一個Command,DomainHandler收到Command后會調用Domain邏輯,此時Domain會發布領域事件,Query分支會訂閱領域事件來更新Query數據庫,同時還有緩存、搜索引擎、其他服務也會訂閱此領域事件。各個服務之間構成了松耦合的分布式應用程序。
當然NServiceBus還有更多高級主題例如:持久化、Saga、單元測試、二級重試、依賴注入、負載均衡、更換消息隊列等內容等着我們去一探究竟。
整個例子的代碼:https://git.oschina.net/richieyangs/NServiceBusPractice,以后會將所有NServiceBus相關的例子放在這個項目中。