一、工作隊列
(使用 php-amqplib)
在第一篇教程中我們寫程序從一個命名隊列中發送和接收消息。在這篇中,我們將建立一個在多個工作者之間用於分發耗時任務的工作隊列。
工作隊列(也稱為:任務隊列)背后的主要思想是避免立即做一項資源密集型任務並且不得不等候它完成。而是我們計划這個任務在稍后被完成。我們封裝一個任務為一條消息並且發送它到一個隊列。一個在后台運行的工作進程將立即獲取這個任務並最終執行它。當你運行多個工作進程時,任務將在它們之間被分配。
這個概念在web應用中尤其有用,在一個短HTTP請求窗口中,處理一項復雜的任務是不太可能的。
准備
在教程的前一篇中,我們發送一條包含“Hello World!”的消息。現在我們將發送字符串來代表復雜任務。我們沒有一個真實的任務,類似於圖片大小被調整或pdf文件被渲染,因此讓我們來假裝這個任務,通過偽裝我們正忙——利用sleep()函數。我們將用字符串中的逗號數量作為它的復雜性。每個逗號將占用一秒的工作。一項被描述為Hello...的假裝的任務將占用三秒鍾。
我們稍微修改一下我們先前例子send.php的代碼,允許從命令行發送任意的消息。這個程序將把任務發送到我們的工作隊列中,所以我們命名它為new_task.php:
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent ", $data, "\n";
我們舊的receive.php腳本也需要一些改變:它需要偽裝在消息體內每一個逗號有一秒鍾的工作。它將從隊列里獲取信息並且執行任務,所以我們稱它為worker.php:
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
注意我們的偽裝任務模擬執行時間。
像在第一篇教程講述的一樣運行它們:
# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."
二、循環調度
使用任務隊列的優點之一就是能夠並行工作。如果我們正在建立一項積壓的工作,我們只要添加更多的工作者就可以輕松地擴大規模。
首先,讓我們試着同時運行兩個worker.php腳本。這兩個都將得到來自隊列里的信息,但具體怎樣?讓我們看看。
你需要打開三個控制台程序。兩個將運行worker.php程序。這些控制台程序將使我們的兩個消費者——C1和C2。
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
在第三個控制台里,我們將發布新的任務。一旦你啟動消費者你就能發布一些信息了:
# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....
讓我們看看有什么被傳給了我們的工作者:
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默認情況下,RabbitMQ將依次發送每一條消息到下一個消費者,平均每個消費者將得到相同數量的消息。這種分發消息的方式就被稱為輪詢。試試用三個或更多的工作者。
三、消息確認
做一項任務會花幾秒鍾時間。你可能會想如果其中的一個消費者執行一項長時間的任務而只執行了一部分就死掉了那會怎樣。在我們目前的代碼中,一旦RabbitMQ傳遞一條消息給消費者后,它立即就會標記這條消息為刪除。在這種情況下,如果你結束掉一個工作者,我們將丟失它正處理的消息。我們也會丟失被分發到這個工作者而尚未被處理的所有消息。
但是我們不想丟失任何任務。如果一個工作者死掉了,我們會想讓這個任務交給另一個工作者。
為了確保一條消息不會丟失,RabbitMQ支持消息確認機制。一個ack(acknowledgement)被消費者發回以告知RabbitMQ一條特定的消息已被接收並處理,RabbitMQ可以刪除它了。
如果一個消費者死掉(它的通道被關閉,連接也被關閉,或者TCP連接丟失)沒有發送一個ack,RabbitMQ就會知道,一條消息沒有被完全處理,則會將這條消息將重新排入隊列。如果有其它的消費者同時在線,那么它將會迅速的重新傳遞這條消息給另一個消費者。這樣你就可以確信沒有消息被丟失,即使工作者偶爾死掉。
如果沒有任何消息超時,當消費者死掉時,RabbitMQ將重新傳送消息。這樣即使處理一條消息要花很長很長時間也沒事。
消息確認機制默認是被關閉的。現在是時間將它們設置為打開了,通過設置 basic_consume的第四個參數為false(true是不ack),然后,一旦我們完成一項任務就從工作者發送一個合適的確認。
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
使用這些代碼我們能確定即使在一個工作者正在處理一條消息時,你用CTRL+C結束掉這個進程,也沒有什么丟失。這個工作者死掉后的不久所有未應答的消息將會重新被投遞。
被忘記的確認
忘記確認是一個常見的錯誤。雖然這是一個簡單的錯誤,但是后果是嚴重的。當你的客戶端退出的時候消息將會被重新投遞(這可能看起來像是隨機的重新投遞),但是因為不能釋放任何未被確認的消息,RabbitMQ將會消耗越來越多的內存。
為了調試這種類型的錯誤,你可以使用rabbitmqctl來打印 messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在Windows系統上,去掉sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
四、消息持久化
我們已經學習了怎樣確保即使消費者死亡任務也不會丟失。但是如果RabbitMQ服務停止了,我們的任務將仍然會被丟失。
當RabbitMQ退出或崩潰時它將忘記隊列和消息除非你讓它不要這么做。要確保消息不被丟失,有兩件事被要求去做:我們需要標記隊列和消息為持久的。
第一,我們需要確保RabbitMQ不會丟失我們的隊列。為了這樣,我們需要定義它為持久的。要這么做,我們就需要傳遞第三個參數為true到queue_declare:
$channel->queue_declare('hello', false, true, false, false);
盡管這條命令本身是正確的,但是在我們目前的設置中是不起作用的。這是因為我們已經定義了一個叫做hello的而不是持久的隊列。RabbitMQ不允許你用不同的參數重新定義一個已經存在的隊列,這將返回一個錯誤到任何這么做的程序中。但是有一個快速的解決辦法——我們來用不同的名字定義一個隊列,例如task_queue:
$channel->queue_declare('task_queue', false, true, false, false);
這個設置為true的標志需要應用到生產者和消費者代碼中。
這時我們就能確信即使RabbitMQ重新啟動,task_queue隊列也不會被丟失。現在我們需要標記我們的消息為持久的——通過設置deliver_mode = 2消息屬性,用作AMQPMessage屬性數組的一部分。
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
關於消息持久的注意事項
標記消息為持久也不能完全保證消息不被丟失。盡管這使得RabbitMQ保存消息懂啊磁盤,也仍然會有一個間隙窗口當RabbitMQ已經接收一條消息而尚未保存它時。再者,RabbitMQ不會為每一條消息調用fsync(2) ——它可能僅被保存在緩存中而不是真的寫到磁盤上。雖然持久的保證不是很強,但是對於我們的簡單任務隊列已經是足夠了。如果你需要一個更強的保證,那么你可以使用 publisher confirms。
五、公平調度
你大概已經注意到了調度仍然不像我們想的那樣工作。例如,在有兩個工作者的情況下,當所有的奇數消息是重量級的而偶數消息是輕量級的,一個工作者將一直處於繁忙狀態而另一個將幾乎沒有任何工作。對於這種情況RabbitMQ一無所知,仍然均勻地分發消息。
發生這種情況是因為RabbitMQ只在消息進入隊列時才調度消息。它不看一個消費者未確認的消息數量。它僅僅盲目地分發每一個第n條消息到第n個消費者。

為了避免這種情況,我們可以使用設置prefetch_count = 1 的basic_qos方法 。這會讓RabbitMQ不會一次去分配多余一條消息給工作者。或者,換句話說,不分發一條新的消息給一個工作者直到這個工作者已經處理完並且確認了前一條消息。轉而分發消息到下一個不忙的消費者。
$channel->basic_qos(null, 1, null);
關於隊列大小的注意事項
如果所有工作者都很忙,你的隊列可以填滿。你會想要關注這個,可能添加更多的工作者,或者有一些其它策略。
六、合在一起
我們的new_task.php文件的最終代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
?>
我們的worker.php文件:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
使用消息確認和預取你能設置一個工作隊列。持久選項讓任務存活即使RabbitMQ被重啟。
現在我們能夠前往下一篇文章,學習怎樣傳遞相同的消息給多個消費者。