RabbitMQ在.NetCore中的基礎應用


一、Rabbit MQ 簡介

        RabbitMQ是一個接收、轉發消息的消息中間件。你可以把它想象成一個郵局:當你把郵件放假郵箱后時,你能夠確保郵遞員最終會將你的郵件送到收件人手里。在這個比喻中,RabbitMQ是一個郵箱、一個郵局、一個郵遞員。RabbitMQ與郵局最大的不同是它接收、存儲和轉發二進制數據塊--消息,而不是處理紙質信件。

RabbitMQ與其他消息中間件通常使用的一些名詞:

  • Producing:指發送,一個發送消息的程序叫做Producer

  • Queue:在RabbitMQ中,類似與郵局中的郵箱,用於存放二進制數據塊。雖然消息經過RabbitMQ和你的應用程序,但是它們只能存在queue里面。queue本質上是一個大的消息緩沖區,只受主機的內存與硬盤的限制。多個Producer可以發送消息至一個queue,多個Consumer可以從一個queue中接收消息。如下圖所示:

  • Consuming:至接收消息,一個等待接收消息的程序叫Consumer

Producer、COnsumer、Broker沒有必要安裝在同一台主機上;實際上,在大多應用程序中都是分開安裝。一個應用程序可以同時是Producer和Consumer。

RabbitMQ使用了AMQP網絡通信協議。可以點擊深入了解RabbitMQ的工作原理。

二、Rabbit MQ 安裝

安裝RabbitMQ服務端(當前最新v 3.8.5下載地址 )注意:安裝前先安裝Erlang(建議opt 23.0)

如遇上圖錯誤,是因為ERLANG_HOME路徑配置有誤,修改環境變量ERLANG_HOME=erlang安裝目錄中Bin的上一級:

這個時候運行服務端,如果提示則百度安裝一下即可。

注意:MSVCR120.dll要下載與系統對應的版本,不然會很難受。

三、RabbitMQ示例--"HELLO WORLD"

