RabbitMQ指南(C#)(二)工作隊列


上一節我們實現了向指定的隊列發送和接收消息。這一節,我們主要講工作隊列,用於在多個消費者之間分配置實時任務。

工作隊列方式主要是為了防止在執行一個耗費資源的任務時,要等待其結束才能處理其它事情。我們將任務的執行延遲,將其封裝成一個消息,然后發送給一個列隊。后台再運行一個程序從隊列里取出消息,然后執行任務。如果有多個消費者,還可以分享任務。

對於Web應用程序來說,這樣就可以使用Http的短請求來處理復雜的業務。

准備

我們發送一個字符串來代表復雜的任務,然后用Thread.Sleep()來模擬耗時操作,使用點的個數來代表任務的復雜度,一個點代表一秒種的工作,例如Hello…代表一個耗時三秒的任務。

修改上一節示例中Send.cs的代碼,允許從命令輸入任意消息,然后發送到工作隊列,我們將其命名為NewTask.cs。

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

channel.BasicPublish(exchange: "",
                     routingKey: "task_queue",
                     basicProperties: properties,
                     body: body);

GetMessage方法獲取輸入的消息

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

修改上一節中的Receive.cs,按每個點代表一個秒模擬耗時任務,處理來自RabbitMQ的消息並執行任務,我們將其命名為Worker.cs

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);

模擬任務執行的時間

int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);

編譯運行

循環調度

使用工作隊列的好處是很容易實現平分工作。如果要執行的工作會造成積壓,只需要運行多個工作者就可以了。

首先,運行兩個控制台程序執行Worker.cs,他們都從隊列里取消息。這兩個控制台程序就是消息者,分別是C1,C2。

再運行一個控制台程序執行NewTask.cs發布任務。啟動消費者之后,在控制輸入以下消息內容發到隊列:

shell3$ NewTask.exe First message.
shell3$ NewTask.exe Second message..
shell3$ NewTask.exe Third message...
shell3$ NewTask.exe Fourth message....
shell3$ NewTask.exe Fifth message.....

然后看看工作都的接收內容:

C1

[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

C2

[*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

RabbitMQ默認按順序發送消息到每一個消費者,每個消費者會平均取到相同數量的消息,這種分布式消息方式稱做輪循調度。也可以添加更多的消費者。

消息確認

執行一個任務需要花費一些時間,那么你可能會考慮如果一個消費者正執行一個耗時的任務時突然崩潰了怎么辦。我們目前的代碼,一旦RabbitMQ發送消息給消費者,消費會立即被刪除。在這種情況下,如果中止一個工作者,就會丟失正在處理的消息,還有當前消費者已接收但還沒有被處理的消息也會丟失。

但是我們不想丟失任何任務,如果某一個工作者停止了,我們希望任務會被發送到其它工作者。

為了保證消息不會丟失,RabbitMQ提供了消息確認機制(acknowledgments)。當消息被接收並處理之后,消費者發送一個ack到RabbitMQ,告訴它可以自由刪除該消息了。

如果消費者停止了,沒有發送ack確認,那么RabbitMQ會認為這個消息沒有被處理,它會將這個消費重新入隊。如果還有其它的消費者的話,它就會立馬把這個消息發送給另一個消費者去處理。這樣即使消費者偶爾中止了,那么也不會造成消息丟失。

任何消息都不會超時,如果消費者崩潰了,RabbitMQ會重新發送消息,即使消息處理要發花費很長時間也不會有問題。

消息確認機制默認是開着的。前邊的例子都通過設置noAck為true來關閉了消息確認。現在設置noAck為false,當任務執行完成后,工作者發送確認消息。

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);
 
    Console.WriteLine(" [x] Done");

    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);

上邊的代碼可以確保工作者正在處理消息時崩潰的話,也不會丟失任何消息。所有未經確認的消息都會重新發送。

消息持久化

我們已經知道如何確認工作者崩潰時,任務也不會丟失。但是如查RabbitMQ也停止的話,任務同樣會丟失。

如果RabbitMQ退出或崩潰了,默認就會清除隊列和消息。要保證消息不丟失,需要配置隊列和消息都要持久化。

首先要保證RabbitMQ不會丟失隊列,這需要聲明隊列為持久隊列

channel.QueueDeclare(queue: "hello",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

從語法上講,上邊的代碼沒有什么問題,但是這樣並不會起作用,因為我們這前已經聲明了名為hello的隊列,RabbitMQ不允許以不同的參數重新定義已經存在的隊列,這樣會拋出異常。所以要換個隊列名稱,如task_queue

channel.QueueDeclare(queue: "task_queue",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

queueDeclare 的修改要在生產者和消費者兩邊都進配置。

設置隊列持久化之后,需要再設置消息為持久化。

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

平衡調度

你可能注意到,目前的分發機制仍然不理想。例如,如果有兩個工作者,當奇數的消息十分復雜,而偶數的消息很簡單時,一個工作者就會非常繁忙,而另一個工作者幾乎沒有任務可做。但RabbitMQ不知道這種情況,仍然會平均分配任務。

因為只要有消息進入隊列,RabbitMQ就會分發消息。它不會檢查每一個消費者未確認的消息個數。它只是盲目的將第N個消息發送給第N個消費者。

為了防止這種情況,要使用basicQos 方法,並設置prefetchCount 參數的值為1。這樣RabbitMQ就不會同時發送多個消息給同一個工作者。也就是說,在工作者處理並確認前一個消息之前,不會分發新的消息給工作者。它會把消息發送給下一個不忙的工作者。

channel.BasicQos(0, 1, false);

完整代碼

NewTask.cs代碼

using System;
using RabbitMQ.Client;
using System.Text;

class NewTask
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "task_queue",
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.SetPersistent(true);

            channel.BasicPublish(exchange: "",
                                 routingKey: "task_queue",
                                 basicProperties: properties,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
    }
}

Worker.cs代碼

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;

class Worker
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "task_queue",
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);

                Console.WriteLine(" [x] Done");

                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: "task_queue",
                                 noAck: false,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

使用消息確認機制和BasicQos 就可以建立工作隊列,用持久化選項可以保證即使RabbitMQ重啟也不會丟失任務。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM