RabbitMQ之Consumer消費模式(Push & Pull)


概述

消息中間件有很多種,進程也會拿幾個來對比對比,其中一種對比項就是消費模式。消息的消費模式分Push、Pull兩種,或者兩者兼具。RabbitMQ的消費模式就是兼具Push和Pull。push 模式主要是通過channel.basicConsume實現

本文通過demo代碼以及借助wireshark抓包工具來觀察RabbitMQ的消費模式。

push模式

Push模式,屬於一種推送模型。注冊一個消費者后,RabbitMQ會在消息可用時,自動將消息進行推送給消費者。

發送端向broker端發送數據,數據內容為:RabbitMQ Demo Test, Send Messages 0;RabbitMQ Demo Test, Send Messages 1;RabbitMQ Demo Test, Send Messages 2,一次類推……

下面是消費端的示例代碼:

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer);

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [X] Received '" + message + "'");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    break;
}
channel.close();
connection.close();

運行輸出:RabbitMQ Demo Test, Send Messages 0

通過wirkshark工具來查看上面示例代碼的整個AMQP的過程(圖1):

上圖可以對照實例代碼來看,比如圖中:Basic.Qos和Basic.Qos-Ok就是示例代碼中的:channel.basicQos(1);

再比如圖中的Basic.Ack就是示例代碼中的channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

對於上圖中的帶藍色背影的那行(即No.545,展開如下圖,broker to client)

可以看到Delivery-Tag = 1, 消息內容:“RabbitMQ Demo Test, Send Messages 0”。

展開No.546,即Basic.Ack這行:

可以看到client to broker的Ack:delievery-tag=1,和上面的符合。

仔細的朋友可以看到No.548同樣是broker向client發送了一條數據,通過展開數據可知:

其內包含的數據正好是下一條的消息——“RabbitMQ Demo Test, Send Messages 1”。但是在運行實例的時候是沒有打印出來的,可以看圖1,是broker端主動向client端發送的數據,client端沒有請求。在broker端發送第一條數據,即”RabbitMQ Demo Test, Send Messages 0”之后發送Ack,之后關閉Channel,到真正關閉完channel之間,broker端還是會發送(push)數據給Client, 此時Client不會在Ack此條數據了。那么這樣這條消息會不會丟失呢?答案是否定的,你可以再運行下consumer程序,就能消費到這條消息,rabbitmq對設置autoAck=false之后沒有被Ack的消息是不會清除掉的。

實際上如果不設置channel.basicQos(1),那么broker端會一次推送多條數據
RabbitMQ的每一數據幀(Frame)都是以0xCE結尾。


pull模式

 

首先是client端發送Get請求,然后broker響應請求回傳消息,最后client端發送Ack.
可以看到有別於push模式,broker端不會在client端沒有請求的情況下來回傳消息。
Pull屬於一種輪詢模型,發送一次Get請求,獲得一個消息。如果此時RabbitMQ中沒有消息,會獲得一個表示空的回復。
對我們實現者來說,要在一個循環里,不斷去服務器get消息。總的來說,這種方式性能比較差!

同樣采用wirkshark工具來觀察pull模式的AMQP過程,pull模式主要是通過channel.basicGet方法來獲取消息,示例代碼如下:

GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);

wireshark抓包結果:

可以觀察No.122, No.123, No.124,這些對於上面的示例代碼。

首先是client端發送Get請求,然后broker響應請求回傳消息,最后client端發送Ack. 可以看到有別於push模式,broker端不會在client端沒有請求的情況下來回傳消息。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM