.net core集成使用EasyNetQ來使用rabbitmq


  之前有寫到一篇介紹EasyNetQ的博文(C# .net 使用rabbitmq消息隊列——EasyNetQ插件介紹 ),所以本文從.net core的角度去繼承使用EasyNetQ,而用法類似於之前集成使用rabbitmq的博文:.net core使用rabbitmq消息隊列 (二)

  國際慣例,先上代碼,但是代碼比較多,所有又放gitee了:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/EasyNetQ

  

  消息發布(AspNetCore.WebApi.Producer)

  Demo中這個項目是消息的發布程序,在Startup中添加服務:    

  
    public void ConfigureServices(IServiceCollection services)
    {
        var connectionString = "host=192.168.209.133;virtualHost=/;username=admin;password=123456;timeout=60";
        string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
        ushort port = 5672;
        string userName = "admin";
        string password = "123456";
        string virtualHost = "/";
        var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

        #region 訂閱發布

        services.AddEasyNetQProducer("Publish", options =>
        {
            //options.ConnectionString = connectionString;
            options.Hosts = hosts;
            options.Port = port;
            options.Password = password;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.PersistentMessages = true;
            options.Priority = 1;
        });

        #endregion
        #region 請求響應

        services.AddEasyNetQProducer("Request", options =>
        {
            //options.ConnectionString = connectionString;
            options.Hosts = hosts;
            options.Port = port;
            options.Password = password;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.PersistentMessages = true;
            options.Priority = 3;
        });

        #endregion
        #region 發送接收

        services.AddEasyNetQProducer("Send", options =>
        {
            //options.ConnectionString = connectionString;
            options.Hosts = hosts;
            options.Port = port;
            options.Password = password;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Priority = 4;
            options.Queue = "send-recieve";
        });

        #endregion
        ......
    }
ConfigureServices

  添加相關服務使用AddEasyNetQProducer方法,可以指定一個名稱,在創建生產者時可以提供指定的名稱。熟悉EasyNetQ的朋友應該知道它提供三種消息模式:Publish/Subscribe, Request/Response和 Send/Receive,正是上面的三種申明方式。

  使用時,需要先注入IBusClientFactory對象,使用它的Create方法創建生產者對象,然后使用這個對象的方法操作消息(Publish方法、Request方法、Send方法分別對應上面的三種模式)。

  另外,EasyNetQ的消息都是一些自定的實體類,因此我們發送消息需要自定創建實體類,比如發布訂閱消息時創建的實體類Subscriber:  

    public class Subscriber
    {
        public string Message { get; set; }
    }

   使用時:  

    /// <summary>
    /// 發布訂閱模式
    /// </summary>
    /// <param name="message"></param>
    /// <returns></returns>
    [HttpGet("Publish")]
    public string Publish(string message)
    {
        message = message ?? "";
        var bus = busFactory.Create("Publish");
        bus.Publish(new Subscriber() { Message = message });

        return "success";
    }

  

  消息消費(AspNetCore.WebApi.Consumer)

   首先,在Startup中添加服務:  

  
    public void ConfigureServices(IServiceCollection services)
    {
        var connectionString = "host=192.168.209.133;virtualHost=/;username=admin;password=123456;timeout=60";
        string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
        ushort port = 5672;
        string userName = "admin";
        string password = "123456";
        string virtualHost = "/";
        var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

        #region 訂閱發布

        services.AddEasyNetQConsumer(options =>
        {
            //options.ConnectionString = connectionString;
            options.Hosts = hosts;
            options.Port = port;
            options.Password = password;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.AutoDelete = true;
            options.Durable = true;
            options.PrefetchCount = 1;
            options.Priority = 2;
        })
        .AddSubscriber("PubSub1",typeof(EasyNetQSubscriber))
        .AddSubscriber<Subscriber>("PubSub2", r =>
        {
            Console.WriteLine("PubSub:" + r.Message);
        });

        #endregion
        #region 請求響應

        services.AddEasyNetQConsumer(options =>
        {
            //options.ConnectionString = connectionString;
            options.Hosts = hosts;
            options.Port = port;
            options.Password = password;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Durable = true;
            options.PrefetchCount = 2;
        })
        .AddResponder(typeof(EasyNetQResponder))
        .AddResponder<Requester, Responder>(request =>
        {
            Console.WriteLine("Rpc:" + request.Data);
            return new Responder() { Result = "Rpc:" + request.Data };
        });

        #endregion
        #region 發送接收

        services.AddEasyNetQConsumer(options =>
        {
            //options.ConnectionString = connectionString;
            options.Hosts = hosts;
            options.Port = port;
            options.Password = password;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Priority = 5;
            options.PrefetchCount = 5;
            options.Exclusive = false;
            options.Arguments = arguments;
            options.Queue = "send-recieve";
        })
        .AddReceiver(typeof(EasyNetQReceiver<Reciever1>))
        .AddReceiver(typeof(EasyNetQReceiver<Reciever2>));
        //.AddReceiver<Reciever1>(r =>
        //{
        //    Console.WriteLine("Reciever1:" + r.Message);
        //})
        //.AddReceiver<Reciever2>(r =>
        //{
        //    Console.WriteLine("Reciever2:" + r.Message);
        //});

        #endregion

        ......
    }
ConfigureServices

  這里先使用AddEasyNetQConsumer方法獲得一個消費者建造者,然后使用它的AddSubscriber方法、AddResponder方法、AddReceiver方法添加消費消息的處理過程,當然這三個方法分別也是對應上面的三種模式。

  另外,這三個方法添加的消息處理程序可以使用Lambda表達式實現,也可以通過響應的接口實現,比如AddSubscriber方法添加的處理程序可通過實現了IEasyNetQSubscriber<T>接口的類來替代,比如Demo中的EasyNetQSubscriber:  

    public class EasyNetQSubscriber : IEasyNetQSubscriber<Subscriber>
    {
        public void Subscribe(Subscriber message)
        {
            Console.WriteLine("EasyNetQSubscriber:" + message.Message);
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM