PHP 下基於 php-amqp 擴展的 RabbitMQ 簡單用例 (五) -- 自動 ACK、手動 ACK、NACK


以 Direct 類型的 交換機和 Queue 的 get 方法為例.

producer.php

// 連接設置
$conConfig = [
    'host' => '127.0.0.1',
    'port' => 5672,
    'login' => 'root',
    'password' => 'root',
    'vhost' => '/'
];
try
{
    // RabbitMQ 連接實例
    $con = new AMQPConnection($conConfig);
    // 發起連接
    $con->connect();
    // 新建通道
    $channel = new AMQPChannel($con);
    // 在指定通道上新建交換機
    $exchange = new AMQPExchange($channel);
    // 交換機名稱
    $exchange->setName('test.exchange.ack');
    // 交換機類型
    $exchange->setType('direct');
    // 聲明交換機
    $exchange->declareExchange();
    
    for($i = 1; $i <= 3; $i++)
    {
        $msg = '消息' . $i;
        // 發送消息,同時為消息指定routing key,成功返回true,失敗false
        $state = $exchange->publish($msg, 'test.rt.ack');
        if($state)
        {
            echo 'Success' . PHP_EOL;
        }else
        {
            echo 'Fail' . PHP_EOL;
        }
    }
    
    // 關閉連接
    //$con->disconnect();
}catch(\Exception $e)
{
    echo $e->getMessage();
}
View Code

自動 ACK

consumer.php

 

$conConfig = [
    'host' => '127.0.0.1',
    'port' => 5672,
    'login' => 'root',
    'password' => 'root',
    'vhost' => '/'
];

try
{
    $con = new AMQPConnection($conConfig);
    $con->connect();
  
    $channel = new AMQPChannel($con);
    $exchange =new AMQPExchange($channel);
    $exchange->setName('test.exchange.ack');
    $exchange->setType('direct');
    $exchange->declareExchange();
    
    $queue = new AMQPQueue($channel);
    $queue->setName('test.ack.queue');
    // 聲明隊列同時返回隊列中的消息數量
    $messageCount = $queue->declareQueue();
    echo '消息數量: ' . $messageCount . PHP_EOL;
    $queue->bind('test.exchange.ack', 'test.rt.ack');

    // 獲取消息后進行自動應答時, get方法的參數設置為AMQP_AUTOACK即可
    while($msgEnvelope = $queue->get(AMQP_AUTOACK))
    {
        $msg = $msgEnvelope->getBody();
        echo $msg . PHP_EOL;
    }
    $con->disconnect();
}catch(Exception $e)
{
    echo $e->getMessage();
}

將 Queue 的 get 方法參數設置為 AMQP_AUTOACK 即可在獲取到消息后自動發送消息已收到響應.

手動 ack

如果不需要自動 ack, 而是根據實際的業務處理結果進行處理. Queue 的 get 方法參數修改為 AMQP_NOPARM 即可.

修改后推送三條消息:

 

連續兩次從隊列中獲取消息:

 

如果不進行 ack, 隊列中的消息將一直存在, 可以反復獲取.

繼續修改 while 循環為:

while($msgEnvelope = $queue->get(AMQP_NOPARAM))
{
    $msg = $msgEnvelope->getBody();
    if(preg_match("/.*?消息2.*?/", $msg))
    {
        // 對消息2執行確定響應
        $queue->ack($msgEnvelope->getDeliveryTag());
    }
    echo date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL;
}

連續執行兩次 comsumer.php:

第一次獲取到 3 條消息, 但第一次執行中對消息 2 執行了確認響應, 剩余消息不進行確認響應. 第二次執行中只獲取到剩余消息.

NACK (否定響應)

如果既不想對消息執行確定響應, 也不需要消息繼續出現在隊列中, 可以使用 Queue 的 nack 方法. 繼續修改 while 循環:

while($msgEnvelope = $queue->get(AMQP_NOPARAM))
{
    $msg = $msgEnvelope->getBody();
    if(preg_match("/.*?消息2.*?/", $msg))
    {
        // 對消息2執行確定響應
        $queue->ack($msgEnvelope->getDeliveryTag());
    }else
    {
        $queue->nack($msgEnvelope->getDeliveryTag());
    }
    echo date('Y-m-d H:i:s') . ' ' . $msg . PHP_EOL;
}

推送 3 條消息后, 連續執行兩次 consumper.php:

第一次執行, 獲取到 3 條數據; 第二次執行未獲取到任何數據. nack 方法除了可以從隊列中過濾掉不需要的方法, 也可以將暫時不需要的方法重新放回隊列, 將該方法的調用修改為:

 

$queue->nack($msgEnvelope->getDeliveryTag(), AMQP_REQUEUE);

注意: nack 方法將消息放回隊列后, 隊列會將消息再次推送給消費者. 如果此時隊列只有一個消費者, 將會造成死循環.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM