Tip: 此篇已加入.NET Core微服務基礎系列文章索引
一、消息隊列與RabbitMQ
1.1 消息隊列
“消息”是在兩台計算機間傳送的數據單位。消息可以非常簡單,例如只包含文本字符串;也可以更復雜,可能包含嵌入對象。消息被發送到隊列中,“消息隊列”是在消息的傳輸過程中保存消息的容器。

消息隊列(Message Queue),是分布式系統中重要的組件,其通用的使用場景可以簡單地描述為:
當不需要立即獲得結果,但是並發量又需要進行控制的時候,差不多就是需要使用消息隊列的時候。
消息隊列主要解決了應用耦合、異步處理、流量削鋒等問題。當前使用較多的消息隊列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分數據庫如Redis、Mysql以及phxsql也可實現消息隊列的功能。更多詳細內容請參考:《消息隊列及其應用場景介紹》
我也在前幾年寫過一篇基於Redis做消息隊列的文章,對消息隊列的一個應用場景做了介紹,沒有了解過的童鞋可以看看。
1.2 RabbitMQ

RabbitMQ是一款基於AMQP(高級消息隊列協議),由Erlang開發的開源消息隊列組件。是一款優秀的消息隊列組件,他由兩部分組成:服務端和客戶端,客戶端支持多種語言的驅動,如:.Net、JAVA、Erlang等。
網上有很多性能比較的文章,例如在1百萬條1k的消息下,每秒種的收發情況如下圖所示:

這里不過多介紹RabbitMQ,有關RabbitMQ的一些需要了解的概念你可以通過下面的文章了解:
顏聖傑,《RabbitMQ知多少》
如果你想了解RabbitMQ與Kafka的對比,可以閱讀這篇文章:《開源軟件成熟度評測報告-分布式消息中間件》
而EasyNetQ呢,它是一款基於RabbitMQ.Client封裝的API庫,正如其名,使用起來比較Easy,它把原RabbitMQ.Client中的很多操作都進行了再次封裝,讓開發人員減少了很多工作量。
二、RabbitMQ的安裝
2.1 Linux下的安裝
這里不演示如何在Linux下安裝,但推薦生產環境使用Linux,下面是一些參考資料:
mcgrady,《Linux下RabbitMQ的安裝》
曉晨Master,《.NET Core使用RabbitMQ》
牛頭人,《Linux安裝RabbitMQ》
一只豬兒蟲,《RabbitMQ Linux安裝》
2.2 Windows下的安裝
開發環境下,我一般使用Windows Server虛擬機,所以這里說明下如何在Windows下安裝:
(1)下載Erlang和RabbitMQ (這里我選則的並非最新版本,而是etp20.3和rabbitmq3.7.5)

(2)首先安裝Erlang,然后添加環境變量(如果添加了,則skip這一步)並加到PATH中

(3)其次安裝RabbitMQ,一路Next,安裝完成后也為其添加環境變量並添加到PATH中


(4)檢查是否安裝成功:rabbitmqctl status
這里我碰到了如下的錯誤:

解決方法:
更正erlang.cookie文件,詳情請參考:https://blog.csdn.net/u012637358/article/details/80078610
最終狀態:

檢查Windows服務,發現已經自動注冊了一個服務:

(5)激活Web管理插件,然后檢查是否可見(http://127.0.0.1:15672)


2.3 一些必要的配置
(1)使用默認賬號:guest/guest登錄進去,添加一個新用戶(Administrator權限),並設置其Permission

(2)添加新的虛擬機(默認為/,這里我添加一個名為EDCVHOST的虛擬機)

(3)綁定新添加的用戶到新的虛擬機上,接下來在我們的程序中就主要使用admin這個用戶和EDCVHOST這個虛擬機

*.當然,為了安全考慮,你也可以把guest用戶remove掉
三、Quick Start:第一個消息隊列
3.1 項目准備
這里為了快速的演示如何使用EasyNetQ,我們來一個QuickStart,准備三個項目:兩個Console程序和一個Class Library。

其中,對Publisher和Subscriber項目安裝EasyNetQ:
NuGet>Install-Package EasyNetQ
針對Messages類庫,新增一個class如下:
public class TextMessage { public string Text { get; set; } }
3.2 我是Publisher
添加以下代碼:
public class Program { public static void Main(string[] args) { var connStr = "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison"; using (var bus = RabbitHutch.CreateBus(connStr)) { var input = ""; Console.WriteLine("Please enter a message. 'Quit' to quit."); while ((input = Console.ReadLine()) != "Quit") { bus.Publish(new TextMessage { Text = input }); } } } }
可以看到,我們在其中使用EasyNetQ高度封裝的接口創建了一個IBus接口的實例,通過這個IBus實例我們可以通過一個超級Easy的Publish接口進行發布消息。這里主要是讀取用戶在控制台中輸入的消息字符串進行發送。實際中,發送的一般都是一個或多個復雜的實體對象。
3.3 我是Subscriber
添加如下所示代碼:
public class Program { public static void Main(string[] args) { var connStr = "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison"; using (var bus = RabbitHutch.CreateBus(connStr)) { bus.Subscribe<TextMessage>("my_test_subscriptionid", HandleTextMessage); Console.WriteLine("Listening for messages. Hit <return> to quit."); Console.ReadLine(); } } public static void HandleTextMessage(TextMessage textMessage) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("Got message: {0}", textMessage.Text); Console.ResetColor(); } }
這里主要是通過IBus實例去訂閱消息(這里是除非用戶關閉程序否則一直處於監聽狀態),當發布者發布了指定類型的消息之后,這里就把它打印出來(紅色字體顯示)。
3.4 簡單測試
通過控制台信息查看結果:

通過RabbitMQ管理界面查看:
(1)通過Connections Tab可以發現我們的兩個客戶端都在Running中

(2)通過Queues Tab查看目前已有的隊列=>可以看到目前我們只注冊了一個隊列

四、在ASP.NET Core中的使用
4.1 案例結構與說明

這里假設有這樣一個場景,客戶通過瀏覽器提交了一個保單,這個保單中包含一些客戶信息,ClientService將這些信息處理后發送一個消息到RabbitMQ中,NoticeService和ZAPEngineService訂閱了這個消息。NoticeService會將客戶信息取出來並獲取一些更多信息為客戶發送Email,而ZAPEngineService則會根據客戶的一些關鍵信息(比如:年齡,是否吸煙,學歷,年收入等等)去數據庫讀取一些規則來生成一份Question List並存入數據庫。
4.2 項目准備工作
創建上面提到的這幾個項目,這里我選擇ASP.NET Core WebAPI類型。
分別為這幾個項目通過NuGet安裝EasyNetQ組件,並且通過以下代碼注入統一的IBus實例對象:
public IServiceProvider ConfigureServices(IServiceCollection services) { // IoC - EventBus services.AddSingleton(RabbitHutch.CreateBus(Configuration["MQ:Dev"])); ...... }
這里我將連接字符串寫到了配置文件中,請參考上面的QuickStart中的內容。
下面是這個demo用到的一個消息對象實體:通過標簽聲明隊列名稱。
[Queue("Qka.Client", ExchangeName = "Qka.Client")] public class ClientMessage { public int ClientId { get; set; } public string ClientName { get; set; } public string Sex { get; set; } public int Age { get; set; } // N: Non-Smoker, S: Smoker public string SmokerCode { get; set; } // Bachelor, Master, Doctor public string Education { get; set; } public decimal YearIncome { get; set; } }
此外,為了充分簡化代碼量,EasyNetQ提供了一個AutoSubscriber的方式,可以通過接口和標簽快速地讓一個類成為Consumer。詳細內容參考:https://github.com/EasyNetQ/EasyNetQ/wiki/Auto-Subscriber
這里為了快速的在項目中使用Subscriber,添加一個擴展方法,它會從注入的服務中取出IBus實例對象,並自動幫我們進行Subscriber(那些實現了IConsume接口的類)的注冊。具體用法見后面的介紹。
public static class AppBuilderExtension { public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly) { var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider; var lifeTime = services.GetService<IApplicationLifetime>(); var bus = services.GetService<IBus>(); lifeTime.ApplicationStarted.Register(() => { var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix); subscriber.Subscribe(assembly); subscriber.SubscribeAsync(assembly); }); lifeTime.ApplicationStopped.Register(() => bus.Dispose()); return appBuilder; } }
4.3 Publisher:ClientService
ClientService作為發布者,這里假設我們在API中處理完業務代碼后,將message發布給RabbitMQ:
[Produces("application/json")] [Route("api/Client")] public class ClientController : Controller { private readonly IClientService clientService; private readonly IBus bus; public ClientController(IClientService _clientService, IBus _bus) { clientService = _clientService; bus = _bus; } ...... [HttpPost] public async Task<string> Post([FromBody]ClientDTO clientDto) { // Business Logic here... // eg.Add new client to your service databases via EF // Sample Publish ClientMessage message = new ClientMessage { ClientId = clientDto.Id.Value, ClientName = clientDto.Name, Sex = clientDto.Sex, Age = 29, SmokerCode = "N", Education = "Master", YearIncome = 100000 }; await bus.PublishAsync(message); return "Add Client Success! You will receive some letter later."; } }
當然,你可以使用同步方法:bus.Publish(message);
4.4 Subscriber: NoticeService & ZAPEngineService
(1)NoticeService:新增一個實現IConsume接口的Consumer類
public class ClientMessageConsumer: IConsumeAsync<ClientMessage> { [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.Notice")] public Task ConsumeAsync(ClientMessage message) { // Your business logic code here // eg.Build one email to client via SMTP service // Sample console code System.Console.ForegroundColor = System.ConsoleColor.Red; System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will send one email to client.", message.ClientName); System.Console.ResetColor(); return Task.CompletedTask; } }
這里為了演示效果,增加了一些輸出信息的代碼,下面的ZAPEngineService也是一樣,不再贅述。
(2)ZAPEngineService:新增一個實現IConsume接口的Consumer類
public class ClientMessageConsumer : IConsumeAsync<ClientMessage> { [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.ZapQuestion")] public Task ConsumeAsync(ClientMessage message) { // Your business logic code here // eg.Generate one ZAP question records into database and send to client // Sample console code System.Console.ForegroundColor = System.ConsoleColor.Red; System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will generate one ZAP question list to client", message.ClientName); System.Console.ResetColor(); return Task.CompletedTask; } }
注意兩個Consumer的SubscriptionId不能一樣,否則無法接受到消息。
(3)為兩個Consumer使用擴展方法:UseSubscribe
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime) { ...... // easyNetQ app.UseSubscribe("ClientMessageService", Assembly.GetExecutingAssembly()); }
4.5 簡單測試
(1)借助Postman向ClientService發起Post請求

(2)查看NoticeService的日志信息

(3)查看ZAPEngineService的日志信息

(4)查看RabbitMQ的管理控制台:

五、小結
本篇超級簡單地介紹了一下消息隊列與RabbitMQ,通過使用EasyNetQ這個基於RabbitMQ.Client的客戶端做了一個QuickStart演示了在.NET Core環境下如何進行消息的發布與訂閱,並通過一個微服務的小案例演示了如何在ASP.NET Core環境下如何基於EasyNetQ完成消息的發布與訂閱,看起來就像一個類似於簡單的事件總線。當然,本篇的內容都十分基礎,如果要應用好RabbitMQ,還得把那些基礎概念(如:Channel,Exchange等)弄清楚,然后去理解一下事件總線的概念,實際中還得考慮數據一致性等等,路途漫漫,繼續加油吧!
示例代碼
Click Here => 點我下載
參考資料
EasyNetQ官方文檔:https://github.com/EasyNetQ/EasyNetQ/wiki/Introduction
focus-lei,《.net core使用EasyNetQ做EventBus》
常山造紙農,《RabbitMQ安裝配置和基於EasyNetQ驅動的基礎使用》
