php-rabbitmq筆記(一)


本文是學習使用php-rabbitmq的一些筆記,結合官方文檔,使用TP5的command運行服務端測試,由於還在學習使用中,可能會有疏漏錯誤之處,完善中。。。。

下面從最簡單的開始

Sending(發送消息  msg->queue)

 

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

public function send(){
     //連接rabbitmq服務 主機 端口號 用戶名 密碼
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel(); //創建一個隊列 $channel->queue_declare('new_tasks',false,false,false,false);      //消息內容 $data = time(); $msg = new AMQPMessage($data);
    //消息進入隊列
    $info = $channel->basic_publish($msg,'','new_tasks'); 
    //關閉
    $channel->close();
    $connection->close();
}

 Receiving(接收消費  queue->msg)

     $connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
        $channel = $connection->channel();
        //聲明隊列
        $channel->queue_declare('new_tasks',false,false,false,false);
     //回調 其中邏輯自定義
$callback = function($msg){ echo '[x] Received',$msg->body,"\n"; sleep(3); //統計消息中有多少個點,就睡幾秒 echo " [x] Done\n"; Db::name('user')->update(['id'=>10236,'delete_time'=>$msg->body]); };
     //當有多個消費者進行循環調度時,下面一行確保公平分發隊列中的信息,每個消費者每次取一條
$channel->basic_qos(null, 1, null); //從隊列獲取信息,並執行回調 $channel->basic_consume('new_tasks','',false,true,false,false,$callback); while(count($channel->callbacks)){ $channel->wait(); }

之后可運行生產腳本和消費腳本測試,若要進行循環調度,可運行多個消費腳本,這里用的TH5的命令行,php think Sending,php think Receiving運行。

上面代碼實際上可用性不高,當消費者進行一個復雜任務時,一旦RabbitMQ向客戶發送消息,它立即將其標記為刪除,而消費者在執行過程中出現問題,任務並未完成,但此時消息已經丟失。我們並不想丟失這些消息,希望當一個消費者掛掉之后,任務自動由另一個消費者接管,這里需要添加---消息確認。

重寫Sending(消息確認)

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
        $channel = $connection->channel();
        //第三個參數為true時,表示隊列持久化,需要和服務端一致
        $channel->queue_declare('new_tasks',false,true,false,false);

        $data = time();
        //設置delivery_mode表示設置消息持久化
        $msg = new AMQPMessage($data,array('delivery_mode' => AMQPMessage :: DELIVERY_MODE_PERSISTENT));
        $info = $channel->basic_publish($msg,'','new_task');

        $channel->close();
        $connection->close();

 

重寫Receiving(消息確認)

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
        $channel = $connection->channel();
        //將第三個參數改為true,第三個參數為true時,表示隊列持久化,需要和客戶端一致
        $channel->queue_declare('new_tasks',false,true,false,false);
        $callback = function($msg){
            echo  '[x] Received',$msg->body,"\n";
            sleep(3);  //統計消息中有多少個點,就睡幾秒
            echo " [x] Done\n";
            Db::name('user')->update(['id'=>10236,'delete_time'=>$msg->body]);
            //確認消息  消費者發回ack
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };
        $channel->basic_qos(null, 1, null);
        //第四個參數默認false 表示將進行消息確認,為true時不進行消息確認
        $channel->basic_consume('new_tasks','',false,false,false,false,$callback);
        while(count($channel->callbacks)){

            $channel->wait();
        }

到這里,執行重寫之后的腳本,到運行兩個消費腳本,如果往隊列中加入10條信息,理論上兩個消費者將各自執行其中5條(基於$channel->basic_qos(null, 1, null);)。若其中一個消費者的確認消息代碼被注釋掉,無法返回ack,即下面一條被注釋

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

那么這個消費者只能獲得一條信息,另一個消費者獲得9條,且這9條在執行完任務后,從隊列中刪除,而未返回確認消息的那一條仍留在隊列中,這時Ctrl+C停止運行這個不返回確認消息的消費者,這條消息將重新分給正常的消費者。

持久性

上面重寫后的代碼還有其他地方相較於重寫之前的代碼有區別,

重寫之后,第三個參數改為true,保證隊列持久

$channel->queue_declare('new_tasks',false,true,false,false);

設置delivery_mode屬性,使消息持久

$msg = new AMQPMessage($data,array('delivery_mode' => AMQPMessage :: DELIVERY_MODE_PERSISTENT));

這樣可以保證隊列持久性,及時重啟,隊列仍然存在,而保證消息持久是將其寫入磁盤,但當rabbitmq接收了消息,並未完成保存時,此時寫入了緩存,未寫入磁盤,根據官方說明,該種方式可用,但只能用於簡單隊列,否則,需要發布確認。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM