DotNet Core中使用RabbitMQ


  上一篇隨筆記錄到RabbitMQ的安裝,安裝完成,我們就開始使用吧。

RabbitMQ簡介

  AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。

  AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

  RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集群和分布式部署。適用於排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。

DotNet Core使用RabbitMQ

通過nuget安裝:https://www.nuget.org/packages/RabbitMQ.Client/

定義生產者:

//創建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",//用戶名
    Password = "guest",//密碼
    HostName = "127.0.0.1"//rabbitmq ip
};

//創建連接
var connection = factory.CreateConnection();
//創建通道
var channel = connection.CreateModel();
//聲明一個隊列
channel.QueueDeclare("hello", false, false, false, null);

Console.WriteLine("\nRabbitMQ連接成功,請輸入消息,輸入exit退出!");

string input;
do
{
    input = Console.ReadLine();

    var sendBytes = Encoding.UTF8.GetBytes(input);
    //發布消息
    channel.BasicPublish("", "hello", null, sendBytes);

} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();

定義消費者:

//創建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",//用戶名
    Password = "guest",//密碼
    HostName = "127.0.0.1"//rabbitmq ip
};

//創建連接
var connection = factory.CreateConnection();
//創建通道
var channel = connection.CreateModel();

//事件基本消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

//接收到消息事件
consumer.Received += (ch, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    Console.WriteLine($"收到消息: {message}");
    //確認該消息已被消費
    channel.BasicAck(ea.DeliveryTag, false);
};
//啟動消費者 設置為手動應答消息
channel.BasicConsume("hello", false, consumer);
Console.WriteLine("消費者已啟動");
Console.ReadKey();
channel.Dispose();
connection.Close();

演示如下:

啟動了一個生產者,兩個消費者,可以看見兩個消費者都能接收到消息,消息投遞到哪個消費者是由RabbitMQ決定的。

RabbitMQ消費失敗的處理

  RabbitMQ采用消息應答機制,即消費者收到一個消息之后,需要發送一個應答,然后RabbitMQ才會將這個消息從隊列中刪除,如果消費者在消費過程中出現異常,斷開連接切沒有發送應答,那么RabbitMQ會將這個消息重新投遞。

我們來修改一下消費者的代碼:

 //接收到消息事件
 consumer.Received += (ch, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);

     Console.WriteLine($"收到消息: {message}");

     Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲10s發送回執");
     Thread.Sleep(10000);
     //確認該消息已被消費
     channel.BasicAck(ea.DeliveryTag, false);
     Console.WriteLine($"已發送回執[{ea.DeliveryTag}]");
 };

演示如下:

從圖中可以看出,設置了消息應答延遲10s,如果在這10s中,該消費者斷開了連接,那么消息會被RabbitMQ重新投遞。

使用RabbitMQ的Exchange

前面的例子,我們可以看到生產者將消息投遞到Queue中,實際上這種方式在RabbitMQ中永遠都不會發生的。實際的情況是,生產者將消息發送到Exchange(交換器),下圖中的X,由Exchange(交換器)將消息路由到一個或多個Queue中(或者丟棄)。

 

AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被發送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,然后Exchange按照特定的策略轉發到Queue進行存儲。同理,消費者也是如此。Exchange 就類似於一個交換機,轉發各個消息分發到相應的隊列中。

Exchange Types(交換器類型)

RabbitMQ常用的Exchange Type有Fanout、Direct、Topic、Headers這四種

1、Fanout:

  這種類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中,這時Routing key不起作用

 

 

 

Fanout Exchange 不需要處理RouteKey 。只需要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的所有隊列上。類似子網廣播,每台子網內的主機都獲得了一份復制的消息。

所以,Fanout Exchange 轉發消息是最快的。

為了演示效果,定義了兩個隊列,分別為hello1,hello2,每個隊列都擁有一個消費者。

static void Main(string[] args)
{
    string exchangeName = "TestFanoutChange";
    string queueName1 = "hello1";
    string queueName2 = "hello2";
    string routeKey = "";

    //創建連接工廠
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用戶名
        Password = "guest",//密碼
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //創建連接
    var connection = factory.CreateConnection();
    //創建通道
    var channel = connection.CreateModel();

    //定義一個Direct類型交換機
    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);

    //定義隊列1
    channel.QueueDeclare(queueName1, false, false, false, null);
    //定義隊列2
    channel.QueueDeclare(queueName2, false, false, false, null);

    //將隊列綁定到交換機
    channel.QueueBind(queueName1, exchangeName, routeKey, null);
    channel.QueueBind(queueName2, exchangeName, routeKey, null);

    //生成兩個隊列的消費者
    ConsumerGenerator(queueName1);
    ConsumerGenerator(queueName2);


    Console.WriteLine($"\nRabbitMQ連接成功,\n\n請輸入消息,輸入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發布消息
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();
}
 /// <summary>
 /// 根據隊列名稱生成消費者
 /// </summary>
 /// <param name="queueName"></param>
 static void ConsumerGenerator(string queueName)
 {
     //創建連接工廠
     ConnectionFactory factory = new ConnectionFactory
     {
         UserName = "guest",//用戶名
         Password = "guest",//密碼
         HostName = "127.0.0.1"//rabbitmq ip
     };

     //創建連接
     var connection = factory.CreateConnection();
     //創建通道
     var channel = connection.CreateModel();

     //事件基本消費者
     EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

     //接收到消息事件
     consumer.Received += (ch, ea) =>
     {
         var message = Encoding.UTF8.GetString(ea.Body);

         Console.WriteLine($"Queue:{queueName}收到消息: {message}");
         //確認該消息已被消費
         channel.BasicAck(ea.DeliveryTag, false);
     };
     //啟動消費者 設置為手動應答消息
     channel.BasicConsume(queueName, false, consumer);
     Console.WriteLine($"Queue:{queueName},消費者已啟動");
 }

運行效果如下:

2、Direct

  這種類型的Exchange路由規則也很簡單,它會把消息路由到哪些binding key與routingkey完全匹配的Queue中。

 

   Direct模式,可以使用rabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進行任何綁定(binding)操作 。消息傳遞時,RouteKey必須完全匹配,才會被隊列接收,否則該消息會被拋棄。

static void Main(string[] args)
{
    string exchangeName = "TestChange";
    string queueName = "hello";
    string routeKey = "helloRouteKey";

    //創建連接工廠
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用戶名
        Password = "guest",//密碼
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //創建連接
    var connection = factory.CreateConnection();
    //創建通道
    var channel = connection.CreateModel();

    //定義一個Direct類型交換機
    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);

    //定義一個隊列
    channel.QueueDeclare(queueName, false, false, false, null);

    //將隊列綁定到交換機
    channel.QueueBind(queueName, exchangeName, routeKey, null);

    Console.WriteLine($"\nRabbitMQ連接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},\n\n請輸入消息,輸入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發布消息
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();

運行效果如下:

3、Topic

  這種類型的Exchange的路由規則支持 binding key 和 routing key 的模糊匹配,會把消息路由到滿足條件的Queue。 binding key 中可以存在兩種特殊字符 *與 #,用於做模糊匹配,其中 * 用於匹配一個單詞,# 用於匹配0個或多個單詞,單詞以符號“.”為分隔符。

  以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1與Q2,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey。

  所以,Topic Exchange使用非常靈活。
static void Main(string[] args)
{
    string exchangeName = "TestTopicChange";
    string queueName = "hello";
    string routeKey = "TestRouteKey.*";

    //創建連接工廠
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用戶名
        Password = "guest",//密碼
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //創建連接
    var connection = factory.CreateConnection();
    //創建通道
    var channel = connection.CreateModel();

    //定義一個Direct類型交換機
    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);

    //定義隊列1
    channel.QueueDeclare(queueName, false, false, false, null);

    //將隊列綁定到交換機
    channel.QueueBind(queueName, exchangeName, routeKey, null);



    Console.WriteLine($"\nRabbitMQ連接成功,\n\n請輸入消息,輸入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發布消息
        channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();
}

運行效果如下:

 4、Headers

  這種類型的Exchange不依賴於 routing key 與 binding key 的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配。

參考:

  官網:https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

    https://www.cnblogs.com/stulzq/p/7551819.html

    https://www.jianshu.com/p/e55e971aebd8


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM