php 操作RabbitMQ


基本流程圖

如果exchange 沒有綁定queue,則消息將會被丟棄
如果創建exchange,queue,並且已經綁定了,則可以直接使用
為了防止腳本出問題 可以配合supervisor

安裝

  • 從網站 https://packagist.org 搜索rabbitmq插件
  • 使用composer安裝插件composer require php-amqplib/php-amqplib

使用

  • 1.連接RabbitMQ服務器
  • 2.開始一個新的 channel
  • 3.新建一個exchange
  • 4.新建一個queue
  • 5.綁定queue和exchange
  • 6.發布一個消息
  • 7.建立一個消費者並注冊一個回調函數
  • 8.監聽數據

新建連接和channel

    <?php
    require "./vendor/autoload.php";
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    
    $host   = "192.168.110.134";
    $port   = 5672;
    $user   = "test";
    $pass   = "test";
    
    $vhost  = "/";
    
    try{
        $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
    }catch (Exception $e){
        echo 'Caught exception: ',  $e->getMessage(), "\n";die;
    }
    
    $channel = $connection->channel();

新建一個exchange

/*
    name: $exchange
    type: fanout
    passive: false // don't check is an exchange with the same name exists
    durable: false // the exchange won't survive server restarts
    auto_delete: true //the exchange will be deleted once the channel is closed.
*/
try{
    $name = 'example_direct_exchange';
    $type = "direct";
    $passive = false;
    $durable = true;
    $auto_delete = true;
    $channel->exchange_declare($name, $type, $passive, $durable, $auto_delete);

}catch (Exception $e){
    echo 'Caught exception: ',  $e->getMessage(), "\n";die;
}

參數 name

exchange名稱

參數 type

exchange類型
fanout 是廣播類型的消息 會給所有綁定的queue發送數據

參數 passive

true    

1.如果exchange已存在 則直接連接 並且不檢查配置 比如已存在的exchange是fanout,新需要建立的是direct,也不會報錯;

2.如果exchange不存在 則直接報錯

false
1.如果exchange不存在 則創建新的exchange

2.如果exchange已存在 則判斷配置是否相同。如果配置不相同 則直接報錯。比如已存在的exchange是fanout,新需要建立的是direct,會報錯。

參數 auto_delete

true 
當最后一個消費者取消訂閱之后 exchange會被自動刪除 一般用於臨時exchange

新建一個queue

/*
    name: $queue    // should be unique in fanout exchange.
    passive: false  // don't check if a queue with the same name exists
    durable: false // the queue will not survive server restarts
    exclusive: false // the queue might be accessed by other channels
    auto_delete: true //the queue will be deleted once the channel is closed.
*/
$queue1 = 'example_direct_queue_1';

$channel->queue_declare($queue1, false, true, false, false);

將queue和exchange綁定起來

    $queue1 = 'example_direct_queue_1';
    $exchange_name = 'example_direct_exchange';
    
    $channel->queue_bind($queue1, $exchange_name);

發布一個消息

$exchange_name = 'example_direct_exchange';
$messageBody = array(
    'example_direct_value'=>date('Y-m-d H:i:s'),
);
$message = new AMQPMessage(json_encode($messageBody));
$channel->basic_publish($message, $exchange_name);

建立一個消費者並注冊一個回調函數

/*
    queue: Queue from where to get the messages
    consumer_tag: Consumer identifier
    no_local: Don't receive messages published by this consumer.
    no_ack: Tells the server if the consumer will acknowledge the messages.
    exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
    nowait:
    callback: A PHP Callback
*/

$consumerTag = 'consumer';

$queue  = 'example_direct_queue_1';

$channel->basic_consume($queue, "", false, false, false, false,function($msg){


    $message = json_decode($msg->body, true);

    file_put_contents("./mq.log", $message,FILE_APPEND);

    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
});

參數no_ack

true
消息只有在返回一個ack之后,才會被刪除

false
消息被取出之后 會被立即刪除

監聽數據

try {
    while (count($channel->callbacks)) {
        $channel->wait();
    }
}
catch(\PhpAmqpLib\Exception\AMQPTimeoutException $e){
    $channel->close();
    $channel->close();
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM