消費者消息確認分兩種:自動確認、手動確認。
自動確認,消費者消費消息時,只要收到消息就回饋rabbitmq服務,
並且消費成功一條消息后,rabbitmq會認為所有消息全部成功消費,隊列中移除所有消息,會導致消息的丟失;
手動確認,消費一條消息,回饋rabbitmq服務,rabbitmq只移除隊列中消費了的消息;
1. 生產者
using RabbitMQMsgProducer.MessageProducer; using Microsoft.Extensions.Configuration; using System; using System.IO; using RabbitMQMsgProducer.ExchangeDemo; using RabbitMQMsgProducer.MessageConfirm; namespace RabbitMQMsgProducer { class Program { static void Main(string[] args) { try { { // 消費者 確認消息 ConsumerMsgConfirm.Send(); } Console.ReadLine(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } }
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQMsgProducer.MessageConfirm { public class ConsumerMsgConfirm { public static void Send() { var factory = new ConnectionFactory(); factory.HostName = "localhost";//服務地址 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 string queueName = "ConsumerMsgConfirmQueue"; string exchangeName = "ConsumerMsgConfirmExchange"; string routingKeyName = "ConsumerMsgConfirmRoutingKey"; using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { // 聲明隊列 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 聲明交換機exchange channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); // 綁定exchange和queue channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName); for (int i = 1; i <= 100; i++) { string message = $"the message is : {i} ."; channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: null, body: Encoding.UTF8.GetBytes(message)); Thread.Sleep(300); Console.WriteLine($"the message is : {i} . is send ."); } Console.Read(); } } } } }
2. 消費者
using RabbitMQMsgConsumer001.ExchangeDemo; using RabbitMQMsgConsumer001.MessageConfirm; using RabbitMQMsgConsumer001.MessageConsumer; using System; using System.Threading.Tasks; namespace RabbitMQMsgConsumer001 { class Program { static void Main(string[] args) { try { { // 消費者 確認消息 ConsumerMsgConfirm.Receive(); } Console.ReadLine(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; namespace RabbitMQMsgConsumer001.MessageConfirm { public class ConsumerMsgConfirm { public static void Receive() { var factory = new ConnectionFactory(); factory.HostName = "localhost";//服務地址 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 string queueName = "ConsumerMsgConfirmQueue"; string exchangeName = "ConsumerMsgConfirmExchange"; string routingKeyName = "ConsumerMsgConfirmRoutingKey"; using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { Console.WriteLine("the consumer is ready !"); // 聲明隊列 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 聲明交換機exchange channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); // 綁定exchange和queue channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); int i = 0; consumer.Received += (model, ea) => { var body = ea.Body; var msg = Encoding.UTF8.GetString(body.ToArray()); #region 自動確認 // 調試運行 消費第一條消息時,rabbitmq中的隊列已經顯示全部消費了 // Console.WriteLine($"the consumer received : {msg} over."); #endregion #region 手動確認 if (i < 20) { // 手動確認 消息已消費,通知broker 可從隊列中移除這條消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); Console.WriteLine($"the consumer received : {msg} over."); } else { // 模擬未消費此消息或消費失敗,通知broker; // requeue: true 重新寫入隊列; false 不重新寫入,直接移除掉此消息 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true); } #endregion i++; }; { // 1.消費者自動確認 autoAck:true //channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); } { // 2.消費者手動確認 autoAck:false channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); } Console.Read(); } } } } }
3. 結果
生產者,停止生產, 重新啟動消費者,將繼續消費20條消息: