NETCORE - RabbitMQ的使用


NETCORE - RabbitMQ的使用

 NET Core微服務之基於EasyNetQ使用RabbitMQ消息隊列

 

安裝部署RabbbitMQ:https://www.cnblogs.com/1285026182YUAN/p/12896851.html 

NETCORE中的訂閱模式

 

 

一、控制台項目

創建三個項目:

  • NETCORE.RabbitMQ.Publisher:Console項目
  • NETCORE.RabbitMQ.Subscriber1:Console項目
  • NETCORE.RabbitMQ.Messages:類庫

 

 

 

 

 

 Publisher 與 Subscriber1 項目安裝 EasyNetQ 插件

NuGet>Install-Package EasyNetQ  

 

 

Messages 項目,新增一個類

    public class TextMessage
    {
        public string Text { get; set; }
    }

 

 

Publisher項目,添加以下代碼

using EasyNetQ;
using NETCORE.RabbitMQ.Messages;
using System;

namespace NETCORE.RabbitMQ.Publisher
{
    class Program
    {
        static void Main(string[] args)
        {
            var connStr = "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456";

            using (var bus = RabbitHutch.CreateBus(connStr))
            {
                var input = "";

                Console.WriteLine("I'm Publisher");
                Console.WriteLine("Please enter a message. 'Quit' to quit.");

                while ((input = Console.ReadLine()) != "Quit")
                {
                    bus.Publish(new TextMessage
                    {
                        Text = input
                    });
                }
            }
 

            Console.WriteLine("Hello World!");
        }
    }
}
View Code

 

 

可以看到,我們在其中使用EasyNetQ高度封裝的接口創建了一個IBus接口的實例,通過這個IBus實例我們可以通過一個超級Easy的Publish接口進行發布消息。這里主要是讀取用戶在控制台中輸入的消息字符串進行發送。實際中,發送的一般都是一個或多個復雜的實體對象。

 

Subscriber項目,添加以下代碼

using System;
using EasyNetQ;
using NETCORE.RabbitMQ.Messages;

namespace NETCORE.RabbitMQ.Subscriber1
{
    class Program
    {
        static void Main(string[] args)
        {
            var connStr = "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456";

            using (var bus = RabbitHutch.CreateBus(connStr))
            {
                bus.Subscribe<TextMessage>("my_test_subscriptionid", HandleTextMessage);


                Console.WriteLine("I'm Subscriber1");
                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
             
            Console.WriteLine("Hello World!");
        }

        public static void HandleTextMessage(TextMessage textMessage)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine("Got message: {0}", textMessage.Text);
            Console.ResetColor();
        }
    }
}
View Code

這里主要是通過IBus實例去訂閱消息(這里是除非用戶關閉程序否則一直處於監聽狀態),當發布者發布了指定類型的消息之后,這里就把它打印出來(紅色字體顯示)。

 

測試:

通過控制台信息查看結果:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

二、 WebApi項目

  創建上面提到的這幾個項目,這里我選擇ASP.NET Core WebAPI類型。

  分別為這三個項目通過NuGet安裝EasyNetQ組件

 

 

 

PM> Install-Package EasyNetQ

 

 

在messages 項目中 下面是這個demo用到的一個消息對象實體:通過標簽聲明隊列名稱。

using System;
using System.Collections.Generic;
using System.Text;
using EasyNetQ;

namespace NETCORE.RabbitMQ.Messages
{

    [Queue("Qka.Client", ExchangeName = "Qka.Client")]
    public class ClientMessage
    {
        public int ClientId { get; set; }
        public string ClientName { get; set; }
        public string Sex { get; set; }
        public int Age { get; set; }
        // N: Non-Smoker, S: Smoker
        public string SmokerCode { get; set; }
        // Bachelor, Master, Doctor
        public string Education { get; set; }
        public decimal YearIncome { get; set; }
    }
}

 此外,為了充分簡化代碼量,EasyNetQ提供了一個AutoSubscriber的方式,可以通過接口和標簽快速地讓一個類成為Consumer。詳細內容參考:https://github.com/EasyNetQ/EasyNetQ/wiki/Auto-Subscriber

  這里為了快速的在項目中使用Subscriber,添加一個擴展方法,它會從注入的服務中取出IBus實例對象,並自動幫我們進行Subscriber(那些實現了IConsume接口的類)的注冊。具體用法見后面的介紹。

 

Publisher項目:ClientService

  ClientService作為發布者,這里假設我們在API中處理完業務代碼后,將message發布給RabbitMQ:

 

appsettings.json 配置文件

{
  "Logging": {
    "LogLevel": {
      "Default": "Warning"
    }
  },
  "ConnectionStrings": {
    "RabbitMQ_Conn": "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456"
  },
  "AllowedHosts": "*"
}

 

 

 

Startup.cs 文件,通過以下代碼注入統一的IBus實例對象:

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);

            services.AddSingleton(RabbitHutch.CreateBus(Configuration.GetConnectionString("RabbitMQ_Conn")));
        }

 

創建控制器 ClientController 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using EasyNetQ;
using Microsoft.AspNetCore.Mvc;
using NETCORE.RabbitMQ.Messages;

namespace NETCORE.RabbitMQ.API.Publisher.Controllers
{
    [Produces("application/json")]
    [Route("api/Client")]
    public class ClientController : Controller
    { 
        private readonly IBus bus;

        public ClientController(  IBus _bus)
        { 
            bus = _bus;
        }
         
        [Route("GetVs")]
        [HttpGet]
        public async Task<string> GetVs()
        {
            // Business Logic here...
            // eg.Add new client to your service databases via EF
            // Sample Publish
            ClientMessage message = new ClientMessage
            {
                ClientId = 12,
                ClientName = "lihongyuan",
                Sex ="sss",
                Age = 29,
                SmokerCode = "N",
                Education = "Master",
                YearIncome = 100000
            };
            await bus.PublishAsync(message);

            return "Add Client Success! You will receive some letter later.";
        }
    }
}
View Code

 

當然,你可以使用同步方法:bus.Publish(message);

 

 

 

Subscriber項目:

  訂閱者接收消息。

 

appsettings.json 配置文件

{
  "Logging": {
    "LogLevel": {
      "Default": "Warning"
    }
  },
  "ConnectionStrings": {
    "RabbitMQ_Conn": "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456"
  },
  "AllowedHosts": "*"
}

 

 

 

Startup.cs 文件,通過以下代碼注入統一的IBus實例對象:

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);

            services.AddSingleton(RabbitHutch.CreateBus(Configuration.GetConnectionString("RabbitMQ_Conn")));
        }

 

創建 AppBuilderExtension.cs 類

using EasyNetQ;
using EasyNetQ.AutoSubscribe;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;

namespace NETCORE.RabbitMQ.API.Subscriber
{
    public static class AppBuilderExtension
    {
        public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly)
        {
            var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider;

            var lifeTime = services.GetService<IApplicationLifetime>();
            var bus = services.GetService<IBus>();
            lifeTime.ApplicationStarted.Register(() =>
            {
                var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
                subscriber.Subscribe(assembly);
                subscriber.SubscribeAsync(assembly);
            });

            lifeTime.ApplicationStopped.Register(() => bus.Dispose());

            return appBuilder;
        }
    }
}
View Code

 

 startup.css 文件中 

public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ......

        // easyNetQ
        app.UseSubscribe("ClientMessageService", Assembly.GetExecutingAssembly());
    }

 

 

增加訂閱類:

using EasyNetQ.AutoSubscribe;
using NETCORE.RabbitMQ.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace NETCORE.RabbitMQ.API.Subscriber.Consumer
{
    public class ClientMessageConsumer : IConsumeAsync<ClientMessage>
    {
        [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.ZapQuestion")]
        public Task ConsumeAsync(ClientMessage message)
        {
            // Your business logic code here
            // eg.Generate one ZAP question records into database and send to client
            // Sample console code
            System.Console.ForegroundColor = System.ConsoleColor.Red;
            System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will generate one ZAP question list to client", message.ClientName);
            System.Console.ResetColor();

            return Task.CompletedTask;
        }
    }
}
View Code

 

 

啟動后,可查看效果

 

 

 

引用:https://www.cnblogs.com/edisonchou/p/aspnetcore_easynetq_basicdemo_foundation.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM