<?php
require_once '../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$exchangeName = 'exchange_1';
$queueName = 'queue_1';
$routeKey = 'routeingkey_1';
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'test', 'test123', '/');
$channel = $connection->channel();
//推送成功
$channel->set_ack_handler(
function (AMQPMessage $message) {
echo "發送成功: " . $message->body . PHP_EOL;
}
);
//推送失敗
$channel->set_nack_handler(
function (AMQPMessage $message) {
echo "發送失敗: " . $message->body . PHP_EOL;
}
);
/*
* bring the channel into publish confirm mode.
* if you would call $ch->tx_select() before or after you brought the channel into this mode
* the next call to $ch->wait() would result in an exception as the publish confirm mode and transactions
* are mutually exclusive
*/
/*
* 進入發布確認模式。
* 如果在將通道引入此模式之前或之后調用$ch->tx_select()
* 下一個調用$ch->wait()將導致發布確認模式和事務異常
* 是互斥的
*/
$channel->confirm_select(); // 發布確認模式
// 通道
$channel->exchange_declare($exchangeName, 'direct', false, false, false);
// 隊列
$channel->queue_declare($queueName, false, false, false, false);
// 使用routeKey綁定交換機和隊列
$channel->queue_bind($queueName, $exchangeName, $routeKey);
//$channel->wait_for_pending_acks();
for ($i = 0; $i < 10; $i++) {
$msg = new AMQPMessage('Hello World!'.$i);
$channel->basic_publish($msg, $exchangeName, $routeKey);
}
/*
* you do not have to wait for pending acks after each message sent. in fact it will be much more efficient
* to wait for as many messages to be acked as possible.
*/
/*
*您不必在每條消息發送后等待掛起的acks。事實上,這樣會更有效率
*等待盡可能多的郵件被屏蔽。
*/
$channel->wait_for_pending_acks();
// 監聽成功或失敗返回結束 成功/失敗 => set_ack_handler/set_nack_handler
$channel->close();
$connection->close();



