RabbitMQ 入門教程(PHP版) 簡單Demo


RabbitMQ的關鍵字說明

(1)Broker:經紀人。提供一種傳輸服務,維護一條從生產者到消費者的傳輸線路,保證消息數據能按照指定的方式傳輸。粗略的可以將圖中的RabbitMQ Server當作Broker。
(2)Exchange:消息交換機。指定消息按照什么規則路由到哪個隊列Queue。
(3)Queue:消息隊列。消息的載體,每條消息都會被投送到一個或多個隊列中。
(4)Binding:綁定。作用就是將Exchange和Queue按照某種路由規則綁定起來。
(5)RoutingKey:路由關鍵字。Exchange根據RoutingKey進行消息投遞。
(6)Vhost:虛擬主機。一個Broker可以有多個虛擬主機,用作不同用戶的權限分離。一個虛擬主機持有一組Exchange、Queue和Binding。
(7)Producer:消息生產者。主要將消息投遞到對應的Exchange上面。一般是獨立的程序。
(8)Consumer:消息消費者。消息的接收者,一般是獨立的程序。
(9)Channel:消息通道,也稱信道。在客戶端的每個連接里可以建立多個Channel,每個Channel代表一個會話任務。

RabbitMQ的使用流程

(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好Binding關系。
(5)生產者客戶端投遞消息到exchange。
(6)exchange接收到消息后,就根據消息的RoutingKey和已經設置的binding,進行消息路由(投遞),將消息投遞到一個或多個隊列里。
(7)消費者客戶端從對應的隊列中獲取並處理消息。

 

 

原理流程

生產者主要做的是:創建連接-->創建channel-->創建交換機對象-->發送消息

消費者主要做的是:創建連接-->創建channel-->創建交換機-->創建隊列-->綁定交換機/隊列/路由鍵-->接收消息


demo1:

創建mysql表

CREATE TABLE `rabbitmq_table` (
`id` int(11) NOT NULL AUTO_INCREMENT,
  `sJson` varchar(255) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;

consumer1.php

<?php


/*
 * 創建表數據
CREATE TABLE `rabbitmq_table` (
`id` int(11) NOT NULL AUTO_INCREMENT,
  `sJson` varchar(255) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;

*/

$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);

$e_name = 'e_linvo'; //交換機名
$q_name = 'q_linvo'; //隊列名
$k_route = 'key_2'; //路由key

//創建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//創建交換機
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$ex->setFlags(AMQP_DURABLE); //持久化
//$ex->declare();
$ex->declareExchange();

//創建隊列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
//$q->declare();     //最好隊列object在這里declare()下,否則如果是新的queue會報錯
$q->declareQueue();     //最好隊列object在這里declare()下,否則如果是新的queue會報錯

//綁定交換機與隊列,並指定路由鍵,可以多個路由鍵
$q->bind($e_name, 'key_1');

echo "Message:".PHP_EOL;
//阻塞模式接收消息
$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答

//非阻塞模式接收消息
/*while(True){
    //消息獲取
    $arr = $q->get();
    $res = $q->ack($arr->getDeliveryTag());     //手動發送ACK應答
    $msg = $arr->getBody();
    mysql_insert($msg);
    log_insert($msg);
}*/

$conn->disconnect();

/**
 * 消費回調函數
 * 處理消息
 */
function processMessage($envelope, $queue) {
    var_dump($envelope->getRoutingKey());
    $msg = $envelope->getBody();
    mysql_insert($msg);
    log_insert($msg);
}

//插入日志
function log_insert($json){
    file_put_contents('./consumer1.txt',$json.PHP_EOL,FILE_APPEND );
}
//消息入庫
function mysql_insert($json){
    //連接MySQL數據庫
    $pdo = new PDO("mysql:host=localhost;dbname=test","root","168168" );
    $pdo->query('SET NAMES UTF8MB4');//設置UTF8字符編碼
//    $pdo->query('SET NAMES UTF8');
    $sql = "insert into `rabbitmq_table` (sJson) values ('{$json}')";
    echo $sql.PHP_EOL;
    if ($pdo->exec($sql)){
        echo "mysql insert success".PHP_EOL;
    } else {
        echo "mysql insert fail".PHP_EOL;
    }
}

publisher1.php

<?php
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);

//創建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//創建交換機
$e_name = 'e_linvo'; //交換機名
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declareExchange().PHP_EOL;


echo "Send Message:".$ex->publish("rabbitmq消息測試,key_1" . date('H:i:s', time()), 'key_1').PHP_EOL;
//echo "Send Message:".$ex->publish("rabbitmq消息測試,key_2 by xust" . date('H:i:s', time()), 'key_2').PHP_EOL;

使用方法:先運行consumer1.php,再運行publisher1.php

運行效果:

 

 demo2:

consumer2.php

<?php
//配置信息
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交換機名
$q_name = 'q_linvo'; //隊列名
$k_route = 'key_1'; //路由key

//創建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//創建交換機
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declareExchange()."\n";

//創建隊列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declareQueue()."\n";

//綁定交換機與隊列,並指定路由鍵
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";

//阻塞模式接收消息
echo "Message:".PHP_EOL;
while(True){
    $q->consume('processMessage');
    //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答
}
$conn->disconnect();

/**
 * 消費回調函數
 * 處理消息
 */
function processMessage($envelope, $queue) {
    $msg = $envelope->getBody();
    echo $msg.PHP_EOL; //處理消息
    $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答
}

publisher2.php

<?php
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交換機名
//$q_name = 'q_linvo'; //無需隊列名
$k_route = 'key_1'; //路由key

//創建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//消息內容
$message = json_encode(['msg'=>"RabbitMQ消息發送成功~~",'order_id'=>time()],JSON_UNESCAPED_UNICODE);

//創建交換機對象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);

//發送消息
//$channel->startTransaction(); //開始事務
for($i=1; $i<=5; ++$i){
    echo "Send Message:".$ex->publish($message.$i, $k_route).PHP_EOL;
    sleep(1);
}
//$channel->commitTransaction(); //提交事務

$conn->disconnect();

使用方法:先運行consumer2.php,再運行publisher2.php

運行效果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM