PHP 下基於 php-amqp 擴展的 RabbitMQ 簡單用例 (四) -- Push API 和 Pull API


RabbitMQ 中針對消息的分發提供了 Push API (訂閱模式) Pull API (主動獲取) 兩種模式. 在 PHP 中, 這兩種模式分別通過 AMQPQueue 類中的 consume get 方法實現.

Push API -- consume 方法

consume 簡單示例

$conConfig = [
    'host' => '127.0.0.1',
    'port' => 5672,
    'login' => 'root',
    'password' => 'root',
    'vhost' => '/'
];

try
{
    $con = new AMQPConnection($conConfig);
    $con->connect();
    if(!$con->isConnected())
    {
        echo '連接失敗';die;
    }
    
    $channel = new AMQPChannel($con);
    $exchange = new AMQPExchange($channel);
    $exchange->setType('fanout');
    $exchange->setName('test.ex1');
    $exchange->declareExchange();
    
    $queue = new AMQPQueue($channel);
    $queue->setName('test.consume');
    $queue->declareQueue();
    $queue->bind($exchange->getName());
    $queue->consume(function($envelope, $queue)
    {
        echo date('Y-m-d H:i:s') . ': ' . $envelope->getBody() . PHP_EOL;
    }, AMQP_AUTOACK);
    
    $con->disconnect();
}catch(Exception $e)
{
    echo $e->getMessage();
}

通過 consume 方法通過輪詢方式持續從隊列獲取消息. 當有新消息從交換機分發到隊列時, 客戶端會自動處理新消息, 不用再主動地從隊列請求獲取消息.

Pull API -- get 方法

get 示例

$conConfig = [
    'host' => '127.0.0.1',
    'port' => 5672,
    'login' => 'root',
    'password' => 'root',
    'vhost' => '/'
];

try
{
    $con = new AMQPConnection($conConfig);
    $con->connect();
    if(!$con->isConnected())
    {
        echo '連接失敗';die;
    }
    
    $channel = new AMQPChannel($con);
    $exchange = new AMQPExchange($channel);
    $exchange->setType('fanout');
    $exchange->setName('test.ex1');
    $exchange->declareExchange();
    
    $queue = new AMQPQueue($channel);
    $queue->setName('test.get');
    $queue->declareQueue();
    $msgEnvelope = $queue->get(AMQP_AUTOACK);
    if($msgEnvelope)
    {
        $msg = $msgEnvelope->getBody();
        echo $msg . PHP_EOL;
    }
    
    $con->disconnect();
}catch(Exception $e)
{
    echo $e->getMessage();
}

get 方式是主動從隊列獲取消息. 除非主動發起請求, 否則隊列中的新消息不會推送到客戶端.

producer 示例

header('Content-Type: text/html; charset=utf-8');
// 連接設置
$conConfig = [
    'host' => '127.0.0.1',
    'port' => 5672,
    'login' => 'root',
    'password' => 'root',
    'vhost' => '/'
];
try
{
    // RabbitMQ 連接實例
    $con = new AMQPConnection($conConfig);
    // 發起連接
    $con->connect();
    // 判斷連接結果,true成功,false失敗
    if(!$con->isConnected())
    {
        echo '連接失敗';die;
    }
    // 新建通道
    $channel = new AMQPChannel($con);
    // 使用RabbitMQ的默認Exchange
    $exchange = new AMQPExchange($channel);
    $exchange->setType('fanout');
    $exchange->setName('test.ex1');
    $exchange->declareExchange();
    $i = 1;
    while(1)
    {
        $state = $exchange->publish('消息' . $i++);
        if($state)
        {
            echo 'Success' . PHP_EOL;
        }else
        {
            echo 'Fail' . PHP_EOL;
        }
        sleep(1);
    }
    
    // 關閉連接
    $con->disconnect();
}catch(Exception $e)
{
    echo $e->getMessage();
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM