RabbitMQ中文文檔PHP版本(五)--主題


2019年12月10日10:05:11

原文:https://www.rabbitmq.com/tutorials/tutorial-five-php.html

 

話題

(使用php-amqplib

 

先決條件

本教程假定RabbitMQ 在標准端口(5672)的本地主機安裝並運行如果您使用其他主機,端口或憑據,則連接設置需要進行調整。

在哪里獲得幫助

如果您在閱讀本教程時遇到困難,可以 通過郵件列表與我們聯系。

 

在上一教程中,我們改進了日志記錄系統。我們沒有使用只能進行虛擬廣播扇出交換機,而是使用直接交換機,並有可能選擇性地接收日志。

盡管使用直接交換改進了我們的系統,但它仍然存在局限性-它不能基於多個條件進行路由。

在我們的日志記錄系統中,我們可能不僅要根據嚴重性訂閱日志,還要根據發出日志的源訂閱日志。您可能從syslog unix工具中了解了這個概念,該 工具根據嚴重性(info / warn / crit ...)和工具(auth / cron / kern ...)路由日志。

這將為我們提供很大的靈活性-我們可能只想聽來自“ cron”的嚴重錯誤,也可以聽“ kern”的所有日志。

為了在日志系統中實現這一點,我們需要學習更復雜的主題交換。

話題交流

發送到主題交換的消息不能具有任意的 routing_key-它必須是單詞列表,以點分隔。這些詞可以是任何東西,但是通常它們指定與消息相關的某些功能。一些有效的路由關鍵示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由密鑰中可以包含任意多個單詞,最多255個字節。

綁定密鑰也必須采用相同的形式。主題交換背后的邏輯類似於直接交換的邏輯 -使用特定路由密鑰發送的消息將被傳遞到所有使用匹配綁定密鑰綁定的隊列。但是,綁定鍵有兩個重要的特殊情況:

  • *(星號)可以代替一個單詞。
  • (哈希)可以替代零個或多個單詞。

在一個示例中最容易解釋這一點:

在此示例中,我們將發送所有描述動物的消息。將使用包含三個詞(兩個點)的路由密鑰發送消息。路由鍵中的第一個單詞將描述速度,第二個將描述顏色,第三個將描述物種:“ <speed>。<color>。<species> ”。

我們創建了三個綁定:Q1與綁定鍵“ * .orange。* ” 綁定,Q2與“ *。*。rabbit ”和“ lazy。# ” 綁定

這些綁定可以總結為:

  • Q1對所有橙色動物都感興趣。
  • 第2季想聽聽有關兔子的一切,以及有關懶惰動物的一切。

路由鍵設置為“ quick.orange.rabbit ”的消息將傳遞到兩個隊列。消息“ lazy.orange.elephant ”也將發送給他們兩個。另一方面,“ quick.orange.fox ”只會進入第一個隊列,而“ lazy.brown.fox ”只會進入第二個隊列“ lazy.pink.rabbit ”將被傳遞到第二隊只有一次,即使兩個綁定匹配。“ quick.brown.fox ”與任何綁定都不匹配,因此將被丟棄。

如果我們違反合同並發送一個或四個單詞的消息,例如“ 橙色 ”或“ quick.orange.male.rabbit ”,會發生什么?好吧,這些消息將不匹配任何綁定,並且將會丟失。

另一方面,“ lazy.orange.male.rabbit ”即使有四個單詞,也將匹配最后一個綁定,並將其傳送到第二個隊列。

話題交流

主題交流功能強大,可以像其他交流一樣進行。

當隊列用“  ”(哈希)綁定鍵綁定時,它將接收所有消息,而與路由鍵無關,就像在扇出交換中一樣。

在綁定中不使用特殊字符“ * ”(星號)和“  ”(哈希)時,主題交換的行為就像直接的一樣

放在一起

我們將在日志記錄系統中使用主題交換。我們將從一個可行的假設開始,即日志的路由鍵將包含兩個詞:“ <facility>。<severity> ”。

該代碼與上一教程中的代碼幾乎相同 

的代碼emit_log_topic.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo ' [x] Sent ', $routing_key, ':', $data, "\n";

$channel->close();
$connection->close();

receive_logs_topic.php的代碼

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

接收所有日志:

php receive_logs_topic.php "#"

要從設施“ kern ” 接收所有日志

php receive_logs_topic.php "kern.*"

或者,如果您只想聽聽“ 關鍵 ”日志:

php receive_logs_topic.php "*.critical"

您可以創建多個綁定:

php receive_logs_topic.php "kern.*" "*.critical"

並發出帶有路由鍵“ kern.critical ” 的日志類型:

php emit_log_topic.php "kern.critical" "A critical kernel error"

玩這些程序玩得開心。請注意,該代碼未對路由鍵或綁定鍵進行任何假設,您可能要使用兩個以上的路由鍵參數。

embed_log_topic.php 和receive_logs_topic.php的完整源代碼

接下來,在教程6中找出如何做往返消息作為遠程過程調用


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM