1、安裝amqp拓展 安裝流程
2、下載工具包 php-amqplib
composer require php-amqplib/php-amqplib
3、代碼操作如下
【消費消息】
1 <?php 2 //配置信息 3 $conn_args = array( 4 'host' => '127.0.0.1', 5 'port' => '5672', 6 'login' => 'zcw', 7 'password' => '123456', 8 'vhost'=>'/' 9 ); 10 $e_name = 'exchange1'; //交換機名 11 $q_name = 'queue1'; //隊列名 12 $k_route = 'route1'; //路由key 13 14 //創建連接和channel 15 $conn = new AMQPConnection($conn_args); 16 if (!$conn->connect()) { 17 die("Cannot connect to the broker!\n"); 18 } 19 $channel = new AMQPChannel($conn); 20 21 //創建交換機 22 $ex = new AMQPExchange($channel); 23 $ex->setName($e_name); 24 $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型(常用的有fanout、direct、topic、headers) 25 $ex->setFlags(AMQP_DURABLE); //持久化 26 27 28 //創建隊列 29 $q = new AMQPQueue($channel); 30 $q->setName($q_name); 31 $q->setFlags(AMQP_DURABLE); //持久化 32 33 $total = $q->declareQueue();//獲取所有的消息數量 34 35 //綁定交換機與隊列,並指定路由鍵 36 $q->bind($e_name, $k_route); 37 38 //1、阻塞模式接收消息 39 while(True){ 40 $q->consume('processMessage'); 41 //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答 42 } 43 44 //2 非阻塞模式接收消息 可定時調用 45 //if($total){ 46 // for($i=0;$i<$total;$i++){ 47 // $envelope = $q->get(); 48 // if($envelope){ 49 // $msg = $envelope->getBody(); 50 // echo $msg."\n"; //處理消息 51 // $q->ack($envelope->getDeliveryTag()); //手動發送ACK應答 52 // } 53 // } 54 //} 55 56 $conn->disconnect(); 57 58 /** 59 * 消費回調函數 60 * 處理消息 61 */ 62 function processMessage($envelope, $queue) { 63 $msg = $envelope->getBody(); 64 echo $msg."\n"; //處理消息 65 $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答 66 } 67 ?>
【生產消息】
1 <?php 2 //配置信息 3 $conn_args = array( 4 'host' => '127.0.0.1', 5 'port' => '5672', 6 'login' => 'zcw', 7 'password' => '123456', 8 'vhost'=>'/' 9 ); 10 $e_name = 'exchange1'; //交換機名 11 //$q_name = 'queue1'; //無需隊列名 12 $k_route = 'route1'; //路由key 13 14 //創建連接和channel 15 $conn = new AMQPConnection($conn_args); 16 if (!$conn->connect()) { 17 die("Cannot connect to the broker!\n"); 18 } 19 $channel = new AMQPChannel($conn); 20 //創建交換機對象 21 $ex = new AMQPExchange($channel); 22 $ex->setName($e_name); 23 //發送消息 24 //$channel->startTransaction(); //開始事務 25 for($i=0; $i<5; ++$i){ 26 $ex->publish($message, $k_route)."\n"; 27 } 28 29 //$channel->commitTransaction(); //提交事務 30 31 $conn->disconnect(); 32 33 ?>