RabbitMQ基礎使用


RabbitMQ

MQ(message queue)

優勢:
應用解耦
異步提速
削峰填谷

RabbitMQ簡介

官網

https://www.rabbitmq.com

基礎概念

Broker: 接收和分發消息的應用

Virtual host: 虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離

Connection: publisher/consumer和broker之間的TCP連接

Channel:connectiion的邏輯連接;處理每次訪問rabbitmq都建立一個connection造成巨大開銷。

Exchange: 交換機; 根據分發規則,匹配路由分發消息到queue中。常用類型:direct(point-to-point), topic(publish-subscribe), fanout(multicast)

Queue:隊列

Binding: exchage和queue建立的虛擬連接。

Publisher: 消息生產者

Consumer: 消息消費者

安裝rabbitMQ以及擴展

安裝rabbitMQ

docker方式:

management版本的有可視化管理面板

docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

管理頁面:127.0.0.1:15672

初始用戶/密碼:guest/guest

其他方式:

brew install rabbitmq-c
or
yum install rabbitmq-c
or 
github自行源碼cmake安裝

安裝php的amqp擴展

wget http://pecl.php.net/get/amqp-1.10.2.tgz

解壓后進入目錄:

解壓:
tar -zxvf amqp-1.10.2.tgz

進入目錄后執行: 
phpize

安裝amqp需要librabbitmq依賴,需要指定rabbitmq的安裝目錄, 我這里的目錄是brew安裝后的目錄, 不是mac的可以通過下載rabbit-c后cmake安裝:

下載地址: https://github.com/alanxz/rabbitmq-c/archive/master.zip
安裝請參考: https://blog.csdn.net/weixin_33726313/article/details/91963653

./configure --with-php-config=/Applications/MAMP/bin/php/php7.4.2/bin/php-config -with-amqp --with-librabbitmq-dir=/usr/local/Cellar/rabbitmq-c/0.9.0

make && make install

php.ini中引入擴展:

extension=amqp.so

安裝php-amqplib/php-amqplib

composer require php-amqplib/php-amqplib

rabbitmqctl命令的使用

https://www.rabbitmq.com/rabbitmqctl.8.html

RabbitMQ使用

簡單的使用

說明: 最簡單的處理方式, 發送者發送消息到隊列, 消費者連接隊列消費(這里使用使用默認的exchange)

消息生產者: publisher.php

//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//創建管道
$channel = $connection->channel();

//聲明一個隊列
$queue_name = 'hello';
$channel->queue_declare($queue_name);

//設置消息並發送隊列
$msg = new AMQPMessage('hello world');
$channel->basic_publish($msg, '', $queue_name);

//關閉通道和連接
$channel->close();
$connection->close();

消息消費者: consumer.php

//加載autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//創建管道
$channel = $connection->channel();

//聲明一個隊列
$queue_name = 'hello';
$channel->queue_declare($queue_name);

//定義回調
$callback = function ($msg) {
    echo 'received:' . $msg->body;
};

//消費隊列
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

//監聽回調並執行回調程序
while ($channel->is_consuming()) {
    $channel->wait();
}

執行:

啟動消費者(ctrl+C 斷開): 
php consumer.php

發送隊列:
php publisher.php

work queues(工作隊列)

說明: 發送者發送隊列后多個消費者來分配處理任務

循環調度: 運行多個consumer, publisher發送多個隊列后, RabbitMQ會按順序依次派發任務給consumer;

消息確認: 為了確保消息永不丟失,RabbitMQ支持消息確認。消費者發送回一個確認(acknowledgement)以告知RabbitMQ已經接收,處理了特定的消息,並且RabbitMQ可以自由刪除它;

注意: rabbitmq無法釋放任何未確認的消息,會導致rabbitmq消耗越來越多的內存,這時可以使用rabbitmqctl打印messages_unacknowledged

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

消息持久化: RabbitMQ退出或崩潰時,除非您告訴它,否則它將忘記隊列和消息。確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標記為持久.(注: 對已經存在的隊列不生效)

公平派遣: rabbitmq它只是盲目地將每第n條消息發送給第n個consumer,並不會查看消費者的未確認消息數.這樣有時會導致有的consumer很忙碌,而有的又很閑; 我們可以通過basic_qos來控制;比如在處理並確認上一條消息之前,不要將新消息發送給consumer;

消息生產者: publisher.php

//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//接收參數組合成字符串
$data = implode('', array_splice($argv, 1)) ?? 'default!';
$data = !empty($data) ? $data : 'default!';

//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//創建管道
$channel = $connection->channel();

//聲明一個隊列
$queue_name = 'task2';
//durable參數(第三個): 設置為true時標記為持久,rabbitmq重啟后將依然存在
$channel->queue_declare($queue_name, false, true);

//設置消息並發送隊列(delivery_mode: 設置消息持久性)
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', $queue_name);

//關閉通道和連接
$channel->close();
$connection->close();

消息消費者: consumer.php

//加載autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//創建管道
$channel = $connection->channel();

//聲明一個隊列
$queue_name = 'task2';
$channel->queue_declare($queue_name, false, true);

//定義回調
$callback = function ($msg) {
    echo 'received:' . $msg->body, "\n";
    //模擬程序執行時間
    sleep(5);
    //告訴rabbitmq該消息已經處理
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

//上一個消息未處理完時,不再接收新的消息
$channel->basic_qos(null, 1, null);
//消費隊列(no_ack=false時(第四個),啟用消息確認)
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//監聽回調並執行回調程序
while ($channel->is_consuming()) {
    $channel->wait();
}

//關閉通道和連接
$channel->close();
$connection->close();

publish/subcribe

exchange types(交換類型)

direct: 默認; 與隊列中routing key進行精准匹配

topic: 主題模式, 下面有使用方法

headers: 與direct的模式不同,不是使用routingkey去做綁定。而是通過消息headers的鍵值對匹配

fanout: 將接收到的所有消息廣播到它知道的所有隊列中

消息生產者: publisher

//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//創建管道
$channel = $connection->channel();

//聲明交換機
$exchang_name = 'logs2';
$channel->exchange_declare($exchang_name, 'fanout', false, false, false);


//聲明一個隊列
$queue_name = 'task6';
$channel->queue_declare($queue_name);

//設置消息並發送隊列
$msg = new AMQPMessage('hello world');
$channel->basic_publish($msg, $exchang_name, $queue_name);

//關閉通道和連接
$channel->close();
$connection->close();

消息消費者: consumer

//加載autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//創建管道
$channel = $connection->channel();

//聲明交換機
$exchang_name = 'logs2';
$channel->exchange_declare($exchang_name, 'fanout', false, false, false);

//聲明一個隊列, 隊列名稱隨機
list($queue_name) = $channel->queue_declare("");

//綁定隊列和交換機
$channel->queue_bind($queue_name, $exchang_name);

//定義回調
$callback = function ($msg) {
    echo 'received:' . $msg->body, "\n";
};

//消費隊列
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

//監聽回調並執行回調程序
while ($channel->is_consuming()) {
    $channel->wait();
}

//關閉通道和連接
$channel->close();
$connection->close();

記錄日志

php sub_consumer.php>logs_from_rabbit.log

routing

說明: 隊列和routing綁定后,不在綁定范圍內的將不再處理

消息產生者:

//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//接收參數組合成字符串
$routing_key = $argv[1];    //獲取提示信息
//將第三個參數組合成字符串
$data = implode('', array_splice($argv, 2));
$data = !empty($data) ? $data : 'default!';

//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//創建管道
$channel = $connection->channel();

//聲明交換機
$exchange_name = 'logs_record';
$channel->exchange_declare($exchange_name, 'direct');

//設置消息並發送隊列
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchange_name, $routing_key);

//關閉通道和連接
$channel->close();
$connection->close();

消息消費者:

//加載autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;


//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//創建管道
$channel = $connection->channel();

//聲明交換機
$exchange_name = 'logs_record';
$channel->exchange_declare($exchange_name, 'direct');

//聲明一個隊列
list($queue_name) = $channel->queue_declare("");

//綁定隊列,交換機,routing
$params = array_slice($argv, 1);
foreach ($params as $param) {
    $channel->queue_bind($queue_name, $exchange_name, $param);
}

//定義回調
$callback = function ($msg) {
    echo 'received:' . $msg->delivery_info['routing_key'] . ':' . $msg->body, "\n";
};

//消費隊列
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//監聽回調並執行回調程序
while ($channel->is_consuming()) {
    $channel->wait();
}

//關閉通道和連接
$channel->close();
$connection->close();

topics

說明:代碼和上面的類似.交換機類型換成topic,案例如下

接收所有日志:
php receive_logs_topic.php “#”

要從設施“ kern ”接收所有日志:
php receive_logs_topic.php “ kern.*”

或者,如果您只想聽聽“關鍵”日志:
php receive_logs_topic.php “ *.critical”

您可以創建多個綁定:
php receive_logs_topic.php “ kern.*”  “*.critical”

並發出帶有路由鍵“ kern.critical ”類型的日志:
php emit_log_topic.php "kern.critical" "A critical kernel error"

RPC

略...

laravel結合RabbitMQ

參考: https://blog.csdn.net/weixin_44600422/article/details/106317870

FAQ

報錯: 已經安裝了amqp還是報Class 'PhpAmqplib\Connection\AMQPLazyConnection' not found

1: 檢查php是否安裝了amqp擴展

2: 在使用時一定要先引入自動加載的文件: 如下
require __DIR__.'/../vendor/autoload.php';

報錯: The connection timed out after 3 sec while awaiting incoming data

原因: 由於我使用的是默認的用戶guest登錄, 而該用戶不允許遠程登錄;

解決:

# 進入docker(非docker運行的忽略此步驟)
docker exec -ti rabbitmq /bin/bash

# 增加用戶
rabbitmqctl add_user admin admin

# 設置tag(類似用戶組)
rabbitmqctl set_user_tags admin administrator

# 設置權限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

報錯: make的時候報/usr/bin/ld: cannot find -lrabbitmq錯誤

原因: 找不到庫文件librabbitmq.so

解決: 參考https://www.cnblogs.com/chenhaoyu/p/13925905.html

1: 找到擴展的位置
find / -name librabbitmq.so

2: 將庫文件所在的路徑加入到 /etc/ld.so.conf 尾部,並使之生效
echo '/disk2/temp/rabbitmq-c-master/build/librabbitmq/' >> /etc/ld.so.conf

3: 刷新配置文件使之生效
ldconfig

4: 修改環境變量,加入庫的文件路徑
export LIBRARY_PATH=/disk2/temp/rabbitmq-c-master/build/librabbitmq/:$LIBRARY_PATH

查看是否已經添加: export -p

5: 將上述 export 命令加入到配置文件 ~/.bashrc,使之永久生效。
echo 'LIBRARY_PATH=/disk2/temp/rabbitmq-c-master/build/librabbitmq/:$LIBRARY_PATH' >> ~/.bashrc

6: 刷新配置
source ~/.bashrc

參考資料

https://www.rabbitmq.com/tutorials

https://my.oschina.net/peaksoho/blog/2872689

https://blog.csdn.net/weixin_44600422/article/details/106317870

https://blog.csdn.net/weixin_33726313/article/details/91963653


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM