《RabbitMQ Tutorial》譯文 第 2 章 工作隊列


原文來自 RabbitMQ 英文官網教程(2.Work Queues),其示例代碼采用了 .NET C# 語言。

Markdown

In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.

在第一篇教程中,我們編寫了程序從一個具名(已明確命名的)隊列中發送和接收消息。在這一篇中,我們會在多個工作單元之間創建一個工作隊列來分配耗時的任務。

Markdown

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

工作隊列(又稱:任務隊列)背后的主旨是為了避免立即執行一項資源密集型任務,從而導致不得不等待它的完成。相反,我們安排任務稍后再完成。我們將任務封裝成一個消息並發送給隊列,由一個后台工作進程負責彈出該任務並最終執行這項工作。如果有多項工作單元在同時運行,這些任務會在它們之間平均分配。

This concept is especially useful in web applications where it's impossible to handle a complex task during a short HTTP request window.

上述這一概念在 Web 應用程序中尤其有用,因為在一個簡短的 HTTP 請求視窗中幾乎不可能處理一項復雜任務。

Preparation

准備事宜

In the previous part of this tutorial we sent a message containing "Hello World!". Now we'll be sending strings that stand for complex tasks. We don't have a real-world task, like images to be resized or pdf files to be rendered, so let's fake it by just pretending we're busy - by using the Thread.Sleep() function (you will need to add using System.Threading; near the top of the file to get access to the threading APIs). We'll take the number of dots in the string as its complexity; every dot will account for one second of "work". For example, a fake task described by Hello... will take three seconds.

在本教程的之前部分,我們發送了一個包含"Hello World!"的消息。現在,我們將要發送一個代表復雜任務的字符串。我們並沒有一個現實世界的任務,諸如圖片尺寸修整,或者 pdf 文件的渲染,所以讓我們通過偽造忙碌來模擬它,使用 Thread.Sleep() 函數即可(你需要在文件的頂部追加 using System.Threading 命名空間)。我們會采取點的字符數量來表達復雜性,每一個點將表明一秒鍾的“工作”,比如模擬任務被描述為“Hello...”時就表示運行了 3 秒鍾。

We will slightly modify the Send program from our previous example, to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue, so let's name it NewTask:

我們會在之前的示例中輕微地修改發送程序,以允許任意的消息可以從命令行中被發送。該程序會安排任務至工作隊列,所以讓我們給它命名為 NewTask 吧:

Like tutorial one we need to generate two projects.

如同第一篇教程,我們需要生成兩個工程項目。

dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
dotnet restore
cd ../Worker
dotnet add package RabbitMQ.Client
dotnet restore
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

channel.BasicPublish(exchange: "",
                     routingKey: "task_queue",
                     basicProperties: properties,
                     body: body);

Some help to get the message from the command line argument:

一些從命令行參數獲取消息的幫助類:

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

Our old Receive.cs script also requires some changes: it needs to fake a second of work for every dot in the message body. It will handle messages delivered by RabbitMQ and perform the task, so let's copy it to the Worker project and modify:

我們在舊的 Receive.cs 代碼中也需要做一些改變:它需要在消息體中針對每一個點模擬一秒鍾刻度的工作,同時會處理經由 RabbitMQ 遞送過來的消息以及運行任務,所以讓我們先把代碼復制到工程項目,並做一些修改:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);

Our fake task to simulate execution time:

我們開始模擬一下仿真執行時間:

int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);

Round-robin dispatching

循環分發

One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.

使用任務隊列的其中一項優勢就是可以很容易地開展並行工作。如果(隊列中)出現大量地工作積壓,我們可以通過該途徑增添更多的工作單元,所以擴展規模很方便。

First, let's try to run two Worker instances at the same time. They will both get messages from the queue, but how exactly? Let's see.

首先,讓我們嘗試同時運行兩個工作實例。他們將同時從隊列中獲取消息,不過結果將會如何呢?一起拭目以待吧。

You need three consoles open. Two will run the Worker program. These consoles will be our two consumers - C1 and C2.

你需要開啟 3 個控件台,其中兩個將用來運行工作程序,它們將成為消費者 - C1 和 C2。

# shell 1
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C

In the third one we'll publish new tasks. Once you've started the consumers you can publish a few messages:

在第 3 個控制台,我們將發布一個新任務。你一旦啟動消費者程序,新任務就可以發布一些信息了:

# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."

Let's see what is delivered to our workers:

讓我們一起看看到底遞送了什么消息給工作單元:

# shell 1
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

默認情況下,RabbitMQ 會依次給下一個消費者逐個發送消息。平均算下來,每一個消費者將獲得相同數量的消息,這種分發消息的方式就被稱作循環。好了,試一試開啟 3 個或更多的工作單元吧。

Message acknowledgment

消息確認

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

執行一項任務會用去一些時間,如果其中一個消費者啟動了一個長任務並且執行到一部分就中止,你可能想知道究竟發生了什么。鑒於我們當前的代碼,一旦 RabbitMQ 遞送消息給消費者,它會立即將消息標記為刪除。這樣的話,如果你中止一個工作單元你將會失去正在處理中的消息。同時,我們也會失去所有已分發到當前指定工作單元中還沒來得及處理的消息。

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

但是,我們並不希望失去任何任務,如果一個工作單元中止了,我們希望這個任務會被遞送給另一個工作單元。

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

為了確保一個消息永遠不會丟失,RabbitMQ 提供了消息確認。消費者將回發一個 ack 標識來告知 RabbitMQ 指定的消息已被接收和處理,然后就可以放心的刪除它。

If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

如果一個消費者中止了(信道被關閉、連接被關閉,或者是 TCP 連接丟失),導致沒有發送 ack 標識,RabbitMQ 將領會到該消息沒有被完全處理,隨后將對其重新分配。如果有其他的消息者同時在線,RabbitMQ 會迅速的重新遞送該任務給另一個消息者。通過該方式,你就可以確信沒有消息會被遺漏,即使這些工作單元偶然地中止。

There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.

沒有任何消息會出現超時,RabbitMQ 會在消費者中止時重新遞送消息。即使運行一個消息會花去非常非常長的時間,它仍然可以運行良好。

Manual message acknowledgments are turned on by default. In previous examples we explicitly turned them off by setting the autoAck ("automatic acknowledgement mode") parameter to true. It's time to remove this flag and manually send a proper acknowledgment from the worker, once we're done with a task.

默認情況下,手動的消息確認是打開的。在之前的例子里,我們通過設置“自動確認”為 true 值來顯式的關閉了手動機制。現在是時候刪除這個(自動)標記了,一旦我們的工作單元完成一個任務的時候,就手動地從工作單元發送一個恰當的確認。

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine(" [x] Done");

    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.

使用這段代碼后,我們可以確信即使你使用 CTRL + C 命令中止一個正在處理消息的工作單元也不會丟失什么。這樣,在工作單元中止不久,所有未被確認的消息將會被重新遞送。

Forgotten acknowledgment
被遺忘的確認

It's a common mistake to miss the BasicAck. It's an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.

一個較為常見的錯誤是忽視了 BasicAck,盡管這種錯誤很低級,但后果相當嚴重。當客戶端退出時,消息會被重新遞送(可能看起來像是隨機地重新遞送),但是 RabbitMQ 會因為無法釋放那些未經確認的消息而吃掉越來越多的內存。

In order to debug this kind of mistake you can use rabbitmqctl to print the messages_unacknowledged field:

為了調試這種類型的錯誤,你可以使用 rabbitmqctl 命令來打印 messages_unacknowledged 字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

On Windows, drop the sudo:

在 Windows 平台上,釋放(執行)該 sudo 命令:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Message durability

消息持久性

We have learned how to make sure that even if the consumer dies, the task isn't lost. But our tasks will still be lost if RabbitMQ server stops.

我們已經學習了如何確保即使消費者(意外)中止時,也可以讓任務不會丟失。但是在 RabbitMQ 服務端停止時,我們的任務仍然會丟失。

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

當 RabbitMQ 退出或者崩潰時它將遺失隊列和消息,除非你明確告知它不要這么做。做好兩件必要的事情,也就是將隊列和消息標記為可持久的,這樣就可以確保消息不會遺失。

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

首先,我們需要確保 RabbitMQ 不會丟失隊列,為了做到這一點,我們需要聲明它是可持久的:

channel.QueueDeclare(queue: "hello",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

Although this command is correct by itself, it won't work in our present setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for example task_queue:

盡管這個命令的本身是正確的,但在我們當前的設置中仍不能正常工作,那是因為我們已經定義過一個未被持久化的名叫“hello”的隊列(參照第一章提到的冪等性)。
RabbitMQ不允許采用不同參數重新定義一個已存在的隊列,任何程序試圖這么做的話將被返回一個錯誤。不過倒是有一個變通方案,讓我們來聲明一個不同名稱的隊列就好了,比如叫“task_queue”:

channel.QueueDeclare(queue: "task_queue",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

This queueDeclare change needs to be applied to both the producer and consumer code.

該隊列聲明的變更需要同時作用於生產者和消費者兩處的代碼(參考第一章中 Receiving 這一節提到的“嘗試從隊列中消費消息時,確保隊列總是已存在的”,因為無法保障會先打開哪一個終端,所以該隊列聲明的代碼要寫兩處)。

At this point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting IBasicProperties.SetPersistent to true.

此時此刻,我們可以確定即使 RabbitMQ 重啟了,名為“task_queue”的隊列也不再丟失。現在我們需要通過設置 IBasicProperties.SetPersistent 的值為 true,從而標識消息為可持久的。

var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Note on message persistence
注意消息的持久

Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.

將消息標識為持久並不能完全保證一個消息也不會被丟失。盡管該標識告訴 RabbitMQ 要將消息保存到磁盤,但是當 RabbitMQ 已經接到一個消息卻尚未保存它之際,將仍然有一個很小的時間窗口。另外,很可能這還只是保存到了緩存而未實實在在地寫入到磁盤。盡管該持久保障措施還不是很強,但對於我們簡單的任務隊列已經是綽綽有余。如果你需要一個更強大的保障,可以使用發布者確認機制。

Fair dispatch

公平分發

You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.

你可能也注意到了,這種分發模式也不如我們所期望。比如在有兩個工作單元的情景下,(並有多條消息相繼而來),假設奇數項的消息比較冗繁,而偶數項的消息相對輕巧些。這樣,其中一個工作單元將會持續地繁忙,而另一個工作單元則幾乎不做任何事情。然而,RabbitMQ 並不知情而且還會繼續朝奇數方向分發消息。

This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

之所以發生這樣的情形,是因為當消息進入到隊列時 RabbitMQ 就開始分發,而忽視了消費者這邊未確認消息的數量,它只是盲目地向第 n 個消費者分發每一條消息。

Markdown

In order to change this behavior we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

為了改變這種行為,我們可以使用 BasicQos 方法的 prefetchCount = 1 設置。它將告訴 RabbitMQ 向工作單元分發消息時一次不要超過一個。或者換一句話來講,直到一個工作單元已處理完成並確認過上一個消息時,才把消息發送給它。反之,RabbitMQ 會把消息分發給下一個並不繁忙的工作單元。(從而達到公平分發的效果。)

channel.BasicQos(0, 1, false);
Note about queue size
注意隊列大小

If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.

如果所有的工作單元都很繁忙,你的隊列將會被填滿,這時就需要你密切注視它,也許可以添加更多的工作單元,或者采取其他的策略。

Putting it all together

融合一起

Final code of our NewTask.cs class:

NewTask.cs 類文件的最終代碼:

using System;
using RabbitMQ.Client;
using System.Text;

class NewTask
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "task_queue",
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            channel.BasicPublish(exchange: "",
                                 routingKey: "task_queue",
                                 basicProperties: properties,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
    }
}

(NewTask.cs source)

(NewTask.cs 源文件)

And our Worker.cs:

Worker.cs 類文件:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;

class Worker
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "task_queue",
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);

                Console.WriteLine(" [x] Done");

                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: "task_queue",
                                 autoAck: false,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

(Worker.cs source)

(Worker.cs 源文件)

Using message acknowledgments and BasicQos you can set up a work queue. The durability options let the tasks survive even if RabbitMQ is restarted.

使用消息確認和 BasicQos,你可以建立起一個工作隊列。通過持續化選項,即使 RabbitMQ 重啟也可以讓任務繼續存活。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM