PHP使用RabbitMQ消息隊列


1、安裝amqp拓展 安裝流程

2、下載工具包 php-amqplib 

composer require php-amqplib/php-amqplib
 
3、代碼操作如下
【消費消息】
 1 <?php 
 2 //配置信息 
 3 $conn_args = array( 
 4     'host' => '127.0.0.1',
 5     'port' => '5672',  
 6     'login' => 'zcw',  
 7     'password' => '123456', 
 8     'vhost'=>'/' 
 9 );   
10 $e_name = 'exchange1'; //交換機名 
11 $q_name = 'queue1'; //隊列名 
12 $k_route = 'route1'; //路由key 
13  
14 //創建連接和channel 
15 $conn = new AMQPConnection($conn_args);   
16 if (!$conn->connect()) {   
17     die("Cannot connect to the broker!\n");   
18 }   
19 $channel = new AMQPChannel($conn);   
20  
21 //創建交換機    
22 $ex = new AMQPExchange($channel);   
23 $ex->setName($e_name); 
24 $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型(常用的有fanout、direct、topic、headers)  
25 $ex->setFlags(AMQP_DURABLE); //持久化 
26 
27    
28 //創建隊列    
29 $q = new AMQPQueue($channel); 
30 $q->setName($q_name);   
31 $q->setFlags(AMQP_DURABLE); //持久化  
32 
33 $total = $q->declareQueue();//獲取所有的消息數量
34  
35 //綁定交換機與隊列,並指定路由鍵 
36 $q->bind($e_name, $k_route); 
37  
38 //1、阻塞模式接收消息 
39 while(True){ 
40     $q->consume('processMessage');   
41     //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答  
42 } 
43 
44 //2 非阻塞模式接收消息 可定時調用
45 //if($total){
46 //    for($i=0;$i<$total;$i++){
47 //        $envelope = $q->get();
48 //        if($envelope){
49 //             $msg = $envelope->getBody(); 
50 //             echo $msg."\n"; //處理消息 
51 //             $q->ack($envelope->getDeliveryTag()); //手動發送ACK應答
52 //        }
53 //    }
54 //}
55 
56 $conn->disconnect();   
57  
58 /**
59  * 消費回調函數
60  * 處理消息
61  */ 
62 function processMessage($envelope, $queue) { 
63     $msg = $envelope->getBody(); 
64     echo $msg."\n"; //處理消息 
65     $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答
66 }
67 ?>

 

【生產消息】

 1 <?php
 2 //配置信息 
 3 $conn_args = array( 
 4     'host' => '127.0.0.1',  
 5     'port' => '5672',  
 6     'login' => 'zcw',
 7     'password' => '123456',
 8     'vhost'=>'/' 
 9 );   
10 $e_name = 'exchange1'; //交換機名 
11 //$q_name = 'queue1'; //無需隊列名 
12 $k_route = 'route1'; //路由key
13  
14 //創建連接和channel 
15 $conn = new AMQPConnection($conn_args);   
16 if (!$conn->connect()) {   
17     die("Cannot connect to the broker!\n");   
18 }   
19 $channel = new AMQPChannel($conn);   
20 //創建交換機對象    
21 $ex = new AMQPExchange($channel);   
22 $ex->setName($e_name);   
23 //發送消息 
24 //$channel->startTransaction(); //開始事務  
25 for($i=0; $i<5; ++$i){
26    $ex->publish($message, $k_route)."\n";
27 }
28  
29 //$channel->commitTransaction(); //提交事務 
30  
31 $conn->disconnect();
32 
33 ?>

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM