RabbitMQ官方中文入門教程(PHP版) 第一部分:Hello World


RabbitMQ是一個消息代理。它的核心原理非常簡單:接收和發送消息。你可以把它想像成一個郵局:你把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處。在這個比喻中,RabbitMQ是一個郵箱、郵局、郵遞員。RabbitMQ和郵局的主要區別是,它處理的不是紙,而是接收、存儲和發送二進制的數據——消息。一般提到RabbitMQ和消息,都用到一些專有名詞。

  • 生產(Producing)意思就是發送。發送消息的程序就是一個生產者(producer)。我們一般用”P”來表示:
  • 隊列(queue)就是郵箱的名稱。消息通過你的應用程序和RabbitMQ進行傳輸,它們能夠只存儲在一個隊列(queue)中。 隊列(queue)沒有任何限制,你要存儲多少消息都可以——基本上是一個無限的緩沖。多個生產者(producers)能夠把消息發送給同一個隊列,同樣,多個消費者(consumers)也能攻從一個隊列(queue)中獲取數據。隊列可以化城這樣(圖上是隊列的名稱):
  • 消費(Consuming)和獲取消息是一樣的意思。一個消費者(consumer)就是一個等待獲取消息的程序。我們把它畫作”C”:

Hello World!

(使用pika 0.9.5 Python客戶端)

我們的“Hello world”不會很復雜——僅僅發送一個消息,然后獲取它並輸出到屏幕。這樣以來我們需要兩個程序,一個用作發送消息,另一個接受消息並打印消息內容

我們大體的設計是這樣的:

生產者(Producer)把消息發送到一個名為“hello”的隊列中。消費者(consumer)從這個隊列中獲取消息。

RabbitMQ庫

RabbitMQ使用的是AMQP協議。要使用她你就必須需要一個使用同樣協議的庫。幾乎所有的編程語言都有可選擇的庫。python也是一樣,可以從以下幾個庫中選擇:

  • py-amqplib
  • txAMQP
  • pika

在這一系列教程中,我們打算使用PHP 的AMQP擴展。詳細教程請查看:

mac os 下RabbitMq 以及 PHP amqp擴展安裝記錄

發送消息

我們第一個程序send.php會發送一個消息到隊列中。首先要做的事情就是建立一個到RabbitMQ服務器的連接。

$connection = new AMQPConnection(array('host' =>'127.0.0.1', 'port' =>'5672', 'vhost' =>'/', 'login' =>'guest', 'password' => 'guest')); 

現在我們已經連接上服務器了,那么,在發送消息之前我們需要確認隊列是存在的。如果我們把消息發送到一個不存在的隊列,RabbitMQ會丟棄這條消息。我門先創建一個名為hello的隊列,然后把消息發送到這個隊列中。

$queue = new AMQPQueue($channel); $queue->setName($queueName); 

這時候我們就可以發送消息了,我們第一條消息只包含了 Hello World!字符串,我們打算把它發送到我們的hello隊列。

在RabbitMQ中,消息是不能直接發送到隊列,它需要發送到交換器(exchange)中。我們不打算在這里深入討論它——你可以通過教程的第三部分了解更多。現在我們所需要了解的是如何使用默認的交換器(exchange),它使用一個空字符串來標識。交換器允許我們指定某條消息需要投遞到哪個隊列,$$routeKey參數必須指定為隊列的名稱:

$exchange->publish($message, $routeKey);
var_dump("[x] Sent 'Hello World!'");

在退出程序之前,我們需要確認網絡緩沖已經被刷寫、消息已經投遞到RabbitMQ。完成這些事情(正確的關閉連接)是很簡單的。

$connection->disconnect();

獲取數據

我們的第二個程序receive.php,將會從隊列中獲取消息並打印消息。

這次我們還是先要連接到RabbitMQ服務器。連接服務器的代碼和之前是一樣的。

下一步也和之前一樣,我們需要確認隊列是存在的。使用$queue->declare()創建一個隊列——我們可以運行這個命令很多次,但是只有一個隊列會創建。

$queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->declare(); 

你也許要問為什么重復聲明了隊列——我們已經在前面的代碼中聲明了它。如果我們確定了隊列是已經存在的,那么我們可以不這么做。比如先運行send.php程序。可是我們並不確定哪個程序先運行,這種情況的話再程序中重復聲明是好的做法。

列出所有隊列

你也許希望查看RabbitMQ由哪些隊列、有多少消息在隊列中。你可以使用rabbitmqctl工具(使用有權限的用戶):

``` $ sudo rabbitmqctl list_queues Listing queues ... hello 0 ...done.

```

(omit sudo on Windows)

(在Windows中不需要sudo命令)

從隊列中獲取消息相對來說稍顯復雜。需要為隊列定義一個回調(callback)函數。當我們獲取到消息的時候,Pika庫就會調用這個回調(callback)函數。我們的這個回調函數將會但因消息的內容到屏幕上。

function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); $queue->nack($envelope->getDeliveryTag()); } 

下一步,我們需要告訴RabbitMQ這個回調函數將會從hello隊列中接收消息:

$queue->consume('callback'); 

要成功運行這些命令,我們必須保證隊列是存在的,我們已經能夠保證——我們之前已經使用創建了一個隊列queue_declare。

$queue->nack()//函數稍后會介紹。 

最后,我們輸入一個無限循環來等待消息數據並確運行回調函數。

var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); } 

整合

send.php的全部代碼:

<?php /** * PHP amqp(RabbitMQ) Demo-1 * @author yuansir &/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'hello'; $routeKey = 'hello'; $message = 'Hello World!'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); try { $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $queue = new AMQPQueue($channel); $queue->setName($queueName); $exchange->publish($message, $routeKey); var_dump("[x] Sent 'Hello World!'"); } catch (AMQPConnectionException $e) { var_dump($e); exit(); } $connection->disconnect(); 

receive.py的全部代碼:

<?php /** * PHP amqp(RabbitMQ) Demo-1 * @author yuansir &/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'hello'; $routeKey = 'hello'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->declare(); $queue->bind($exchangeName, $routeKey); var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); } $connection->disconnect(); function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); $queue->nack($envelope->getDeliveryTag()); } 

現在就可以在終端中運行我們的程序了。首先,用send.php重續發送一條消息:

php send.php  
string(23) "[x] Sent 'Hello World!'"</pre> 

生產者(producer)程序send.php每次運行之后就會停止。現在我們就來接收消息:

php receive.php  
string(46) "[*] Waiting for messages. To exit press CTRL+C"  
string(26) " [x] Received:Hello World!"</pre> 

成功了!我們已經通過RabbitMQ發送第一條消息。你也許已經注意到了,receive.py程序並沒有退出。它一直在准備獲取消息,你可以通過Ctrl-C來終端它。

試下在新的終端中再次運行send.php。

我們已經學會如何發送消息到一個已知隊列中並接收消息。是時候移步到第二部分了,我們將會建立一個簡單的工作隊列(work queue)

轉載自Ryan是菜鳥 | LNMP技術棧筆記


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM