RabbitMq初探——消息確認


 

消息確認機制

前言


消息隊列的下游,業務邏輯可能復雜,處理任務可能花費很長時間。若在一條消息到達它的下游,任務剛處理了一半,由於不確定因素,下游的任務處理進程

被kill掉啦,導致任務無法執行完成。而沿用我們前面幾章的消息刪除【消息一旦拋給下游,就立馬從隊列刪除】,這可能會引發問題——消息沒有處理完,但是隊列

里的消息已經被刪除了。

因此,rabbitmq內含 消息確認機制【Message acknowledgment】,簡稱ack。rabbitmq將消息發送給consumer,此刻消息不會從隊列刪除,consumer消費完畢消息后,

給rabbitmq發送一條ack,在rabbitmq收到ack后,才從隊列里刪除。

 

代碼


 

在consumer層的代碼,需要修改兩步。

一、添加ack代碼

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

二、開啟消息確認機制

$channel->basic_consume('task_queue', '', false, false, false, false, $callback); //basic_consume第四個參數置成false,no_ack=false

整體代碼見下

生產者sender.php

<?php
/**
 * sender.php
 * Created by PhpStorm.
 * User: wangdaxi
 * Date: 2017/10/18
 * Time: 14:26
 */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();


$channel->queue_declare('hello', false, false, false, false);

$data = implode(" ", array_slice($argv, 1));
empty($data) && $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent '$data'\n";

//close the channel and connection;
$channel->close();
$connection->close();

 

消費者receive.php

<?php
/**
 * receive.php
 * Created by PhpStorm.
 * User: wangdaxi
 * Date: 2017/10/18
 * Time: 14:34
 */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {
    echo "[x] Received ", $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo "[x] Done\n";
    //消息確認
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('hello', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

 

驗證


 

模擬消息下游進程被kill掉。

1. 開啟兩個終端分別作為消費者,記作C1,C2。分別等待接收消息。

2. 開啟一個終端作為生產者。記作P。發送消息,用圓點來模擬耗時任務。

 

3. 發現C1接收到了消息,這里我們用 Ctrl+C關閉掉該進程。

4. 由於C1沒有回傳ack,且已中斷,C2接收到了該消息,處理並且ack回傳。

5. 通過 rabbitmq命令 查看hello隊列messages_ready 和 messages_unacknowledged,ready狀態的消息和未確認消息都是0。證明該消息已經被消費。

 

 疑問


如果消費者執行完畢消息但是忘記了ack,會發生什么?

1. 修改消費者代碼,注釋掉ack代碼。

2. 開啟兩個終端,分別作為消費者C1、生產者P。

3. 生產者P生產消息

 

4. 消費者接收到消息並處理

 

5. 命令查看消息數,發現未ack消息有兩條。

6. 修改代碼,解除ack通知的注釋,另開終端作為消費者C2。發現C2不會接收到任何消息。

7. 關閉C1,C2接收C1未ack消息並處理。

8. 命令行查看發現消息都被消費掉。

 

結論


當消費者開啟ack確認機制卻忘記在處理完消息后回傳rabbitmq ack時,會產生嚴重后果。

1. 未ack的消費者可以繼續接收消息,但是不回傳ack。導致rabbitmq內存被unacknowledged messages占用過多。

2. 從代碼層面要解決掉,需要放棄掉這個進程,另開進程添加ack代碼,進行消息回傳。從而清除掉占用內存的已經被處理卻未被刪除的消息。

 

以上。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM