NETCORE - RabbitMQ的使用
NET Core微服務之基於EasyNetQ使用RabbitMQ消息隊列
安裝部署RabbbitMQ:https://www.cnblogs.com/1285026182YUAN/p/12896851.html
NETCORE中的訂閱模式
一、控制台項目
創建三個項目:
- NETCORE.RabbitMQ.Publisher:Console項目
- NETCORE.RabbitMQ.Subscriber1:Console項目
- NETCORE.RabbitMQ.Messages:類庫

Publisher 與 Subscriber1 項目安裝 EasyNetQ 插件
NuGet>Install-Package EasyNetQ
Messages 項目,新增一個類
public class TextMessage { public string Text { get; set; } }
Publisher項目,添加以下代碼
using EasyNetQ; using NETCORE.RabbitMQ.Messages; using System; namespace NETCORE.RabbitMQ.Publisher { class Program { static void Main(string[] args) { var connStr = "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456"; using (var bus = RabbitHutch.CreateBus(connStr)) { var input = ""; Console.WriteLine("I'm Publisher"); Console.WriteLine("Please enter a message. 'Quit' to quit."); while ((input = Console.ReadLine()) != "Quit") { bus.Publish(new TextMessage { Text = input }); } } Console.WriteLine("Hello World!"); } } }
可以看到,我們在其中使用EasyNetQ高度封裝的接口創建了一個IBus接口的實例,通過這個IBus實例我們可以通過一個超級Easy的Publish接口進行發布消息。這里主要是讀取用戶在控制台中輸入的消息字符串進行發送。實際中,發送的一般都是一個或多個復雜的實體對象。
Subscriber項目,添加以下代碼
using System; using EasyNetQ; using NETCORE.RabbitMQ.Messages; namespace NETCORE.RabbitMQ.Subscriber1 { class Program { static void Main(string[] args) { var connStr = "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456"; using (var bus = RabbitHutch.CreateBus(connStr)) { bus.Subscribe<TextMessage>("my_test_subscriptionid", HandleTextMessage); Console.WriteLine("I'm Subscriber1"); Console.WriteLine("Listening for messages. Hit <return> to quit."); Console.ReadLine(); } Console.WriteLine("Hello World!"); } public static void HandleTextMessage(TextMessage textMessage) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("Got message: {0}", textMessage.Text); Console.ResetColor(); } } }
這里主要是通過IBus實例去訂閱消息(這里是除非用戶關閉程序否則一直處於監聽狀態),當發布者發布了指定類型的消息之后,這里就把它打印出來(紅色字體顯示)。
測試:
通過控制台信息查看結果:



二、 WebApi項目
創建上面提到的這幾個項目,這里我選擇ASP.NET Core WebAPI類型。
分別為這三個項目通過NuGet安裝EasyNetQ組件

PM> Install-Package EasyNetQ
在messages 項目中 下面是這個demo用到的一個消息對象實體:通過標簽聲明隊列名稱。
using System; using System.Collections.Generic; using System.Text; using EasyNetQ; namespace NETCORE.RabbitMQ.Messages { [Queue("Qka.Client", ExchangeName = "Qka.Client")] public class ClientMessage { public int ClientId { get; set; } public string ClientName { get; set; } public string Sex { get; set; } public int Age { get; set; } // N: Non-Smoker, S: Smoker public string SmokerCode { get; set; } // Bachelor, Master, Doctor public string Education { get; set; } public decimal YearIncome { get; set; } } }
此外,為了充分簡化代碼量,EasyNetQ提供了一個AutoSubscriber的方式,可以通過接口和標簽快速地讓一個類成為Consumer。詳細內容參考:https://github.com/EasyNetQ/EasyNetQ/wiki/Auto-Subscriber
這里為了快速的在項目中使用Subscriber,添加一個擴展方法,它會從注入的服務中取出IBus實例對象,並自動幫我們進行Subscriber(那些實現了IConsume接口的類)的注冊。具體用法見后面的介紹。
Publisher項目:ClientService
ClientService作為發布者,這里假設我們在API中處理完業務代碼后,將message發布給RabbitMQ:
appsettings.json 配置文件
{ "Logging": { "LogLevel": { "Default": "Warning" } }, "ConnectionStrings": { "RabbitMQ_Conn": "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456" }, "AllowedHosts": "*" }
Startup.cs 文件,通過以下代碼注入統一的IBus實例對象:
public void ConfigureServices(IServiceCollection services) { services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2); services.AddSingleton(RabbitHutch.CreateBus(Configuration.GetConnectionString("RabbitMQ_Conn"))); }
創建控制器 ClientController
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using EasyNetQ; using Microsoft.AspNetCore.Mvc; using NETCORE.RabbitMQ.Messages; namespace NETCORE.RabbitMQ.API.Publisher.Controllers { [Produces("application/json")] [Route("api/Client")] public class ClientController : Controller { private readonly IBus bus; public ClientController( IBus _bus) { bus = _bus; } [Route("GetVs")] [HttpGet] public async Task<string> GetVs() { // Business Logic here... // eg.Add new client to your service databases via EF // Sample Publish ClientMessage message = new ClientMessage { ClientId = 12, ClientName = "lihongyuan", Sex ="sss", Age = 29, SmokerCode = "N", Education = "Master", YearIncome = 100000 }; await bus.PublishAsync(message); return "Add Client Success! You will receive some letter later."; } } }
當然,你可以使用同步方法:bus.Publish(message);
Subscriber項目:
訂閱者接收消息。
appsettings.json 配置文件
{
"Logging": {
"LogLevel": {
"Default": "Warning"
}
},
"ConnectionStrings": {
"RabbitMQ_Conn": "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456"
},
"AllowedHosts": "*"
}
Startup.cs 文件,通過以下代碼注入統一的IBus實例對象:
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
services.AddSingleton(RabbitHutch.CreateBus(Configuration.GetConnectionString("RabbitMQ_Conn")));
}
創建 AppBuilderExtension.cs 類
using EasyNetQ; using EasyNetQ.AutoSubscribe; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Threading.Tasks; namespace NETCORE.RabbitMQ.API.Subscriber { public static class AppBuilderExtension { public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly) { var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider; var lifeTime = services.GetService<IApplicationLifetime>(); var bus = services.GetService<IBus>(); lifeTime.ApplicationStarted.Register(() => { var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix); subscriber.Subscribe(assembly); subscriber.SubscribeAsync(assembly); }); lifeTime.ApplicationStopped.Register(() => bus.Dispose()); return appBuilder; } } }
startup.css 文件中
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime) { ...... // easyNetQ app.UseSubscribe("ClientMessageService", Assembly.GetExecutingAssembly()); }
增加訂閱類:
using EasyNetQ.AutoSubscribe; using NETCORE.RabbitMQ.Messages; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace NETCORE.RabbitMQ.API.Subscriber.Consumer { public class ClientMessageConsumer : IConsumeAsync<ClientMessage> { [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.ZapQuestion")] public Task ConsumeAsync(ClientMessage message) { // Your business logic code here // eg.Generate one ZAP question records into database and send to client // Sample console code System.Console.ForegroundColor = System.ConsoleColor.Red; System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will generate one ZAP question list to client", message.ClientName); System.Console.ResetColor(); return Task.CompletedTask; } } }
啟動后,可查看效果

引用:https://www.cnblogs.com/edisonchou/p/aspnetcore_easynetq_basicdemo_foundation.html
