消息隊列RabbitMQ的安裝配置與PHP中的使用


一、RabbitMQ安裝

  1. windows安裝

下載地址:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3.exe
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-windows-3.8.3.zip
2.linux安裝

下載地址:

Debian, Ubuntu

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server_3.8.3-1_all.deb

Centos8:

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3-1.el8.noarch.rpm

Centos7:

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3-1.el7.noarch.rpm

Centos6:

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3-1.el6.noarch.rpm

OpenSUSE:

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3-1.suse.noarch.rpm

SLEX 11x:

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3-1.sles11.noarch.rpm

 

二、RabbitMQ配置

  1. 下載Erlang並安裝

Erlang下載地址:https://www.erlang.org/downloads

按照你的自己的環境,下載合適的版本並安裝即可。

windows相對來說安裝比較簡單,和普通軟件的安裝沒有區別,按照提示向下進行即可。

Linux安裝:需要下載源碼,並進行編譯安裝,這個會在另外的文章中進行說明

 

三、RabbitMQ原理與消息隊列

  1. 消息:是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。
  2. 消息隊列:消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到消息隊列中而不用管誰來取,消息使用者只管從消息隊列中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。
  3. 用途:在不同的進程或線程之間進行通訊。
  4. 主流的消息隊列:
    RabbitMQActiveMq、ZeroMq、kafkaRocketMQ(阿里開源)

本文主要介紹RabbitMQ

RabbitMQ是應用程序之間通訊的一種方法,是建立在AMQP協議基礎上的,使用Erlang語言開發的,完整且可復制的企業消息系統。

支持系統:windows,linux,macOX等。

開發語言支持:rubypythonjavaphp. netC/C++Node.js等。

AMQPAdvanced Message Queue Protocol(高級消息隊列協議),是一個提供統一消息服務的應用層標准高級消息隊列協議。

 

四、RabbitMQ特性

RabbitMQ起源於金融系統,用於在分布式系統中存儲並轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

具體特性包括:

  1. 可靠性(Reliability:使用一些機制來保證可靠性,如:持久化,傳輸確認,發布確認等。
  2. 靈活的路由(Flexible Routing):在消息進入隊列之前,通過exchange來路由消息。RabbitMQ默認再系統中提供了一些路由以供使用,對於復雜的路由可以將多個exchange綁在一起,也通過插件機制來實現自己的路由。
  3. 高可用(Highly Available Queues):隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
  4. 多種協議(Multi-protocol):RabbitMQ 支持多種消息隊列協議,比如 STOMPMQTT 等等。
  5. 多語言客戶端(Many Clients):RabbitMQ 幾乎支持所有常用語言,比如 Java.NETRuby 等等。
  6. 管理界面(Management UI):RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面。
  7. 跟蹤機制(Tracing):如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什么。
  8. 插件機制(Plugin System):RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。

五、RabbitMQ基本概念:

 

AMQPAdvanced Message Queue Protocol(高級消息隊列協議),是一個提供統一消息服務的應用層標准高級消息隊列協議。

Exchange:交換機,用來接收生成者消息,並路由給與交換機綁定的隊列。

Queue:消息隊列,用來存儲和消息,並對消息進行順序排列的隊列,是消息的容器,也是消息的終點,消費者將消息存儲到消息隊列后即可返回。消費者,從這里取用消息。

Broker:消息隊列服務器實體(這里可以理解為實體機)

Virtual Host:虛擬主機,一個虛擬機里可以存在多個交換機,和消息隊列,他可以共享服務器的加密信息和鑒權認證等,也就是說鑒權和加密是以虛擬機為單位的,不能單獨對隊列進行這些操作。

Connection:鏈接:一個網絡連接

Channel:通道或信道多路復用連接中的一條獨立的雙向數據流通道,是建立在真實的TCP連接內地虛擬連接,可以安排每隔進程分配一個信道,也可以安排每隔線程分配一個信道。

Binding:綁定,即exchangequeue之間的綁定關系

Consumer:消費者,這里是指對隊列內容進行消費(取出消息,並進行相應的處理)的應用程序,可以是一個進程,也可以是一個線程。

Publisher:生成者,這里是指產生消息的應用程序,可以是進程,也可以是線程

Message:消息,這里是指生產者產生並存儲於隊列的數據。

六、RabbitMQPHP中的應用

  1. AMQP擴展安裝

擴展下載地址:http://pecl.php.net/package/amqp

注意事項:

  1. windows系統請點擊 DLL
  2. linux下載后需要編譯安裝
  3. Windows下下載dll之前請檢查php的相應版本,並下載對應的版本,否則會出現無法使用的問題
  4. Window下需要在php.ini下加入:

需要先將php_amqp.dll復制到php/ext目錄下

[amqp]

extension=php_amqp.dll

然后在httpd.conf下加入:

需要先將rabbitmq.4.dll復制到配置目錄下

# rabbitmq

LoadFile  "d:/xampp/php/rabbitmq.4.dll"

這兩個dll文件存在於下載的擴展中

  1. 一個關於RabbitMQphp封裝,此封裝經過本人測試是可以使用的,但是有關信道和交換機這塊因為本人僅僅是為了設置,所以在分配邏輯上做太多考慮,如果需要使用該方法可以在這方面做出相應的修改。

<?php

 

/**

 * RabbitMQ消息隊列封裝

 */

 

namespace common\lib;

 

/**

 * RabbitMQ消息隊列生產者

 */

class RabbitMQ {

 

    const HOST = '127.0.0.1';

    const PORT = '5672';

    const USER = 'zhangxugang810';

    const PASSWORD = '132133';

    const VHOST = 'huaweiQueue';

 

    private $isTransaction = true;

    private $exchangeName = 'ex_huawei';

    private $queueName = 'queue_huawei';

    private $routeKey = 'route_huawei';

    private $cn;

    private $ch;

    private $ex;

    private $queue;

 

    public function __construct() {

        $this->connect();

        

    }

 

    /**

     * 組裝參數

     */

    private function connectParams() {

        return ['host' => static::HOST, 'port' => static::PORT, 'login' => static::USER, 'password' => static::PASSWORD, 'vhost' => static::VHOST];

    }

 

    /**

     * 連接隊列服務器並創建channel

     */

    private function connect() {

        $params = $this->connectParams();

        $this->cn = new \AMQPConnection($params);

        if (!$this->cn->connect()) {

            die("不能連接這個BROCKER\n");

        }

        $this->channel();

    }

    

    private function channel(){

        $this->ch = new \AMQPChannel($this->cn);

    }

 

    /**

     * 創建交換機

     */

    private function exchange() {

        $this->ex = new \AMQPExchange($this->ch);

        $this->ex->setName($this->exchangeName);

        date_default_timezone_set("Asia/Shanghai");

    }

    

    /**

     * 創建隊列並綁定交換機

     */

    private function queue() {

        //使用哪個信道

        $this->queue = new \AMQPQueue($this->ch);

        $this->queue->setName($this->queueName);

        $this->queue->setFlags(AMQP_DURABLE);

//        echo "Message Total:".$this->queue->declare()."\n";   

//        $this->queue->bind($this->exchangeName, $this->routeKey);

    }

    

    /**

     * 生產者方法 - 單條

     * @param type $msg

     * @return type

     */

    public function sendOne($msg){

        $this->exchange();

        return $this->ex->publish($msg, $this->routeKey);

    }

    

    /**

     * 生產者方法 - 多條

     * @param type $msgs

     * @return type

     */

    public function sendMultiple($msgs){

        $this->exchange();

        if ($this->isTransaction) {//如果啟用事務

            $this->ch->startTransaction(); //開始事務

        }

        $result = [];

        foreach($msgs as $k => $msg){

            $result[$k] = $this->ex->publish($msg, $this->routeKey);

        }

        if ($this->isTransaction) {//如果啟用事務

            $this->ch->commitTransaction(); //開始事務

        }

        return $result;

    }

    

    /**

     * 消費者方法

     * 這里需要根據相應的業務邏輯進行修改

     */

    public function run() {

        $this->queue();

        echo "Message:\n";

        while (True) {

            $this->queue->consume(function($envelope, $queue) {

                //此處調用業務邏輯處理方法

                static::processMessage($envelope, $queue);

            });

            //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答  

        }

        $this->disconnect();

    }

    

    /**

     * 消費者業務邏輯

     * 此處只是示例方法

     * @param type $envelope

     * @param type $queue

     */

    public static function processMessage($envelope, $queue) {

        //此處是業務邏輯處理

        $msg = $envelope->getBody();

        echo $msg . "\n"; //處理消息

        $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答

    }

 

}

 

七、RabbitMQ單點問題與高可用

這里為了高可用性可以考慮一個問題,如果我們只部署一個單點RabbitMQ服務器用於生產環境,那么一但這個RabbitMQ出現問題,則所有調用該消息隊列的應用都將出現問題,為了解決這個問題,就產生了RabbitMQ集群,具體的集群搭建,另做講解。

 

八、踩坑提示

    1. windows安裝時,請注意安裝路徑不要帶空格
    2. 配置完成后直接運行程序,結果是隊列消息無法加入,請檢查所對應的虛擬主機是否存在,exchange是否存在,隊列是否存在,exchange和隊列是否綁定好了,如果未綁定成


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM