tutorial:http://www.rabbitmq.com/tutorials/tutorial-two-java.html
這里解釋接收消息端關於 acknowledge和prefetch的設置問題
這里有兩段代碼,sender,負責發送100條消息; recv,負責接收消息,每接收到一條消息sleep 1 秒。
1 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 2 //channel.basicQos(1,false); 3 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 4 5 Consumer consumer = new DefaultConsumer(channel) { 6 @Override 7 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 8 throws IOException { 9 String message = new String(body, "UTF-8"); 10 System.out.println(" [x] Received '" + message + "'"); 11 try{ 12 Thread.sleep(1000); 13 }catch (Exception e){ 14 15 } 16 //channel.basicAck(envelope.getDeliveryTag(), false); 17 18 } 19 }; 20 channel.basicConsume(QUEUE_NAME, true, consumer);
先展示一個基礎版本的recv。
運行這段代碼遇到兩個費解的現象
1. 先啟動sender產生100條消息,后啟動兩個recv,發現所有消息都被第一個啟動的recv接收了,第二個recv沒有接收到任何消息。但如果先兩個recv,再啟動sender,那么兩個recv是平均分配消息的。
2. 先啟動sender,啟動第一個recv,處理10條消息,關掉recv,再啟動第二個recv,第二個recv沒有消息。11~100條消息丟失了。
為了說明上面的問題,我們先描述一下一個消息在rmq server端的狀態
在server端,消息進入queue后,首先是ready狀態。如果有recv 消費這條消息,那么消息進入 unack 狀態,當recv ack這條消息后,server端將刪除消息。
現象1的解釋:
先啟動sender,server端有100個ready 消息。啟動recv1,雖然recv1還沒有來得及處理這些消息,但recv1 接收了100個消息。在server端100個消息都進入unack狀態。因為我們設置的ack方式是autoack, line 20
因此所有消息立刻就從server端刪除了。當我再啟動recv2時,隊列已經沒有消息了,所以recv2沒有接收到任何消息。反過來,先啟動兩個recv,sender生產消息的時候,server會按round robbin方式分配消息,因此兩個recv各接收50條消息
現象2的解釋:
因為消息都已經發送給了recv1,server端收到了ack,刪除了消息,系統中只有recv1緩存了消息,如果關掉recv1,所有消息都會丟失。recv2無法拿到recv1未處理的消息。
下面我們看看改進版本。變化在於
- line 20 autoack 改為false
- line 16 每條消息處理后ack
1 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 2 //channel.basicQos(1,false); 3 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 4 5 Consumer consumer = new DefaultConsumer(channel) { 6 @Override 7 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 8 throws IOException { 9 String message = new String(body, "UTF-8"); 10 System.out.println(" [x] Received '" + message + "'"); 11 try{ 12 Thread.sleep(1000); 13 }catch (Exception e){ 14 15 } 16 channel.basicAck(envelope.getDeliveryTag(), false); 17 18 } 19 }; 20 channel.basicConsume(QUEUE_NAME, false, consumer);
可以發現現象1 依舊,但現象2 不同了。我們可以通過命令查看server端的狀態
rabbitmqctl list_queues name messages_unacknowledged messages_ready
啟動sender后 unack=0,ready=100
啟動recv1 后 unack=100, ready=0,所有消息都給了recv1
等recv1 處理幾條消息后 unack=89, ready=0,recv1處理並ack了11條消息,
關閉recv1 后 unack=0, ready=85,未ack的消息都回到ready狀態
啟動recv2,unack=89, ready =0,所有消息都轉給了recv2.
如果消息量很大,那么緩存消息就可能吃掉recv的所有內存導致系統崩潰。因此我們打開 line2。這樣recv在ack一個消息后才會領取下一個消息。再來看看按照上面流程queue里消息的狀態
啟動sender后 unack=0,ready=100
啟動recv1 后,unack=1, ready=99
啟動recv2后,unack=2, ready=95
在消息處理完前,unack都是2,recv1和recv2 各持有一個消息