工作隊列

在第一篇教程中,我們已經寫了一個從已知隊列中發送和獲取消息的程序。在這篇教程中,我們將創建一個工作隊列(Work Queue),它會發送一些耗時的任務給多個工作者(Works )。
工作隊列(又稱:任務隊列——Task Queues)是為了避免等待一些占用大量資源、時間的操作。當我們把任務(Task)當作消息發送到隊列中,一個運行在后台的工作者(worker)進程就會取出任務然后處理。當你運行多個工作者(workers),任務就會在它們之間共享。
這個概念在網絡應用中是非常有用的,它可以在短暫的HTTP請求中處理一些復雜的任務。
准備
之前的教程中,我們發送了一個包含“Hello World!”的字符串消息。現在,我們將發送一些字符串,把這些字符串當作復雜的任務。我們沒有真是的例子,例如圖片縮放、pdf文件轉換。所以使用 sleep()函數來模擬這種情況。我們在字符串中加上點號(.)來表示任務的復雜程度,一個點(.)將會耗時1秒鍾。比 如”Hello…”就會耗時3秒鍾。
我們對之前教程的send.php做些簡單的調整,以便可以發送隨意的消息。這個程序會按照計划發送任務到我們的工作隊列中。我們把它命名為new_task.php:
$message = empty($argv[1]) ? 'Hello World!' : ' '.$argv[1]; $exchange->publish($message, $routeKey); var_dump("[x] Sent $message");
我們的舊腳本(receive.php)同樣需要做一些改動:它需要為消息體中每一個點號(.)模擬1秒鍾的操作。它會從隊列中獲取消息並執行,我們把它命名為worker.php:
function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); sleep(substr_count($msg,'.')); $queue->ack($envelope->getDeliveryTag()); }
輪詢分發
使用工作隊列的一個好處就是它能夠並行的處理隊列。如果堆積了很多任務,我們只需要添加更多的工作者(workers)就可以了,擴展很簡單。
首先,我們先同時運行兩個worker.php腳本,它們都會從隊列中獲取消息,到底是不是這樣呢?我們看看。
你需要打開三個終端,兩個用來運行worker.php腳本,這兩個終端就是我們的兩個消費者(consumers)—— C1 和 C2。
shell1
$php worker.php
[*] Waiting for messages. To exit press CTRL+C
shell2
$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
第三個終端,我們用來發布新任務。你可以發送一些消息給消費者(consumers):
shell3
$ php new_task.php First message.
shell3
$ php new_task.php Second message..
shell3
$ php new_task.php Third message...
shell3
$ php new_task.php Fourth message....
shell3
$ php new_task.php Fifth message.....
看看到底發送了什么給我們的工作者(workers):
shell1
$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
shell2
$ php worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'
默認來說,RabbitMQ會按順序得把消息發送給每個消費者(consumer)。平均每個消費者都會收到同等數量得消息。這種發送消息得方式叫做——輪詢(round-robin)。試着添加三個或更多得工作者(workers)。
消息響應
當處理一個比較耗時得任務的時候,你也許想知道消費者(consumers)是否運行到一半就掛掉。當前的代碼中,當消息被RabbitMQ發送給 消費者(consumers)之后,馬上就會在內存中移除。這種情況,你只要把一個工作者(worker)停止,正在處理的消息就會丟失。同時,所有發送 到這個工作者的還沒有處理的消息都會丟失。
我們不想丟失任何任務消息。如果一個工作者(worker)掛掉了,我們希望任務會重新發送給其他的工作者(worker)。
為了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)。消費者會通過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,然后RabbitMQ就會釋放並刪除這條消息。
如果消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認為消息沒有被完全處理,然后重新發送給其他消費者(consumer)。這樣,及時工作者(workers)偶爾的掛掉,也不會丟失消息。
消息是沒有超時這個概念的;當工作者與它斷開連的時候,RabbitMQ會重新發送消息。這樣在處理一個耗時非常長的消息任務的時候就不會出問題了。
消息是沒有超時這個概念的;當工作者與它斷開連的時候,RabbitMQ會重新發送消息。這樣在處理一個耗時非常長的消息任務的時候就不會出問題了。 之前的例子中我們使用$queue->ack()。當工作者(worker)完成了任務,就發送一個響應。
function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); sleep(substr_count($msg,'.')); $queue->ack($envelope->getDeliveryTag()); } $queue->consume('callback');
運行上面的代碼,我們發現即使使用CTRL+C殺掉了一個工作者(worker)進程,消息也不會丟失。當工作者(worker)掛掉這后,所有沒有響應的消息都會重新發送。
忘了響應
一個很容易犯的錯誤就是忘了basic_ack,后果很嚴重。消息在你的程序退出之后就會重新發送,如果它不能夠釋放沒響應的消息,RabbitMQ就會占用越來越多的內存。
為了排除這種錯誤,你可以使用rabbitmqctl命令,輸出messages_unacknowledged字段:
``` $ sudo rabbitmqctl listqueues name messagesready messages_unacknowledged Listing queues ... hello 0 0 ...done.
```
消息持久化
如果你沒有特意告訴RabbitMQ,那么在它退出或者崩潰的時候,它將會流失所有的隊列和消息。為了確保信息不會丟失,有兩個事情是需要注意的:我們必須把“隊列”和“消息”設為持久化。
首先,為了不讓隊列丟失,需要把它聲明為持久化(durable):
$queue->setFlags(AMQP_DURABLE);
盡管這行代碼本身是正確的,但是仍然不會正確運行。因為我們已經定義過一個叫hello的非持久化隊列。RabbitMq不允許你使用不同的參數重新定義一個隊列,它會返回一個錯誤。但我們現在使用一個快捷的解決方法——用不同的名字,例如task_queue。
$queue->setName('task_queue'); $queue->setFlags(AMQP_DURABLE); $queue->declare();
這個$queue->declare();必須在生產者(producer)和消費者(consumer)對應的代碼中修改。
這時候,我們就可以確保在RabbitMq重啟之后queue_declare隊列不會丟失。
注意:消息持久化
將消息設為持久化並不能完全保證不會丟失。以上代碼只是告訴了RabbitMq要把消息存到硬盤,但從RabbitMq收到消息到保存之間還是有一 個很小的間隔時間。因為RabbitMq並不是所有的消息都使用fsync(2)——它有可能只是保存到緩存中,並不一定會寫到硬盤中。並不能保證真正的 持久化,但已經足夠應付我們的簡單工作隊列。如果你一定要保證持久化,你需要改寫你的代碼來支持事務(transaction)。
公平分發
你應該已經發現,它仍舊沒有按照我們期望的那樣進行分發。比如有兩個工作者(workers),處理奇數消息的比較繁忙,處理偶數消息的比較輕松。然而RabbitMQ並不知道這些,它仍然一如既往的派發消息。
這時因為RabbitMQ只管分發進入隊列的消息,不會關心有多少消費者(consumer)沒有作出響應。它盲目的把第n-th條消息發給第n-th個消費者。