下面我們以發送、接收"Hello World"的示例,先簡單了解RabbitMQ的開發(以C#代碼為示例):

首先創建兩個控制台應用程序:RabbitMQ.Producer(消息生產者)、RabbitMQ.Consumer(消息消費者)

添加nuget引用-->>RabbitMQ.Client

RabbitMQ.Producer.Program.cs

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                    durable: false,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);

                string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) => {
                    channel.BasicPublish(exchange: "",
                                        routingKey: "hello",
                                        basicProperties: null,
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Consumer.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQ.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Received {message}");
                };
                channel.BasicConsume(queue: "hello",
                                     autoAck: true,
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

執行效果,每隔一秒發送一次消息:

四、隊列

任務隊列最大的目的是為了避免立即處理一下大資源任務(比如一些文件數據)而不得不等待它完成后才能繼續處理后續消息。取而代之的是我們計划稍后處理這些大資源的任務。我們將任務封裝為消息並將其發送到隊列。在后台運行的工作進程將彈出任務並最終執行該作業。當運行多個工作進程時,任務將在他們之間共享。

Round-robin dispatching-消息分發機制

上面的代碼示例簡單的展現了RabbitMQ的運行方式,下面的代碼將體現隊列的工作過程。以及一些代碼中的參數配置。

  • 利用Threading.Sleep來假裝處理大資源任務,修改Consumer中的代碼,並修改Producer中的代碼,增加消息隨機數

RabbitMQ.Producer.Program.cs

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                    durable: false,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);

                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) => {
                    string message = "Hello World!" + rd.Next(1000, 10000);
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "",
                                        routingKey: "hello",
                                        basicProperties: null,
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Consumer.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQ.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Received {message}");
                    Thread.Sleep(3000);
                };
                channel.BasicConsume(queue: "hello",
                                     autoAck: true,
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code
  • 運行一個Producer和兩個(或多個)Consumer,觀察兩個Consumer中接收到的消息

        觀察發現,Producer一共發送了14條消息,兩個Consumer分別收到了7條,這個就是RabbitMQ round-robin消息分發機制,當接收消息處理需要耗費大量時間時(如業務邏輯復雜、處理文件流信息等),可以同時開啟多個Consumer去同時接收消息並處理,這樣就可以並行處理堆積在隊列中的消息。

Message acknowledgment-消息確認機制

        解決了因消費消息時間過長導致隊列中消息堆積過多的問題之后,你可能會有這樣的疑問,當Consumer接收到一條消息之后,任務處理到一半Consumer掛掉之后,這條消息按照RabbitMQ的機制,一旦發送給Consumer就會被標記為刪除,而且接下來發送到這個Consumer的消息都會丟失,但這並不是你想要的結果。當一個Consumer掛掉后,應該把消息轉發給另一個Consumer。

        為了解決上述問題,確保消息用於不會丟失,RabbitMQ提供了消息確認機制(message acknowledgments)。在Consumer成功接收並處理一筆消息后,會返回一個確認信息給RabbitMQ,告訴他指定的消息已經成功接收並處理,可以進行刪除。如果Consumer因掛掉了(通道關閉、連接關閉、網絡連接中斷等)而未返回確認信息,RabbitMQ會理解消息沒有完全被處理並把它重新排入隊列之中。如果同時有另一個Consumer在線,它會快速的把消息重新發送給另一個Consumer。通過這種方式,即使Consumer偶然掛掉,也不會有消息丟失。

Consumer程序中,需要調整部分代碼,當消息處理完成后,程序手動返回一個確認給RabbitMQ。

RabbitMQ.Consumer.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQ.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Received {message}");
                    Thread.Sleep(3000);
                    channel.BasicAck(ea.DeliveryTag, false);// Consumer返回消息處理完成確認信息
                    Console.WriteLine("完成確認信息返回...");
                };
                channel.BasicConsume(queue: "hello",
                                     autoAck: false,        // Consumer返回消息處理完成確認信息
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

        發布完Consumer程序后,打開兩個Consumer,在其中一個接收到消息的時候,立即Ctrl+C,停止其運行,觀察消息是否轉發到另一個Consumer。

        在上動圖中,左邊的Comsumer在接收到消息9856時,我們立馬終止其操作,然后發現右邊的Consumer接收到了消息9856,並進行了處理確認。

        在使用BasicAck方法時,要確認接收消息與返回確認消息的channel是同一個,不然會拋出channel-level protocol exception的錯誤。而且要注意,當確定返回確認消息給RabbitMQ時(即設置 autoAck = false),一定不要忘記BasicAck方法,不然后果很嚴重。RabbitMQ一直接收不到確認消息,就會一直轉發消息(隨機轉發),當你的Consumer全部關閉后,RabbitMQ將會消耗掉越來越多的內存,因為它無法釋放未確認的消息。為了調試這種錯誤,你可以使用 rabbitmqctl

在Windows中,執行下面命令,可以查詢未確認消息的數量

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Message durability-消息持久機制

我們已經了解了,如何在Consumer掛掉的情況下確保消息任務不會丟失,但如果RabbitMQ服務器掛掉了,消息任務仍然會丟失。

當RabbitMQ服務器關閉會崩潰是,它將丟失隊列及消息,除非你告訴它不要這么做。為了確保隊列及消息不會丟失,有兩件事需要我們去完成:我們需要同時給隊列和消息打上持久的標記。

首先,我們需要確保隊列在RabbitMQ節點重啟隊列中存在,對此可以作一下聲明:

channel.QueueDeclare(queue: "hello",
                     durable: true,// 聲明durable = true
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

上面的聲明看起來沒啥毛病,但是真正運行起來並沒有任何效果。那是因為我們已經定義過了隊列 "hello",且durable = false。RabbitMQ不允許重新用不同的參數去定義一個已經存在的隊列,所以我們需要重新聲明一個隊列,例如 queue_task,同時設置消息的持久性(Persistent = true).

RabbitMQ.Producer.Program.cs

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;       // 設置消息持久性
                channel.QueueDeclare(queue: "queue_task",
                                     durable: true,  // 設置隊列持久性
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) => {
                    string message = "Hello World!" + " -- " + rd.Next(1000, 10000);
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "",
                                        routingKey: "queue_task",
                                        basicProperties: properties,// 設置消息持久性
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Consumer.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQ.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "queue_task",
                                     durable: true,  // 設置隊列持久性
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Received {message}");
                    Thread.Sleep(3000);
                    channel.BasicAck(ea.DeliveryTag, false);// Consumer返回消息處理完成確認信息
                    Console.WriteLine("完成確認信息返回...");
                };
                channel.BasicConsume(queue: "queue_task",
                                     autoAck: false,        // Consumer返回消息處理完成確認信息
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code  

        調整完之后編譯發布后,執行Producer,在執行過程中關閉RabbitMQ服務后再開啟,執行Consumer觀察是否可以接收到消息。你會發現重啟RabbitMQ服務后,Consumer會正常接收到消息。

        由上圖所示,當關閉RabbitMQ后,Producer不再發送消息,打開RabbitMQ,Producer繼續發送消息,Consumer從第一條消息4204開始接收處理。(中間報錯是因為服務器反應有點慢)

注意:設置消息Persistent的屬性並能夠完全確保消息不會丟失。盡管告訴了RabbitMQ需要保存消息至硬盤,但是仍然會有一個RabbitMQ接收消息后還未保存的時間點。而且RabbitMQ不能同時保存所有的消息至硬盤,有時往往只會寫入內存之中。Persistent雖然不夠健壯,但是一般的隊列任務都是沒問題的,如果需要更加安全的消息保護機制,可以點擊了解

Fair Dispatch-公平分發機制

        2013年的某個夏天,小明與小剛相約去搬磚,有兩種類型運磚車,A類車的磚頭搬完需要2個小時,B類車的磚頭搬完需要3個小時,工頭小華和小明是表兄弟,所以一直讓小明負責A類車,這個時候問題就來了,小明一直搬A類車的磚,所以經常出現,小剛在累死累活的時候,小明在放空。這個時候被老板看見了,重新分配了一下,小剛當工頭,小明和小華搬磚。當小明搬完A類車時,如果小華正在搬磚,這個時候不管來的是A類車還是B類車,小明都需要接着搬,要保證工作飽和。

        上面提到的 round-robin dispatching是RabbitMQ默認的消息分發機制:兩個Consumer,奇數消息發送給A,偶數消息發送給B。看似沒啥問題,但仔細一品,如果奇數類消息處理時間很長而偶數類消息處理時間很短,那么Consumer A總是忙個不停,而Consumer B卻總是很閑,但是RabbitMQ並不知道這些,還是繼續平均分發消息。

        發生上述問題的原因是RabbitMQ只會分發進入隊列中的消息,它不會考慮每台Consumer還剩多少未確認(消息正在處理,並未返回確認)的消息,只會盲目的把第幾條消息分發給第幾台Consumer。為了解決這個問題,可以在Consumer程序中使用BasicQos方法,設置prefetchCount=1。這個會告訴RabbitMQ在同一時間不能發送超過一條消息給Consumer,換句話說,只有在Consumer處理完上一條消息並返回確認后才可以分發消息給這個Consumer。取而代之,會把消息發送給另一台空閑的Consumer。

 channel.BasicQos(0, 1, false);                                                                                                                     

注意:這樣的設置需要注意一個問題,如果多個Consumer都很忙的時候,那么消息會一直堆在Queue中,你需要考慮到Queue的容量,多增加幾台Consumer或者采用其他一些策略。

代碼整合

RabbitMQ.Producer.Program.cs

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;       // 設置消息持久性
                channel.QueueDeclare(queue: "queue_task",
                                     durable: true,  // 設置隊列持久性
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) => {
                    string message = "Hello World!" + " -- " + rd.Next(1000, 10000);
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "",
                                        routingKey: "queue_task",
                                        basicProperties: properties,// 設置消息持久性
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Consumer.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQ.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "queue_task",
                                     durable: true,  // 設置隊列持久性
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                // 設置每次最多可以接收多少條消息,prefetchCount=1時,表明只有消息處理完成且返回確認后方可接收新消息
                channel.BasicQos(0, 1, false);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Received {message}");
                    Thread.Sleep(3000);
                    // Consumer返回消息處理完成確認信息
                    channel.BasicAck(ea.DeliveryTag, false);
                    Console.WriteLine("完成確認信息返回...");
                };
                channel.BasicConsume(queue: "queue_task",
                                     autoAck: false,        // Consumer返回消息處理完成確認信息
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

在Producer程序中,我們設置了隊列及消息的持久性,在Consumer程序中,我們設置了消息確認機制及消息公平分發機制。

如果想多了解channel的屬性,即IModel和IBasicProperties可以點擊RabbitMQ .NET client API reference online進行深入學習。

五、發布與訂閱

        在上個章節,我們創建了一個工作隊列,他主要的職責就是接收、分發消息,每一條消息准確的分發給某一台Consumer。有時,這種分發方式並不滿足於業務需求,例如泰康的客戶信息更改,需要保證所有系統的客戶信息一致,這時由客戶中心分發更新后的客戶信息,多個系統進行接收並同步更新。這一章節,我們將會討論如果將一條消息同事分發給多個Consumer。這中模式被稱為“發布/訂閱”。

        在這個章節中,我們會創建一個簡單的日志系統。首先建一個LogProducer程序用於發送日志消息,再建兩個LogConsumer程序:LogConsumerWrite與LogConsumerPrint,前者用於把=將日志寫入硬盤,后者用於將日志打印出來。在這個日志系統中,所有運行的LogConsumer程序都會接收到LogProducer發送的所有日志消息。本質上,就是LogProducer發送日志消息給所有運行的LogConsumer。

Exchanges-交換機

讓我們重新回顧一下上幾個章節中的名詞:

  • Producer是一個發送消息的應用程序;
  • Queue是一個存儲消息的緩沖區;
  • Consumer是一個接收消息的應用程序;

        在上幾章類容,表面上我們看到的是Queue去發送與接收消息。下面介紹一下RabbitMQ中的完整消息模型。這種消息模型的核心思想是Producer從來不直接發送消息給Queue。事實上,通常情況下Producer甚至不知道消息是否發送至Queue。取而代之,Producer只能發送消息給交換機。

        交換機是一個非常簡單的東西。它從一邊接受來自Producer發送的消息,另一邊把消息塞進Queue。但交換機必須得完全明白他接收的消息是干嘛用的。這條消息是加入指定的隊列還是加入多個隊列?或者這條消息是否丟棄?這些規則由交換機類型去定義。

交換機類型分為:direct,topic,headers,fanout。這次的日志系統以fanout為例,字面理解就是廣播所有他接收到的消息給他所知道的隊列。

我們可以使用以下命令行去獲取服務器上交換機的列表:

sudo rabbitmqctl list_exchanges

        你會發現有很多amq.*的交換機,這些是默認創建的,暫時用不到。其中第一個名稱為空的交換機,其實就是上面我們演示的Hellow World默認創建的交換機,因為Producer不能直接發送消息給Queue,必須通過交換機進行消息處理。

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "hello",
                     basicProperties: null,
                     body: body);

        上面的代碼是之前的例子,exchange: "",表示這是一個默認的或無名的交換機。routingKey: "hello",表示消息將會發送到指定的隊列-hello。

Temporary queues-臨時隊列

        還記得上面我們定義的hello和queue_task兩個隊列嗎?定義個隊列的名稱是非常重要的,我們需要指出在同一隊列中的Consumer。當你需要在Producer和Consumer中分享一個隊列時,隊列的名稱就格外的重要,而且隊列不能重復申明。但是這樣就不符合接下來的日志系統的需求了,我們希望所有隊列都能夠接收到日志消息,而且接收到的是最新的消息。為了滿足日志系統的要求,需要做下面兩件事:

  • 第一,無論何時連接RabbitMQ我們都需要一個新的、空的隊列。因此我們創建隊列時需要一個隨機的名稱,最好系統可以挑選一個隨機的名稱給我們。
  • 第二,一旦Consumer斷開聯機,Queue就應該自動刪除

在RabbitMQ.Client中提供了一個無參申明隊列的方法,我們可以創建一個非持久、獨特的、自動刪除、自動命名的隊列。

var queueName = channel.QueueDeclare().QueueName;

Bindings-綁定

了解如何創建交換機和隊列后。我們現在需要告訴交換機去發送消息給隊列,兩者之間的關系叫做 binding。

 channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); 

代碼整合

        下面是示例代碼,Producer程序看起來與之前的沒啥不同。最大的不同是我們給exchange定義了一個名稱,並清空了routingKey,因為在fanout交換機中,它是可以忽略的。

RabbitMQ.Exchange.Producer.Program.cs:

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.Exchange.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) =>
                {
                    string message = "Log Infoooo!" + " -- " + rd.Next(1000, 10000);
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "logs",
                                        routingKey: "",
                                        basicProperties: null,
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Exchange.Consumer_LogDisk.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.IO;
using System.Text;

namespace RabbitMQ.Exchange.Consumer_LogDisk
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                  exchange: "logs",
                                  routingKey: "");
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    using(var file = File.CreateText("loginfo.txt"))
                    {
                        file.Write(message);
                    }
                    // 記錄消息
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Save {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,       
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.Exchange.Consumer_LogPrint.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQ.Exchange.Consumer_LogPrint
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                  exchange: "logs",
                                  routingKey: "");
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    // 打印消息
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Print {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

        需要注意的是,如果沒有隊列與交換機綁定,那么消息將會丟失。但在日志系統中,這個沒啥問題,如果沒有Consumer接收消息,可以將其丟棄,如果需要確保消息不能丟失的業務請慎用。

        在上面動圖中,先查詢Queue列表信息,打開Consumer_Save,再查詢Queue列表,發現多了一個amq-gen-隨機字符串的Queue。打開Producer開始發送消息,再打開Consumer_Print,再查詢Queue列表,又多了一個。這個時候發現Consumer_Print接收的消息是最新發送的消息,Producer發送完消息就立馬丟棄消息。最后關閉Consumer,再查詢Queue列表,發現創建的隨機Queue自動銷毀。

六、路由

        第五章的內容,用了一個日志系統的例子簡單了解了如果同時廣播消息個多個Consumer。在這章內容中,我們將添加一個特性去實現Consumer只訂閱消息中的一部分。例如,我們只需要將嚴重的錯誤消息記錄在硬盤中,其他所有消息依然都需要打印在控制台上。

Bindings

上幾個例子,我們已經了解了如何創建Bindings:

 channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); 

Binding是交換機與隊列之間的關聯。通俗來說,隊列對來自交換機的消息感興趣。Bindings可以傳遞一個額外的routingKey參數。為了避免與BasicPublish方法中的參數混淆,我們可以叫他 binding key。

 channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "black"); 

Binding的意義取決於交換機的類型,扇形交換機完全忽略了他的價值。

Direct exchange

        上面日志系統的示例將發送所有日志給所有Consumer。我們對它進行擴展,可以基於日志的嚴重程度去過濾日志消息。例如,我們想只要接收到嚴重錯誤日志是,才會記錄至硬盤,不會浪費硬盤空間去記錄哪些信息、警告啥的。

       為了實現上述需求,我們將用direct交換機去替代fanout交換機。隱藏在direct交換機后的路由算法程序是比較簡單的,消息中的routing key與隊列中的binding key匹配時,該消息將會被塞進該隊列中。

        上圖所示,direct交換機與兩個Queue綁定,routing key=organge月Q1綁定,routing key=black、green與Q2綁定,C1與Q1綁定,只接收organge消息,C2與Q2綁定,可以接收到black、green消息。

Multiple bindings

多重綁定,多個Queue綁定同一個路由key是完全沒有問題的。在上上張圖中,我們把通過key black將X與Q2綁定一起,這張圖通過key black 將X 與 Q1、Q2綁定在一起,實現了類似於fanout交換機的功能,廣播消息給所有的隊列。

Emitting logs

我們用direct交換機來改進日志分發系統,將日志嚴重等級作為routing key,將direct exchage與Queue綁定一起。

首先,創建一個交換機

 channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); 

發送消息

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
                     routingKey: severity,
                     basicProperties: null,
                     body: body);

為了簡化程序,我們假設日志嚴重等級為:info、warning、error

Subscribing--訂閱

var queueName = channel.QueueDeclare().QueueName;

foreach(var severity in args)
{
    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: severity);
}

示例代碼如下:

RabbitMQ.DirectExchange.Producer.Program.cs

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.DirectExchange.Producer
{
    class Program
    {
        private static readonly string[] LogLevel = new string[] { "Info", "Debug", "Warn","Error"};
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs_direct", type: ExchangeType.Direct);
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) =>
                {
                    string loglevel = LogLevel[rd.Next(0, 4)];
                    string message = $"[{loglevel}] -- Log Infoooo!";
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "logs_direct",
                                        routingKey: loglevel,
                                        basicProperties: null,
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.DirectExchange.Consumer_LogDisk.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.IO;
using System.Text;

namespace RabbitMQ.DirectExchange.Consumer_LogDisk
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs_direct", type: ExchangeType.Direct);
                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                  exchange: "logs_direct",
                                  routingKey: "Error");// 只接收Error級別的消息,並寫入硬盤
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    using (var file = File.CreateText("loginfo.txt"))
                    {
                        file.Write(message);
                    }
                    // 記錄消息
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Save {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.DirectExchange.Consumer_LogPrint.Program.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQ.DirectExchange.Consumer_LogPrint
{
    class Program
    {
        private static readonly string[] LogLevel = new string[] { "Info", "Debug", "Warn", "Error" };

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs_direct", type: ExchangeType.Direct);
                var queueName = channel.QueueDeclare().QueueName;
                foreach (var item in LogLevel)
                {
                    channel.QueueBind(queue: queueName,
                                  exchange: "logs_direct",
                                  routingKey: item);// 接收所有級別的消息,並打印出來
                }
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Save {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

執行上面的代碼:

兩個隊列,一個只接收Error類型的日志寫入硬盤,另一個接收所有類型的日志,打印在控制台。

七、主題

        上一章節我們優化了日志記錄系統。我們用direct exchange取代了fanout exchange(只能進行虛擬廣播)實現了選擇性接收日志消息。但是它仍然有一定的局限性-他不能基於多個條件進行路由選擇。

        在我們的日志系統,我們可能不僅僅基於日志嚴重程度去進行訂閱消息,還需要根據日志的來源進行分發日志消息。比如不同系統發布的日志需要進行不同的操作。例如,我們只想監聽CRM所有的日志信息,Portal的只監聽錯誤日志信息。為了實現上述需求,我們需要了解更加復雜的交換機 - topic exchange。

Topic exchange

        發送給topic交換機的消息的routing-key有特殊的格式要求,必需是以 點 分隔的字符串。字符串內容可以隨便定義,但通常都是以消息的特性去定義。例如泰康項目中,客戶中心MQ消息:q.tkzj-ykcrm.khzx1。routing-key可以定義的很長,最長為255個字節。binding-key必須也要用同樣一種格式。topic交換機的邏輯與direct交換機類似--使用特定的routing-key發送的消息將被傳送到使用匹配binding-key的所有隊列之中。下面是binding-key的兩種重要的匹配符:

  • *(星號)可以匹配一個單詞。
  • #(井號)可以匹配0個或多個單詞。

如下圖所示:

        如上圖所示,我們將要發送描述動物的消息。routing-key的格式為 <speed>.<color>.<species>。我們創建三個綁定,Q1的bingding-key為*.orange.* 。Q2的bingding-key為 *.*.rabbit 與l azy.#。

總結一下,Q1將接收到所有顏色為橙色的動物消息。Q2將接收到所有兔子類和遲鈍的動物消息。

routing-key為 quick.orange.rabbit 的消息,將會同時發送給Q1與Q2。routing-key為 lazy.orange.elephant 也會同時發送給Q1與Q2。routing-key為 quick.roange.fox的消息,將只會發送給Q1。routing-key為 lazy.brown.fox的消息只會發送給Q2。

如果我們將routing-key設置為 quick.orange.male.rabbit 將匹配不到任何綁定,該消息將會丟失。

代碼整合

其實出去名稱與binding-key不同,topic交換機與direct交換機非常類似,如果binding-key中除去 * 與 #,那topic與direct就完全一樣了。

下面我們將用topic交換機去改進日志系統。我們假設日志消息的routing-key有兩個詞 <facility>.<severity>。

RabbitMQ.DirectExchange.Producer.Program.cs:

using RabbitMQ.Client;
using System;
using System.Text;
using System.Timers;

namespace RabbitMQ.DirectExchange.Producer
{
    class Program
    {
        private static readonly string[] Source = new string[] { "Crm", "Portal", "Interface" };
        private static readonly string[] LogLevel = new string[] { "Info", "Debug", "Warn", "Error" };
        static void Main(string[] args)
        {
            Random rd = new Random();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(
                    exchange: "logs_topic",
                    type: ExchangeType.Topic
                    );
                Timer timer = new Timer();
                timer.Elapsed += new ElapsedEventHandler((object sender, ElapsedEventArgs e) =>
                {
                    string source = Source[rd.Next(0, 3)];
                    string loglevel = LogLevel[rd.Next(0, 4)];
                    string message = $"[{source}][{loglevel}] -- Log Infoooo!";
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "logs_topic",
                                        // routingKey根據排列組合可以有12種
                                        routingKey: string.Concat(source, '.', loglevel),
                                        basicProperties: null,
                                        body: body);
                    Console.WriteLine($"[{DateTime.Now}][Producer Test] Sent {message}");
                });
                timer.Interval = 1000;
                timer.Enabled = true;
                timer.Start();
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.TopicExchange.Consumer_LogPrint.Program.cs:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQ.TopicExchange.Consumer_LogPrint
{
    class Program
    {
        private static readonly string[] Source = new string[] { "Crm", "Portal", "Interface" };
        private static readonly string[] LogLevel = new string[] { "Info", "Debug", "Warn", "Error" };

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs_topic", type: ExchangeType.Topic);
                var queueName = channel.QueueDeclare().QueueName;
                // 打印Crm所有消息
                channel.QueueBind(queue: queueName, exchange: "logs_topic", routingKey: "Crm.*");
                // 打印Portal、接口錯誤消息
                channel.QueueBind(queue: queueName, exchange: "logs_topic", routingKey: "*.Error");
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Save {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

RabbitMQ.TopicExchange.Consumer_LogDisk.Program.cs:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.IO;
using System.Text;

namespace RabbitMQ.TopicExchange.Consumer_LogDisk
{
    class Program
    {
        private static readonly string[] Source = new string[] { "Crm", "Portal", "Interface" };
        private static readonly string[] LogLevel = new string[] { "Info", "Debug", "Warn", "Error" };
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs_topic", type: ExchangeType.Topic);
                var queueName = channel.QueueDeclare().QueueName;
                // 存儲所有系統的錯誤消息
                channel.QueueBind(queue: queueName, exchange: "logs_topic", routingKey: "#.Error");
                Console.WriteLine(" [*] Waiting for logs.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    using (var file = File.CreateText("loginfo.txt"))
                    {
                        file.Write(message);
                    }
                    // 記錄消息
                    Console.WriteLine($"[{DateTime.Now}][Consumer Test] Save {message}");
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
View Code

        上述動圖,我們可以看出控制台打印出所有Crm系統所有的消息以及Portal與接口的錯誤消息;寫入硬盤的Customer接收到了所有系統的錯誤消息。

        上面幾章節內容的示例都是以每個新概念編寫的簡單的代碼,例如direct、topic交換機的代碼中省略了連接管理、錯誤處理、連接恢復、並發性的處理方式,謹記在上生產前應考慮周全。

八、Publisher Confirms--發布者確認

        發布者確認是RabbitMQ的一種去提高發布消息可靠性的擴展。當在通道上啟用發布者確認時,客戶端發布的消息將由代理異步確認,這意味着它們已在服務器端得到處理。在這一章節中我們將使用發布者確認機制去保證發布的消息能夠安全的到達Consumer。這將涉及到幾種發布者確認的策略以及他們的優缺點。

Enabling Publisher Confirms on a Channel--在Channel上開啟發布者確認

發布者確認機制是AMQP協議的一種擴展,所以他不會默認開啟。使用ConfirmSelect方法,啟用channel上的發布者確認機制:

 var channel = connection.CreateModel(); channel.ConfirmSelect(); 

開啟發布者確認機制的方法只使用一次,不需要每條發送的消息都使用。

策略一:單獨發布消息確認

讓我們從使用confirms發布消息的最簡單方法開始,即發布消息並同步等待消息的確認:

while (ThereAreMessagesToPublish())
{
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.BasicPublish(exchange, queue, properties, body);
    // uses a 5 second timeout
    channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
}

        在上面的示例中,我們像往常一樣發布消息,並使用WaitForConfirmsOrDie方法等待消息的確認。消息確認后,該方法立即返回。如果消息在超時時間內沒有得到確認,或者消息是nack-ed(這意味着代理由於某種原因無法處理它),那么該方法將拋出一個異常。異常的處理通常包括記錄錯誤消息和/或重試發送消息。

        這種確認方式十分直接了當,但也有一個缺點:他在等待的同時阻止了后續消息的發布,大大降低了消息發布效率。這種方法不適用於一秒鍾可以發幾百條消息的情景。但是如果消息發布頻率低的話,這種方法還是足夠好的。

策略二:批量發布消息確認

為了優化上面的示例,我們可以按批次去確認消息發布情況。下面以100條一驗證:

var batchSize = 100;
var outstandingMessageCount = 0;
while (ThereAreMessagesToPublish())
{
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.BasicPublish(exchange, queue, properties, body);
    outstandingMessageCount++;
    if (outstandingMessageCount == batchSize)
    {
        channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
        outstandingMessageCount = 0;
    }
}
if (outstandingMessageCount > 0)
{
    channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
}

        與等待單個消息的確認相比,等待一批消息被確認大大提高了吞吐量(對於遠程RabbitMQ節點,可以達到20-30倍)。但有一個缺點是,我們不知道在失敗的情況下到底出了什么問題,所以我們可能必須在內存中保留一整批數據,以便記錄有意義的內容或重新發布消息。而且這個解決方案仍然是同步的,所以它阻止了消息的發布。

策略三:異步處理發布消息確認

代理以異步方式確認已發布的消息,只需在客戶端上注冊一個回調即可收到這些確認的通知:

var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender, ea) =>
{
  // code when message is confirmed
};
channel.BasicNacks += (sender, ea) =>
{
  //code when message is nack-ed
};

有兩個回調:一個用於確認消息,一個用於nack-ed消息(代理可以認為丟失的消息)。兩個回調都有一個對應的EventArgs參數(ea),其中包含:

  • delivery tag:識別已確認或未經確認的報文的序列號。我們將很快看到如何將它與發布的消息關聯起來。
  • multiple:這是一個布爾值。如果為false,則只有一條消息被確認/nack-ed;如果為true,則序列號較低或相等的所有消息都被確認/nack-ed。

在發布消息前,可以通過Channel中的NextPublishSeqNo去獲取消息的序列號:

 var sequenceNumber = channel.NextPublishSeqNo; channel.BasicPublish(exchange, queue, properties, body); 

用Dictionary將消息去序列號關聯起來。我們假設發布的是字符串因為他們容易轉換成字節數組。:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
// ... code for confirm callbacks will come later
var body = "...";
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, body);
channel.BasicPublish(exchange, queue, properties, Encoding.UTF8.GetBytes(body));

發布代碼現在使用字典跟蹤出站消息。當確認到達時,我們需要清理此字典,並在消息被取消時記錄警告:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

void cleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
    if (multiple)
    {
        var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
        foreach (var entry in confirmed)
        {
            outstandingConfirms.TryRemove(entry.Key, out _);
        }
    }
    else
    {
        outstandingConfirms.TryRemove(sequenceNumber, out _);
    }
}

channel.BasicAcks += (sender, ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender, ea) =>
{
    outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
    Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
    cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};
// ... publishing code

        上述示例包含一個回調,當確認到達時,該回調將清理字典。注意這個回調同時處理單個和多個確認。當確認到達時使用此回調(Channel.BasicAcks)。nack-ed消息的回調將檢索消息正文並發出警告。然后,它重新使用先前的回調來清除字典中未完成的確認(無論消息是確認的還是nack的,它們在字典中的對應條目都必須被刪除)。

總結一下,異步發布確認通常需要一下幾個步驟:

  • 提供一個序列號與消息關聯起來:例如使用 ConcurrentDictionary
  • 在Channel上注冊一個確認監聽事件,當發布的消息到達時,無論是ack或者nack-ed,都會觀察到並作出合適的操作,例如記錄nack-ed消息或者重新發布nack-ed消息,同時清理ConsucurrentDictionary中序列號與消息的關聯。
  • 在消息發布之前追蹤序列號。

代碼整合

using RabbitMQ.Client;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text;
using System.Linq;
using System.Threading;

class PublisherConfirms
{
    private const int MESSAGE_COUNT = 50_000;

        public static void Main()
        {
            PublishMessagesIndividually();
            PublishMessagesInBatch();
            HandlePublishConfirmsAsynchronously();
        }

        private static IConnection CreateConnection()
        {
            var factory = new ConnectionFactory { HostName = "localhost" };
            return factory.CreateConnection();
        }

        private static void PublishMessagesIndividually()
        {
            using (var connection = CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var queueName = channel.QueueDeclare().QueueName;
                channel.ConfirmSelect();

                var timer = new Stopwatch();
                timer.Start();
                for (int i = 0; i < MESSAGE_COUNT; i++)
                {
                    var body = Encoding.UTF8.GetBytes(i.ToString());
                    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                    channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
                }
                timer.Stop();
                Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages individually in {timer.ElapsedMilliseconds:N0} ms");
            }
        }

        private static void PublishMessagesInBatch()
        {
            using (var connection = CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var queueName = channel.QueueDeclare().QueueName;
                channel.ConfirmSelect();

                var batchSize = 100;
                var outstandingMessageCount = 0;
                var timer = new Stopwatch();
                timer.Start();
                for (int i = 0; i < MESSAGE_COUNT; i++)
                {
                    var body = Encoding.UTF8.GetBytes(i.ToString());
                    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                    outstandingMessageCount++;

                    if (outstandingMessageCount == batchSize)
                    {
                        channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
                        outstandingMessageCount = 0;
                    }
                }

                if (outstandingMessageCount > 0)
                    channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));

                timer.Stop();
                Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages in batch in {timer.ElapsedMilliseconds:N0} ms");
            }
        }

        private static void HandlePublishConfirmsAsynchronously()
        {
            using (var connection = CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var queueName = channel.QueueDeclare().QueueName;
                channel.ConfirmSelect();

                var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

                void cleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
                {
                    if (multiple)
                    {
                        var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
                        foreach (var entry in confirmed)
                            outstandingConfirms.TryRemove(entry.Key, out _);
                    }
                    else
                        outstandingConfirms.TryRemove(sequenceNumber, out _);
                }

                channel.BasicAcks += (sender, ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
                channel.BasicNacks += (sender, ea) =>
                {
                    outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
                    Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
                    cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
                };

                var timer = new Stopwatch();
                timer.Start();
                for (int i = 0; i < MESSAGE_COUNT; i++)
                {
                    var body = i.ToString();
                    outstandingConfirms.TryAdd(channel.NextPublishSeqNo, i.ToString());
                    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: Encoding.UTF8.GetBytes(body));
                }

                if (!WaitUntil(60, () => outstandingConfirms.IsEmpty))
                    throw new Exception("All messages could not be confirmed in 60 seconds");

                timer.Stop();
                Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages and handled confirm asynchronously {timer.ElapsedMilliseconds:N0} ms");
            }
        }

        private static bool WaitUntil(int numberOfSeconds, Func<bool> condition)
        {
            int waited = 0;
            while(!condition() && waited < numberOfSeconds * 1000)
            {
                Thread.Sleep(100);
                waited += 100;
            }

            return condition();
        }
}
View Code

九、擴展

UI管理工具

該工具是包含在RabbitMQ中插件中的一種,使用前需要開啟:

 rabbitmq-plugins enable rabbitmq_management 

開啟后訪問 http://localhost:15672/

這個時候需要用命令行去創建一個管理員賬號:

# 創建用戶
rabbitmqctl add_user <account> <password>
# 給用戶一個管理員角色
rabbitmqctl set_user_tags <account> administrator

權限管理:

Tag Capabilities
(None) 沒有權限使用管理插件
management 用戶通過消息傳遞協議可以進行任何查找:
  • 列出所有他們可以通過AMQP登錄的虛擬機
  • 查看他們虛擬機中所有的Queue、Exchange、Binding
  • 查看和關閉他們自己的Channel、Connection
  • 查看涵蓋其所有虛擬主機的“全局”統計信息,包括其中其他用戶的活動
policymaker 在management權限的基礎上增加:
  • 查看、創建、刪除他們可以通過AMQP登錄的虛擬機的策略與參數
monitoring 在management權限的基礎上增加:
  • 列出所有虛擬主機,包括無法使用消息傳遞協議訪問的虛擬主機
  • 查看其它用戶的Connection、Channel
  • 查看節點級數據,如內存使用和集群
  • 查看所有虛擬機的全局信息
administrator 在policymaker與monitoring權限的基礎上增加:
  • 查看、創建、刪除虛擬機
  • 查看、創建、刪除用戶
  • 查看、創建、刪除權限
  • 關閉其它用戶的連接

至於UI管理的運用,可以自己動手試一試。

配置文件

由於RabbitMQ安裝包中不包含配置文件,我們可以從官方下載---點擊下載 ,下載后命名為 rabbitmq.conf。

將配置文件放入下圖位置:

修改配置文件后需要重新啟動RabbitMQ節點。

控制命令行

有興趣的可以自行查看他們的功能,例如rabbitmqctl -help,會列出其所有的命令行:

rabbitmqctl add_user <account> <password>   增加用戶

rabbitmqctl set_user_tags <account> <tag> 賦角色

rabbitmqctl delete_user <account> 刪除用戶

rabbitmqctl list_exchanges 列出所有交換機

......

RabbitMQ Management HTTP API

RabbitMQ UI 站點還提供了API的信息 可以訪問 http://localhost:15672/api/index.html 里面包含了 api 的信息及其應用示例。

現在我們簡單的了解一個發送消息的接口:

注釋掉之前RabbitMQ.Exchange.Producer.Program.cs中自動發消息的代碼,由於我們增加了用戶,所以可以通過賬號密碼進行連接(注意:賬號區分大小寫!!!)。

同時運行,Producer、Consumer,打開Manager UI,觀察變化:

點擊 queue_task 進去隊列詳情,可以發送消息,(測試、補發 使用):

 

准備開始調用接口,安裝postman工具,獲取接口授權,授權方式是HTTP基本身份驗證:

輸入參數點擊發送

        雖然API中有發布消息的接口,但RabbitMQ官方不建議這種發布消息的方式,對於高頻率的消息推送,每次都創建一條TCP連接,對性能有不小的影響。當然除此之外還有很多其他的API,大家有興趣可以去看一哈。

 

 

 

 

 

文檔參考 www.rabbitmq.com

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM