RabbitMQ
MQ(message queue)
優勢:
應用解耦
異步提速
削峰填谷
RabbitMQ簡介
官網
基礎概念
Broker: 接收和分發消息的應用
Virtual host: 虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離
Connection: publisher/consumer和broker之間的TCP連接
Channel:connectiion的邏輯連接;處理每次訪問rabbitmq都建立一個connection造成巨大開銷。
Exchange: 交換機; 根據分發規則,匹配路由分發消息到queue中。常用類型:direct(point-to-point), topic(publish-subscribe), fanout(multicast)
Queue:隊列
Binding: exchage和queue建立的虛擬連接。
Publisher: 消息生產者
Consumer: 消息消費者
安裝rabbitMQ以及擴展
安裝rabbitMQ
docker方式:
management版本的有可視化管理面板
docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
管理頁面:127.0.0.1:15672
初始用戶/密碼:guest/guest
其他方式:
brew install rabbitmq-c
or
yum install rabbitmq-c
or
github自行源碼cmake安裝
安裝php的amqp擴展
wget http://pecl.php.net/get/amqp-1.10.2.tgz
解壓后進入目錄:
解壓:
tar -zxvf amqp-1.10.2.tgz
進入目錄后執行:
phpize
安裝amqp需要librabbitmq依賴,需要指定rabbitmq的安裝目錄, 我這里的目錄是brew安裝后的目錄, 不是mac的可以通過下載rabbit-c后cmake安裝:
下載地址: https://github.com/alanxz/rabbitmq-c/archive/master.zip
安裝請參考: https://blog.csdn.net/weixin_33726313/article/details/91963653
./configure --with-php-config=/Applications/MAMP/bin/php/php7.4.2/bin/php-config -with-amqp --with-librabbitmq-dir=/usr/local/Cellar/rabbitmq-c/0.9.0
make && make install
php.ini中引入擴展:
extension=amqp.so
安裝php-amqplib/php-amqplib
composer require php-amqplib/php-amqplib
rabbitmqctl命令的使用
RabbitMQ使用
簡單的使用
說明: 最簡單的處理方式, 發送者發送消息到隊列, 消費者連接隊列消費(這里使用使用默認的exchange)
消息生產者: publisher.php
//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//創建管道
$channel = $connection->channel();
//聲明一個隊列
$queue_name = 'hello';
$channel->queue_declare($queue_name);
//設置消息並發送隊列
$msg = new AMQPMessage('hello world');
$channel->basic_publish($msg, '', $queue_name);
//關閉通道和連接
$channel->close();
$connection->close();
消息消費者: consumer.php
//加載autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//創建管道
$channel = $connection->channel();
//聲明一個隊列
$queue_name = 'hello';
$channel->queue_declare($queue_name);
//定義回調
$callback = function ($msg) {
echo 'received:' . $msg->body;
};
//消費隊列
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
//監聽回調並執行回調程序
while ($channel->is_consuming()) {
$channel->wait();
}
執行:
啟動消費者(ctrl+C 斷開):
php consumer.php
發送隊列:
php publisher.php
work queues(工作隊列)
說明: 發送者發送隊列后多個消費者來分配處理任務
循環調度: 運行多個consumer, publisher發送多個隊列后, RabbitMQ會按順序依次派發任務給consumer;
消息確認: 為了確保消息永不丟失,RabbitMQ支持消息確認。消費者發送回一個確認(acknowledgement)以告知RabbitMQ已經接收,處理了特定的消息,並且RabbitMQ可以自由刪除它;
注意: rabbitmq無法釋放任何未確認的消息,會導致rabbitmq消耗越來越多的內存,這時可以使用rabbitmqctl打印messages_unacknowledged
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化: RabbitMQ退出或崩潰時,除非您告訴它,否則它將忘記隊列和消息。確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標記為持久.(注: 對已經存在的隊列不生效)
公平派遣: rabbitmq它只是盲目地將每第n條消息發送給第n個consumer,並不會查看消費者的未確認消息數.這樣有時會導致有的consumer很忙碌,而有的又很閑; 我們可以通過basic_qos來控制;比如在處理並確認上一條消息之前,不要將新消息發送給consumer;
消息生產者: publisher.php
//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//接收參數組合成字符串
$data = implode('', array_splice($argv, 1)) ?? 'default!';
$data = !empty($data) ? $data : 'default!';
//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//創建管道
$channel = $connection->channel();
//聲明一個隊列
$queue_name = 'task2';
//durable參數(第三個): 設置為true時標記為持久,rabbitmq重啟后將依然存在
$channel->queue_declare($queue_name, false, true);
//設置消息並發送隊列(delivery_mode: 設置消息持久性)
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', $queue_name);
//關閉通道和連接
$channel->close();
$connection->close();
消息消費者: consumer.php
//加載autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//創建管道
$channel = $connection->channel();
//聲明一個隊列
$queue_name = 'task2';
$channel->queue_declare($queue_name, false, true);
//定義回調
$callback = function ($msg) {
echo 'received:' . $msg->body, "\n";
//模擬程序執行時間
sleep(5);
//告訴rabbitmq該消息已經處理
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//上一個消息未處理完時,不再接收新的消息
$channel->basic_qos(null, 1, null);
//消費隊列(no_ack=false時(第四個),啟用消息確認)
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//監聽回調並執行回調程序
while ($channel->is_consuming()) {
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
publish/subcribe
exchange types(交換類型)
direct: 默認; 與隊列中routing key進行精准匹配
topic: 主題模式, 下面有使用方法
headers: 與direct的模式不同,不是使用routingkey去做綁定。而是通過消息headers的鍵值對匹配
fanout: 將接收到的所有消息廣播到它知道的所有隊列中
消息生產者: publisher
//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//創建管道
$channel = $connection->channel();
//聲明交換機
$exchang_name = 'logs2';
$channel->exchange_declare($exchang_name, 'fanout', false, false, false);
//聲明一個隊列
$queue_name = 'task6';
$channel->queue_declare($queue_name);
//設置消息並發送隊列
$msg = new AMQPMessage('hello world');
$channel->basic_publish($msg, $exchang_name, $queue_name);
//關閉通道和連接
$channel->close();
$connection->close();
消息消費者: consumer
//加載autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//創建管道
$channel = $connection->channel();
//聲明交換機
$exchang_name = 'logs2';
$channel->exchange_declare($exchang_name, 'fanout', false, false, false);
//聲明一個隊列, 隊列名稱隨機
list($queue_name) = $channel->queue_declare("");
//綁定隊列和交換機
$channel->queue_bind($queue_name, $exchang_name);
//定義回調
$callback = function ($msg) {
echo 'received:' . $msg->body, "\n";
};
//消費隊列
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
//監聽回調並執行回調程序
while ($channel->is_consuming()) {
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
記錄日志
php sub_consumer.php>logs_from_rabbit.log
routing
說明: 隊列和routing綁定后,不在綁定范圍內的將不再處理
消息產生者:
//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//接收參數組合成字符串
$routing_key = $argv[1]; //獲取提示信息
//將第三個參數組合成字符串
$data = implode('', array_splice($argv, 2));
$data = !empty($data) ? $data : 'default!';
//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//創建管道
$channel = $connection->channel();
//聲明交換機
$exchange_name = 'logs_record';
$channel->exchange_declare($exchange_name, 'direct');
//設置消息並發送隊列
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchange_name, $routing_key);
//關閉通道和連接
$channel->close();
$connection->close();
消息消費者:
//加載autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
//連接
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//創建管道
$channel = $connection->channel();
//聲明交換機
$exchange_name = 'logs_record';
$channel->exchange_declare($exchange_name, 'direct');
//聲明一個隊列
list($queue_name) = $channel->queue_declare("");
//綁定隊列,交換機,routing
$params = array_slice($argv, 1);
foreach ($params as $param) {
$channel->queue_bind($queue_name, $exchange_name, $param);
}
//定義回調
$callback = function ($msg) {
echo 'received:' . $msg->delivery_info['routing_key'] . ':' . $msg->body, "\n";
};
//消費隊列
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//監聽回調並執行回調程序
while ($channel->is_consuming()) {
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
topics
說明:代碼和上面的類似.交換機類型換成topic
,案例如下
接收所有日志:
php receive_logs_topic.php “#”
要從設施“ kern ”接收所有日志:
php receive_logs_topic.php “ kern.*”
或者,如果您只想聽聽“關鍵”日志:
php receive_logs_topic.php “ *.critical”
您可以創建多個綁定:
php receive_logs_topic.php “ kern.*” “*.critical”
並發出帶有路由鍵“ kern.critical ”類型的日志:
php emit_log_topic.php "kern.critical" "A critical kernel error"
RPC
略...
laravel結合RabbitMQ
參考: https://blog.csdn.net/weixin_44600422/article/details/106317870
FAQ
報錯: 已經安裝了amqp還是報Class 'PhpAmqplib\Connection\AMQPLazyConnection' not found
1: 檢查php是否安裝了amqp擴展
2: 在使用時一定要先引入自動加載的文件: 如下
require __DIR__.'/../vendor/autoload.php';
報錯: The connection timed out after 3 sec while awaiting incoming data
原因: 由於我使用的是默認的用戶guest登錄, 而該用戶不允許遠程登錄;
解決:
# 進入docker(非docker運行的忽略此步驟)
docker exec -ti rabbitmq /bin/bash
# 增加用戶
rabbitmqctl add_user admin admin
# 設置tag(類似用戶組)
rabbitmqctl set_user_tags admin administrator
# 設置權限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
報錯: make的時候報/usr/bin/ld: cannot find -lrabbitmq錯誤
原因: 找不到庫文件librabbitmq.so
解決: 參考https://www.cnblogs.com/chenhaoyu/p/13925905.html
1: 找到擴展的位置
find / -name librabbitmq.so
2: 將庫文件所在的路徑加入到 /etc/ld.so.conf 尾部,並使之生效
echo '/disk2/temp/rabbitmq-c-master/build/librabbitmq/' >> /etc/ld.so.conf
3: 刷新配置文件使之生效
ldconfig
4: 修改環境變量,加入庫的文件路徑
export LIBRARY_PATH=/disk2/temp/rabbitmq-c-master/build/librabbitmq/:$LIBRARY_PATH
查看是否已經添加: export -p
5: 將上述 export 命令加入到配置文件 ~/.bashrc,使之永久生效。
echo 'LIBRARY_PATH=/disk2/temp/rabbitmq-c-master/build/librabbitmq/:$LIBRARY_PATH' >> ~/.bashrc
6: 刷新配置
source ~/.bashrc
參考資料
https://blog.csdn.net/weixin_44600422/article/details/106317870
https://blog.csdn.net/weixin_33726313/article/details/91963653