RabbitMQ 中針對消息的分發提供了 Push API (訂閱模式) 和 Pull API (主動獲取) 兩種模式. 在 PHP 中, 這兩種模式分別通過 AMQPQueue 類中的 consume 和 get 方法實現.
Push API -- consume 方法
consume 簡單示例
$conConfig = [ 'host' => '127.0.0.1', 'port' => 5672, 'login' => 'root', 'password' => 'root', 'vhost' => '/' ]; try { $con = new AMQPConnection($conConfig); $con->connect(); if(!$con->isConnected()) { echo '連接失敗';die; } $channel = new AMQPChannel($con); $exchange = new AMQPExchange($channel); $exchange->setType('fanout'); $exchange->setName('test.ex1'); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName('test.consume'); $queue->declareQueue(); $queue->bind($exchange->getName()); $queue->consume(function($envelope, $queue) { echo date('Y-m-d H:i:s') . ': ' . $envelope->getBody() . PHP_EOL; }, AMQP_AUTOACK); $con->disconnect(); }catch(Exception $e) { echo $e->getMessage(); }
通過 consume 方法通過輪詢方式持續從隊列獲取消息. 當有新消息從交換機分發到隊列時, 客戶端會自動處理新消息, 不用再主動地從隊列請求獲取消息.
Pull API -- get 方法
get 示例
$conConfig = [ 'host' => '127.0.0.1', 'port' => 5672, 'login' => 'root', 'password' => 'root', 'vhost' => '/' ]; try { $con = new AMQPConnection($conConfig); $con->connect(); if(!$con->isConnected()) { echo '連接失敗';die; } $channel = new AMQPChannel($con); $exchange = new AMQPExchange($channel); $exchange->setType('fanout'); $exchange->setName('test.ex1'); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName('test.get'); $queue->declareQueue(); $msgEnvelope = $queue->get(AMQP_AUTOACK); if($msgEnvelope) { $msg = $msgEnvelope->getBody(); echo $msg . PHP_EOL; } $con->disconnect(); }catch(Exception $e) { echo $e->getMessage(); }
get 方式是主動從隊列獲取消息. 除非主動發起請求, 否則隊列中的新消息不會推送到客戶端.
producer 示例

header('Content-Type: text/html; charset=utf-8'); // 連接設置 $conConfig = [ 'host' => '127.0.0.1', 'port' => 5672, 'login' => 'root', 'password' => 'root', 'vhost' => '/' ]; try { // RabbitMQ 連接實例 $con = new AMQPConnection($conConfig); // 發起連接 $con->connect(); // 判斷連接結果,true成功,false失敗 if(!$con->isConnected()) { echo '連接失敗';die; } // 新建通道 $channel = new AMQPChannel($con); // 使用RabbitMQ的默認Exchange $exchange = new AMQPExchange($channel); $exchange->setType('fanout'); $exchange->setName('test.ex1'); $exchange->declareExchange(); $i = 1; while(1) { $state = $exchange->publish('消息' . $i++); if($state) { echo 'Success' . PHP_EOL; }else { echo 'Fail' . PHP_EOL; } sleep(1); } // 關閉連接 $con->disconnect(); }catch(Exception $e) { echo $e->getMessage(); }