1 消息確認
在一些場合,如轉賬、付費時每一條消息都必須保證成功的被處理。AMQP是金融級的消息隊列協議,有很高的可靠性,這里介紹在使用RabbitMQ時怎么保證消息被成功處理的。消息確認可以分為兩種:一種是生產者發送消息到Broke時,Broker給生產者發送確認回執,用於告訴生產者消息已被成功發送到Broker;一種是消費者接收到Broker發送的消息時,消費者給Broker發送確認回執,用於通知消息已成功被消費者接收。
下邊分別介紹生產者端和消費者端的消息確認方法。准備條件:使用Web管理工具添加exchange、queue並綁定,bindingKey為“mykey”,如下所示:
1 生產者端消息確認(tx機制和Confirm模式)
生產者端的消息確認:當生產者將消息發送給Broker,Broker接收到消息給生產者發送確認回執。生產者端的消息確認有兩種方式:tx機制和Confirm模式。
1.tx機制
tx機制可以叫做事務機制,RabbitMQ中有三個與tx機制的方法:txSelect(), txCommit()和txRollback()。 channel.txSelect() 用於將當前channel設置成transaction模式, channel.txCommit() 提交事務, channel.txRollback() 回滾事務。使用tx機制,我們首先要通過txSelect方法開啟事務,然后發布消息給broker服務器了,如果txCommit提交成功了,則說明消息成功被broker接收了;如果在txCommit執行之前broker異常崩潰或者由於其他原因拋出異常,這個時候我們可以捕獲異常,通過txRollback回滾事務。看一個tx機制的簡單實現:
var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這里就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //創建連接connection using (var connection = factory.CreateConnection()) { //創建通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者准備就緒...."); string message = ""; //發送消息 //在控制台輸入消息,按enter鍵發送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); try { //開啟事務機制 channel.TxSelect(); //發送消息 channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); //事務提交 channel.TxCommit(); Console.WriteLine($"【{message}】發送到Broke成功!"); } catch (Exception) { Console.WriteLine($"【{message}】發送到Broker失敗!"); channel.TxRollback(); } } } } Console.ReadKey(); }
程序運行結果如下:
2 Confirm模式
C#的RabbitMQ API中,有三個與Confirm相關的方法:ConfirmSelect(),WaitForConfirms()和WaitForConfirmOrDie。 channel.ConfirmSelect() 表示開啟Confirm模式; channel.WaitForConfirms() 等待所有消息確認,如果所有的消息都被服務端成功接收返回true,只要有一條沒有被成功接收就返回false。 channel.WaitForConfirmsOrDie() 和WaitForConfirms作用類型,也是等待所有消息確認,區別在於該方法沒有返回值(Void),如果有任意一條消息沒有被成功接收,該方法會立即拋出一個OperationInterrupedException類型異常。看一個Confirm模式的簡單實現:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這里就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //創建連接connection using (var connection = factory.CreateConnection()) { //創建通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者准備就緒...."); string message = ""; //在控制台輸入消息,按enter鍵發送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //開啟Confirm模式 channel.ConfirmSelect(); //發送消息 channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); //WaitForConfirms確認消息(可以同時確認多條消息)是否發送成功 if (channel.WaitForConfirms()) { Console.WriteLine($"【{message}】發送到Broke成功!"); } } } } Console.ReadKey(); }
程序運行結果:
2 消費者端消息確認(自動確認和顯示確認)
從Broke發送到消費者時,RabbitMQ提供了兩種消息確認的方式:自動確認和顯示確認。
1 自動確認
自動確認:當RabbbitMQ將消息發送給消費者后,消費者端接收到消息后,不等待消息處理結束,立即自動回送一個確認回執。自動確認的用法十分簡單,設置消費方法的參數autoAck為true即可,我們前邊的例子都是使用的自動確認,這里不再詳細演示,如下:
channel.BasicConsume(queue: "myqueue",autoAck: true, consumer: consumer);
注意:Broker會在接收到確認回執時刪除消息,如果消費者接收到消息並返回了確認回執,然后這個消費者在處理消息時掛了,那么這條消息就再也找不回來了。
2 顯示確認
我們知道自動確認可能會出現消息丟失的問題,我們不免會想到:Broker收到回執后才刪除消息,如果可以讓消費者在接收消息時不立即返回確認回執,等到消息處理完成后(或者完成一部分的邏輯)再返回確認回執,這樣就保證消費端不會丟失消息了!這正是顯式確認的思路。使用顯示確認也比較簡單,首先將Resume方法的參數autoAck設置為false,然后在消費端使用代碼 channel.BasicAck()/BasicReject()等方法 來確認和拒絕消息。看一個栗子:
生產者代碼如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這里就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //創建連接connection using (var connection = factory.CreateConnection()) { //創建通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者准備就緒...."); string message = ""; //發送消息 //在控制台輸入消息,按enter鍵發送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //基本發布 channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); Console.WriteLine($"消息【{message}】已發送到隊列"); } } } Console.ReadKey(); }
消費者代碼如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這里就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { string message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"接受到消息【{message}】"); //以news開頭表示是新聞類型,處理完成后確認消息 if (message.StartsWith("news")) { //這里處理消息balabala Console.WriteLine($"【{message}】是新聞消息,處理消息並確認"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } //不以news開頭表示不是新聞類型,不進行處理,把消息退回到queue中 else { Console.WriteLine($"【{message}】不是新聞類型,拒絕處理"); channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); } }; Console.WriteLine("消費者准備就緒...."); //第五步:處理消息 channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer); Console.ReadKey(); } } }
介紹一下代碼中標紅的兩個方法: channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 方法用於確認消息,deliveryTag參數是分發的標記,multiple表示是否確認多條。 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); 方法用於拒絕消息,deliveryTag也是指分發的標記,requeue表示消息被拒絕后是否重新放回queue中,true表示放回queue中,false表示直接丟棄。
運行這兩個應用程序,通過生產者發送兩條消息,效果如下:
一些意外的情況:使用顯式確認時,如果消費者處理完消息不發送確認回執,那么消息不會被刪除,消息的狀態一直是Unacked,這條消息也不會再發送給其他消費者。如果一個消費者在處理消息時尚未發送確認回執的情況下掛掉了,那么消息會被重新放入隊列(狀態從Unacked變成Ready),有其他消費者存時,消息會發送給其他消費者。
2 消息持久化/優先級
1 消息持久化(Persistent)
在前邊已經介紹了exchange和queue的持久化,把exchange和queue的durable屬性設置為true,重啟rabbitmq服務時( 重啟命令:rabbitmqctl stop_app ;rabbitmqctl start_app ),exchange和queue也會恢復。我們需要注意的是:如果queue設置durable=true,rabbitmq服務重啟后隊列雖然會存在,但是隊列內的消息會丟全部丟失。那么怎么實現消息的持久化呢?實現的方法很簡單:將exchange和queue都設置durable=true,然后在消息發布的時候設置persistent=true即可。看一個栗子:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這里就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //創建連接connection using (var connection = factory.CreateConnection()) { //創建通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者准備就緒...."); string message = ""; //在控制台輸入消息,按enter鍵發送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //設置消息持久化 var props = channel.CreateBasicProperties(); props.Persistent = true; channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: props, body: body); Console.WriteLine($"【{message}】發送到Broke成功!"); } } } Console.ReadKey(); }
聲明exchange和queue時設置durable=true,然后執行上邊的代碼,傳入一條消息。重啟rabbitmq后,exchange,queue和消息都會恢復。我們也可以在web管理界面設置消息持久化,如下:
2 消息優先級(Priority)
我們知道queue是先進先出的,即先發送的消息,先被消費。但是在具體業務中可能會遇到要提前處理某些消息的需求,如一個常見的需求:普通客戶的消息按先進先出的順序處理,Vip客戶的消息要提前處理。消息實現優先級控制的實現方式是:首先在聲明queue是設置隊列的x-max-priority屬性,然后在publish消息時,設置消息的優先級等級即可。為了演示方便,約定所有vip客戶的信息都以vip開頭,看一下代碼實現:
生產者代碼:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這里就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //創建連接connection using (var connection = factory.CreateConnection()) { //創建通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //聲明隊列queue channel.QueueDeclare(queue: "myqueue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() { //隊列優先級最高為10,不加x-max-priority的話,計算發布時設置了消息的優先級也不會生效 {"x-max-priority",10 } }); //綁定exchange和queue channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "mykey"); Console.WriteLine("生產者准備就緒...."); //一些待發送的消息 string[] msgs = { "vip1", "hello2", "world3","common4", "vip5" }; //設置消息優先級 var props = channel.CreateBasicProperties(); foreach (string msg in msgs) { //vip開頭的消息,優先級設置為9 if (msg.StartsWith("vip")) { props.Priority = 9; channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: props, body: Encoding.UTF8.GetBytes(msg)); } //其他消息的優先級為1 else { props.Priority = 1; channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: props, body: Encoding.UTF8.GetBytes(msg)); } } } } Console.ReadKey(); }
消費者,不需要對消費者做額外的配置,代碼如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這里就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { #region EventingBasicConsumer //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine(Encoding.UTF8.GetString(ea.Body)); }; Console.WriteLine("消費者准備就緒...."); //處理消息 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer); Console.ReadKey(); #endregion } } }
運行程序,結果如下,我們看到vip開頭的消息被率先處理了,證明優先級是生效的
3 小結
本節簡單介紹了Rabbitmq中的消息確認,消息持久化,消息優先級的實現方式,這幾個功能在開發中會經常用到,RabbitMQ還有一些其他有用的功能,如Lazy queue模式,dead letter處理,queue的消息條數、字節數限制等,這里沒有具體演示,有興趣的園友可以自己研究一下。