rabbitmq 消息的狀態轉換


tutorial:http://www.rabbitmq.com/tutorials/tutorial-two-java.html

這里解釋接收消息端關於 acknowledge和prefetch的設置問題

這里有兩段代碼,sender,負責發送100條消息; recv,負責接收消息,每接收到一條消息sleep 1 秒。

 1 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 2         //channel.basicQos(1,false);
 3         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 4 
 5         Consumer consumer = new DefaultConsumer(channel) {
 6             @Override
 7             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
 8                     throws IOException {
 9                 String message = new String(body, "UTF-8");
10                 System.out.println(" [x] Received '" + message + "'");
11                 try{
12                     Thread.sleep(1000);
13                 }catch (Exception e){
14 
15                 }
16                 //channel.basicAck(envelope.getDeliveryTag(), false);
17 
18             }
19         };
20         channel.basicConsume(QUEUE_NAME, true, consumer);

先展示一個基礎版本的recv。

運行這段代碼遇到兩個費解的現象

1. 先啟動sender產生100條消息,后啟動兩個recv,發現所有消息都被第一個啟動的recv接收了,第二個recv沒有接收到任何消息。但如果先兩個recv,再啟動sender,那么兩個recv是平均分配消息的。

2. 先啟動sender,啟動第一個recv,處理10條消息,關掉recv,再啟動第二個recv,第二個recv沒有消息。11~100條消息丟失了。

為了說明上面的問題,我們先描述一下一個消息在rmq server端的狀態

在server端,消息進入queue后,首先是ready狀態。如果有recv 消費這條消息,那么消息進入 unack 狀態,當recv ack這條消息后,server端將刪除消息。

現象1的解釋:

先啟動sender,server端有100個ready 消息。啟動recv1,雖然recv1還沒有來得及處理這些消息,但recv1 接收了100個消息。在server端100個消息都進入unack狀態。因為我們設置的ack方式是autoack, line 20

因此所有消息立刻就從server端刪除了。當我再啟動recv2時,隊列已經沒有消息了,所以recv2沒有接收到任何消息。反過來,先啟動兩個recv,sender生產消息的時候,server會按round robbin方式分配消息,因此兩個recv各接收50條消息

現象2的解釋:

因為消息都已經發送給了recv1,server端收到了ack,刪除了消息,系統中只有recv1緩存了消息,如果關掉recv1,所有消息都會丟失。recv2無法拿到recv1未處理的消息。

下面我們看看改進版本。變化在於

  • line 20 autoack 改為false
  • line 16 每條消息處理后ack
 1 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 2         //channel.basicQos(1,false);
 3         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 4 
 5         Consumer consumer = new DefaultConsumer(channel) {
 6             @Override
 7             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
 8                     throws IOException {
 9                 String message = new String(body, "UTF-8");
10                 System.out.println(" [x] Received '" + message + "'");
11                 try{
12                     Thread.sleep(1000);
13                 }catch (Exception e){
14 
15                 }
16                 channel.basicAck(envelope.getDeliveryTag(), false);
17 
18             }
19         };
20         channel.basicConsume(QUEUE_NAME, false, consumer);

可以發現現象1 依舊,但現象2 不同了。我們可以通過命令查看server端的狀態

rabbitmqctl list_queues name messages_unacknowledged messages_ready

啟動sender后 unack=0,ready=100

啟動recv1 后 unack=100, ready=0,所有消息都給了recv1

等recv1 處理幾條消息后 unack=89, ready=0,recv1處理並ack了11條消息,

關閉recv1 后 unack=0, ready=85,未ack的消息都回到ready狀態

啟動recv2,unack=89, ready =0,所有消息都轉給了recv2.

 

如果消息量很大,那么緩存消息就可能吃掉recv的所有內存導致系統崩潰。因此我們打開 line2。這樣recv在ack一個消息后才會領取下一個消息。再來看看按照上面流程queue里消息的狀態

啟動sender后 unack=0,ready=100

啟動recv1 后,unack=1, ready=99

啟動recv2后,unack=2, ready=95

在消息處理完前,unack都是2,recv1和recv2 各持有一個消息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM