【RabbitMQ 實戰指南】一 RabbitMQ 開發


1、RabbitMQ 安裝

RabbitMQ 的安裝可以參考官方文檔:https://www.rabbitmq.com/download.html

2、管理頁面

rabbitmq-management插件提供基於HTTP的API方式管理和監控你的RabbitMQ服務器。

2.1、開啟 rabbitmq_management 插件

rabbitmq-plugins enable rabbitmq_management

windows 下運行如下命令

rabbitmq-plugins.bat enable rabbitmq_management

2.2、登錄管理頁面

瀏覽器輸入: http://localhost:15672 , 輸入用戶名和密碼(默認為guest)。管理頁面如下圖:

 

3、php 安裝 amqp 擴展

3.1、 下載對應的擴展

下載地址: https://pecl.php.net/package/amqp

假如需要安裝 amqp-1.9.4

wget http://pecl.php.net/get/amqp-1.9.4.tgz

3.2、安裝

tar -zxvf amqp-1.9.4.tgz
cd cd amqp-1.9.4/
./configure --with-php-config=/path/to/php-config
make && make install
vim /path/to/php.ini
extension=amqp.so

3.3、驗證

php -m | grep amqp

4、開發

項目采用 composer 管理依賴, 對應代碼地址:https://github.com/SevenParadise/php-examples/tree/master/mq/rabbitmq/sample

4.1、安裝php-amqplib依賴

composer require php-amqplib/php-amqplib

4.2、生產者代碼

<?php 
require __DIR__ . '/../../../vendor/autoload.php';

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection;

// todo 換成自己的配置
$host = '192.168.33.1';
$port = 5672;
$username = 'zhangcs';
$password = 'zhangcs';
$vhost = '/';

// 1、連接到 RabbitMQ Broker,建立一個連接
$connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost);

// 2、開啟一個通道
$channel = $connection->channel();

$exchange = 'test_exchange';
$queue = 'test_queue';
// 3、聲明一個交換器,並且設置相關屬性
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);

// 4、聲明一個隊列, 並且設置相關屬性
$channel->queue_declare($queue, false, true, false, false);

// 5、通過路由鍵將交換器和隊列綁定起來
$channel->queue_bind($queue, $exchange);

$body = $argv[1] ?? 'Hello RabbitMQ';

// 6、初始化消息,並且持久化消息
$message = new AMQPMessage($body, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);

// 7、將消息發送到 RabbitMQ Broker
$channel->basic_publish($message, $exchange);

// 8、關閉通道
$channel->close();
// 9、關閉連接
$connection->close();

4.3、消費者代碼

<?php 
require __DIR__ . '/../../../vendor/autoload.php';

use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection;

// todo 換成自己的配置
$host = '192.168.33.1';
$port = 5672;
$username = 'zhangcs';
$password = 'zhangcs';
$vhost = '/';

// 1、連接到 RabbitMQ Broker,建立一個連接
$connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost);

// 2、開啟一個通道
$channel = $connection->channel();

$exchange = 'test_exchange';
$queue = 'test_queue';

// 3、聲明一個交換器,並且設置相關屬性
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);

// 4、聲明一個隊列, 並且設置相關屬性
$channel->queue_declare($queue, false, true, false, false);

// 5、通過路由鍵將交換器和隊列綁定起來
$channel->queue_bind($queue, $exchange);

function process_message($message)
{
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    // Send a message with the string "quit" to cancel the consumer.
    if ($message->body === 'quit') {
        $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
    }
}

// 6、消費消息,並且設置回調函數為 process_message
$channel->basic_consume($queue, 'hahah', false, false, false, false, 'process_message');

// 7、注冊終止函數,關閉通道,關閉連接
function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);

// 8、一直阻塞消費數據
while ($channel ->is_consuming()) {
    $channel->wait();
}

4.4、運行

運行消費者

$ php mq/rabbitmq/sample/consumer.php

運行生產者

$ php mq/rabbitmq/sample/producer.php 'message'
# 關閉消費者
$ php mq/rabbitmq/sample/producer.php 'quit'

消費結果如下:

5、方法詳解

5.1、exchange_declare 方法

聲明交換器方法,函數聲明如下

public function exchange_declare(
    $exchange,
    $type,
    $passive = false,
    $durable = false,
    $auto_delete = true,
    $internal = false,
    $nowait = false,
    $arguments = array(),
    $ticket = null
)

 

  • $exchange: 交換器名稱
  • $type: 交換器類型,常見的如 fanout、direct、topic,詳情見:【RabbitMQ 實戰指南】一 RabbitMQ入門
  • $passive: 判斷交換器是否存在,當設置為true是,然后交換器不存在時,會拋出異常
  • $durable: 設置是否持久化。durable 設置為true表示持久化。持久化可以將交換器存盤,在服務器重啟的時候不會丟失相關消息。
  • $auto_delete: 設置是否自動刪除,auto_delete 設置為 true 則表示自動刪除。自動 刪除的前提是至少有一個隊列或者交換器與這個交換器綁定 ,之后所有與這個交換器綁 定的隊列或者交換器都與 此解綁。注意不能錯誤地把這個參數理解為 : "當與此交換器 連接的客戶端都斷開時 , RabbitMQ 會自動 刪 除本交換器 "。 
  • $internal: 設置是否是內置的。如果設置為 true,則表示是內置的交換器,客戶端程 序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式。 
  • $nowait: 設置是否需要等待服務器返回回執消息,默認為false

5.2、queue_declare 方法

聲明隊列方法,函數聲明如下

public function queue_declare(
    $queue = '',
    $passive = false,
    $durable = false,
    $exclusive = false,
    $auto_delete = true,
    $nowait = false,
    $arguments = array(),
    $ticket = null
) 
  • $queue: 隊列名稱
  • $passive: 判斷隊列是否存在,當設置為true是,然后隊列不存在時,會拋出異常
  • $durable: 設置是否持久化。durable 設置為true表示持久化。持久化可以將隊列存盤,在服務器重啟的時候不會丟失相關消息。
  • $exclusive: 設置為排他。為 true 則設置隊列為排他的。如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:排他隊列是基於連接(Connection)可見的,同一個連接的不同信道(Channel)是可以同時訪問統一連接創建的排他隊列;"首次" 是指如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列適用與一個客戶端同時發送和讀取消息的應用場景。
  • $auto_delete: 設置是否自動刪除。為true則設置隊列為自動刪除。自動刪除的前提是: 至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會 自動刪除。不能把這個參數錯誤地理解為: "當連接到此隊列的所有客戶端斷開時,這 個隊列自動刪除",因為生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊 列連接時,都不會自動刪除這個隊列。 
  • $nowait: 設置是否需要等待服務器返回回執消息,默認為false

5.3、queue_bind 方法

綁定隊列和交換器方法,函數聲明如下

public function queue_bind(
    $queue,
    $exchange,
    $routing_key = '',
    $nowait = false,
    $arguments = array(),
    $ticket = null
)
  • $queue: 隊列名稱
  • $exchange: 交換器名稱
  • $routing_key : 用來綁定隊列和交換器的路由鍵
  • $nowait: 設置是否需要等待服務器返回回執消息,默認為false

5.4、exchange_bind 方法

我們不僅可以將隊列和交換器綁定,也可以將交換器和交換器綁定,函數聲明如下

public function exchange_bind(
    $destination,
    $source,
    $routing_key = '',
    $nowait = false,
    $arguments = array(),
    $ticket = null
)
  • $destination: 目標交換器名稱
  • $source: 源交換器名稱
  • $routing_key : 用來源交換器和目標交換器的路由鍵
  • $nowait: 設置是否需要等待服務器返回回執消息,默認為false

生產者發送消息至交換器 source 中, 交換器 source 根據路由鍵找到與其匹配的另一個交換器 destination, 並把消息轉發到 destination 中,進而存儲在 destination 綁定的 queue 中,如下圖:

5.5、basic_publish

發送消息方法,函數聲明如下

public function basic_publish(
    $msg,
    $exchange = '',
    $routing_key = '',
    $mandatory = false,
    $immediate = false,
    $ticket = null
)
  • $msg: 需要發送的消息,PhpAmqpLib\Message\AMQPMessage 對象,可以設置特定屬性,比如消息是否持久化,消息的優先級
  • $exchange: 交換器的名稱,指明消息需要發送到哪個交換器中,如果設置為空字符串,會發送給 RabbitMQ 默認的交換器 " AMQP default " 中
  • $routing_key: 路由鍵
  •  $mandatory: 當 mandatory 設置為true 時,交換器無法通過自身的類型和路由鍵找到一個符合條件的隊列,那么 RabbitMQ 會調用 Basic.Return 命令將消息返回給生產者 。當 mandatory 參 數設置為 false 時,出現上述情形,則消息直接被丟棄 。 生產者可以通過添加一個監聽器監聽消息是否正確路由到隊列中
  • $immediate: 當 imrnediate 參數設為 true 時,如果交換器在將消息路由到隊列時發現隊列上並不存在 任何消費者,那么這條消息將不會存入隊列中。當與路由鍵匹配 的所有隊列都沒有消費者時 , 該消息會至生產者。 

5.6、basic_consume

消費消息方法,函數聲明如下

public function basic_consume(
    $queue = '',
    $consumer_tag = '',
    $no_local = false,
    $no_ack = false,
    $exclusive = false,
    $nowait = false,
    $callback = null,
    $ticket = null,
    $arguments = array()
) 
  • $queue: 隊列名稱
  • $consumer_tag: 消費者標簽,用來區分多個消費者
  • $no_local: 設置為 true 則表示不能將同一個 Connection 中生產者發送的消息傳送給這個 Connection 中的消費者
  • $no_ack: 設置為自動確認,詳細可參考連接: RabbitMQ 之 no_ack 分析
  • $exclusive: 是否設置為排他
  • $callback: 設置消費者的回調函數。用於處理 RabbitMQ 推送過來的消息

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM