1、RabbitMQ 安裝
RabbitMQ 的安裝可以參考官方文檔:https://www.rabbitmq.com/download.html
2、管理頁面
rabbitmq-management插件提供基於HTTP的API方式管理和監控你的RabbitMQ服務器。
2.1、開啟 rabbitmq_management 插件
rabbitmq-plugins enable rabbitmq_management
windows 下運行如下命令
rabbitmq-plugins.bat enable rabbitmq_management
2.2、登錄管理頁面
瀏覽器輸入: http://localhost:15672 , 輸入用戶名和密碼(默認為guest)。管理頁面如下圖:
3、php 安裝 amqp 擴展
3.1、 下載對應的擴展
假如需要安裝 amqp-1.9.4
wget http://pecl.php.net/get/amqp-1.9.4.tgz
3.2、安裝
tar -zxvf amqp-1.9.4.tgz cd cd amqp-1.9.4/ ./configure --with-php-config=/path/to/php-config make && make install vim /path/to/php.ini extension=amqp.so
3.3、驗證
php -m | grep amqp
4、開發
項目采用 composer 管理依賴, 對應代碼地址:https://github.com/SevenParadise/php-examples/tree/master/mq/rabbitmq/sample
4.1、安裝php-amqplib依賴
composer require php-amqplib/php-amqplib
4.2、生產者代碼
<?php require __DIR__ . '/../../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Connection\AMQPStreamConnection; // todo 換成自己的配置 $host = '192.168.33.1'; $port = 5672; $username = 'zhangcs'; $password = 'zhangcs'; $vhost = '/'; // 1、連接到 RabbitMQ Broker,建立一個連接 $connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost); // 2、開啟一個通道 $channel = $connection->channel(); $exchange = 'test_exchange'; $queue = 'test_queue'; // 3、聲明一個交換器,並且設置相關屬性 $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); // 4、聲明一個隊列, 並且設置相關屬性 $channel->queue_declare($queue, false, true, false, false); // 5、通過路由鍵將交換器和隊列綁定起來 $channel->queue_bind($queue, $exchange); $body = $argv[1] ?? 'Hello RabbitMQ'; // 6、初始化消息,並且持久化消息 $message = new AMQPMessage($body, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); // 7、將消息發送到 RabbitMQ Broker $channel->basic_publish($message, $exchange); // 8、關閉通道 $channel->close(); // 9、關閉連接 $connection->close();
4.3、消費者代碼
<?php require __DIR__ . '/../../../vendor/autoload.php'; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Connection\AMQPStreamConnection; // todo 換成自己的配置 $host = '192.168.33.1'; $port = 5672; $username = 'zhangcs'; $password = 'zhangcs'; $vhost = '/'; // 1、連接到 RabbitMQ Broker,建立一個連接 $connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost); // 2、開啟一個通道 $channel = $connection->channel(); $exchange = 'test_exchange'; $queue = 'test_queue'; // 3、聲明一個交換器,並且設置相關屬性 $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); // 4、聲明一個隊列, 並且設置相關屬性 $channel->queue_declare($queue, false, true, false, false); // 5、通過路由鍵將交換器和隊列綁定起來 $channel->queue_bind($queue, $exchange); function process_message($message) { echo "\n--------\n"; echo $message->body; echo "\n--------\n"; $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); // Send a message with the string "quit" to cancel the consumer. if ($message->body === 'quit') { $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); } } // 6、消費消息,並且設置回調函數為 process_message $channel->basic_consume($queue, 'hahah', false, false, false, false, 'process_message'); // 7、注冊終止函數,關閉通道,關閉連接 function shutdown($channel, $connection) { $channel->close(); $connection->close(); } register_shutdown_function('shutdown', $channel, $connection); // 8、一直阻塞消費數據 while ($channel ->is_consuming()) { $channel->wait(); }
4.4、運行
運行消費者
$ php mq/rabbitmq/sample/consumer.php
運行生產者
$ php mq/rabbitmq/sample/producer.php 'message' # 關閉消費者 $ php mq/rabbitmq/sample/producer.php 'quit'
消費結果如下:
5、方法詳解
5.1、exchange_declare 方法
聲明交換器方法,函數聲明如下
public function exchange_declare( $exchange, $type, $passive = false, $durable = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = array(), $ticket = null )
- $exchange: 交換器名稱
- $type: 交換器類型,常見的如 fanout、direct、topic,詳情見:【RabbitMQ 實戰指南】一 RabbitMQ入門
- $passive: 判斷交換器是否存在,當設置為true是,然后交換器不存在時,會拋出異常
- $durable: 設置是否持久化。durable 設置為true表示持久化。持久化可以將交換器存盤,在服務器重啟的時候不會丟失相關消息。
- $auto_delete: 設置是否自動刪除,auto_delete 設置為 true 則表示自動刪除。自動 刪除的前提是至少有一個隊列或者交換器與這個交換器綁定 ,之后所有與這個交換器綁 定的隊列或者交換器都與 此解綁。注意不能錯誤地把這個參數理解為 : "當與此交換器 連接的客戶端都斷開時 , RabbitMQ 會自動 刪 除本交換器 "。
- $internal: 設置是否是內置的。如果設置為 true,則表示是內置的交換器,客戶端程 序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式。
- $nowait: 設置是否需要等待服務器返回回執消息,默認為false
5.2、queue_declare 方法
聲明隊列方法,函數聲明如下
public function queue_declare( $queue = '', $passive = false, $durable = false, $exclusive = false, $auto_delete = true, $nowait = false, $arguments = array(), $ticket = null )
- $queue: 隊列名稱
- $passive: 判斷隊列是否存在,當設置為true是,然后隊列不存在時,會拋出異常
- $durable: 設置是否持久化。durable 設置為true表示持久化。持久化可以將隊列存盤,在服務器重啟的時候不會丟失相關消息。
- $exclusive: 設置為排他。為 true 則設置隊列為排他的。如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:排他隊列是基於連接(Connection)可見的,同一個連接的不同信道(Channel)是可以同時訪問統一連接創建的排他隊列;"首次" 是指如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列適用與一個客戶端同時發送和讀取消息的應用場景。
- $auto_delete: 設置是否自動刪除。為true則設置隊列為自動刪除。自動刪除的前提是: 至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會 自動刪除。不能把這個參數錯誤地理解為: "當連接到此隊列的所有客戶端斷開時,這 個隊列自動刪除",因為生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊 列連接時,都不會自動刪除這個隊列。
- $nowait: 設置是否需要等待服務器返回回執消息,默認為false
5.3、queue_bind 方法
綁定隊列和交換器方法,函數聲明如下
public function queue_bind( $queue, $exchange, $routing_key = '', $nowait = false, $arguments = array(), $ticket = null )
- $queue: 隊列名稱
- $exchange: 交換器名稱
- $routing_key : 用來綁定隊列和交換器的路由鍵
- $nowait: 設置是否需要等待服務器返回回執消息,默認為false
5.4、exchange_bind 方法
我們不僅可以將隊列和交換器綁定,也可以將交換器和交換器綁定,函數聲明如下
public function exchange_bind( $destination, $source, $routing_key = '', $nowait = false, $arguments = array(), $ticket = null )
- $destination: 目標交換器名稱
- $source: 源交換器名稱
- $routing_key : 用來源交換器和目標交換器的路由鍵
- $nowait: 設置是否需要等待服務器返回回執消息,默認為false
生產者發送消息至交換器 source 中, 交換器 source 根據路由鍵找到與其匹配的另一個交換器 destination, 並把消息轉發到 destination 中,進而存儲在 destination 綁定的 queue 中,如下圖:
5.5、basic_publish
發送消息方法,函數聲明如下
public function basic_publish( $msg, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null )
- $msg: 需要發送的消息,PhpAmqpLib\Message\AMQPMessage 對象,可以設置特定屬性,比如消息是否持久化,消息的優先級
- $exchange: 交換器的名稱,指明消息需要發送到哪個交換器中,如果設置為空字符串,會發送給 RabbitMQ 默認的交換器 " AMQP default " 中
- $routing_key: 路由鍵
- $mandatory: 當 mandatory 設置為true 時,交換器無法通過自身的類型和路由鍵找到一個符合條件的隊列,那么 RabbitMQ 會調用 Basic.Return 命令將消息返回給生產者 。當 mandatory 參 數設置為 false 時,出現上述情形,則消息直接被丟棄 。 生產者可以通過添加一個監聽器監聽消息是否正確路由到隊列中
- $immediate: 當 imrnediate 參數設為 true 時,如果交換器在將消息路由到隊列時發現隊列上並不存在 任何消費者,那么這條消息將不會存入隊列中。當與路由鍵匹配 的所有隊列都沒有消費者時 , 該消息會至生產者。
5.6、basic_consume
消費消息方法,函數聲明如下
public function basic_consume( $queue = '', $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback = null, $ticket = null, $arguments = array() )
- $queue: 隊列名稱
- $consumer_tag: 消費者標簽,用來區分多個消費者
- $no_local: 設置為 true 則表示不能將同一個 Connection 中生產者發送的消息傳送給這個 Connection 中的消費者
- $no_ack: 設置為自動確認,詳細可參考連接: RabbitMQ 之 no_ack 分析
- $exclusive: 是否設置為排他
- $callback: 設置消費者的回調函數。用於處理 RabbitMQ 推送過來的消息