十五、.net core(.NET 6)搭建RabbitMQ消息隊列生產者和消費者的簡單方法


 搭建RabbitMQ簡單通用的直連方法

 

如果還沒有MQ環境,可以參考上一篇的博客

https://www.cnblogs.com/weskynet/p/14877932.html

 

接下來開始.net core操作Rabbitmq有關的內容。我打算使用比較簡單的單機的direct直連模式,來演示一下有關操作,基本套路差不多。

首先,我在我的package包項目上面,添加對RabbitMQ.Client的引用:

 

 

Common文件夾下,新建類庫項目 Wsk.Core.RabbitMQ,並且引用package項目:

 

 

在啟動項目下的appsettings配置文件里面,新增一個訪問RabbitMQ的配置信息:

 

 

配置部分代碼:

"MQ": [
    {
      "Host": "127.0.0.1", // MQ安裝的實際服務器IP地址
      "Port": 5672, // 服務端口號
      "User": "wesky", // 用戶名
      "Password": "wesky123", // 密碼
      "ExchangeName": "WeskyExchange", // 設定一個Exchange名稱,
      "Durable": true // 是否啟用持久化
    }
  ]

 

然后,在實體類項目下,新建實體類MqConfigInfo,用於把讀取的配置信息賦值到該實體類下:

 

實體類代碼:

public class MqConfigInfo
    {
        public string Host { get; set; }
        public int Port { get; set; }
        public string User { get; set; }
        public string Password { get; set; }
        public string ExchangeName { get; set; }
        public bool Durable { get; set; }
    }
View Code

 

在剛剛新建的RabbitMQ類庫項目下面,引用該實體類庫項目,以及APppSettings項目。然后新建一個類,叫做ReadMqConfigHelper,以及它的interface接口,並且提供一個方法,叫ReadMqConfig,用來進行讀取配置信息使用:

讀取配置信息類代碼:

public class ReadMqConfigHelper:IReadMqConfigHelper
    {
        private readonly ILogger<ReadMqConfigHelper> _logger;
        public ReadMqConfigHelper(ILogger<ReadMqConfigHelper>  logger)
        {
            _logger = logger;
        }
        public List<MqConfigInfo> ReadMqConfig()
        {
            try
            {
                List<MqConfigInfo> config = AppHelper.ReadAppSettings<MqConfigInfo>(new string[] { "MQ" }); // 讀取MQ配置信息
                if (config.Any())
                {
                    return config;
                }
                _logger.LogError($"獲取MQ配置信息失敗:沒有可用數據集");
                return null;
            }
            catch (Exception ex)
            {
                _logger.LogError($"獲取MQ配置信息失敗:{ex.Message}");
                return null;
            }
        }
    }
View Code

 

接着,新建類MqConnectionHelper以及接口IMqConnectionHelper,用於做MQ連接、創建生產者和消費者等有關操作:

 

 

然后,新增一系列創建連接所需要的靜態變量:

 

 

然后,設置兩個消費者隊列,用來測試。以及添加生產者連接有關的配置和操作:

 

 

然后,創建消費者連接方法:

 

 

其中,StartListener下面提供了事件,用於手動確認消息接收。如果設置為自動,有可能導致消息丟失:

 

 

然后,添加消息發布方法:

 

 

interface接口里面,添加有關的接口,用於等下依賴注入使用:

 

 

連接類部分的代碼:

  public class MqConnectionHelper:IMqConnectionHelper
    {

        private readonly ILogger<MqConnectionHelper> _logger;
        public MqConnectionHelper(ILogger<MqConnectionHelper> logger)
        {
            _logger = logger;

            _connectionReceiveFactory = new IConnectionFactory[_costomerCount];
            _connectionReceive = new IConnection[_costomerCount];
            _modelReceive = new IModel[_costomerCount];
            _basicConsumer = new EventingBasicConsumer[_costomerCount];

        }


        /*
         備注:使用數組的部分,是給消費端用的。目前生產者只設置了一個,消費者可能存在多個。
                     當然,有條件的還可以上RabbitMQ集群進行處理,會更好玩一點。
         */
        private static IConnectionFactory _connectionSendFactory;  //RabbitMQ工廠 發送端
        private static IConnectionFactory[] _connectionReceiveFactory; //RabbitMQ工廠 接收端  

        private static IConnection _connectionSend; //連接 發送端
        private static IConnection[] _connectionReceive; //連接 消費端

        public static List<MqConfigInfo> _mqConfig; // 配置信息

        private static IModel _modelSend;  //通道  發送端
        private static IModel[] _modelReceive; //通道  消費端

        private static EventingBasicConsumer[] _basicConsumer;  // 事件

        /* 設置兩個routingKey 和 隊列名稱,用來做測試使用*/
        public static int _costomerCount = 2;
        public static string[] _routingKey = new string[] {"WeskyNet001","WeskyNet002" };
        public static string[] _queueName = new string[] { "Queue001", "Queue002" };

        /// <summary>
        /// 生產者初始化連接配置
        /// </summary>
        public void SendFactoryConnectionInit()
        {
            _connectionSendFactory = new ConnectionFactory
            {
                HostName = _mqConfig.FirstOrDefault().Host,
                Port = _mqConfig.FirstOrDefault().Port,
                UserName = _mqConfig.FirstOrDefault().User,
                Password = _mqConfig.FirstOrDefault().Password
            };
        }

        /// <summary>
        /// 生產者連接
        /// </summary>
        public void SendFactoryConnection()
        {

            if (null != _connectionSend && _connectionSend.IsOpen)
            {
                return; // 已有連接
            }
            _connectionSend = _connectionSendFactory.CreateConnection(); // 創建生產者連接

            if (null != _modelSend && _modelSend.IsOpen)
            {
                return; // 已有通道
            }
            _modelSend = _connectionSend.CreateModel(); // 創建生產者通道

            _modelSend.ExchangeDeclare(_mqConfig.FirstOrDefault().ExchangeName, ExchangeType.Direct); // 定義交換機名稱和類型(direct)

        }

        /// <summary>
        /// 消費者初始化連接配置
        /// </summary>
        public void ReceiveFactoryConnectionInit()
        {
            var factories = new ConnectionFactory
            {
                HostName = _mqConfig.FirstOrDefault().Host,
                Port = _mqConfig.FirstOrDefault().Port,
                UserName = _mqConfig.FirstOrDefault().User,
                Password = _mqConfig.FirstOrDefault().Password
            };

            for (int i = 0; i < _costomerCount; i++)
            {
                _connectionReceiveFactory[i] = factories;  // 給每個消費者綁定一個連接工廠
            }
        }

        /// <summary>
        /// 消費者連接
        /// </summary>
        /// <param name="consumeIndex"></param>
        /// <param name="exchangeName"></param>
        /// <param name="routeKey"></param>
        /// <param name="queueName"></param>
        public void ConnectionReceive(int consumeIndex, string exchangeName, string routeKey, string queueName)
        {
            _logger.LogInformation($"開始連接RabbitMQ消費者:{routeKey}");

            if (null != _connectionReceive[consumeIndex] && _connectionReceive[consumeIndex].IsOpen)
            {
                return;
            }
            _connectionReceive[consumeIndex] = _connectionReceiveFactory[consumeIndex].CreateConnection(); // 創建消費者連接

            if (null != _modelReceive[consumeIndex] && _modelReceive[consumeIndex].IsOpen)
            {
                return;
            }
            _modelReceive[consumeIndex] = _connectionReceive[consumeIndex].CreateModel();  // 創建消費者通道

            _basicConsumer[consumeIndex] = new EventingBasicConsumer(_modelReceive[consumeIndex]);

            _modelReceive[consumeIndex].ExchangeDeclare(exchangeName, ExchangeType.Direct); // 定義交換機名稱和類型  與生產者保持一致

            _modelReceive[consumeIndex].QueueDeclare(
                         queue: queueName, //消息隊列名稱
                         durable: _mqConfig.FirstOrDefault().Durable, // 是否可持久化,此處配置在文件中,默認全局持久化(true),也可以自定義更改
                         exclusive: false,
                         autoDelete: false,
                         arguments: null
           );  // 定義消費者隊列

            
            _modelReceive[consumeIndex].QueueBind(queueName, exchangeName, routeKey); // 隊列綁定給指定的交換機

            _modelReceive[consumeIndex].BasicQos(0, 1, false); // 設置消費者每次只接收一條消息

            StartListener((model, ea) =>
            {
                byte[] message = ea.Body.ToArray(); // 接收到的消息

                string msg = Encoding.UTF8.GetString(message);

                _logger.LogInformation($"隊列{queueName}接收到消息:{msg}");
                Thread.Sleep(2000);

                _modelReceive[consumeIndex].BasicAck(ea.DeliveryTag, true);
            }, queueName, consumeIndex);

        }

        /// <summary>
        /// 消費者接收消息的確認機制
        /// </summary>
        /// <param name="basicDeliverEventArgs"></param>
        /// <param name="queueName"></param>
        /// <param name="consumeIndex"></param>
        private static void StartListener(EventHandler<BasicDeliverEventArgs> basicDeliverEventArgs, string queueName, int consumeIndex)
        {
            _basicConsumer[consumeIndex].Received += basicDeliverEventArgs;
            _modelReceive[consumeIndex].BasicConsume(queue: queueName, autoAck: false, consumer: _basicConsumer[consumeIndex]); // 設置手動確認。

        }

        /// <summary>
        /// 消息發布
        /// </summary>
        /// <param name="message"></param>
        /// <param name="exchangeName"></param>
        /// <param name="routingKey"></param>
        public static void PublishExchange(string message, string exchangeName, string routingKey = "")
        {
            byte[] body = Encoding.UTF8.GetBytes(message);
            _modelSend.BasicPublish(exchangeName, routingKey, null, body);
        }


    }
View Code

 

現在,我把整個Wsk.Core.RabbitMQ項目進行添加到依賴注入:

 

 

然后,在啟動項目里面的初始化服務里面,添加對MQ連接的初始化以及連接,並且發送兩條消息進行測試:

 

 

 

啟用程序,提示發送成功:

 

 

打開RabbitMQ頁面客戶端,可以看見新增了一個交換機WeskyExchange

 

 

點進去可以看見對應的流量走勢:

 

 

 

 

關閉程序,現在添加消費者的初始化和連接,然后重新發送:

 

 

可見發送消息成功,並且消費者也成功接收到了消息。打開客戶端查看一下:

 

 

WeskyExchange交換機下,多了兩個隊列,以及隊列歸屬的RoutingKey分別是WeskyNet001WeskyNet002。以及在Queue目錄下,多了兩個隊列的監控信息:

 

為了看出點效果,我們批量發消息試一下:

 

 

然后啟動項目,我們看一下監控效果。先是交換機頁面的監控:

 

 

然后是隊列1的監控:

 

 

現在換一種寫法,在消費者那邊加個延遲:

 

 

並且生產者的延遲解除:

 

 

再啟動一下看看效果:

 

 

會發現隊列消息被堵塞,必須在執行完成以后,才可以解鎖。而且生產者這邊並不需要等待,可以看見消息一次性全發出去了,可以繼續執行后續操作:

 

 

以上就是關於使用Direct模式進行RabbitMQ收發消息的內容,發送消息可以在其他類里面或者方法里面,直接通過靜態方法進行發送;接收消息,啟動了監聽,就可以一直存活。如果有興趣,也可以自己嘗試FanoutTopic等不同的模式進行測試,以及可以根據不同的機器,進行配置成收發到不同服務器上面進行通信。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM