快速掌握RabbitMQ(三)——消息確認、持久化、優先級的C#實現


 1 消息確認

  在一些場合,如轉賬、付費時每一條消息都必須保證成功的被處理。AMQP是金融級的消息隊列協議,有很高的可靠性,這里介紹在使用RabbitMQ時怎么保證消息被成功處理的。消息確認可以分為兩種:一種是生產者發送消息到Broke時,Broker給生產者發送確認回執,用於告訴生產者消息已被成功發送到Broker;一種是消費者接收到Broker發送的消息時,消費者給Broker發送確認回執,用於通知消息已成功被消費者接收。

  下邊分別介紹生產者端和消費者端的消息確認方法。准備條件:使用Web管理工具添加exchange、queue並綁定,bindingKey為“mykey”,如下所示:

1 生產者端消息確認(tx機制和Confirm模式)

  生產者端的消息確認:當生產者將消息發送給Broker,Broker接收到消息給生產者發送確認回執。生產者端的消息確認有兩種方式:tx機制和Confirm模式。

1.tx機制

  tx機制可以叫做事務機制,RabbitMQ中有三個與tx機制的方法:txSelect(), txCommit()和txRollback()。 channel.txSelect() 用於將當前channel設置成transaction模式, channel.txCommit() 提交事務, channel.txRollback() 回滾事務。使用tx機制,我們首先要通過txSelect方法開啟事務,然后發布消息給broker服務器了,如果txCommit提交成功了,則說明消息成功被broker接收了;如果在txCommit執行之前broker異常崩潰或者由於其他原因拋出異常,這個時候我們可以捕獲異常,通過txRollback回滾事務。看一個tx機制的簡單實現:

            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("生產者准備就緒....");
                    string message = "";
                    //發送消息
                    //在控制台輸入消息,按enter鍵發送消息
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            //開啟事務機制
 channel.TxSelect(); //發送消息
                            channel.BasicPublish(exchange: "myexchange",
                                                 routingKey: "mykey",
                                                 basicProperties: null,
                                                 body: body);
                            //事務提交
 channel.TxCommit();
                            Console.WriteLine($"【{message}】發送到Broke成功!");
                        }
                        catch (Exception)
                        {
                            Console.WriteLine($"【{message}】發送到Broker失敗!");
                            channel.TxRollback();
                        }                      
                    }
                }
            }
            Console.ReadKey();
        }

   程序運行結果如下:

 

2 Confirm模式

  C#的RabbitMQ API中,有三個與Confirm相關的方法:ConfirmSelect(),WaitForConfirms()和WaitForConfirmOrDie。 channel.ConfirmSelect() 表示開啟Confirm模式; channel.WaitForConfirms() 等待所有消息確認,如果所有的消息都被服務端成功接收返回true,只要有一條沒有被成功接收就返回false。 channel.WaitForConfirmsOrDie() 和WaitForConfirms作用類型,也是等待所有消息確認,區別在於該方法沒有返回值(Void),如果有任意一條消息沒有被成功接收,該方法會立即拋出一個OperationInterrupedException類型異常。看一個Confirm模式的簡單實現:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("生產者准備就緒....");
                    string message = "";
                    //在控制台輸入消息,按enter鍵發送消息
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);

                        //開啟Confirm模式
 channel.ConfirmSelect(); //發送消息
                        channel.BasicPublish(exchange: "myexchange",
                                             routingKey: "mykey",
                                             basicProperties: null,
                                             body: body);
                        //WaitForConfirms確認消息(可以同時確認多條消息)是否發送成功
                        if (channel.WaitForConfirms())
                        {
                            Console.WriteLine($"【{message}】發送到Broke成功!");
                        }
                    }
                }
            }
            Console.ReadKey();
        }

  程序運行結果:

2 消費者端消息確認(自動確認和顯示確認)

  從Broke發送到消費者時,RabbitMQ提供了兩種消息確認的方式:自動確認和顯示確認。

1 自動確認

  自動確認:當RabbbitMQ將消息發送給消費者后,消費者端接收到消息后,不等待消息處理結束,立即自動回送一個確認回執。自動確認的用法十分簡單,設置消費方法的參數autoAck為true即可,我們前邊的例子都是使用的自動確認,這里不再詳細演示,如下:

 channel.BasicConsume(queue: "myqueue",autoAck: true, consumer: consumer);

注意:Broker會在接收到確認回執時刪除消息,如果消費者接收到消息並返回了確認回執,然后這個消費者在處理消息時掛了,那么這條消息就再也找不回來了。

2 顯示確認

  我們知道自動確認可能會出現消息丟失的問題,我們不免會想到:Broker收到回執后才刪除消息,如果可以讓消費者在接收消息時不立即返回確認回執,等到消息處理完成后(或者完成一部分的邏輯)再返回確認回執,這樣就保證消費端不會丟失消息了!這正是顯式確認的思路。使用顯示確認也比較簡單,首先將Resume方法的參數autoAck設置為false,然后在消費端使用代碼 channel.BasicAck()/BasicReject()等方法 來確認和拒絕消息。看一個栗子:

生產者代碼如下:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("生產者准備就緒....");
                    string message = "";
                    //發送消息
                    //在控制台輸入消息,按enter鍵發送消息
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        //基本發布
                        channel.BasicPublish(exchange: "myexchange",
                                             routingKey: "mykey",
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine($"消息【{message}】已發送到隊列");
                    }
                }
            }
            Console.ReadKey();
        }

消費者代碼如下:

       static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        string message = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine($"接受到消息【{message}】");
                        //以news開頭表示是新聞類型,處理完成后確認消息
                        if (message.StartsWith("news"))
                        {
                            //這里處理消息balabala
                            Console.WriteLine($"【{message}】是新聞消息,處理消息並確認");
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        }
                        //不以news開頭表示不是新聞類型,不進行處理,把消息退回到queue中
                        else
                        {
                            Console.WriteLine($"【{message}】不是新聞類型,拒絕處理");
                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
                        }
                    };
                    Console.WriteLine("消費者准備就緒....");
                    //第五步:處理消息
                    channel.BasicConsume(queue: "myqueue",
                                           autoAck: false,
                                           consumer: consumer);
                    Console.ReadKey();
                }
            }
        }

  介紹一下代碼中標紅的兩個方法: channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 方法用於確認消息,deliveryTag參數是分發的標記,multiple表示是否確認多條。 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); 方法用於拒絕消息,deliveryTag也是指分發的標記,requeue表示消息被拒絕后是否重新放回queue中,true表示放回queue中,false表示直接丟棄。

  運行這兩個應用程序,通過生產者發送兩條消息,效果如下:

  一些意外的情況:使用顯式確認時,如果消費者處理完消息不發送確認回執,那么消息不會被刪除,消息的狀態一直是Unacked,這條消息也不會再發送給其他消費者。如果一個消費者在處理消息時尚未發送確認回執的情況下掛掉了,那么消息會被重新放入隊列(狀態從Unacked變成Ready),有其他消費者存時,消息會發送給其他消費者。

2 消息持久化/優先級

1 消息持久化(Persistent)

  在前邊已經介紹了exchange和queue的持久化,把exchange和queue的durable屬性設置為true,重啟rabbitmq服務時( 重啟命令:rabbitmqctl stop_app ;rabbitmqctl start_app ),exchange和queue也會恢復。我們需要注意的是:如果queue設置durable=true,rabbitmq服務重啟后隊列雖然會存在,但是隊列內的消息會丟全部丟失。那么怎么實現消息的持久化呢?實現的方法很簡單:將exchange和queue都設置durable=true,然后在消息發布的時候設置persistent=true即可。看一個栗子:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("生產者准備就緒....");
                    string message = "";
                    //在控制台輸入消息,按enter鍵發送消息
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        //設置消息持久化
                        var props = channel.CreateBasicProperties(); props.Persistent = true;
                        channel.BasicPublish(exchange: "myexchange",
                                             routingKey: "mykey",
                                             basicProperties: props,
                                             body: body);
                        Console.WriteLine($"【{message}】發送到Broke成功!");
                    }
                }
            }
            Console.ReadKey();
        }

  聲明exchange和queue時設置durable=true,然后執行上邊的代碼,傳入一條消息。重啟rabbitmq后,exchange,queue和消息都會恢復。我們也可以在web管理界面設置消息持久化,如下:

 2 消息優先級(Priority)

  我們知道queue是先進先出的,即先發送的消息,先被消費。但是在具體業務中可能會遇到要提前處理某些消息的需求,如一個常見的需求:普通客戶的消息按先進先出的順序處理,Vip客戶的消息要提前處理。消息實現優先級控制的實現方式是:首先在聲明queue是設置隊列的x-max-priority屬性,然后在publish消息時,設置消息的優先級等級即可。為了演示方便,約定所有vip客戶的信息都以vip開頭,看一下代碼實現:

  生產者代碼

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "myexchange",
                                            type: ExchangeType.Direct,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //聲明隊列queue
                    channel.QueueDeclare(queue: "myqueue",
                                       durable: true,
                                       exclusive: false,
                                       autoDelete: false,
                                       arguments: new Dictionary<string, object>() {
                                           //隊列優先級最高為10,不加x-max-priority的話,計算發布時設置了消息的優先級也不會生效
                                            {"x-max-priority",10 }
                                       });
                    //綁定exchange和queue
                    channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "mykey");
                    Console.WriteLine("生產者准備就緒....");
                    //一些待發送的消息
                    string[] msgs = { "vip1", "hello2", "world3","common4", "vip5" };
                    //設置消息優先級
                    var props = channel.CreateBasicProperties(); foreach (string msg in msgs)
                    {
                        //vip開頭的消息,優先級設置為9
                        if (msg.StartsWith("vip"))
                        {
                            props.Priority = 9;
                            channel.BasicPublish(exchange: "myexchange",
                                                 routingKey: "mykey",
                                                 basicProperties: props,
                                                 body: Encoding.UTF8.GetBytes(msg));
                        }
                        //其他消息的優先級為1
                        else
                        {
                            props.Priority = 1;
                            channel.BasicPublish(exchange: "myexchange",
                                                 routingKey: "mykey",
                                                 basicProperties: props,
                                                 body: Encoding.UTF8.GetBytes(msg));
                        }
                       
                    }
                }
            }
            Console.ReadKey();
        }

   消費者,不需要對消費者做額外的配置,代碼如下:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    #region EventingBasicConsumer
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Console.WriteLine(Encoding.UTF8.GetString(ea.Body));
                    };
                    Console.WriteLine("消費者准備就緒....");
                    //處理消息
                    channel.BasicConsume(queue: "myqueue",
                                           autoAck: true,
                                           consumer: consumer);
                    Console.ReadKey();
                    #endregion
                }
            }
        }

  運行程序,結果如下,我們看到vip開頭的消息被率先處理了,證明優先級是生效的

3 小結

  本節簡單介紹了Rabbitmq中的消息確認,消息持久化,消息優先級的實現方式,這幾個功能在開發中會經常用到,RabbitMQ還有一些其他有用的功能,如Lazy queue模式,dead letter處理,queue的消息條數、字節數限制等,這里沒有具體演示,有興趣的園友可以自己研究一下。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM