.net core使用rabbitmq消息隊列 (二)


  之前有寫過.net core集成使用rabbitmq的博文,見.net core使用rabbitmq消息隊列,但是里面的使用很簡單,而且還有幾個bug,想改下,但是后來想了想,還是算了,之前使用的是.net core 2.x,還是重新整理一遍吧!

  由於代碼比較多,我把代碼傳到gitee上了,地址見:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/Rabbitmq

  這是一個Demo項目,介紹.net core集成使用rabbitmq消息隊列,使用的.net core 3.1,這里簡單介紹:

  生產者(AspNetCore.WebApi.Producer)

  在Startup的ConfigureServices方法中添加相關rabbitmq的服務:  

  
    public void ConfigureServices(IServiceCollection services)
    {
        string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
        int port = 5672;
        string userName = "admin";
        string password = "123456";
        string virtualHost = "/";
        var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

        #region 日志記錄

        services.AddLogging(builder =>
        {
            builder.SetMinimumLevel(LogLevel.Trace);
        });
        services.AddRabbitLogger(options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            options.Category = "Home";
            options.MinLevel = LogLevel.Trace;
            options.ApplicationName = "AspNetCore.WebApi.Producer";

            //隊列模式
            options.Queue = "queue.logger";

            //交換機模式
            //options.Exchange = "exchange.logger"; 
            //options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.logger", Route = "#" } };
            //options.Type = RabbitExchangeType.Topic;
        });

        #endregion

        #region 普通模式

        services.AddRabbitProducer("SimplePattern", options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            options.InitializeCount = 3;
            options.Queues = new string[] { "queue.simple" };
        });

        #endregion

        #region 工作模式

        services.AddRabbitProducer("WorkerPattern", options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            options.InitializeCount = 3;
            options.Queues = new string[] { "queue.worker" };
        });

        #endregion

        #region 發布訂閱模式 

        services.AddRabbitProducer("FanoutPattern", options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            options.InitializeCount = 3;
            options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.fanout1" }, new RouteQueue() { Queue = "queue.fanout2" } };
            options.Type = RabbitExchangeType.Fanout;
            options.Exchange = "exchange.fanout";
        });

        #endregion

        #region 路由模式 

        services.AddRabbitProducer("DirectPattern", options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            options.InitializeCount = 5;
            options.Exchange = "exchange.direct";
            options.Type = RabbitExchangeType.Direct;
            options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.direct1", Route = "direct1" }, new RouteQueue() { Queue = "queue.direct2", Route = "direct2" } };
        });

        #endregion

        #region 主題模式

        services.AddRabbitProducer("TopicPattern", options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            options.InitializeCount = 5;
            options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.topic1", Route = "topic1.#" }, new RouteQueue() { Queue = "queue.topic2", Route = "topic2.#" } };
            options.Type = RabbitExchangeType.Topic;
            options.Exchange = "exchange.topic";
        });

        #endregion

        ......
    }
ConfigureServices

  里面介紹了6中集成方式:

  使用AddRabbitLogger方法添加日志相關的服務,需要注意的是,數據是以json格式發送到rabbitmq中去的,具體可以參考RabbitLoggerMessage<T>類,最好是自己發布測試就可以了,當然讀者可以安裝自己的需求修改RabbitLogger類中的發布邏輯。

  使用AddRabbitProducer方法添加一個發布者,可以指定名稱,這個名稱是獲取發布者對象時使用。這個方法添加的發布者可以滿足rabbitmq的五種使用方式(普通模式,工作模式,發布訂閱模式,路由模式,主題模式),具體由RabbitProducerOptions配置指定。

  服務配置好,具體使用可以參考HomeController,日志記錄可以注入ILogger<T>對象,或者注入ILoggerFactory對象,然后獲取ILogger<T>對象,直接使用ILogger<T>對象的方法就是發布消息了:  

    /// <summary>
    /// 日志
    /// </summary>
    /// <param name="message"></param>
    /// <returns></returns>
    [HttpGet]
    public string Get(string message)
    {
        logger.LogTrace($"Trace:{message}");
        logger.LogDebug($"Debug:{message}");
        logger.LogInformation($"Information:{message}");
        logger.LogWarning($"Warning:{message}");
        logger.LogError($"Error:{message}");
        logger.LogCritical($"Critical:{message}");

        return "success";

  至於另外五種模式,我們需要注入IRabbitProducerFactory對象,然后使用Create方法創建指定名稱的發布者,然后調用Publish或者PublishAsync方法發布消息,而且他們都有幾個重載。

  需要注意的是,不同類型的生產者應該使用不同的Publish或者PublishAsync方法,比如普通模式和工作模式,因為他們沒有路由參數,因此需要使用無路由參數的Publish方法,如:  

    /// <summary>
    /// Simple
    /// </summary>
    /// <returns></returns>
    [HttpGet("Simple")]
    public string Simple(string message = "Simple")
    {
        var producer = rabbitProducerFactory.Create("SimplePattern");
        producer.Publish(message);

        return "success";
    }
    /// <summary>
    /// Worker
    /// </summary>
    /// <param name="message"></param>
    /// <returns></returns>
    [HttpGet("Worker")]
    public string Worker(string message = "Worker")
    {
        var producer = rabbitProducerFactory.Create("WorkerPattern");
        int count = 10;
        while (count-- > 0)
        {
            producer.Publish(message);
        }

        return "success";
    }

  而發布訂閱模式、路由模式、主題模式都是有路由的(發布訂閱模式的路由可以認為是空值),因此需要使用帶路由參數的Publish方法:  

    /// <summary>
    /// Direct
    /// </summary>
    /// <param name="route"></param>
    /// <param name="message"></param>
    /// <returns></returns>
    [HttpGet("Direct")]
    public string Direct(string route = "direct1", string message = "Direct")
    {
        var producer = rabbitProducerFactory.Create("DirectPattern");
        producer.Publish(route, message);

        return "success";
    }
    /// <summary>
    /// Fanout
    /// </summary>
    /// <param name="message"></param>
    /// <returns></returns>
    [HttpGet("Fanout")]
    public string Fanout(string message = "Fanout")
    {
        var producer = rabbitProducerFactory.Create("FanoutPattern");
        producer.Publish("", message);//fanout模式路由為空值

        return "success";
    }
    /// <summary>
    /// Topic
    /// </summary>
    /// <param name="route"></param>
    /// <param name="message"></param>
    /// <returns></returns>
    [HttpGet("Topic")]
    public string Topic(string route = "topic1.a", string message = "Topic")
    {
        var producer = rabbitProducerFactory.Create("TopicPattern");
        producer.Publish(route, message);

        return "success";
    }

 

  消費者(AspNetCore.WebApi.Consumer)

  生產者和消費者不在同一個項目中,同樣的,需要先在Startup的ConfigureServices方法中添加服務:  

  
    public void ConfigureServices(IServiceCollection services)
    {
        string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
        int port = 5672;
        string userName = "admin";
        string password = "123456";
        string virtualHost = "/";
        var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

        #region 日志記錄

        services.AddRabbitConsumer(options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            options.AutoAck = true;

            //options.FetchCount = 10;
            //options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.logger", Route = "#" } };//交換機模式
            //options.Type = RabbitExchangeType.Topic;//交換機模式
        })

        //.AddListener("queue.logger", result =>
        //{
        //    Console.WriteLine("Message From queue.logger:" + result.Body);
        //});

        .AddListener<RabbitConsumerListener>("queue.logger");

        //.AddListener("exchange.logger", "queue.logger", result =>
        //{
        //    Console.WriteLine("Message From queue.logger:" + result.Body);
        //});//交換機模式

        #endregion

        #region 普通模式

        services.AddRabbitConsumer(options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            //options.FetchCount = 1;
            options.AutoAck = false;
        }).AddListener("queue.simple", result =>
        {
            Console.WriteLine("Message From queue.simple:" + result.Body);
            result.Commit();
            //result.RollBack();//回滾,參數表示是否重新消費
        });
        #endregion

        #region 工作模式

        services.AddRabbitConsumer(options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            options.FetchCount = 2;
            options.AutoAck = false;
        }).AddListener("queue.worker", result =>
        {
            Console.WriteLine("Message From queue.worker1:" + result.Body);
            result.Commit();
            //result.RollBack();//回滾,參數表示是否重新消費
        }).AddListener("queue.worker", result =>
        {
            Console.WriteLine("Message From queue.worker2:" + result.Body);
            result.Commit();
            //result.RollBack();//回滾,參數表示是否重新消費
        });
        #endregion

        #region 發布訂閱模式 

        services.AddRabbitConsumer(options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            options.FetchCount = 2;
            options.AutoAck = false;
            options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.fanout1" }, new RouteQueue() { Queue = "queue.fanout2" } };
            options.Type = RabbitExchangeType.Fanout;
        }).AddListener("exchange.fanout", "queue.fanout1", result =>
        {
            Console.WriteLine("Message From queue.fanout1:" + result.Body);
            result.Commit();
            //result.RollBack();//回滾,參數表示是否重新消費
        }).AddListener("exchange.fanout", "queue.fanout2", result =>
        {
            Console.WriteLine("Message From queue.fanout2:" + result.Body);
            result.Commit();
            //result.RollBack();//回滾,參數表示是否重新消費
        });

        #endregion

        #region 路由模式 

        services.AddRabbitConsumer(options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            options.AutoAck = false;
            options.FetchCount = 2;
            options.Type = RabbitExchangeType.Direct;
            options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.direct1", Route = "direct1" }, new RouteQueue() { Queue = "queue.direct2", Route = "direct2" } };
        }).AddListener("exchange.direct", "queue.direct1", result =>
        {
            Console.WriteLine("Message From queue.direct1:" + result.Body);
            result.Commit();
            //result.RollBack();//回滾,參數表示是否重新消費
        }).AddListener("exchange.direct", "queue.direct2", result =>
        {
            Console.WriteLine("Message From queue.direct2:" + result.Body);
            result.Commit();
            //result.RollBack();//回滾,參數表示是否重新消費
        });

        #endregion

        #region 主題模式

        services.AddRabbitConsumer(options =>
        {
            options.Hosts = hosts;
            options.Password = password;
            options.Port = port;
            options.UserName = userName;
            options.VirtualHost = virtualHost;

            options.Arguments = arguments;
            options.Durable = true;
            options.AutoDelete = true;

            options.FetchCount = 2;
            options.AutoAck = false;
            options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.topic1", Route = "topic1.#" }, new RouteQueue() { Queue = "queue.topic2", Route = "topic2.#" } };
            options.Type = RabbitExchangeType.Topic;
        }).AddListener("exchange.topic", "queue.topic1", result =>
        {
            Console.WriteLine("Message From queue.topic1:" + result.Body);
            result.Commit();
            //result.RollBack();//回滾,參數表示是否重新消費
        }).AddListener("exchange.topic", "queue.topic2", result =>
        {
            Console.WriteLine("Message From queue.topic2:" + result.Body);
            result.Commit();
            //result.RollBack();//回滾,參數表示是否重新消費
        });

        #endregion

        ......
    }
ConfigureServices

  無論是日志的消費,還是其他五種模式的消費,都是先使用AddRabbitConsumer方法獲取到一個IRabbitConsumerBuilder消費者構造對象,然后它的通過AddListener方法添加消息處理程序。

  同樣的,需要注意的是,普通模式和工作模式是不基於交換機的策略模式,因此需要使用不包含交換機參數的AddListener方法:  

    #region 普通模式

    services.AddRabbitConsumer(options =>
    {
        options.Hosts = hosts;
        options.Password = password;
        options.Port = port;
        options.UserName = userName;
        options.VirtualHost = virtualHost;

        options.Arguments = arguments;
        options.Durable = true;
        options.AutoDelete = true;

        //options.FetchCount = 1;
        options.AutoAck = false;
    }).AddListener("queue.simple", result =>
    {
        Console.WriteLine("Message From queue.simple:" + result.Body);
        result.Commit();
        //result.RollBack();//回滾,參數表示是否重新消費
    });
    #endregion

    #region 工作模式

    services.AddRabbitConsumer(options =>
    {
        options.Hosts = hosts;
        options.Password = password;
        options.Port = port;
        options.UserName = userName;
        options.VirtualHost = virtualHost;

        options.Arguments = arguments;
        options.Durable = true;
        options.AutoDelete = true;

        options.FetchCount = 2;
        options.AutoAck = false;
    }).AddListener("queue.worker", result =>
    {
        Console.WriteLine("Message From queue.worker1:" + result.Body);
        result.Commit();
        //result.RollBack();//回滾,參數表示是否重新消費
    }).AddListener("queue.worker", result =>
    {
        Console.WriteLine("Message From queue.worker2:" + result.Body);
        result.Commit();
        //result.RollBack();//回滾,參數表示是否重新消費
    });
    #endregion

  而發布訂閱模式、路由模式和主題模式都是基於交換機的策略模式,因此使用需要交換機參數的AddListener方法:  

    #region 發布訂閱模式 

    services.AddRabbitConsumer(options =>
    {
        options.Hosts = hosts;
        options.Password = password;
        options.Port = port;
        options.UserName = userName;
        options.VirtualHost = virtualHost;

        options.Arguments = arguments;
        options.Durable = true;
        options.AutoDelete = true;

        options.FetchCount = 2;
        options.AutoAck = false;
        options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.fanout1" }, new RouteQueue() { Queue = "queue.fanout2" } };
        options.Type = RabbitExchangeType.Fanout;
    }).AddListener("exchange.fanout", "queue.fanout1", result =>
    {
        Console.WriteLine("Message From queue.fanout1:" + result.Body);
        result.Commit();
        //result.RollBack();//回滾,參數表示是否重新消費
    }).AddListener("exchange.fanout", "queue.fanout2", result =>
    {
        Console.WriteLine("Message From queue.fanout2:" + result.Body);
        result.Commit();
        //result.RollBack();//回滾,參數表示是否重新消費
    });

    #endregion

    #region 路由模式 

    services.AddRabbitConsumer(options =>
    {
        options.Hosts = hosts;
        options.Password = password;
        options.Port = port;
        options.UserName = userName;
        options.VirtualHost = virtualHost;

        options.Arguments = arguments;
        options.Durable = true;
        options.AutoDelete = true;

        options.AutoAck = false;
        options.FetchCount = 2;
        options.Type = RabbitExchangeType.Direct;
        options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.direct1", Route = "direct1" }, new RouteQueue() { Queue = "queue.direct2", Route = "direct2" } };
    }).AddListener("exchange.direct", "queue.direct1", result =>
    {
        Console.WriteLine("Message From queue.direct1:" + result.Body);
        result.Commit();
        //result.RollBack();//回滾,參數表示是否重新消費
    }).AddListener("exchange.direct", "queue.direct2", result =>
    {
        Console.WriteLine("Message From queue.direct2:" + result.Body);
        result.Commit();
        //result.RollBack();//回滾,參數表示是否重新消費
    });

    #endregion

    #region 主題模式

    services.AddRabbitConsumer(options =>
    {
        options.Hosts = hosts;
        options.Password = password;
        options.Port = port;
        options.UserName = userName;
        options.VirtualHost = virtualHost;

        options.Arguments = arguments;
        options.Durable = true;
        options.AutoDelete = true;

        options.FetchCount = 2;
        options.AutoAck = false;
        options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.topic1", Route = "topic1.#" }, new RouteQueue() { Queue = "queue.topic2", Route = "topic2.#" } };
        options.Type = RabbitExchangeType.Topic;
    }).AddListener("exchange.topic", "queue.topic1", result =>
    {
        Console.WriteLine("Message From queue.topic1:" + result.Body);
        result.Commit();
        //result.RollBack();//回滾,參數表示是否重新消費
    }).AddListener("exchange.topic", "queue.topic2", result =>
    {
        Console.WriteLine("Message From queue.topic2:" + result.Body);
        result.Commit();
        //result.RollBack();//回滾,參數表示是否重新消費
    });

    #endregion

  另外,AddListener中的消息處理委托可以使用一個實現了IRabbitConsumerListener接口的類代替,如Demo中的RabbitConsumerListener:  

    public class RabbitConsumerListener : IRabbitConsumerListener
    {
        public Task ConsumeAsync(RecieveResult recieveResult)
        {
            Console.WriteLine("RabbitConsumerListener:" + recieveResult.Body);
            recieveResult.Commit();
            //result.RollBack();//回滾,參數表示是否重新消費
            return Task.CompletedTask;
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM