相關博文:
CentOS6.9安裝RabbitMQ和源碼編譯安裝php的RabbitMQ擴展
RabbitMQ入門基礎
CentOS7源碼編譯安裝nginx+php7.2+mysql5.7並使用systemctl管理
RabbitMQ的安裝過程,工作流程,和一些基礎概念已經在前面的筆記中提到了,今天在本地實現了php連接RabbitMQ,以及消息的生產和消費的過程,首先看下沒有生產者和消費者的默認RabbitMQ管理界面截圖:
Connections:
還沒有任何連接(Connections)
Channels:
還沒有任何通道(Channels)
Exchanges:
交換機只有系統默認的
Queues:
還沒有任何隊列
先上消費者代碼consumer.php
<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/15 * Time: 13:16 */ //聲明連接參數 $config = array( 'host' => '192.168.75.132', 'vhost' => '/', 'port' => 5672, 'login' => 'test', 'password' => 'test' ); //連接broker $cnn = new AMQPConnection($config); if (!$cnn->connect()) { echo "Cannot connect to the broker"; exit(); } //在連接內創建一個通道 $ch = new AMQPChannel($cnn); //創建一個交換機 $ex = new AMQPExchange($ch); //聲明路由鍵 $routingKey = 'key_1'; //聲明交換機名稱 $exchangeName = 'exchange_1'; //設置交換機名稱 $ex->setName($exchangeName); //設置交換機類型 //AMQP_EX_TYPE_DIRECT:直連交換機 //AMQP_EX_TYPE_FANOUT:扇形交換機 //AMQP_EX_TYPE_HEADERS:頭交換機 //AMQP_EX_TYPE_TOPIC:主題交換機 $ex->setType(AMQP_EX_TYPE_DIRECT); //設置交換機持久 $ex->setFlags(AMQP_DURABLE); //聲明交換機 $ex->declareExchange(); //創建一個消息隊列 $q = new AMQPQueue($ch); //設置隊列名稱 $q->setName('queue_1'); //設置隊列持久 $q->setFlags(AMQP_DURABLE); //聲明消息隊列 $q->declareQueue(); //交換機和隊列通過$routingKey進行綁定 $q->bind($ex->getName(), $routingKey); //接收消息並進行處理的回調方法 function receive($envelope, $queue) { //休眠兩秒, sleep(2); //echo消息內容 echo $envelope->getBody()."\n"; //顯式確認,隊列收到消費者顯式確認后,會刪除該消息 $queue->ack($envelope->getDeliveryTag()); } //設置消息隊列消費者回調方法,並進行阻塞 $q->consume("receive"); //$q->consume("receive", AMQP_AUTOACK);//隱式確認,不推薦
以上是消費者代碼,打開兩個命令行/終端
輸入php consumer.php
,消費者開始阻塞獲取消息,如下圖
此時再看RabbitMQ管理界面:
Connections
出現兩個連接,這兩個就是消費者,因為他們在阻塞着等待消息
Channels
消費者在各自的連接里都打開了一個通道
Exchanges
其中一個消費者創建了一個持久的直連交換機
Queues
消息隊列已經創建,但消息數是0,因為此時還沒有生產者
生產者代碼publisher.php
<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/15 * Time: 13:15 */ $config = array( 'host' => '192.168.75.132', 'vhost' => '/', 'port' => 5672, 'login' => 'test', 'password' => 'test' ); $cnn = new AMQPConnection($config); if (!$cnn->connect()) { echo "Cannot connect to the broker"; exit(); } $ch = new AMQPChannel($cnn); $ex = new AMQPExchange($ch); //消息的路由鍵,一定要和消費者端一致 $routingKey = 'key_1'; //交換機名稱,一定要和消費者端一致, $exchangeName = 'exchange_1'; $ex->setName($exchangeName); $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE); $ex->declareExchange(); //創建10個消息 for ($i=1;$i<=10;$i++){ //消息內容 $msg = array( 'data' => 'message_'.$i, 'hello' => 'world', ); //發送消息到交換機,並返回發送結果 //delivery_mode:2聲明消息持久,持久的隊列+持久的消息在RabbitMQ重啟后才不會丟失 echo "Send Message:".$ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2))."\n"; //代碼執行完畢后進程會自動退出 }
以上是生產者代碼
在執行之前,先關掉前面的兩個消費者,打開一個命令行/終端,輸入php publisher.php
,由於生產者不需要阻塞,執行完進程便退出,所以現在RabbitMQ管理界面中既沒有Connections也沒有Channels,但是Queues已經被Exchanges投遞過去了10條消息,如下圖:
因為我們執行生產者之前已經關掉了全部消費者,所以此時消息在隊列中等待獲取;
因為在發送消息時設置了delivery_mode:2
來聲明消息持久化,此時如果重啟RabbitMQ,消息還會恢復;此時重新執行消費者,假設還是兩個,打開兩個命令行/終端,輸入php consumer.php
,我們可以看到消息被消費,如下圖:
提醒:生產者在生產消息時,如果不存在指定隊列,並且沒有創建隊列,或者隊列存在但消息路由鍵和交換機與隊列綁定的鍵(路由規則)不一致(直連交換機必須一致),則消息會被交換機丟棄。
原文地址:PHP使用RabbitMQ實例