RabbitMQ消息隊列隨筆


本文權當各位看官對RabbitMQ的基本概念以及使用場景有了一定的了解,如果你還對它所知甚少或者只是停留在僅僅是聽說過,建議你先看看這篇文章,在對RabbitMQ有了基本認識后,我們正式開啟我們的RabbitMQ之旅吧,希望本文能夠幫助大家在實際用到消息隊列時有所幫助,如有表述的不當之處,還望各位看官指正。

一、消息隊列的安裝

1、 RabbitMQ是用Erlang編程語言進行開發,所以首先得在Erlang官網下載Erlang運行的環境,如圖,選擇所需對應文件進行下載,並進行安裝:

2、 設置環境變量,如圖

3、 去RabbitMQ官網下載對應操作系統的安裝文件,並進行安裝如圖:

4、 以管理員方式打開cmd,定位到RabbitMQ安裝目錄sbin文件夾下,依次執行以下命令

(1)     rabbitmq-service install

(2)     rabbitmq-service enable

(3)     rabbitmq-service start

如圖:

到這里我們的RabbitMQ服務已經安裝好了,利用Windows + R鍵,輸入services.msc查看,RabbitMQ服務已經處於運行的狀態了。

5、 到這里不要以為我們的工作就結束了,接下來我們為RabbitMQ設置用戶以及密碼

在cmd中執行 rabbitmqctl list_users 查看RabbitMQ已存在的用戶,如圖

這里發現有兩個賬號 rabbit 是我之前添加的,guest 是RabbitMQ自帶的,

執行以下命令,添加RabbitMQ用戶,並設置相應權限:

rabbitmqctl add_user bestadmin 123456

rabbitmqctl set_permissions  bestadmin ".*"  ".*"  ".*"

rabbitmqctl set_user_tags bestadmin administrator

如圖:

 

 

6、 RabbitMQ有一個可視化界面,進行消息的管理,不過需要用命名rabbitmq-plugins enable rabbitmq_management 命令進行啟動,接下來我們便可以在瀏覽器輸入127.0.0.1:15672中進行查看,如圖:

輸入我們剛設置的用戶名bestuser 密碼123,如圖:

一、消息隊列的使用

我們知道RabbitMQ的Exchange常用交換器類型分為fanout、direct、topic、headers 4種類型,這里我們將對fanout、direct、topic 3種類型以實際代碼的形式進行講解,至於關於交換器對各類型的具體講解,請參照文章開始給出的鏈接進行了解,這里就不再贅述,我們新建了如下圖的解決方案:

1、RabbitMQHelper 幫助類,對常用的消息入隊以及消費消息進行了簡單的封裝:

 /// <summary>
    /// RabbitMQHelper
    /// </summary>
    public class RabbitMQHelper
    {
        private static ConnectionFactory _connectionFactory = null;
        private static readonly JsonSerializerSettings _jsonSettings = new JsonSerializerSettings { Formatting = Formatting.None, NullValueHandling = NullValueHandling.Ignore };

        /// <summary>
        /// 構造函數
        /// </summary>
        static RabbitMQHelper()
        {
            _connectionFactory = new ConnectionFactory();
            _connectionFactory.HostName = ConfigurationManager.AppSettings["HostName"].ToString();
            _connectionFactory.UserName = ConfigurationManager.AppSettings["UserName"].ToString();
            _connectionFactory.Password = ConfigurationManager.AppSettings["Password"].ToString();
            _connectionFactory.AutomaticRecoveryEnabled = true;
        }

        #region 單消息入隊
        /// <summary>
        /// 單消息入隊
        /// </summary>
        /// <param name="exchangeName">交換器名稱</param>
        /// <param name="exchangeType">交換器類型</param>
        /// <param name="routingKey">路由關鍵字</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="message">消息實例</param>
        /// <param name="arguments">消息的參數信息(如: 隊列過期時間:x-expires;消息過期時間:x-message-ttl;過期消息轉向路由:x-dead-letter-exchange;過期消息轉向路由:x-dead-letter-routing-key 等。)</param>
        public static void Enqueue<TItem>(string exchangeName, string exchangeType, string routingKey, string queueName, TItem message, IDictionary<string, object> arguments = null)
        {
            if (message != null)
            {
                using (IConnection connection = _connectionFactory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType, true, false, arguments);
                        channel.QueueDeclare(queueName, true, false, false, arguments);
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        var properties = channel.CreateBasicProperties();
                        properties.Persistent = true; //使消息持久化
                        properties.ContentType = "application/json";
                        string messageString = JsonConvert.SerializeObject(message, _jsonSettings);
                        byte[] body = Encoding.UTF8.GetBytes(messageString);
                        channel.BasicPublish(exchangeName, routingKey, properties, body);
                    }
                }
            }
        }
        #endregion

        #region 單消息入隊(字符串入隊)
        /// <summary>
        /// 單消息入隊(字符串入隊)
        /// </summary>
        /// <param name="exchangeName">交換器名稱</param>
        /// <param name="exchangeType">交換器類型</param>
        /// <param name="routingKey">路由關鍵字</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="message">消息實例</param>
        /// <param name="arguments">消息的參數信息(如: 隊列過期時間:x-expires;消息過期時間:x-message-ttl;過期消息轉向路由:x-dead-letter-exchange;過期消息轉向路由:x-dead-letter-routing-key 等。)</param>
        public static void Enqueue(string exchangeName, string exchangeType, string routingKey, string queueName, string message, IDictionary<string, object> arguments = null)
        {
            if (!string.IsNullOrWhiteSpace(message))
            {
                using (IConnection connection = _connectionFactory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType, true, false, arguments);
                        channel.QueueDeclare(queueName, true, false, false, arguments);
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        var properties = channel.CreateBasicProperties();
                        properties.Persistent = true; //使消息持久化
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey, properties, body);
                    }
                }
            }
        }
        #endregion

        #region 消息批量入隊
        /// <summary>
        /// 消息批量入隊
        /// </summary>
        /// <param name="exchangeName">交換器名稱</param>
        /// <param name="exchangeType">交換器類型</param>
        /// <param name="routingKey">路由關鍵字</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="list">消息集合</param>
        /// <param name="arguments">消息的參數信息(如: 隊列過期時間:x-expires;消息過期時間:x-message-ttl;過期消息轉向路由:x-dead-letter-exchange;過期消息轉向路由:x-dead-letter-routing-key 等。)</param>
        public static void Enqueue<TItem>(string exchangeName, string exchangeType, string routingKey, string queueName, List<TItem> list, IDictionary<string, object> arguments = null)
        {
            if (list != null && list.Count > 0)
            {
                using (IConnection connection = _connectionFactory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        foreach (TItem item in list)
                        {
                            if (item != null)
                            {
                                channel.ExchangeDeclare(exchangeName, exchangeType, true, false, arguments);
                                channel.QueueDeclare(queueName, true, false, false, arguments);
                                channel.QueueBind(queueName, exchangeName, routingKey);
                                string messageString = JsonConvert.SerializeObject(item, _jsonSettings);
                                byte[] body = Encoding.UTF8.GetBytes(messageString);
                                var properties = channel.CreateBasicProperties();//使消息持久化
                                properties.ContentType = "application/json";
                                properties.Persistent = true;
                                channel.BasicPublish(exchangeName, routingKey, properties, body);
                            }
                        }
                    }
                }
            }
        }
        #endregion

        #region 消費消息隊列(舊的方式)
        /// <summary>
        /// 消費消息隊列
        /// </summary>
        /// <typeparam name="TItem">消息對象</typeparam>
        /// <param name="exchangeName">交換器名稱</param>
        /// <param name="exchangeType">交換器類型</param>
        /// <param name="routingKey">路由關鍵字</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="func">消費消息的具體操作</param>
        /// <param name="failFunc">消費消息失敗的具體操作</param>
        /// <param name="tryTimes">消費失敗后,繼續嘗試消費的次數</param>
        /// <param name="arguments">消息的參數信息(如: 隊列過期時間:x-expires;消息過期時間:x-message-ttl;過期消息轉向路由:x-dead-letter-exchange;過期消息轉向路由:x-dead-letter-routing-key 等。)</param>
        /// <param name="isAgain">是否重新入隊</param>
        public static void Consume<TItem>(string exchangeName, string exchangeType, string routingKey, string queueName, Func<TItem, bool> func, Func<TItem, bool> failFunc = null,
            int tryTimes = 5, IDictionary<string, object> arguments = null, bool isAgain = false)
        {
            try
            {
                int consumeCount = 0;//嘗試消費次數
                bool isConsumeSuccess;//是否消費成功
                using (IConnection connection = _connectionFactory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType, true, false, arguments);
                        channel.QueueDeclare(queueName, true, false, false, arguments);
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                        channel.BasicConsume(queueName, false, consumer);
                        while (true)
                        {
                            var ea = consumer.Queue.Dequeue();
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            TItem queueMessage = JsonConvert.DeserializeObject<TItem>(message, _jsonSettings);
                            if (queueMessage != null)
                            {
                                consumeCount = 0;
                                while (true)
                                {
                                    consumeCount++;
                                    isConsumeSuccess = func(queueMessage);
                                    if (isConsumeSuccess || consumeCount >= tryTimes)
                                    {
                                        channel.BasicAck(ea.DeliveryTag, false);//將隊列里面的消息進行釋放
                                        if (!isConsumeSuccess && failFunc != null)
                                        {
                                            failFunc(queueMessage);//消費消息失敗的具體操作
                                        }
                                        #region 消息處理失敗后重新入隊
                                        if (!isConsumeSuccess && isAgain)
                                        {
                                            var properties = channel.CreateBasicProperties();
                                            properties.Persistent = true;
                                            channel.BasicPublish(exchangeName, routingKey, properties, body);
                                        }
                                        #endregion
                                        break;
                                    }
                                }
                            }
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
        #endregion

        #region 消費消息隊列(舊的方式)
        /// <summary>
        /// 消費消息隊列
        /// </summary>
        /// <param name="exchangeName">交換器名稱</param>
        /// <param name="exchangeType">交換器類型</param>
        /// <param name="routingKey">路由關鍵字</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="func">消費消息的具體操作</param>
        /// <param name="failFunc">消費消息失敗的具體操作</param>
        /// <param name="tryTimes">消費失敗后,繼續嘗試消費的次數</param>
        /// <param name="arguments">消息的參數信息(如: 隊列過期時間:x-expires;消息過期時間:x-message-ttl;過期消息轉向路由:x-dead-letter-exchange;過期消息轉向路由:x-dead-letter-routing-key 等。)</param>
        /// <param name="isAgain">是否重新入隊</param>
        public static void ConsumeString(string exchangeName, string exchangeType, string routingKey, string queueName, Func<string, bool> func, Func<string, bool> failFunc = null,
            int tryTimes = 5, IDictionary<string, object> arguments = null, bool isAgain = false)
        {
            try
            {
                int consumeCount = 0;//嘗試消費次數
                using (IConnection connection = _connectionFactory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType, true, false, arguments);
                        channel.QueueDeclare(queueName, true, false, false, arguments);
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                        channel.BasicConsume(queueName, false, consumer);
                        while (true)
                        {
                            var ea = consumer.Queue.Dequeue();
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            if (!string.IsNullOrWhiteSpace(message))
                            {
                                consumeCount = 0;
                                while (true)
                                {
                                    consumeCount++;
                                    bool isConsumeSuccess = func(message);
                                    if (isConsumeSuccess || consumeCount >= tryTimes)
                                    {
                                        channel.BasicAck(ea.DeliveryTag, false);//將隊列里面的消息進行釋放
                                        if (!isConsumeSuccess && failFunc != null)
                                        {
                                            failFunc(message);//消費消息失敗的具體操作
                                        }
                                        #region 消息處理失敗后重新入隊
                                        if (!isConsumeSuccess && isAgain)
                                        {
                                            var properties = channel.CreateBasicProperties();
                                            properties.Persistent = true;
                                            channel.BasicPublish(exchangeName, routingKey, properties, body);
                                        }
                                        #endregion
                                        break;
                                    }
                                }
                            }
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
        #endregion

        #region 消費消息隊列(新的方式)
        /// <summary>
        /// 消費消息隊列
        /// </summary>
        /// <typeparam name="TItem">消息對象</typeparam>
        /// <param name="exchangeName">交換器名稱</param>
        /// <param name="exchangeType">交換器類型</param>
        /// <param name="routingKey">路由關鍵字</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="func">消費消息的具體操作</param>
        /// <param name="tryTimes">消費失敗后,繼續嘗試消費的次數</param>
        /// <param name="arguments">消息的參數信息(如: 隊列過期時間:x-expires;消息過期時間:x-message-ttl;過期消息轉向路由:x-dead-letter-exchange;過期消息轉向路由:x-dead-letter-routing-key 等。)</param>
        public static void NewConsume<TItem>(string exchangeName, string exchangeType, string routingKey, string queueName, Func<TItem, bool> func, int tryTimes = 5, IDictionary<string, object> arguments = null)
        {
            try
            {
                int consumeCount = 0;//嘗試消費次數
                using (IConnection connection = _connectionFactory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType, true, false, arguments);
                        channel.QueueDeclare(queueName, true, false, false, arguments);
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (sender, eventArgs) =>
                        {
                            byte[] body = eventArgs.Body;
                            if (body != null && body.Length > 0)
                            {
                                string message = Encoding.UTF8.GetString(body);
                                if (!string.IsNullOrWhiteSpace(message))
                                {
                                    TItem queueMessage = JsonConvert.DeserializeObject<TItem>(message, _jsonSettings);
                                    if (queueMessage != null)
                                    {
                                        consumeCount = 0;
                                        while (true)
                                        {
                                            consumeCount++;
                                            bool isConsumeSuccess = func(queueMessage);
                                            if (isConsumeSuccess || consumeCount >= tryTimes)
                                            {
                                                channel.BasicAck(eventArgs.DeliveryTag, false);//將隊列里面的消息進行釋放
                                                break;
                                            }
                                        }
                                    }
                                }
                            }
                        };
                        channel.BasicConsume(queueName, false, consumer);
                    }
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
        #endregion

        #region 消費消息隊列(新的方式)
        /// <summary>
        /// 消費消息隊列
        /// </summary>
        /// <param name="exchangeName">交換器名稱</param>
        /// <param name="exchangeType">交換器類型</param>
        /// <param name="routingKey">路由關鍵字</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="func">消費消息的具體操作</param>
        /// <param name="failFunc">消費消息失敗的具體操作</param>
        /// <param name="tryTimes">消費失敗后,繼續嘗試消費的次數</param>
        /// <param name="arguments">消息的參數信息(如: 隊列過期時間:x-expires;消息過期時間:x-message-ttl;過期消息轉向路由:x-dead-letter-exchange;過期消息轉向路由:x-dead-letter-routing-key 等。)</param>
        /// <param name="isAgain">是否重新入隊</param>
        public static void NewConsumeString(string exchangeName, string exchangeType, string routingKey, string queueName, Func<string, bool> func, Func<string, bool> failFunc = null,
            int tryTimes = 5, IDictionary<string, object> arguments = null, bool isAgain = false)
        {
            try
            {
                int consumeCount = 0;//嘗試消費次數
                using (IConnection connection = _connectionFactory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType, true, false, arguments);
                        channel.QueueDeclare(queueName, true, false, false, arguments);
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (sender, eventArgs) =>
                        {
                            byte[] body = eventArgs.Body;
                            if (body != null && body.Length > 0)
                            {
                                string message = Encoding.UTF8.GetString(body);
                                if (!string.IsNullOrWhiteSpace(message))
                                {
                                    consumeCount = 0;
                                    while (true)
                                    {
                                        consumeCount++;
                                        bool isConsumeSuccess = func(message);
                                        if (isConsumeSuccess || consumeCount >= tryTimes)
                                        {
                                            channel.BasicAck(eventArgs.DeliveryTag, false);//將隊列里面的消息進行釋放
                                            if (!isConsumeSuccess && failFunc != null)
                                            {
                                                failFunc(message);//消費消息失敗的具體操作
                                            }
                                            #region 消息處理失敗后重新入隊
                                            if (!isConsumeSuccess && isAgain)
                                            {
                                                var properties = channel.CreateBasicProperties();
                                                properties.Persistent = true;
                                                channel.BasicPublish(exchangeName, routingKey, properties, body);
                                            }
                                            #endregion
                                            break;
                                        }
                                    }
                                }
                            }
                        };
                        channel.BasicConsume(queueName, false, consumer);
                    }
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
        #endregion
    }

 

 備注:需要說明的是這里在對消費消息的方法進行封裝的過程中使用了泛型委托,這樣我們就只需要按照自己的業務需求,對消息進行處理了。

 2、 fanout類型:簡單的說適合這樣的場景,一個生產者產生的消息,需要將該消息發送到多個消息隊列,供多個消費者進行消費。這里,為了對該場景進行還原,所以新建了RabbitMQConsumer,RabbitMQConsumer1兩個消費者。

 生產者代碼:

       public Form1()
        {
            InitializeComponent();
        }

        private void Producer_Click(object sender, EventArgs e)
        {
            RabbitMQHelper.Enqueue("developExchange", "fanout", "", new Developer() { Id = Guid.NewGuid(), Name = "nickdeng", Position = "開發" });
        }

 消費者1代碼:

  class Program
    {
        static void Main(string[] args)
        {
            RabbitMQHelper.Consume<Developer>("developExchange", "fanout", "", "developQueue", ConsumeMessage);
        }

        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="developer">處理對象</param>
        /// <returns>消費結果</returns>
        public static bool ConsumeMessage(Developer developer)
        {
            string message = JsonConvert.SerializeObject(developer);
            Console.Write(message);
            return true;
        }
    }

消費者2代碼:

  class Program
    {
        static void Main(string[] args)
        {
            RabbitMQHelper.Consume<Developer>("developExchange", "fanout", "", "developQueue1", ConsumeMessage);
        }

        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="developer">處理對象</param>
        /// <returns>消費結果</returns>
        public static bool ConsumeMessage(Developer developer)
        {
            string message = JsonConvert.SerializeObject(developer);
            Console.Write(message);
            return true;
        }
    }

消費者1與消費者2的代碼,眨眼一看,不是一樣的嗎?仔細看會發現它們在的消息隊列名稱不一樣,消費者1的隊列名稱是“developQueue”,消息者2的隊列名稱是“developQueue1”,因為這兩個消息隊列都與交換器“developExchange”進行了綁定,所以生產者產生的消息將被推送到這兩個消息隊列。運行代碼,得到如下圖結果:

 3、direct類型:直譯過來就是直接的意思,該類型適用於點對點的使用場景,生產者將消息發送到指定的消息隊列:

  生產者代碼:

      public Form1()
        {
            InitializeComponent();
        }

        private void Producer_Click(object sender, EventArgs e)
        {
            RabbitMQHelper.Enqueue("developExchange1", "direct", "directkey", new Developer() { Id = Guid.NewGuid(), Name = "nickdeng", Position = "開發" });
        }

消費者1代碼:

        static void Main(string[] args)
        {
            RabbitMQHelper.Consume<Developer>("developExchange1", "direct", "directkey", "developQueue", ConsumeMessage);
        }

        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="developer">處理對象</param>
        /// <returns>消費結果</returns>
        public static bool ConsumeMessage(Developer developer)
        {
            string message = JsonConvert.SerializeObject(developer);
            Console.Write(message);
            return true;
        }

消費者2代碼:

        static void Main(string[] args)
        {
            RabbitMQHelper.Consume<Developer>("developExchange1", "direct", "directkey1", "developQueue1", ConsumeMessage);
        }

        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="developer">處理對象</param>
        /// <returns>消費結果</returns>
        public static bool ConsumeMessage(Developer developer)
        {
            string message = JsonConvert.SerializeObject(developer);
            Console.Write(message);
            return true;
        }

生產者的路由關鍵字是“directkey”,消費者1的路由關鍵字為”directkey“,消費者2的路由關鍵字為”directkey1“,僅僅一字相差,生產者產生的消息就只有消費者1能夠收到,運行代碼得到如圖結果:

 至於topic類型,這里就不再以代碼進行講解了,其實大致使用方法與上面的direct類型相似,不同之處在於topic類型可通過路由關鍵字進行模糊匹配,將消息路由到相應隊列,大家可根據自己的實際使用場景,進行類型的選擇。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM