rabbitmq系列——(5 消息確認 -- 消費者 自動確認和手動確認)


  消費者消息確認分兩種:自動確認、手動確認。

  自動確認,消費者消費消息時,只要收到消息就回饋rabbitmq服務,

    並且消費成功一條消息后,rabbitmq會認為所有消息全部成功消費,隊列中移除所有消息,會導致消息的丟失;

  手動確認,消費一條消息,回饋rabbitmq服務,rabbitmq只移除隊列中消費了的消息;

 

1. 生產者

using RabbitMQMsgProducer.MessageProducer;
using Microsoft.Extensions.Configuration;
using System;
using System.IO;
using RabbitMQMsgProducer.ExchangeDemo;
using RabbitMQMsgProducer.MessageConfirm;

namespace RabbitMQMsgProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // 消費者 確認消息
                    ConsumerMsgConfirm.Send();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQMsgProducer.MessageConfirm
{
    public class ConsumerMsgConfirm
    {
        public static void Send()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//服務地址
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            string queueName = "ConsumerMsgConfirmQueue";
            string exchangeName = "ConsumerMsgConfirmExchange";
            string routingKeyName = "ConsumerMsgConfirmRoutingKey";
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    // 聲明隊列
                    channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    // 聲明交換機exchange
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    // 綁定exchange和queue
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName);

                    for (int i = 1; i <= 100; i++)
                    {
                        string message = $"the message is : {i} .";
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: null, body: Encoding.UTF8.GetBytes(message));

                        Thread.Sleep(300);
                        Console.WriteLine($"the message is : {i} . is send .");
                    }
                    Console.Read();

                }
            }
        }
    }
}

 

2. 消費者

using RabbitMQMsgConsumer001.ExchangeDemo;
using RabbitMQMsgConsumer001.MessageConfirm;
using RabbitMQMsgConsumer001.MessageConsumer;
using System;
using System.Threading.Tasks;

namespace RabbitMQMsgConsumer001
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // 消費者 確認消息
                    ConsumerMsgConfirm.Receive();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQMsgConsumer001.MessageConfirm
{
    public class ConsumerMsgConfirm
    {
        public static void Receive()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//服務地址
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            string queueName = "ConsumerMsgConfirmQueue";
            string exchangeName = "ConsumerMsgConfirmExchange";
            string routingKeyName = "ConsumerMsgConfirmRoutingKey";
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    Console.WriteLine("the consumer is ready !");
                    // 聲明隊列
                    channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    // 聲明交換機exchange
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    // 綁定exchange和queue
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    int i = 0;
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var msg = Encoding.UTF8.GetString(body.ToArray());
                        #region 自動確認
                        // 調試運行 消費第一條消息時,rabbitmq中的隊列已經顯示全部消費了
                        // Console.WriteLine($"the consumer received : {msg} over.");
                        #endregion
                        #region 手動確認
                        if (i < 20)
                        {
                            // 手動確認 消息已消費,通知broker 可從隊列中移除這條消息
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                            Console.WriteLine($"the consumer received : {msg} over.");
                        }
                        else
                        {
                            // 模擬未消費此消息或消費失敗,通知broker;
                            // requeue: true 重新寫入隊列; false 不重新寫入,直接移除掉此消息
                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
                        }
                        #endregion
                        i++;
                    };
                    {
                        // 1.消費者自動確認 autoAck:true
                        //channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
                    }
                    {
                        // 2.消費者手動確認 autoAck:false
                        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    }

                    Console.Read();
                }
            }
        }
    }
}

 

3. 結果

 

 生產者,停止生產, 重新啟動消費者,將繼續消費20條消息:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM