一、發布/訂閱
(使用 php-amqplib)
在前一篇教程中我們建立了一個工作隊列。工作隊列假設每個任務被精確的發送給一個工作者。在這部分,我們將做一些完全不同的事情——我們將發送一條消息給多個消費者。這種模式被稱為“發布/訂閱”。
為了說明這種模式,我們將建立一個簡單的日志系統。它將有兩個程序組成——第一個將發送日志信息而第二個將接收並打印它們。
在我們的日志系統中,每個正在運行的接受者程序的副本都將收到消息。那樣我們就能運行一個接收者並將日志寫入磁盤;與此同時我們將運行另一個接收者輸出日志到屏幕。
實際上,被發布的日志消息將被廣播到所有的接收者。
二、交換
在前面的教程中我們發送消息到隊列或從隊列中獲取消息。現在是時候介紹在Rabbit中完整的消息模型了。
讓我們快速重溫一下我們在前面的教程中學到的知識:
生產者:一個發送消息的用戶應用。
隊列:一個存儲消息的緩沖區。
消費者:一個接收消息的用戶應用。
在RabbitMQ里消息模型的核心思想就是生產者絕不直接發送任何消息給隊列。實際上,通常生產者甚至根本不知道一條消息是否將被傳遞到任何隊列中。
相反,生產者僅能發送消息到一個交換。交換是一個非常簡單的事物。一方面它接收來自於生產者的消息,另一方面它推送消息給隊列。交換必須確切地知道如何處理它接收到的消息。它應該被添加到一個特定的隊列里么?它應該被添加到很多隊列里么?還是它應該被丟棄。這種規則由交換類型來定義。
有幾個可用的交換類型:direct,topic,headers和fanout。我們將看最后一個——fanout。讓我們建立一個這種類型的交換,並稱之為logs:
$channel->exchange_declare('logs', 'fanout', false, false, false);
fanout交換非常簡單。或許你能從名字中猜出來,它僅廣播所有的它收到的消息到所有它知道的隊列中。這正是我們的日志系統需要的。
列出交換
你可以運行永遠都很有用的rabbitmqctl,列出服務器端的交換:
sudo rabbitmqctl list_exchanges在這個列表里有一些amq.*交換和默認(未命名)交換。這些被默認建立,但是此時你還不可能會需要使用它們。
默認交換
在前面部分的教程中我們還不知道關於交換的任何事情,但是仍然能發送消息到隊列。因為我們正在使用默認的交換,我們通過空字符串("")來指定,所以這是可能的。
回想一下之前我們是如何發布一條消息的:
$channel->basic_publish($msg, '', 'hello');這里我們使用默認或無名的交換:消息被發到由routing_key指定的隊列中,如果它存在。路由鍵是basic_public的第三個參數
現在,我們能夠發布消息到我們命名的交換:
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');
三、臨時隊列
也許你還記得先前我們使用的隊列有一個指定的名字(記着hello和task_queue么?)。對於我們來說能命名一個隊列是至關重要的——我們需要將工作者指給相同的隊列。當你想要在生產者和消費者之間分享隊列時,給隊列一個名字是重要的。
但是我們的日志系統不是這樣的情況。我們想要監聽所有的日志消息,而不僅是它們中的子集。我們對只對當前流轉的消息感興趣而不是舊的。解決這個我們需要兩件東西。
第一,無論何時我們連接到Rabbit,我們都需要一個新的,空的隊列。做這個我們可以建立一個隨機名字的隊列,或者,更好的——讓服務器為我們選一個隨機的隊列名。
第二,一旦我們斷開了消費者,隊列應該自動被刪除。
在php-amqplib客戶端,當我們提供隊列名一個空字符串時,我們建立了一個非持久的,被生成名稱的隊列。
list($queue_name, ,) = $channel->queue_declare("");
當方法返回時,變量$queue_name包含一個被RabbitMQ生成的隨機隊列名。例如,它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
當聲明它的連接關閉時,隊列將被刪除,因為它被定義為排他的。你可以在隊列向導中學習更多的關於排他標記和其它隊列屬性。
四、綁定
我們已經建立了一個fanout類型的交換和一個隊列。現在我們需要讓交換發送消息到我們的隊列。交換隊列之間的關系被稱為綁定。
$channel->queue_bind($queue_name, 'logs');
從現在開始日志交換將添加消息到我們的隊列。
列出綁定
你可以列出已存在的綁定,使用……,你猜到了
rabbitmqctl list_bindings
五、放在一起
生產者程序,發送日志消息,看起來和前一篇教程中的沒有多少不同。最重要的改變是我們現在想要發布消息到我們的日志交換而不是未命名的那個。這里給出emit_log.php的編碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
?>
正如你所看到的,在建立連接之后我們定義交換。這一步是必要的,因為不允許發送消息到一個不存在的交換。
如果還沒有隊列綁定到交換,消息將丟失,但是這對我們沒什么。如果還沒有消費者在監聽,我們可以安全的丟棄這條消息。
receive_logs.php的代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
如果你想保存日志為一個文件,只需打開終端輸入:
php receive_logs.php > logs_from_rabbit.log
如果你想在屏幕上看日志,打開一個新的終端並運行:
php receive_logs.php
當然,發送日志,輸入:
php emit_log.php
使用rabbitmqctl list_bindings你可以驗證代碼實際上像我們想的那樣建立了綁定和隊列。運行兩個receive_logs.php程序時,你應該看到如下所示:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
結果的解釋是明確的:來自於交換的日志數據進入了兩個由服務器分配的名字的隊列。這正是我們想要的。
要了解怎樣監聽消息的子集,讓我們繼續看第四課。
原文:http://www.rabbitmq.com/tutorials/tutorial-three-php.html
