消息確認機制
前言
消息隊列的下游,業務邏輯可能復雜,處理任務可能花費很長時間。若在一條消息到達它的下游,任務剛處理了一半,由於不確定因素,下游的任務處理進程
被kill掉啦,導致任務無法執行完成。而沿用我們前面幾章的消息刪除【消息一旦拋給下游,就立馬從隊列刪除】,這可能會引發問題——消息沒有處理完,但是隊列
里的消息已經被刪除了。
因此,rabbitmq內含 消息確認機制【Message acknowledgment】,簡稱ack。rabbitmq將消息發送給consumer,此刻消息不會從隊列刪除,consumer消費完畢消息后,
給rabbitmq發送一條ack,在rabbitmq收到ack后,才從隊列里刪除。
代碼
在consumer層的代碼,需要修改兩步。
一、添加ack代碼
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
二、開啟消息確認機制
$channel->basic_consume('task_queue', '', false, false, false, false, $callback); //basic_consume第四個參數置成false,no_ack=false
整體代碼見下
生產者sender.php
<?php /** * sender.php * Created by PhpStorm. * User: wangdaxi * Date: 2017/10/18 * Time: 14:26 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); $data = implode(" ", array_slice($argv, 1)); empty($data) && $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent '$data'\n"; //close the channel and connection; $channel->close(); $connection->close();
消費者receive.php
<?php /** * receive.php * Created by PhpStorm. * User: wangdaxi * Date: 2017/10/18 * Time: 14:34 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg) { echo "[x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo "[x] Done\n"; //消息確認 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_consume('hello', '', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); }
驗證
模擬消息下游進程被kill掉。
1. 開啟兩個終端分別作為消費者,記作C1,C2。分別等待接收消息。

2. 開啟一個終端作為生產者。記作P。發送消息,用圓點來模擬耗時任務。

3. 發現C1接收到了消息,這里我們用 Ctrl+C關閉掉該進程。

4. 由於C1沒有回傳ack,且已中斷,C2接收到了該消息,處理並且ack回傳。

5. 通過 rabbitmq命令 查看hello隊列messages_ready 和 messages_unacknowledged,ready狀態的消息和未確認消息都是0。證明該消息已經被消費。

疑問
如果消費者執行完畢消息但是忘記了ack,會發生什么?
1. 修改消費者代碼,注釋掉ack代碼。

2. 開啟兩個終端,分別作為消費者C1、生產者P。
3. 生產者P生產消息

4. 消費者接收到消息並處理

5. 命令查看消息數,發現未ack消息有兩條。

6. 修改代碼,解除ack通知的注釋,另開終端作為消費者C2。發現C2不會接收到任何消息。
7. 關閉C1,C2接收C1未ack消息並處理。

8. 命令行查看發現消息都被消費掉。

結論
當消費者開啟ack確認機制卻忘記在處理完消息后回傳rabbitmq ack時,會產生嚴重后果。
1. 未ack的消費者可以繼續接收消息,但是不回傳ack。導致rabbitmq內存被unacknowledged messages占用過多。
2. 從代碼層面要解決掉,需要放棄掉這個進程,另開進程添加ack代碼,進行消息回傳。從而清除掉占用內存的已經被處理卻未被刪除的消息。
以上。
