一. 生產者-確認機制
1. Confirm模式
(1). 含義:就是應答模式,生產者發送一條消息之后,Rabbitmq服務器做了個響應,表示收到了。
(2). 特點:異步模式,在應之前,可以繼續發送消息,單條消息、批量消息均可繼續發送。
(3). 核心代碼:單條消息確認: channel.waitForConfirms()
批量消息確認: channel.waitForConfirmsOrDie()
異步監聽消息確認:channel.addConfirmListener()
PS: 大致流程:channel.ConfirmSelect();開啟確認模式→發送消息→提供一個回執方法WaitForConfirms(); 返回一個bool 值
代碼分享:

/// <summary> /// 生產者-Confirm模式 /// </summary> public class ProductionConfirm { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { //創建通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("-------------------------生產者准備就緒-----------------------------"); channel.QueueDeclare(queue: "ConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.ExchangeDeclare(exchange: "ConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); channel.QueueBind(queue: "ConfirmQueue", exchange: "ConfirmExchange", routingKey: "ConfirmSelectKey"); string message = ""; //在控制台輸入消息,按enter鍵發送消息 while (!message.Equals("stop", StringComparison.CurrentCultureIgnoreCase)) { Console.WriteLine("請輸入要發送的消息:"); message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); try { //開啟消息確認模式 channel.ConfirmSelect(); //發送消息 channel.BasicPublish(exchange: "ConfirmExchange", routingKey: "ConfirmSelectKey", basicProperties: null, body: body); if (channel.WaitForConfirms()) //單條消息確認 { //表示消息發送成功(已經存入隊列) Console.WriteLine($"【{message}】發送到Broke成功!"); } else { //表示消息發送失敗 } //channel.WaitForConfirmsOrDie();//如果所有消息發送成功 就正常執行, 如果有消息發送失敗;就拋出異常; } catch (Exception) { //表示消息發送失敗 Console.WriteLine($"【{message}】發送到Broker失敗!"); } } } } Console.ReadKey(); } }
運行效果:
2. TX事務模式
(1). 含義:基於AMPQ協議;可以讓信道設置成一個帶事務的信道,分為三步:開啟事務、提交事務、事務回滾
(2). 特點:同步模式,在事務提交之前不能繼續發送消息,該模式相比Confirm模式效率差一點。
(3). 核心代碼:channel.TxSelect(); 開啟一個事務
channel.TxCommit();提交事務, 這一步成功后,消息才真正的寫入隊列
channel.TxRollback(); //事務回滾
代碼分享:

/// <summary> ///生產者-事務模式 /// </summary> public class ProductionTx { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { //創建通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("---------------生產者准備就緒-------------------"); channel.QueueDeclare(queue: "MessageTxQueue01", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "MessageTxQueue02", durable: true, exclusive: false, autoDelete: false, arguments: null); //聲明交換機exchang channel.ExchangeDeclare(exchange: "MessageTxQueueExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //綁定exchange和queue channel.QueueBind(queue: "MessageTxQueue01", exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey01"); channel.QueueBind(queue: "MessageTxQueue02", exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey02"); string message = ""; //發送消息,在控制台輸入消息,按enter鍵發送消息 while (!message.Equals("stop", StringComparison.CurrentCultureIgnoreCase)) { Console.WriteLine("請輸入要發送的消息:"); message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); try { //① 開啟事務機制 channel.TxSelect(); //事務是協議支持的 //發送消息,同時給多個隊列發送消息;要么都成功;要么都失敗; channel.BasicPublish(exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey01", basicProperties: null, body: body); channel.BasicPublish(exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey02", basicProperties: null, body: body); //int.Parse("dfdsfdf"); //模擬錯誤,消息發送不成功,進入catch,事務回滾 //② 事務提交 channel.TxCommit(); //只有事務提交成功以后,才會真正的寫入到隊列里面去 Console.WriteLine($"【{message}】發送到Broke成功!"); } catch (Exception ex) { Console.WriteLine($"【{message}】發送到Broker失敗!{ex.Message}"); //③ 事務回滾 channel.TxRollback(); } } Console.Read(); } } } }
運行效果:
二. 消費者-確認機制
1. 自動確認
(1) 含義:是消費消息的時候,只有收到消息,就直接給RabbitMQ應答,直接總覽該隊列中所有消息了
(2) 特點:只是消費成功了一條消息,RabbitMQ也會認為你是全部成功了,會把該隊列中所有消息從隊列中移除,這樣會導致消息的丟失。
(3) 核心代碼:autoAck: true ,表示自動確認
(4) 案例演示:生產者先啟動,3s后啟動消費者,然后消費者消費1條消息的時候,進行停頓一段時間,發現隊列中不止少了1條消息,少了很多,說明第1次應答,將當時隊列中的消息全部刪除了。
配套生產者代碼:

/// <summary> /// 普通的生產者 /// (用於配合演示消費者的兩種應答機制) /// </summary> public class ProductionConfirmPh { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { Console.WriteLine("---------------------生產者准備就緒---------------------------------"); channel.QueueDeclare(queue: "ConsumerConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //聲明交換機exchang channel.ExchangeDeclare(exchange: "ConsumerConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //綁定exchange和queue channel.QueueBind(queue: "ConsumerConfirmQueue", exchange: "ConsumerConfirmExchange", routingKey: "ConsumerConfirmKey"); for (int i = 1; i <= 1000; i++) { string message = $"消息{i}"; channel.BasicPublish(exchange: "ConsumerConfirmExchange", routingKey: "ConsumerConfirmKey", basicProperties: null, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"【{message}】 已發送~~~"); Thread.Sleep(500); } Console.Read(); } } } }
消費者代碼

/// <summary> /// 消費者-自動確認 /// (需要配合生產者,讓生產者03/ProductionConfirmPh先啟動) /// </summary> public class ConsumerAutoConfirm { public static void Show1() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine(Encoding.UTF8.GetString(ea.Body.ToArray())); }; Console.WriteLine("消費者准備就緒...."); //autoAck: true 自動確認 channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: false, consumer: consumer); Console.ReadKey(); } } } public static void Show2() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory { HostName = "localhost",//RabbitMQ服務在本地運行 UserName = "guest",//用戶名 Password = "guest"//密碼 }; using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //定義消費者 var consumer = new EventingBasicConsumer(channel); int i = 0; consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); if (i < 50) { Console.WriteLine($"第{i+1}次消費,內容為:{message}"); Thread.Sleep(10000000); } else { Console.WriteLine($"第{i + 1}次消費,內容為:{message}"); } i++; }; Console.WriteLine("------------------消費者准備就緒-------------------------"); //autoAck: true 自動確認 channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: true, consumer: consumer); Console.ReadKey(); } } } }
結果現象截圖
2. 顯式確認(手動確認)
(1) 含義:消費者消費一條,回執給RabbitMq一條消息,Rabbitmq 只刪除當前這一條消息;相當於是一條消費了,刪除一條消息,性能稍微低一些;
(2) 特點:消費1條應答一次,可以告訴RabbitMq消費成功or失敗,消費成功,服務器刪除該條消息,消費失敗,可以刪除也可以重新寫入。
(3) 核心代碼:autoAck: false,表示不自動確認
然后:hannel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 表示消費成功
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); 表示消費失敗, 可以配置:requeue: true:重新寫入到隊列里去; false: 刪除消息
(4) 案例演示:生產者先啟動,3s后啟動消費者,然后消費者消費1條消息的時候,進行停頓一段時間,發現隊列中僅少了1條消息,說明應答一次,刪除1條。
配套生產者代碼:

/// <summary> /// 普通的生產者 /// (用於配合演示消費者的兩種應答機制) /// </summary> public class ProductionConfirmPh { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { Console.WriteLine("---------------------生產者准備就緒---------------------------------"); channel.QueueDeclare(queue: "ConsumerConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //聲明交換機exchang channel.ExchangeDeclare(exchange: "ConsumerConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //綁定exchange和queue channel.QueueBind(queue: "ConsumerConfirmQueue", exchange: "ConsumerConfirmExchange", routingKey: "ConsumerConfirmKey"); for (int i = 1; i <= 1000; i++) { string message = $"消息{i}"; channel.BasicPublish(exchange: "ConsumerConfirmExchange", routingKey: "ConsumerConfirmKey", basicProperties: null, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"【{message}】 已發送~~~"); Thread.Sleep(500); } Console.Read(); } } } }
消費者代碼

/// <summary> /// 消費者-手動確認 /// (需要配合生產者,讓生產者03/ProductionConfirmPh先啟動) /// </summary> public class ConsumerNoAutoConfirm { public static void Show1() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory { HostName = "localhost",//RabbitMQ服務在本地運行 UserName = "guest",//用戶名 Password = "guest"//密碼 }; using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //定義消費者 var consumer = new EventingBasicConsumer(channel); int i = 0; consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); if (i < 50) { //手動確認 (下面模擬消息正常消費,告訴rabbitmq,可以刪除該條消息) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); Console.WriteLine(message); } else { //手動確認 (下面模擬消息異常消費,告訴rabbitmq,可以刪除該條消息,也可以重新寫入隊列) // requeue: true:重新寫入到隊列里去; false: 刪除消息 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); } i++; }; Console.WriteLine("------------------消費者准備就緒-------------------------"); //處理消息 autoAck: false 顯示確認(手動確認) channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: false, consumer: consumer); Console.ReadKey(); } } } public static void Show2() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory { HostName = "localhost",//RabbitMQ服務在本地運行 UserName = "guest",//用戶名 Password = "guest"//密碼 }; using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //定義消費者 var consumer = new EventingBasicConsumer(channel); int i = 0; consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); if (i < 50) { //手動確認 (下面模擬消息正常消費,告訴rabbitmq,可以刪除該條消息) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); Console.WriteLine($"第{i + 1}次消費,內容為:{message}"); Thread.Sleep(10000000); } else { //手動確認 (下面模擬消息異常消費,告訴rabbitmq,可以刪除該條消息,也可以重新寫入隊列) // requeue: true:重新寫入到隊列里去; false: 刪除消息 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); } i++; }; Console.WriteLine("------------------消費者准備就緒-------------------------"); //處理消息 autoAck: false 顯示確認(手動確認) channel.BasicConsume(queue: "ConsumerConfirmQueue", autoAck: false, consumer: consumer); Console.ReadKey(); } } } }
結果現象截圖
!
- 作 者 : Yaopengfei(姚鵬飛)
- 博客地址 : http://www.cnblogs.com/yaopengfei/
- 聲 明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
- 聲 明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。