我們可以使用$channel->qos();方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工作者(worker),直到它已經處理了上一條消息並且作出了響應。這樣,RabbitMQ就會把消息分發給下一個空閑的工作者(worker)。
$channel->qos(0,1);
關於隊列大小
如果所有的工作者都處理繁忙狀態,你的隊列就會被填滿。你需要留意這個問題,要么添加更多的工作者(workers),要么使用其他策略。
整合
new_task.py的完整代碼:
<?php /** * PHP amqp(RabbitMQ) Demo-2 * @author yuansir <yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'task_queue'; $routeKey = 'task_queue'; $message = empty($argv[1]) ? 'Hello World!' : ' '.$argv[1]; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declare(); $exchange->publish($message, $routeKey); var_dump("[x] Sent $message"); $connection->disconnect();
我們的worker:
<?php /** * PHP amqp(RabbitMQ) Demo-2 * @author yuansir <yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'task_queue'; $routeKey = 'task_queue'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declare(); $queue->bind($exchangeName, $routeKey); var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); $channel->qos(0,1); } $connection->disconnect(); function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); sleep(substr_count($msg,'.')); $queue->ack($envelope->getDeliveryTag()); }
使用消息響應和prefetch_count你就可以搭建起一個工作隊列了。這些持久化的選項使得在RabbitMQ重啟之后仍然能夠恢復。
現在我們可以移步教程3學習如何發送相同的消息給多個消費者(consumers)
