.Net RabbitMQ實戰指南——客戶端開發


開發中關鍵的Class和Interface有Channel、Connection、ConnectionFactory、Consumer等,與RabbitMQ相關的開發工作,基本上是圍繞Connection和Channel這兩個類展開的。

連接RabbitMQ

一個Connection可以創建多個Channel實例,但Channel實例不能在線程間共享,應用程序應該為每一個線程開辟一個Channel。

 

Channel或者Connection中有個isOpen方法可以用來檢測其是否已處於開啟狀態。但並不推薦使用,這個方法的返回值依賴於shutdownCause的存在,有可能會產生競爭。更多的是捕獲ShutdownSignalException,IOException或SocketException等異常判斷RabbitMq的連接狀態。

實際操作過程中遇到BrokerUnreachableException異常

 

 因為我使用的賬號是guest,guest賬號默認是不支持遠程連接,需要在http://localhost:15672(前提是安裝了web插件)的Admin選項卡中添加一個新用戶(或者使用命令行添加)。

安裝web插件

rabbitmq-plugins enable rabbitmq_management

添加新用戶:

sudo rabbitmqctl add_user  user_admin  passwd_admin

 

如上圖所示,新添加的用戶沒有任何權限,需要點擊用戶名設置權限。

示例代碼:

var factory = new ConnectionFactory {
    HostName = "localhost",   //主機名
    UserName = "mymq",        //默認用戶名
    Password = "123456",      //默認密碼
    RequestedHeartbeat = TimeSpan.FromSeconds(30)
};

using (var connection = factory.CreateConnection())//連接服務器
{
    //創建一個通道
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare("stacking", false, false, false, null);//創建消息隊列
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 1;
        string message = "RabbitMQ Test"; //傳遞的消息內容
        channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生產消息
        Console.WriteLine($"Send:{message}");
    }
}

在管理界面處看到消息插入成功

 

 使用新加的賬號鏈接MQ還會提示BrokerUnreachableException異常,很納悶。折騰了半天把WSL升級到WSL2就鏈接成功。

交換器和隊列

交換器和隊列是應用層面的構建模塊,使用前應對其進行聲明確保其存在。

 var exchangeName = "exchange_name";
 channel.ExchangeDeclare(exchangeName, "direct", true);//創建一個持久化的、非自動刪除的、綁定類型為direct的交換器
 var queueName = channel.QueueDeclare().QueueName;   //創建一個非持久化的、排他的、自動刪除的隊列(隊列名由RabbitMQ自動生成)
 channel.QueueBind(queueName, exchangeName, "routing_key");  //使用路由鍵(routing_key)將隊列和交換器綁定

 channel.QueueDeclare("queue_name", true);   // QueueDeclare擁有多個重載

ExchangeDeclare方法詳解

各個參數詳細說明如下:

exchange:交換器的名稱。

type:交換器的類型,常見的如fanout、direct、topic。

durable:設置是否持久化。durable設置為true表示持久化,反之是非持久化。持久化可以將交換器存盤,在服務器重啟的時候不會丟失相關信息。

autoDelete:設置是否自動刪除。autoDelete設置為true則表示自動刪除。自動刪除的前提是至少有一個隊列或者交換器與這個交換器綁定,之后所有與這個交換器綁定的隊列或者交換器都與此解綁才會刪除。

internal:設置是否是內置的。如果設置為true,則表示是內置的交換器,客戶端程序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式。

argument:其他一些結構化參數

QueueDeclareNoWait方法實現設置了一個nowait參數(AMQP中Exchange.Declare命令的參數),意思是不需要等待服務區返回結果。

ExchangeDeclarePassive方法用來檢測相應的交換器是否存在。如果存在則正常返回;如果不存在則拋出異常。

QueueDeclare方法詳解

方法的參數詳細說明如下:

queue:隊列的名稱。

durable:設置是否持久化。為true則設置隊列為持久化。持久化的隊列會存盤,在服務器重啟的時候可以保證不丟失相關信息。

exclusive:設置是否排他。為true則設置隊列為排他的。

autoDelete:設置是否自動刪除。為true則設置隊列為自動刪除。自動刪除的前提是:至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。

arguments:設置隊列的其他一些參數,如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority等。

如果一個隊列被聲明為排他隊列,則該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。需要注意:排他隊列是基於連接(Connection)可見的,同一個連接的不同信道(Channel)可以訪問同一連接創建的排他隊列;“首次”是指如果一個連接已經聲明了一個排他隊列,其他連接不允許再建立同名的排他隊列;即使該隊列是持久化的,一旦連接關閉或客戶端退出,該排他隊列都會被自動刪除,這種隊列適用於一個客戶端同時發送和讀取消息的應用場景。

QueueDeclareNoWait方法實現設置了一個nowait參數,意思是不需要等待服務區返回結果。

QueueDeclarePassive方法用來檢測相應的隊列是否存在。如果存在則正常返回;如果不存在則拋出異常。

 QueueBind方法詳解

方法中涉及的參數:

queue:隊列名稱;

exchange:交換器的名稱;

routingKey:用來綁定隊列和交換器的路由鍵;

argument:定義綁定的一些參數。

ExchangeBind方法詳解

不僅可以將交換器與隊列綁定,也可以將交換器與交換器綁定。綁定之后,消息從source交換器轉發到destination交換器

方法中涉及的參數:

destination:目的交換器名;

source:源交換器的名稱;

routingKey:用來綁定隊列和交換器的路由鍵;

argument:定義綁定的一些參數。

交換器的使用並不會真正耗費服務器的性能,而隊列會。要衡量RabbitMQ當前的QPS只需看隊列的即可。

發送消息

BasicPublish方法用來發送一條消息到。為了更好地控制發送,可以使用mandatory這個參數

對應的具體參數解釋如下:

exchange:交換器的名稱,指明消息需要發送到哪個交換器中。如果設置為空字符串,則消息會被發送到RabbitMQ默認的交換器中。

routingKey:路由鍵,交換器根據路由鍵將消息存儲到相應的隊列之中。

basicProperties:消息的基本屬性集,其包含14個屬性成員,分別有contentType、contentEncoding、headers(Map<String,Object>)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。

byte[] body:消息體(payload),真正需要發送的消息。

mandatory:設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,會調用basic.return方法將消息返還給生產者;設為false時,出現上述情形broker會直接將消息扔掉。

豐富了一下第一部分的代碼:

var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 1;
properties.Priority = 2;
properties.ContentType = "text/plain";
properties.Expiration = "60000";
string message = "RabbitMQ Test"; //傳遞的消息內容
channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生產消息

消費消息

RabbitMQ的消費模式分兩種:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume進行消費,而拉模式則是調用Basic.Get進行消費。

推模式

推模式接收消息需要實例化一個EventingBasicConsumer類,訂閱Received事件來接收消息。EventingBasicConsumer實現了DefaultBasicConsumer類,實際使用中如果不滿足需求可以繼承該類。

示例代碼:

var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (ch, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        Console.WriteLine($"Received:{Encoding.UTF8.GetString(body)}");
                        channel.BasicAck(ea.DeliveryTag, false);
                    };
                    var consumerTag = channel.BasicConsume("stacking", false, consumer);

BasicConsume方法對應的參數說明如下:

queue:隊列的名稱;

autoAck:設置是否自動確認。建議設成false,即不自動確認;

consumerTag:消費者標簽,用來區分多個消費者;

arguments:設置消費者的其他參數;

callback:設置消費者的回調函數。

BasicConsume返回字符串類型consumerTag,可以通過調用channel.BasicCancel(consumerTag)顯式地取消一個消費者的訂閱。BasicCancel方法會首先觸發HandleConsumerOk方法,之后觸發HandleDelivery方法,最后才觸發HandleCancelOk方法.

拉模式

拉模式通過channel.BasicGet方法可以單條地獲取消息。

示例代碼:

var result = channel.BasicGet("stacking",false);
Console.WriteLine($"Received:{Encoding.UTF8.GetString(result.Body.ToArray())}");
channel.BasicAck(result.DeliveryTag, false);

Basic.Consume將信道(Channel)置為接收模式,直到取消隊列的訂閱,RabbitMQ會不斷地推送消息給消費者,當然推送消息的個數還是會受到Basic.Qos的限制。如果只想從隊列獲得單條消息而不是持續訂閱,建議使用Basic.Get進行消費。但是不能將Basic.Get放在一個循環里來代替Basic.Consume,這樣做會嚴重影響RabbitMQ的性能。

消費端的確認與拒絕

為了保證消息從隊列可靠地達到消費者,RabbitMQ提供了消息確認機制(message acknowledgement)。消費者在訂閱隊列時指定autoAck參數,當autoAck為false時,RabbitMQ會等待消費者顯式地回復確認信號后才從內存(或者磁盤)中移去消息(實質上是先打上刪除標記,之后再刪除)。當autoAck為true時,RabbitMQ會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正地消費到了這些消息。

當autoAck參數設置為false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題。

當autoAck參數設置為false,對於RabbitMQ服務端而言,隊列中的消息分為兩部分:一部分是等待投遞給消費者的消息;一部分是已投遞給消費者,但是還沒有收到消費者確認信號的消息。如果RabbitMQ一直沒有收到消費者的確認信號,並且此消息的消費者斷開連接,則RabbitMQ會安排該消息重新進入隊列,等待投遞給下一個消費者(可能還是原來的那個消費者 ),並且RabbitMQ不會為未確認的消息設置過期時間。

消費消息時autoAck參數設置為false需要主動調用channel.BasicAck對消息進行確認,以便RabbitMQ刪除消息,對應的也可以調用channel.BasicReject方法拒絕消息,由其他消費端處理或者丟棄。

deliveryTag可以看作消息的編號。如果requeue參數設置為true,則RabbitMQ會重新將這條消息存入隊列;如果requeue參數設置為false,則RabbitMQ立即會把消息從隊列中移除,不會把它發送給新的消費者。

Basic.Reject命令一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用Basic.Nack這個命令,對應的實現方法為channel.BasicNack.

其中deliveryTag和requeue的含義可以參考BasicReject方法。multiple參數設置為false則表示僅拒絕編號為deliveryTag的單條消息;multiple參數設置為true則表示拒絕deliveryTag編號之前所有未被當前消費者確認的消息。

channel.BasicReject或者channel.BasicNack中的requeue設置為false,可以啟用“死信隊列”的功能。死信隊列可以通過檢測被拒絕或者未送達的消息來追蹤問題。

關閉連接

可以顯示的調用Connection和Channel的Close方法來關閉連接,也可以借助using來管理連接。

Connection和Channel所具備的生命周期如下:

Open:開啟狀態,代表當前對象可以使用。

Closing:正在關閉狀態。當前對象被顯式地通知調用關閉方法(shutdown),這樣就產生了一個關閉請求讓其內部對象進行相應的操作,並等待這些關閉操作的完成。

Closed:已經關閉狀態。當前對象已經接收到所有的內部對象已完成關閉動作的通知,並且其也關閉了自身。

在Connection和Channel中都定義了對應實現監聽狀態的改變。

Connection

 

 Channel

Github

示例代碼地址:https://github.com/MayueCif/RabbitMQ


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM