一、RabbitMQ安裝
- windows安裝
下載地址:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3.exe
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-windows-3.8.3.zip
2.linux安裝
下載地址:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server_3.8.3-1_all.deb
Centos8:
Centos7:
Centos6:
OpenSUSE:
SLEX 11x:
二、RabbitMQ配置
- 下載Erlang並安裝
Erlang下載地址:https://www.erlang.org/downloads
按照你的自己的環境,下載合適的版本並安裝即可。
windows相對來說安裝比較簡單,和普通軟件的安裝沒有區別,按照提示向下進行即可。
Linux安裝:需要下載源碼,並進行編譯安裝,這個會在另外的文章中進行說明
三、RabbitMQ原理與消息隊列
- 消息:是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。
- 消息隊列:消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到消息隊列中而不用管誰來取,消息使用者只管從消息隊列中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。
- 用途:在不同的進程或線程之間進行通訊。
- 主流的消息隊列:
RabbitMQ、ActiveMq、ZeroMq、kafka、RocketMQ(阿里開源)
本文主要介紹RabbitMQ。
RabbitMQ是應用程序之間通訊的一種方法,是建立在AMQP協議基礎上的,使用Erlang語言開發的,完整且可復制的企業消息系統。
支持系統:windows,linux,macOX等。
開發語言支持:ruby,python,java,php,. net,C/C++,Node.js等。
AMQP:Advanced Message Queue Protocol(高級消息隊列協議),是一個提供統一消息服務的應用層標准高級消息隊列協議。
四、RabbitMQ特性
RabbitMQ起源於金融系統,用於在分布式系統中存儲並轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
具體特性包括:
- 可靠性(Reliability):使用一些機制來保證可靠性,如:持久化,傳輸確認,發布確認等。
- 靈活的路由(Flexible Routing):在消息進入隊列之前,通過exchange來路由消息。RabbitMQ默認再系統中提供了一些路由以供使用,對於復雜的路由可以將多個exchange綁在一起,也通過插件機制來實現自己的路由。
- 高可用(Highly Available Queues):隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
- 多種協議(Multi-protocol):RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT 等等。
- 多語言客戶端(Many Clients):RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。
- 管理界面(Management UI):RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面。
- 跟蹤機制(Tracing):如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什么。
- 插件機制(Plugin System):RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。
五、RabbitMQ基本概念:
AMQP:Advanced Message Queue Protocol(高級消息隊列協議),是一個提供統一消息服務的應用層標准高級消息隊列協議。
Exchange:交換機,用來接收生成者消息,並路由給與交換機綁定的隊列。
Queue:消息隊列,用來存儲和消息,並對消息進行順序排列的隊列,是消息的容器,也是消息的終點,消費者將消息存儲到消息隊列后即可返回。消費者,從這里取用消息。
Broker:消息隊列服務器實體(這里可以理解為實體機)
Virtual Host:虛擬主機,一個虛擬機里可以存在多個交換機,和消息隊列,他可以共享服務器的加密信息和鑒權認證等,也就是說鑒權和加密是以虛擬機為單位的,不能單獨對隊列進行這些操作。
Connection:鏈接:一個網絡連接
Channel:通道或信道多路復用連接中的一條獨立的雙向數據流通道,是建立在真實的TCP連接內地虛擬連接,可以安排每隔進程分配一個信道,也可以安排每隔線程分配一個信道。
Binding:綁定,即exchange與queue之間的綁定關系
Consumer:消費者,這里是指對隊列內容進行消費(取出消息,並進行相應的處理)的應用程序,可以是一個進程,也可以是一個線程。
Publisher:生成者,這里是指產生消息的應用程序,可以是進程,也可以是線程
Message:消息,這里是指生產者產生並存儲於隊列的數據。
六、RabbitMQ在PHP中的應用
- AMQP擴展安裝
擴展下載地址:http://pecl.php.net/package/amqp
注意事項:
- windows系統請點擊 DLL
- linux下載后需要編譯安裝
- Windows下下載dll之前請檢查php的相應版本,並下載對應的版本,否則會出現無法使用的問題
- Window下需要在php.ini下加入:
需要先將php_amqp.dll復制到php/ext目錄下
[amqp]
extension=php_amqp.dll
然后在httpd.conf下加入:
需要先將rabbitmq.4.dll復制到配置目錄下
# rabbitmq
LoadFile "d:/xampp/php/rabbitmq.4.dll"
這兩個dll文件存在於下載的擴展中
- 一個關於RabbitMQ的php封裝,此封裝經過本人測試是可以使用的,但是有關信道和交換機這塊因為本人僅僅是為了設置,所以在分配邏輯上做太多考慮,如果需要使用該方法可以在這方面做出相應的修改。
<?php
/**
* RabbitMQ消息隊列封裝
*/
namespace common\lib;
/**
* RabbitMQ消息隊列生產者
*/
class RabbitMQ {
const HOST = '127.0.0.1';
const PORT = '5672';
const USER = 'zhangxugang810';
const PASSWORD = '132133';
const VHOST = 'huaweiQueue';
private $isTransaction = true;
private $exchangeName = 'ex_huawei';
private $queueName = 'queue_huawei';
private $routeKey = 'route_huawei';
private $cn;
private $ch;
private $ex;
private $queue;
public function __construct() {
$this->connect();
}
/**
* 組裝參數
*/
private function connectParams() {
return ['host' => static::HOST, 'port' => static::PORT, 'login' => static::USER, 'password' => static::PASSWORD, 'vhost' => static::VHOST];
}
/**
* 連接隊列服務器並創建channel
*/
private function connect() {
$params = $this->connectParams();
$this->cn = new \AMQPConnection($params);
if (!$this->cn->connect()) {
die("不能連接這個BROCKER\n");
}
$this->channel();
}
private function channel(){
$this->ch = new \AMQPChannel($this->cn);
}
/**
* 創建交換機
*/
private function exchange() {
$this->ex = new \AMQPExchange($this->ch);
$this->ex->setName($this->exchangeName);
date_default_timezone_set("Asia/Shanghai");
}
/**
* 創建隊列並綁定交換機
*/
private function queue() {
//使用哪個信道
$this->queue = new \AMQPQueue($this->ch);
$this->queue->setName($this->queueName);
$this->queue->setFlags(AMQP_DURABLE);
// echo "Message Total:".$this->queue->declare()."\n";
// $this->queue->bind($this->exchangeName, $this->routeKey);
}
/**
* 生產者方法 - 單條
* @param type $msg
* @return type
*/
public function sendOne($msg){
$this->exchange();
return $this->ex->publish($msg, $this->routeKey);
}
/**
* 生產者方法 - 多條
* @param type $msgs
* @return type
*/
public function sendMultiple($msgs){
$this->exchange();
if ($this->isTransaction) {//如果啟用事務
$this->ch->startTransaction(); //開始事務
}
$result = [];
foreach($msgs as $k => $msg){
$result[$k] = $this->ex->publish($msg, $this->routeKey);
}
if ($this->isTransaction) {//如果啟用事務
$this->ch->commitTransaction(); //開始事務
}
return $result;
}
/**
* 消費者方法
* 這里需要根據相應的業務邏輯進行修改
*/
public function run() {
$this->queue();
echo "Message:\n";
while (True) {
$this->queue->consume(function($envelope, $queue) {
//此處調用業務邏輯處理方法
static::processMessage($envelope, $queue);
});
//$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答
}
$this->disconnect();
}
/**
* 消費者業務邏輯
* 此處只是示例方法
* @param type $envelope
* @param type $queue
*/
public static function processMessage($envelope, $queue) {
//此處是業務邏輯處理
$msg = $envelope->getBody();
echo $msg . "\n"; //處理消息
$queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答
}
}
七、RabbitMQ單點問題與高可用
這里為了高可用性可以考慮一個問題,如果我們只部署一個單點RabbitMQ服務器用於生產環境,那么一但這個RabbitMQ出現問題,則所有調用該消息隊列的應用都將出現問題,為了解決這個問題,就產生了RabbitMQ集群,具體的集群搭建,另做講解。
八、踩坑提示
- windows安裝時,請注意安裝路徑不要帶空格
- 配置完成后直接運行程序,結果是隊列消息無法加入,請檢查所對應的虛擬主機是否存在,exchange是否存在,隊列是否存在,exchange和隊列是否綁定好了,如果未綁定成