以 Direct 類型的 交換機和 Queue 的 get 方法為例.
producer.php

// 連接設置 $conConfig = [ 'host' => '127.0.0.1', 'port' => 5672, 'login' => 'root', 'password' => 'root', 'vhost' => '/' ]; try { // RabbitMQ 連接實例 $con = new AMQPConnection($conConfig); // 發起連接 $con->connect(); // 新建通道 $channel = new AMQPChannel($con); // 在指定通道上新建交換機 $exchange = new AMQPExchange($channel); // 交換機名稱 $exchange->setName('test.exchange.ack'); // 交換機類型 $exchange->setType('direct'); // 聲明交換機 $exchange->declareExchange(); for($i = 1; $i <= 3; $i++) { $msg = '消息' . $i; // 發送消息,同時為消息指定routing key,成功返回true,失敗false $state = $exchange->publish($msg, 'test.rt.ack'); if($state) { echo 'Success' . PHP_EOL; }else { echo 'Fail' . PHP_EOL; } } // 關閉連接 //$con->disconnect(); }catch(\Exception $e) { echo $e->getMessage(); }
自動 ACK
consumer.php
$conConfig = [ 'host' => '127.0.0.1', 'port' => 5672, 'login' => 'root', 'password' => 'root', 'vhost' => '/' ]; try { $con = new AMQPConnection($conConfig); $con->connect(); $channel = new AMQPChannel($con); $exchange =new AMQPExchange($channel); $exchange->setName('test.exchange.ack'); $exchange->setType('direct'); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName('test.ack.queue'); // 聲明隊列同時返回隊列中的消息數量 $messageCount = $queue->declareQueue(); echo '消息數量: ' . $messageCount . PHP_EOL; $queue->bind('test.exchange.ack', 'test.rt.ack'); // 獲取消息后進行自動應答時, get方法的參數設置為AMQP_AUTOACK即可 while($msgEnvelope = $queue->get(AMQP_AUTOACK)) { $msg = $msgEnvelope->getBody(); echo $msg . PHP_EOL; } $con->disconnect(); }catch(Exception $e) { echo $e->getMessage(); }
將 Queue 的 get 方法參數設置為 AMQP_AUTOACK 即可在獲取到消息后自動發送消息已收到響應.
手動 ack
如果不需要自動 ack, 而是根據實際的業務處理結果進行處理. Queue 的 get 方法參數修改為 AMQP_NOPARM 即可.
修改后推送三條消息:
連續兩次從隊列中獲取消息:
如果不進行 ack, 隊列中的消息將一直存在, 可以反復獲取.
繼續修改 while 循環為:
while($msgEnvelope = $queue->get(AMQP_NOPARAM)) { $msg = $msgEnvelope->getBody(); if(preg_match("/.*?消息2.*?/", $msg)) { // 對消息2執行確定響應 $queue->ack($msgEnvelope->getDeliveryTag()); } echo date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL; }
連續執行兩次 comsumer.php:
第一次獲取到 3 條消息, 但第一次執行中對消息 2 執行了確認響應, 剩余消息不進行確認響應. 第二次執行中只獲取到剩余消息.
NACK (否定響應)
如果既不想對消息執行確定響應, 也不需要消息繼續出現在隊列中, 可以使用 Queue 的 nack 方法. 繼續修改 while 循環:
while($msgEnvelope = $queue->get(AMQP_NOPARAM)) { $msg = $msgEnvelope->getBody(); if(preg_match("/.*?消息2.*?/", $msg)) { // 對消息2執行確定響應 $queue->ack($msgEnvelope->getDeliveryTag()); }else { $queue->nack($msgEnvelope->getDeliveryTag()); } echo date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL; }
推送 3 條消息后, 連續執行兩次 consumper.php:
第一次執行, 獲取到 3 條數據; 第二次執行未獲取到任何數據. nack 方法除了可以從隊列中過濾掉不需要的方法, 也可以將暫時不需要的方法重新放回隊列, 將該方法的調用修改為:
$queue->nack($msgEnvelope->getDeliveryTag(), AMQP_REQUEUE);
注意: nack 方法將消息放回隊列后, 隊列會將消息再次推送給消費者. 如果此時隊列只有一個消費者, 將會造成死循環.