PHP使用RabbitMQ實例


相關博文:
CentOS6.9安裝RabbitMQ和源碼編譯安裝php的RabbitMQ擴展
RabbitMQ入門基礎
CentOS7源碼編譯安裝nginx+php7.2+mysql5.7並使用systemctl管理
RabbitMQ的安裝過程,工作流程,和一些基礎概念已經在前面的筆記中提到了,今天在本地實現了php連接RabbitMQ,以及消息的生產和消費的過程,首先看下沒有生產者和消費者的默認RabbitMQ管理界面截圖:
Connections:

還沒有任何連接(Connections)
Channels:

還沒有任何通道(Channels)
Exchanges:

交換機只有系統默認的
Queues:

還沒有任何隊列
先上消費者代碼consumer.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/15
 * Time: 13:16
 */
//聲明連接參數
$config = array(
    'host' => '192.168.75.132',
    'vhost' => '/',
    'port' => 5672,
    'login' => 'test',
    'password' => 'test'
);
//連接broker
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
    echo "Cannot connect to the broker";
    exit();
}
//在連接內創建一個通道
$ch = new AMQPChannel($cnn);
//創建一個交換機
$ex = new AMQPExchange($ch);
//聲明路由鍵
$routingKey = 'key_1';
//聲明交換機名稱
$exchangeName = 'exchange_1';
//設置交換機名稱
$ex->setName($exchangeName);
//設置交換機類型
//AMQP_EX_TYPE_DIRECT:直連交換機
//AMQP_EX_TYPE_FANOUT:扇形交換機
//AMQP_EX_TYPE_HEADERS:頭交換機
//AMQP_EX_TYPE_TOPIC:主題交換機
$ex->setType(AMQP_EX_TYPE_DIRECT);
//設置交換機持久
$ex->setFlags(AMQP_DURABLE);
//聲明交換機
$ex->declareExchange();
//創建一個消息隊列
$q = new AMQPQueue($ch);
//設置隊列名稱
$q->setName('queue_1');
//設置隊列持久
$q->setFlags(AMQP_DURABLE);
//聲明消息隊列
$q->declareQueue();
//交換機和隊列通過$routingKey進行綁定
$q->bind($ex->getName(), $routingKey);
//接收消息並進行處理的回調方法
function receive($envelope, $queue) {
    //休眠兩秒,
    sleep(2);
    //echo消息內容
    echo $envelope->getBody()."\n";
    //顯式確認,隊列收到消費者顯式確認后,會刪除該消息
    $queue->ack($envelope->getDeliveryTag());
}
//設置消息隊列消費者回調方法,並進行阻塞
$q->consume("receive");
//$q->consume("receive", AMQP_AUTOACK);//隱式確認,不推薦

 

以上是消費者代碼,打開兩個命令行/終端
輸入php consumer.php,消費者開始阻塞獲取消息,如下圖

此時再看RabbitMQ管理界面:
Connections

出現兩個連接,這兩個就是消費者,因為他們在阻塞着等待消息
Channels

消費者在各自的連接里都打開了一個通道
Exchanges

其中一個消費者創建了一個持久的直連交換機
Queues

消息隊列已經創建,但消息數是0,因為此時還沒有生產者
生產者代碼publisher.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/15
 * Time: 13:15
 */

$config = array(
    'host' => '192.168.75.132',
    'vhost' => '/',
    'port' => 5672,
    'login' => 'test',
    'password' => 'test'
);
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
    echo "Cannot connect to the broker";
    exit();
}
$ch = new AMQPChannel($cnn);
$ex = new AMQPExchange($ch);
//消息的路由鍵,一定要和消費者端一致
$routingKey = 'key_1';
//交換機名稱,一定要和消費者端一致,
$exchangeName = 'exchange_1';
$ex->setName($exchangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
//創建10個消息
for ($i=1;$i<=10;$i++){
    //消息內容
    $msg = array(
        'data'  => 'message_'.$i,
        'hello' => 'world',
    );
    //發送消息到交換機,並返回發送結果
    //delivery_mode:2聲明消息持久,持久的隊列+持久的消息在RabbitMQ重啟后才不會丟失
    echo "Send Message:".$ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";
    //代碼執行完畢后進程會自動退出
}

 

 

以上是生產者代碼
在執行之前,先關掉前面的兩個消費者,打開一個命令行/終端,輸入php publisher.php,由於生產者不需要阻塞,執行完進程便退出,所以現在RabbitMQ管理界面中既沒有Connections也沒有Channels,但是Queues已經被Exchanges投遞過去了10條消息,如下圖:

因為我們執行生產者之前已經關掉了全部消費者,所以此時消息在隊列中等待獲取;
因為在發送消息時設置了delivery_mode:2來聲明消息持久化,此時如果重啟RabbitMQ,消息還會恢復;此時重新執行消費者,假設還是兩個,打開兩個命令行/終端,輸入php consumer.php,我們可以看到消息被消費,如下圖:

提醒:生產者在生產消息時,如果不存在指定隊列,並且沒有創建隊列,或者隊列存在但消息路由鍵和交換機與隊列綁定的鍵(路由規則)不一致(直連交換機必須一致),則消息會被交換機丟棄。
原文地址:PHP使用RabbitMQ實例


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM