基本流程圖
如果exchange 沒有綁定queue,則消息將會被丟棄
如果創建exchange,queue,並且已經綁定了,則可以直接使用
為了防止腳本出問題 可以配合supervisor
安裝
- 從網站 https://packagist.org 搜索rabbitmq插件
- 使用composer安裝插件composer require php-amqplib/php-amqplib
使用
- 1.連接RabbitMQ服務器
- 2.開始一個新的 channel
- 3.新建一個exchange
- 4.新建一個queue
- 5.綁定queue和exchange
- 6.發布一個消息
- 7.建立一個消費者並注冊一個回調函數
- 8.監聽數據
新建連接和channel
<?php
require "./vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$host = "192.168.110.134";
$port = 5672;
$user = "test";
$pass = "test";
$vhost = "/";
try{
$connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
}catch (Exception $e){
echo 'Caught exception: ', $e->getMessage(), "\n";die;
}
$channel = $connection->channel();
新建一個exchange
/*
name: $exchange
type: fanout
passive: false // don't check is an exchange with the same name exists
durable: false // the exchange won't survive server restarts
auto_delete: true //the exchange will be deleted once the channel is closed.
*/
try{
$name = 'example_direct_exchange';
$type = "direct";
$passive = false;
$durable = true;
$auto_delete = true;
$channel->exchange_declare($name, $type, $passive, $durable, $auto_delete);
}catch (Exception $e){
echo 'Caught exception: ', $e->getMessage(), "\n";die;
}
參數 name
exchange名稱
參數 type
exchange類型
fanout 是廣播類型的消息 會給所有綁定的queue發送數據
參數 passive
true
1.如果exchange已存在 則直接連接 並且不檢查配置 比如已存在的exchange是fanout,新需要建立的是direct,也不會報錯;
2.如果exchange不存在 則直接報錯
false
1.如果exchange不存在 則創建新的exchange
2.如果exchange已存在 則判斷配置是否相同。如果配置不相同 則直接報錯。比如已存在的exchange是fanout,新需要建立的是direct,會報錯。
參數 auto_delete
true
當最后一個消費者取消訂閱之后 exchange會被自動刪除 一般用於臨時exchange
新建一個queue
/*
name: $queue // should be unique in fanout exchange.
passive: false // don't check if a queue with the same name exists
durable: false // the queue will not survive server restarts
exclusive: false // the queue might be accessed by other channels
auto_delete: true //the queue will be deleted once the channel is closed.
*/
$queue1 = 'example_direct_queue_1';
$channel->queue_declare($queue1, false, true, false, false);
將queue和exchange綁定起來
$queue1 = 'example_direct_queue_1';
$exchange_name = 'example_direct_exchange';
$channel->queue_bind($queue1, $exchange_name);
發布一個消息
$exchange_name = 'example_direct_exchange';
$messageBody = array(
'example_direct_value'=>date('Y-m-d H:i:s'),
);
$message = new AMQPMessage(json_encode($messageBody));
$channel->basic_publish($message, $exchange_name);
建立一個消費者並注冊一個回調函數
/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
*/
$consumerTag = 'consumer';
$queue = 'example_direct_queue_1';
$channel->basic_consume($queue, "", false, false, false, false,function($msg){
$message = json_decode($msg->body, true);
file_put_contents("./mq.log", $message,FILE_APPEND);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
});
參數no_ack
true
消息只有在返回一個ack之后,才會被刪除
false
消息被取出之后 會被立即刪除
監聽數據
try {
while (count($channel->callbacks)) {
$channel->wait();
}
}
catch(\PhpAmqpLib\Exception\AMQPTimeoutException $e){
$channel->close();
$channel->close();
}