本文是學習使用php-rabbitmq的一些筆記,結合官方文檔,使用TP5的command運行服務端測試,由於還在學習使用中,可能會有疏漏錯誤之處,完善中。。。。
下面從最簡單的開始
Sending(發送消息 msg->queue)
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; public function send(){
//連接rabbitmq服務 主機 端口號 用戶名 密碼 $connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel(); //創建一個隊列 $channel->queue_declare('new_tasks',false,false,false,false); //消息內容 $data = time(); $msg = new AMQPMessage($data);
//消息進入隊列
$info = $channel->basic_publish($msg,'','new_tasks');
//關閉
$channel->close();
$connection->close();
}
Receiving(接收消費 queue->msg)
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel(); //聲明隊列 $channel->queue_declare('new_tasks',false,false,false,false);
//回調 其中邏輯自定義 $callback = function($msg){ echo '[x] Received',$msg->body,"\n"; sleep(3); //統計消息中有多少個點,就睡幾秒 echo " [x] Done\n"; Db::name('user')->update(['id'=>10236,'delete_time'=>$msg->body]); };
//當有多個消費者進行循環調度時,下面一行確保公平分發隊列中的信息,每個消費者每次取一條 $channel->basic_qos(null, 1, null); //從隊列獲取信息,並執行回調 $channel->basic_consume('new_tasks','',false,true,false,false,$callback); while(count($channel->callbacks)){ $channel->wait(); }
之后可運行生產腳本和消費腳本測試,若要進行循環調度,可運行多個消費腳本,這里用的TH5的命令行,php think Sending,php think Receiving運行。
上面代碼實際上可用性不高,當消費者進行一個復雜任務時,一旦RabbitMQ向客戶發送消息,它立即將其標記為刪除,而消費者在執行過程中出現問題,任務並未完成,但此時消息已經丟失。我們並不想丟失這些消息,希望當一個消費者掛掉之后,任務自動由另一個消費者接管,這里需要添加---消息確認。
重寫Sending(消息確認)
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel(); //第三個參數為true時,表示隊列持久化,需要和服務端一致 $channel->queue_declare('new_tasks',false,true,false,false); $data = time(); //設置delivery_mode表示設置消息持久化 $msg = new AMQPMessage($data,array('delivery_mode' => AMQPMessage :: DELIVERY_MODE_PERSISTENT)); $info = $channel->basic_publish($msg,'','new_task'); $channel->close(); $connection->close();
重寫Receiving(消息確認)
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel(); //將第三個參數改為true,第三個參數為true時,表示隊列持久化,需要和客戶端一致 $channel->queue_declare('new_tasks',false,true,false,false); $callback = function($msg){ echo '[x] Received',$msg->body,"\n"; sleep(3); //統計消息中有多少個點,就睡幾秒 echo " [x] Done\n"; Db::name('user')->update(['id'=>10236,'delete_time'=>$msg->body]); //確認消息 消費者發回ack $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); //第四個參數默認false 表示將進行消息確認,為true時不進行消息確認 $channel->basic_consume('new_tasks','',false,false,false,false,$callback); while(count($channel->callbacks)){ $channel->wait(); }
到這里,執行重寫之后的腳本,到運行兩個消費腳本,如果往隊列中加入10條信息,理論上兩個消費者將各自執行其中5條(基於$channel->basic_qos(null, 1, null);)。若其中一個消費者的確認消息代碼被注釋掉,無法返回ack,即下面一條被注釋
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
那么這個消費者只能獲得一條信息,另一個消費者獲得9條,且這9條在執行完任務后,從隊列中刪除,而未返回確認消息的那一條仍留在隊列中,這時Ctrl+C停止運行這個不返回確認消息的消費者,這條消息將重新分給正常的消費者。
持久性
上面重寫后的代碼還有其他地方相較於重寫之前的代碼有區別,
重寫之后,第三個參數改為true,保證隊列持久
$channel->queue_declare('new_tasks',false,true,false,false);
設置delivery_mode屬性,使消息持久
$msg = new AMQPMessage($data,array('delivery_mode' => AMQPMessage :: DELIVERY_MODE_PERSISTENT));
這樣可以保證隊列持久性,及時重啟,隊列仍然存在,而保證消息持久是將其寫入磁盤,但當rabbitmq接收了消息,並未完成保存時,此時寫入了緩存,未寫入磁盤,根據官方說明,該種方式可用,但只能用於簡單隊列,否則,需要發布確認。
