第四節:RabbitMq剖析生產者、消費者的幾種消息確認機制(Conform、事務、自動、手動)


一. 生產者-確認機制

1. Confirm模式

(1). 含義:就是應答模式,生產者發送一條消息之后,Rabbitmq服務器做了個響應,表示收到了。

(2). 特點:異步模式,在應之前,可以繼續發送消息,單條消息、批量消息均可繼續發送。

(3). 核心代碼:單條消息確認: channel.waitForConfirms()

         批量消息確認: channel.waitForConfirmsOrDie()

         異步監聽消息確認:channel.addConfirmListener()

PS: 大致流程:channel.ConfirmSelect();開啟確認模式→發送消息→提供一個回執方法WaitForConfirms(); 返回一個bool 值

代碼分享:

    /// <summary>
    /// 生產者-Confirm模式
    /// </summary>
    public class ProductionConfirm
    {
        public static void Show()
        {
            Console.ForegroundColor = ConsoleColor.Red;

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("-------------------------生產者准備就緒-----------------------------");
                    channel.QueueDeclare(queue: "ConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.ExchangeDeclare(exchange: "ConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    channel.QueueBind(queue: "ConfirmQueue", exchange: "ConfirmExchange", routingKey: "ConfirmSelectKey");
                    string message = "";
                    //在控制台輸入消息,按enter鍵發送消息              
                    while (!message.Equals("stop", StringComparison.CurrentCultureIgnoreCase))
                    {
                        Console.WriteLine("請輸入要發送的消息:");
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            //開啟消息確認模式
                            channel.ConfirmSelect();
                            //發送消息
                            channel.BasicPublish(exchange: "ConfirmExchange", routingKey: "ConfirmSelectKey", basicProperties: null, body: body);
                            if (channel.WaitForConfirms())   //單條消息確認
                            {
                                //表示消息發送成功(已經存入隊列)
                                Console.WriteLine($"【{message}】發送到Broke成功!");
                            }
                            else
                            { 
                                //表示消息發送失敗
                            }
                            //channel.WaitForConfirmsOrDie();//如果所有消息發送成功 就正常執行, 如果有消息發送失敗;就拋出異常;
                        }
                        catch (Exception)
                        {
                            //表示消息發送失敗
                            Console.WriteLine($"【{message}】發送到Broker失敗!");
                        }
                    }                   
                }
            }
            Console.ReadKey();
        }
    }
View Code

運行效果:

2. TX事務模式

(1). 含義:基於AMPQ協議;可以讓信道設置成一個帶事務的信道,分為三步:開啟事務、提交事務、事務回滾

(2). 特點:同步模式,在事務提交之前不能繼續發送消息,該模式相比Confirm模式效率差一點。

(3). 核心代碼:channel.TxSelect(); 開啟一個事務

         channel.TxCommit();提交事務, 這一步成功后,消息才真正的寫入隊列

       channel.TxRollback(); //事務回滾

代碼分享:

  /// <summary>
    ///生產者-事務模式
    /// </summary>
    public class ProductionTx
    {
        public static void Show()
        {
            Console.ForegroundColor = ConsoleColor.Red;

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            { 
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("---------------生產者准備就緒-------------------");
                    channel.QueueDeclare(queue: "MessageTxQueue01", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "MessageTxQueue02", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "MessageTxQueueExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    //綁定exchange和queue
                    channel.QueueBind(queue: "MessageTxQueue01", exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey01");
                    channel.QueueBind(queue: "MessageTxQueue02", exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey02");
                    string message = "";
                    //發送消息,在控制台輸入消息,按enter鍵發送消息
                    while (!message.Equals("stop", StringComparison.CurrentCultureIgnoreCase))
                    {
                        Console.WriteLine("請輸入要發送的消息:");
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            //① 開啟事務機制
                            channel.TxSelect(); //事務是協議支持的
                            //發送消息,同時給多個隊列發送消息;要么都成功;要么都失敗;
                            channel.BasicPublish(exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey01", basicProperties: null, body: body);
                            channel.BasicPublish(exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey02", basicProperties: null, body: body);

                            //int.Parse("dfdsfdf"); //模擬錯誤,消息發送不成功,進入catch,事務回滾

                            //② 事務提交
                            channel.TxCommit(); //只有事務提交成功以后,才會真正的寫入到隊列里面去
                            Console.WriteLine($"【{message}】發送到Broke成功!");
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine($"【{message}】發送到Broker失敗!{ex.Message}");
                            //③ 事務回滾
                            channel.TxRollback(); 
                        }
                    }
                    Console.Read();
                }
            }
        }
    }
View Code

運行效果:

 

 

 

二. 消費者-確認機制

1. 自動確認

(1) 含義:是消費消息的時候,只有收到消息,就直接給RabbitMQ應答,直接總覽該隊列中所有消息了

(2) 特點:只是消費成功了一條消息,RabbitMQ也會認為你是全部成功了,會把該隊列中所有消息從隊列中移除,這樣會導致消息的丟失

(3) 核心代碼:autoAck: true ,表示自動確認

(4) 案例演示:生產者先啟動,3s后啟動消費者,然后消費者消費1條消息的時候,進行停頓一段時間,發現隊列中不止少了1條消息,少了很多,說明第1次應答,將當時隊列中的消息全部刪除了。

配套生產者代碼: 

    /// <summary>
    /// 普通的生產者
    /// (用於配合演示消費者的兩種應答機制)
    /// </summary>
    public class ProductionConfirmPh
    {
        public static void Show()
        {
            Console.ForegroundColor = ConsoleColor.Red;

            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    Console.WriteLine("---------------------生產者准備就緒---------------------------------");
                    channel.QueueDeclare(queue: "ConsumerConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "ConsumerConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    //綁定exchange和queue
                    channel.QueueBind(queue: "ConsumerConfirmQueue", exchange: "ConsumerConfirmExchange", routingKey: "ConsumerConfirmKey");

                    for (int i = 1; i <= 1000; i++)
                    {
                        string message = $"消息{i}";
                        channel.BasicPublish(exchange: "ConsumerConfirmExchange",
                                         routingKey: "ConsumerConfirmKey",
                                         basicProperties: null,
                                         body: Encoding.UTF8.GetBytes(message));                      
                        Console.WriteLine($"【{message}】 已發送~~~");
                        Thread.Sleep(500);
                    }
                    Console.Read();
                }
            }
        }
    }
View Code

消費者代碼

/// <summary>
    /// 消費者-自動確認
    /// (需要配合生產者,讓生產者03/ProductionConfirmPh先啟動)
    /// </summary>
    public class ConsumerAutoConfirm
    {
        public static void Show1()
        {
            Console.ForegroundColor = ConsoleColor.Green;

            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {                   
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Console.WriteLine(Encoding.UTF8.GetString(ea.Body.ToArray()));
                    };
                    Console.WriteLine("消費者准備就緒....");
                    //autoAck: true 自動確認
                    channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }


        public static void Show2()
        {
            Console.ForegroundColor = ConsoleColor.Green;
            var factory = new ConnectionFactory
            {
                HostName = "localhost",//RabbitMQ服務在本地運行
                UserName = "guest",//用戶名
                Password = "guest"//密碼 
            };
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    int i = 0;
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        if (i < 50)
                        {
                            Console.WriteLine($"第{i+1}次消費,內容為:{message}");
                            Thread.Sleep(10000000);
                        }
                        else
                        {
                            Console.WriteLine($"第{i + 1}次消費,內容為:{message}");
                        }
                        i++;
                    };
                    Console.WriteLine("------------------消費者准備就緒-------------------------");

                    //autoAck: true 自動確認
                    channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: true, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
View Code

結果現象截圖

2. 顯式確認(手動確認)

(1) 含義:消費者消費一條,回執給RabbitMq一條消息,Rabbitmq 只刪除當前這一條消息;相當於是一條消費了,刪除一條消息,性能稍微低一些;

(2) 特點:消費1條應答一次,可以告訴RabbitMq消費成功or失敗,消費成功,服務器刪除該條消息,消費失敗,可以刪除也可以重新寫入。

(3) 核心代碼:autoAck: false,表示不自動確認

      然后:hannel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 表示消費成功

      channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); 表示消費失敗, 可以配置:requeue: true:重新寫入到隊列里去; false: 刪除消息

(4) 案例演示:生產者先啟動,3s后啟動消費者,然后消費者消費1條消息的時候,進行停頓一段時間,發現隊列中僅少了1條消息,說明應答一次,刪除1條。

配套生產者代碼: 

    /// <summary>
    /// 普通的生產者
    /// (用於配合演示消費者的兩種應答機制)
    /// </summary>
    public class ProductionConfirmPh
    {
        public static void Show()
        {
            Console.ForegroundColor = ConsoleColor.Red;

            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    Console.WriteLine("---------------------生產者准備就緒---------------------------------");
                    channel.QueueDeclare(queue: "ConsumerConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "ConsumerConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    //綁定exchange和queue
                    channel.QueueBind(queue: "ConsumerConfirmQueue", exchange: "ConsumerConfirmExchange", routingKey: "ConsumerConfirmKey");

                    for (int i = 1; i <= 1000; i++)
                    {
                        string message = $"消息{i}";
                        channel.BasicPublish(exchange: "ConsumerConfirmExchange",
                                         routingKey: "ConsumerConfirmKey",
                                         basicProperties: null,
                                         body: Encoding.UTF8.GetBytes(message));                      
                        Console.WriteLine($"【{message}】 已發送~~~");
                        Thread.Sleep(500);
                    }
                    Console.Read();
                }
            }
        }
    }
View Code

消費者代碼

/// <summary>
    /// 消費者-手動確認
    /// (需要配合生產者,讓生產者03/ProductionConfirmPh先啟動)
    /// </summary>
    public class ConsumerNoAutoConfirm
    {
        public static void Show1()
        {
            Console.ForegroundColor = ConsoleColor.Green;
            var factory = new ConnectionFactory
            {
                HostName = "localhost",//RabbitMQ服務在本地運行
                UserName = "guest",//用戶名
                Password = "guest"//密碼 
            };
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    int i = 0;
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        if (i < 50)
                        {
                            //手動確認  (下面模擬消息正常消費,告訴rabbitmq,可以刪除該條消息)
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                            Console.WriteLine(message);
                        }
                        else
                        {
                            //手動確認  (下面模擬消息異常消費,告訴rabbitmq,可以刪除該條消息,也可以重新寫入隊列)
                            // requeue: true:重新寫入到隊列里去; false: 刪除消息
                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
                        }
                        i++;
                    };
                    Console.WriteLine("------------------消費者准備就緒-------------------------");

                    //處理消息 autoAck: false  顯示確認(手動確認) 
                    channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }


        public static void Show2()
        {
            Console.ForegroundColor = ConsoleColor.Green;
            var factory = new ConnectionFactory
            {
                HostName = "localhost",//RabbitMQ服務在本地運行
                UserName = "guest",//用戶名
                Password = "guest"//密碼 
            };
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    int i = 0;
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        if (i < 50)
                        {
                            //手動確認  (下面模擬消息正常消費,告訴rabbitmq,可以刪除該條消息)
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                            Console.WriteLine($"第{i + 1}次消費,內容為:{message}");

                            Thread.Sleep(10000000);
                        }
                        else
                        {
                            //手動確認  (下面模擬消息異常消費,告訴rabbitmq,可以刪除該條消息,也可以重新寫入隊列)
                            // requeue: true:重新寫入到隊列里去; false: 刪除消息
                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
                        }
                        i++;
                    };
                    Console.WriteLine("------------------消費者准備就緒-------------------------");

                    //處理消息 autoAck: false  顯示確認(手動確認) 
                    channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
View Code

結果現象截圖

 

 

 

 

 

 

 

!

  • 作       者 : Yaopengfei(姚鵬飛)
  • 博客地址 : http://www.cnblogs.com/yaopengfei/
  • 聲     明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
  • 聲     明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM