我們有一個小說系統,每天會有很多作者發布新的小說內容,而讀者因為個人愛好可能只訂閱他喜歡的類型的小說,比如歷史類、玄幻類小說。小說系統每天會根據用戶的口味推送相關的小說更新消息,這就用到了消息發布和訂閱系統。
本文將結合實例介紹PHP操作RabbitMQ實現消息發布和訂閱功能,本文假設您已經安裝好RabbitMQ,開放了對應的端口,且安裝了php-amqplib。前置文章閱讀:《在CentOS7系統安裝與配置RabbitMQ》、《PHP與RabbitMQ消息隊列》。
概念
我們先來了解幾個概念:
交換器(Exchanges)
RabbitMQ消息傳遞模型的核心思想是,生產者不發送任何信息直接到隊列。事實上,生產者甚至不知道消息是否會發送到任何隊列。生產者只能向交換器發送消息(也叫交換機,默認交換器使用""空字符標記)。交換器需要知道如何處理接收的消息,將消息推入到指定的隊列中,決定消息是否入列和拋棄。如下圖,P表示消息發布者,X表示交換機,Q1和Q2表示不同的隊列。
交換類型
交換機有幾種類型:direct, topic, headers 和 fanout。
fanout:廣播訂閱,向所有的消費者發布消息。每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每台子網內的主機都獲得了一份復制的消息。fanout 類型轉發消息是最快的。
direct:消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的消息,不會轉發“dogA”,也不會轉發“dogB”等等。它是完全匹配、單播的模式。
topic:topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“*”。#匹配0個或多個單詞,*匹配不多不少一個單詞。
headers類型的交換器基本不用,本文忽略。
舉例:以下代碼,發布者向名叫msg的交換器發布廣播消息,全體消費者都能收到相同的消息。
$channel->exchange_declare('msg', 'fanout', false, false, false);
綁定(Bindings)
交換器和隊列之間的對應關系稱為綁定,可以理解為,隊列對來自此交換器的消息感興趣。
以下代碼表示將隊列綁定到名叫article的交換器上。
$channel->queue_bind($queue_name, 'article');
路由鍵
綁定可以采取額外的routing_key參數。避免混淆和$channel::basic_publish參數我們要叫它綁定key。這就是我們如何用鍵創建綁定的原因:
$routerKey = 'abc'; $channel->queue_bind($queueName, $exchange, $routerKey);
消息發布
我們創建發布者文件publish_direct.php,指定交換機為article,類型為direct,我們只允許訂閱了對應類型小說文章的消費者才可以消費對應的小說文章消息。我們將向消費者發布四個類型的小說文章消息:fantasy(玄幻),military(軍事),history(歷史),romance(言情)。
以下代碼模擬了發布者發布100條隨機消息:
<?php /** * @發布消息 * @Author: Helloweba * @publish_direct.php */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $exchange = 'article'; $connection = new AMQPStreamConnection('192.168.0.100', 56720, 'helloweba', 'helloweba', 'test'); $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, false, false); for ($i = 0; $i < 100; $i++) { $cate = ['fantasy', 'military', 'history', 'romance']; $key = array_rand($cate); $arr = [ 'id' => 'message_' . $i, 'content' => 'helloweba '. $cate[$key] ]; $data = json_encode($arr); $msg = new AMQPMessage($data); $channel->basic_publish($msg, $exchange, $cate[$key]); echo 'Send '.$cate[$key].' message: ' . $data . PHP_EOL; } $channel->close(); $connection->close();
消息訂閱
現在我們建立訂閱者文件subscribe_direct.php,指定交換機為article,路由鍵為fantasy,意為只訂閱玄幻小說類消息,代碼如下:
channel(); $channel->exchange_declare($exchange, 'direct', false, false, false); list($queueName, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queueName, $exchange, $routerKey); echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL; $callback = function ($msg) { //echo " Received message:", $msg->body, PHP_EOL; echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL; sleep(1); //模擬耗時執行 }; $channel->basic_consume($queueName, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
接着再新建訂閱者文件subscribe_direct_2.php,復制粘貼subscribe_direct.php文件的代碼,並將路由鍵改為history,意為只訂閱歷史小說類消息
模擬測試
好了,現在我們打開兩個終端,分別執行兩個訂閱者程序:
php subscribe_direct.php //client1訂閱玄幻小說類消息 php subscribe_direct_2.php //client2訂閱歷史小說類消息
再另開啟一個終端,執行發布者程序:
php publish_direct.php
現在你應該可以看到如圖效果:
client1,只訂閱玄幻類(fantasy)消息:
client2,只訂閱歷史類(history)消息:
publish_direct.php:
<?php /** * 發布消息 * @Author: Helloweba * @Date: 2020-01-01 20:24:22 * @Last Modified by: Helloweba * @Last Modified time: 2020-01-05 20:29:46 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $exchange = 'article'; $connection = new AMQPStreamConnection('192.168.0.100', 56720, 'helloweba', 'helloweba', 'test'); $channel = $connection->channel(); //$channel->exchange_declare($exchange, 'fanout', false, false, false); $channel->exchange_declare($exchange, 'direct', false, false, false); for ($i = 0; $i < 100; $i++) { $cate = ['fantasy', 'military', 'history', 'romance']; $key = array_rand($cate); $arr = [ 'id' => 'message_' . $i, 'content' => 'helloweba '. $cate[$key] ]; $data = json_encode($arr); $msg = new AMQPMessage($data); $channel->basic_publish($msg, $exchange, $cate[$key]); echo 'Send '.$cate[$key].' message: ' . $data . PHP_EOL; } $channel->close(); $connection->close();
subscribe_direct.php:
<?php /** * 訂閱消息 * @Author: Helloweba * @Date: 2020-01-01 20:24:57 * @Last Modified by: Helloweba * @Last Modified time: 2020-01-05 20:17:16 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $exchange = 'article'; $routerKey = 'fantasy'; //只消費玄幻類消息 //$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $connection = new AMQPStreamConnection('localhost', 56720, 'helloweba', 'helloweba', 'test'); $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, false, false); list($queueName, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queueName, $exchange, $routerKey); echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL; $callback = function ($msg) { //echo " Received message:", $msg->body, PHP_EOL; echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL; sleep(1); //模擬耗時執行 }; $channel->basic_consume($queueName, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
subscribe_direct_2.php:
<?php /** * 訂閱消息 * @Author: Helloweba * @Date: 2020-01-01 20:24:57 * @Last Modified by: Helloweba * @Last Modified time: 2020-01-05 20:18:25 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $exchange = 'article'; $routerKey = 'history'; //$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $connection = new AMQPStreamConnection('localhost', 56720, 'helloweba', 'helloweba', 'test'); $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, false, false); list($queueName, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queueName, $exchange, $routerKey); //$channel->queue_bind($queue_name, $exchange); echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL; $callback = function ($msg) { echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL; sleep(1); //模擬耗時執行 }; $channel->basic_consume($queueName, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();