開發中關鍵的Class和Interface有Channel、Connection、ConnectionFactory、Consumer等,與RabbitMQ相關的開發工作,基本上是圍繞Connection和Channel這兩個類展開的。
連接RabbitMQ
一個Connection可以創建多個Channel實例,但Channel實例不能在線程間共享,應用程序應該為每一個線程開辟一個Channel。
Channel或者Connection中有個isOpen方法可以用來檢測其是否已處於開啟狀態。但並不推薦使用,這個方法的返回值依賴於shutdownCause的存在,有可能會產生競爭。更多的是捕獲ShutdownSignalException,IOException或SocketException等異常判斷RabbitMq的連接狀態。
實際操作過程中遇到BrokerUnreachableException異常
因為我使用的賬號是guest,guest賬號默認是不支持遠程連接,需要在http://localhost:15672(前提是安裝了web插件)的Admin選項卡中添加一個新用戶(或者使用命令行添加)。
安裝web插件
rabbitmq-plugins enable rabbitmq_management
添加新用戶:
sudo rabbitmqctl add_user user_admin passwd_admin
如上圖所示,新添加的用戶沒有任何權限,需要點擊用戶名設置權限。
示例代碼:
var factory = new ConnectionFactory { HostName = "localhost", //主機名 UserName = "mymq", //默認用戶名 Password = "123456", //默認密碼 RequestedHeartbeat = TimeSpan.FromSeconds(30) }; using (var connection = factory.CreateConnection())//連接服務器 { //創建一個通道 using (var channel = connection.CreateModel()) { channel.QueueDeclare("stacking", false, false, false, null);//創建消息隊列 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 1; string message = "RabbitMQ Test"; //傳遞的消息內容 channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生產消息 Console.WriteLine($"Send:{message}"); } }
在管理界面處看到消息插入成功
使用新加的賬號鏈接MQ還會提示BrokerUnreachableException異常,很納悶。折騰了半天把WSL升級到WSL2就鏈接成功。
交換器和隊列
交換器和隊列是應用層面的構建模塊,使用前應對其進行聲明確保其存在。
var exchangeName = "exchange_name"; channel.ExchangeDeclare(exchangeName, "direct", true);//創建一個持久化的、非自動刪除的、綁定類型為direct的交換器 var queueName = channel.QueueDeclare().QueueName; //創建一個非持久化的、排他的、自動刪除的隊列(隊列名由RabbitMQ自動生成) channel.QueueBind(queueName, exchangeName, "routing_key"); //使用路由鍵(routing_key)將隊列和交換器綁定 channel.QueueDeclare("queue_name", true); // QueueDeclare擁有多個重載
ExchangeDeclare方法詳解
各個參數詳細說明如下:
exchange:交換器的名稱。
type:交換器的類型,常見的如fanout、direct、topic。
durable:設置是否持久化。durable設置為true表示持久化,反之是非持久化。持久化可以將交換器存盤,在服務器重啟的時候不會丟失相關信息。
autoDelete:設置是否自動刪除。autoDelete設置為true則表示自動刪除。自動刪除的前提是至少有一個隊列或者交換器與這個交換器綁定,之后所有與這個交換器綁定的隊列或者交換器都與此解綁才會刪除。
internal:設置是否是內置的。如果設置為true,則表示是內置的交換器,客戶端程序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式。
argument:其他一些結構化參數
QueueDeclareNoWait方法實現設置了一個nowait參數(AMQP中Exchange.Declare命令的參數),意思是不需要等待服務區返回結果。
ExchangeDeclarePassive方法用來檢測相應的交換器是否存在。如果存在則正常返回;如果不存在則拋出異常。
QueueDeclare方法詳解
方法的參數詳細說明如下:
queue:隊列的名稱。
durable:設置是否持久化。為true則設置隊列為持久化。持久化的隊列會存盤,在服務器重啟的時候可以保證不丟失相關信息。
exclusive:設置是否排他。為true則設置隊列為排他的。
autoDelete:設置是否自動刪除。為true則設置隊列為自動刪除。自動刪除的前提是:至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。
arguments:設置隊列的其他一些參數,如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority等。
如果一個隊列被聲明為排他隊列,則該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。需要注意:排他隊列是基於連接(Connection)可見的,同一個連接的不同信道(Channel)可以訪問同一連接創建的排他隊列;“首次”是指如果一個連接已經聲明了一個排他隊列,其他連接不允許再建立同名的排他隊列;即使該隊列是持久化的,一旦連接關閉或客戶端退出,該排他隊列都會被自動刪除,這種隊列適用於一個客戶端同時發送和讀取消息的應用場景。
QueueDeclareNoWait方法實現設置了一個nowait參數,意思是不需要等待服務區返回結果。
QueueDeclarePassive方法用來檢測相應的隊列是否存在。如果存在則正常返回;如果不存在則拋出異常。
QueueBind方法詳解
方法中涉及的參數:
queue:隊列名稱;
exchange:交換器的名稱;
routingKey:用來綁定隊列和交換器的路由鍵;
argument:定義綁定的一些參數。
ExchangeBind方法詳解
不僅可以將交換器與隊列綁定,也可以將交換器與交換器綁定。綁定之后,消息從source交換器轉發到destination交換器
方法中涉及的參數:
destination:目的交換器名;
source:源交換器的名稱;
routingKey:用來綁定隊列和交換器的路由鍵;
argument:定義綁定的一些參數。
交換器的使用並不會真正耗費服務器的性能,而隊列會。要衡量RabbitMQ當前的QPS只需看隊列的即可。
發送消息
BasicPublish方法用來發送一條消息到。為了更好地控制發送,可以使用mandatory這個參數
對應的具體參數解釋如下:
exchange:交換器的名稱,指明消息需要發送到哪個交換器中。如果設置為空字符串,則消息會被發送到RabbitMQ默認的交換器中。
routingKey:路由鍵,交換器根據路由鍵將消息存儲到相應的隊列之中。
basicProperties:消息的基本屬性集,其包含14個屬性成員,分別有contentType、contentEncoding、headers(Map<;String,Object>;)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。
byte[] body:消息體(payload),真正需要發送的消息。
mandatory:設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,會調用basic.return方法將消息返還給生產者;設為false時,出現上述情形broker會直接將消息扔掉。
豐富了一下第一部分的代碼:
var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 1; properties.Priority = 2; properties.ContentType = "text/plain"; properties.Expiration = "60000"; string message = "RabbitMQ Test"; //傳遞的消息內容 channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生產消息
消費消息
RabbitMQ的消費模式分兩種:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume進行消費,而拉模式則是調用Basic.Get進行消費。
推模式
推模式接收消息需要實例化一個EventingBasicConsumer類,訂閱Received事件來接收消息。EventingBasicConsumer實現了DefaultBasicConsumer類,實際使用中如果不滿足需求可以繼承該類。
示例代碼:
var consumer = new EventingBasicConsumer(channel); consumer.Received += (ch, ea) => { var body = ea.Body.ToArray(); Console.WriteLine($"Received:{Encoding.UTF8.GetString(body)}"); channel.BasicAck(ea.DeliveryTag, false); }; var consumerTag = channel.BasicConsume("stacking", false, consumer);
BasicConsume方法對應的參數說明如下:
queue:隊列的名稱;
autoAck:設置是否自動確認。建議設成false,即不自動確認;
consumerTag:消費者標簽,用來區分多個消費者;
arguments:設置消費者的其他參數;
callback:設置消費者的回調函數。
BasicConsume返回字符串類型consumerTag,可以通過調用channel.BasicCancel(consumerTag)顯式地取消一個消費者的訂閱。BasicCancel方法會首先觸發HandleConsumerOk方法,之后觸發HandleDelivery方法,最后才觸發HandleCancelOk方法.
拉模式
拉模式通過channel.BasicGet方法可以單條地獲取消息。
示例代碼:
var result = channel.BasicGet("stacking",false); Console.WriteLine($"Received:{Encoding.UTF8.GetString(result.Body.ToArray())}"); channel.BasicAck(result.DeliveryTag, false);
Basic.Consume將信道(Channel)置為接收模式,直到取消隊列的訂閱,RabbitMQ會不斷地推送消息給消費者,當然推送消息的個數還是會受到Basic.Qos的限制。如果只想從隊列獲得單條消息而不是持續訂閱,建議使用Basic.Get進行消費。但是不能將Basic.Get放在一個循環里來代替Basic.Consume,這樣做會嚴重影響RabbitMQ的性能。
消費端的確認與拒絕
為了保證消息從隊列可靠地達到消費者,RabbitMQ提供了消息確認機制(message acknowledgement)。消費者在訂閱隊列時指定autoAck參數,當autoAck為false時,RabbitMQ會等待消費者顯式地回復確認信號后才從內存(或者磁盤)中移去消息(實質上是先打上刪除標記,之后再刪除)。當autoAck為true時,RabbitMQ會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正地消費到了這些消息。
當autoAck參數設置為false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題。
當autoAck參數設置為false,對於RabbitMQ服務端而言,隊列中的消息分為兩部分:一部分是等待投遞給消費者的消息;一部分是已投遞給消費者,但是還沒有收到消費者確認信號的消息。如果RabbitMQ一直沒有收到消費者的確認信號,並且此消息的消費者斷開連接,則RabbitMQ會安排該消息重新進入隊列,等待投遞給下一個消費者(可能還是原來的那個消費者 ),並且RabbitMQ不會為未確認的消息設置過期時間。
消費消息時autoAck參數設置為false需要主動調用channel.BasicAck對消息進行確認,以便RabbitMQ刪除消息,對應的也可以調用channel.BasicReject方法拒絕消息,由其他消費端處理或者丟棄。
deliveryTag可以看作消息的編號。如果requeue參數設置為true,則RabbitMQ會重新將這條消息存入隊列;如果requeue參數設置為false,則RabbitMQ立即會把消息從隊列中移除,不會把它發送給新的消費者。
Basic.Reject命令一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用Basic.Nack這個命令,對應的實現方法為channel.BasicNack.
其中deliveryTag和requeue的含義可以參考BasicReject方法。multiple參數設置為false則表示僅拒絕編號為deliveryTag的單條消息;multiple參數設置為true則表示拒絕deliveryTag編號之前所有未被當前消費者確認的消息。
channel.BasicReject或者channel.BasicNack中的requeue設置為false,可以啟用“死信隊列”的功能。死信隊列可以通過檢測被拒絕或者未送達的消息來追蹤問題。
關閉連接
可以顯示的調用Connection和Channel的Close方法來關閉連接,也可以借助using來管理連接。
Connection和Channel所具備的生命周期如下:
Open:開啟狀態,代表當前對象可以使用。
Closing:正在關閉狀態。當前對象被顯式地通知調用關閉方法(shutdown),這樣就產生了一個關閉請求讓其內部對象進行相應的操作,並等待這些關閉操作的完成。
Closed:已經關閉狀態。當前對象已經接收到所有的內部對象已完成關閉動作的通知,並且其也關閉了自身。
在Connection和Channel中都定義了對應實現監聽狀態的改變。
Connection
Channel
Github
示例代碼地址:https://github.com/MayueCif/RabbitMQ