RabbitMQ的安裝與基本使用


  運行環境:https://oneinstack.com/install/

    在項目中,將一些無需即時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。如發送短信、郵件、過濾非法關鍵字等等。它還可以用於RPC。

    先看一張官方圖:

 

 
    一、概念:
     Broker:簡單來說就是消息隊列服務器實體。
   Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
   Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
   Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
   Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
   vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
   producer:消息生產者,就是投遞消息的程序。
   consumer:消息消費者,就是接受消息的程序。
   channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
二、安裝RabbitMQ
     Ubuntu:
          sudo apt-get install erlang
          sudo apt-get install rabbitmq-server
     CentOS:
          1.先安裝erlang
               yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel 
               yum -y install ncurses-devel 
               yum install ncurses-devel 
    cd /usr/local
               wget http://www.erlang.org/download/otp_src_17.5.tar.gz
               tar -xzvf otp_src_17.5.tar.gz 
               cd otp_src_17.5
               ./configure --without-javac
               make && make install
               測試一下是否安裝成功,在控制台輸入命令erl
    安裝完后輸入“erl”以下提示即為安裝成功
[root@cloud bin]# erl
Erlang R16B02 (erts-5.10.3) [source] [64-bit] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]
 
Eshell V5.10.3  (abort with ^G)
1>
          2.安裝rabbitmq
               cd /usr/local
               tar -zxvf rabbitmq-server-3.5.1.tar.gz 
               cd rabbitmq-server-3.5.1
               make
               make TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man DOC_INSTALL_DIR=/usr/local/rabbitmq/doc install
               報錯處理:
                    /bin/sh: xmlto: command not found
                    /bin/sh: line 2: xmlto: command not found
                   解決:yum install xmlto
三、管理命令
  切換到安裝目錄,sbin文件下才能執行命令,如:cd /usr/local/rabbitmq/sbin/
       啟動:./rabbitmq-server start
       后台啟動:./rabbitmq-server -detached
       關閉:./rabbitmqctl stop
       狀態:./rabbitmqctl status
四、插件
    啟動web管理插件,切換到安裝目錄,sbin文件下才能執行命令,如:cd /usr/local/rabbitmq/sbin/
        ./rabbitmq-plugins enable rabbitmq_management
        錯誤解決:
            Error: {cannot_write_enabled_plugins_file,"/etc/rabbitmq/enabled_plugins",            enoent}
            mkdir /etc/rabbitmq
            重新啟動輸入地址:localhost:15672,帳號默認為guest,密碼guest,此帳號默認只能在本機訪問。不建議打開遠程訪問。你可以創建一個帳戶,並設置可以遠程訪問的角色進行訪問。
            如:./rabbitmqctl add_user luo 123456
            ./rabbitmqctl  set_user_tags  luo administrator
五、用戶管理
     默認的guest帳戶相當於root帳戶
     rabbitmqctl add_user username password 添加帳戶
     rabbitmqctl change_password username newpassword 修改密碼
     rabbitmqctl delete_user username 刪除帳戶
     rabbitmqctl list_users 列出所有帳戶
     rabbitmqctl  set_user_tags  User  Tag 設置角色(administrator、monitoring、policymaker、management、其它)
     立即生效,不需重啟
 六、安裝PHP擴展(rabbitmq-c,amqp)
    
 1.安裝rabbitmq-c
 cd /usr/local
    wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz 
  tar zxvf rabbitmq-c-0.8.0.tar.gz
    cd  rabbitmq-c-0.8.0
    autoreconf -i
    ./configure --prefix=/usr/local/rabbitmq-c
最后顯示一下內容表示正常
rabbitmq-c build options:
Host: x86_64-unknown-linux-gnu
Version: 0.4.1
SSL/TLS: openssl
Tools: yes
Documentation: no
Examples: yes
  然后進行make和安裝了.
  make && make install
   2.安裝amqp
    cd /usr/local
    wget https://pecl.php.net/get/amqp-1.9.1.tgz
    tar -zxvf amqp-1.9.1.tgz 
    cd amqp-1.9.1
     /usr/local/php/bin/phpize( 可使用find / -name phpize查找phpize路徑 )
   ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c(可使用find / -name php-config查找php-config路徑)
make && make install
   在php.ini中extenstion部分寫入extension=amqp.so
   重啟php,/etc/init.d/php-fpm restart
    最后phpinfo檢查搜索amqp是否成功
七、使用PHP與之交互
     Produce端:
//設置連接屬性 
$connArgs = array( 'host'           => 'localhost',  
     'port'           => '5672',  
     'login'           => 'guest',  
     'password' => 'guest',  
     'vhost'          => '/' ); //創建RabbitMQ連接 
$conn = new AMQPConnection($connArgs); if (!$conn->connect()) { echo "Cannot connect to the broker <br/>\n "; } //創建channel 
$channel = new AMQPChannel($conn); //創建exchange 
$exchangeName = 'exchange'; $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName);//創建名字 
$exchange->setType(AMQP_EX_TYPE_DIRECT);//設置為direct類型 
$exchange->setFlags(AMQP_DURABLE);//持久化交換機,當代理重啟動后依然存在,並包括它們中的完整數據 
$exchange->setFlags(AMQP_AUTODELETE);//對交換機而言,自動刪除標志表示交換機將在沒有隊列綁定的情況下被自動刪除,如果從沒有隊列和其綁定過,這個交換機將不會被刪除. //發送消息 
$routingKey = 'key_test'; for($i=0;$i<10;$i++){ sleep(3); $message = 'Hello World '.$i; $exchange->publish($message,$routingKey); } //斷開連接 
$conn->disconnect();  

 

 

Consumer端:
//設置連接屬性 
$connArgs = array( 'host'           => 'localhost',  
                    'port'           => '5672',  
                    'login'           => 'guest',  
                    'password' => 'guest',  
                    'vhost'          => '/' ); //創建RabbitMQ連接 
$conn = new AMQPConnection($connArgs); if (!$conn->connect()) { echo 'cannot connect to the broker'; } //創建channel 
$channel = new AMQPChannel($conn); //設置隊列名稱 
$exchangeName = 'exchange'; $routingKey = 'key_test'; $queueName = 'queue_test_1'; $queue = new AMQPQueue($channel); $queue->setName($queueName);//設置名稱 
$queue->setFlags(AMQP_DURABLE);//持久化隊列,當代理重啟動后依然存在,並包括它們中的完整數據 
$queue->declare();//聲明此隊列 
$queue->bind('exchange',$routingKey);//使用某交換機,並綁定某路由關鍵字 //消費數據方式一:非阻塞方式 // while(true) // { // sleep(1); // $envelope = $queue->get(AMQP_AUTOACK) ;//第一個參數表示自動ACK應答 // if ($envelope){ // $messages = $envelope->getBody(); // echo $messages; // } // } //消費數據方式二:以阻塞模式消費數據(推薦) 
while(true) { $queue->consume('processMessage');//第一個參數表示要回調的方法名,第二個參數設置為AMQP_AUTOACK,表示自動ACK應答 
} /** * 定義回調方法 */  
function processMessage($envelope, $queue) { $messages = $envelope->getBody();#獲取消息數據 
     echo $messages; $queue->ack($envelope->getDeliveryTag()); //處理成功后,手動發送ACK應答 //$queue->nack($envelope->getDeliveryTag()); //處理不成功,手動發送NO-ACK應答,放回隊列中 
}  

 


     八:特別說明:
     1.如果某一次消費數據沒有ACK,則此條消息會記錄為Unacknowledged,如果對於某一節點有連續三條則RabbitMQ認為此節點有故障,則不會再對它進行分發.當它斷開后或一定時間后Unacknowledged狀態消息會重新放回交換機中。手動no-ack后此Message將會放回隊列中且不會再轉發給此節點。
     2.對於計算密集型的工作,我們需要建立多個Consumer,對於多個Consumer,默認的分發機制是“公平分發”,將第n個發給第n個Consumer,超過個數取n的模。
     3.Exchange中的類型有:direct, topic 和fanout。
          direct:通過routingKey和exchange決定的那個唯一的queue可以接收消息。
              topic :所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息,direct的區別是它的routingkey可 以模糊匹配,#代表一個或多個字符,*代表任何字符。一般為確保程序嚴謹性而使用direct。注意當使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout
          fanout:是廣播模式,所有bind到此exchange的queue都可以接收消息,即該一個Message可以對應多個Consumer,應用場景舉例:生產了了添加到購物車的Message,一個Consumer寫推薦商品日志,一個Consumer寫購物車表。
     4.兩個不相同的queue名的隊列綁定到同一exchange和rotingky上,此時相當於同一queue名的fanout廣播模式,兩個queue都會得都到
所有Message。如圖所示:
5.RPC的實現流程:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM