作者:凱易&耘田
審核校對:白玙
編輯&排版:雯燕
前言:隨着 RocketMQ 5.0 preview 的發布,5.0 的重大特性逐步與大家見面。POP Consumer 作為 5.0 的一大特性,POP 消費模式展現了一種全新的消費模式。其具備的輕量級,無狀態,無隊列獨占等特點,對於消息積壓場景,Streaming 消費場景等都非常友好。在介紹 POP Consumer 之前,我們先回顧一下目前使用較多的 Push Consumer。
Push Consumer
熟悉 RocketMQ 的同學對 Push Consumer 肯定不會陌生,客戶端消費一般都會使用這種消費模式,使用這種消費模式也比較簡單。我們只需簡單設置,並在回調方法 ConsumeMessage 中寫好業務邏輯即可,啟動客戶端應用就可以正常消費消息了。
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("test_topic", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
那么 Push Consumer 是如何消費消息的呢?

當然,Consumer 收到消息的前提是 Producer 先發消息發到 Topic 當中。Producer 使用輪詢的方式分別向每個 Queue 中發送消息,一般消費端都不止一個,客戶端啟動的時候會在 Topic,Consumer group 維度發生負載均衡,為每個客戶端分配需要處理的 Queue。負載均衡過程中每個客戶端都獲取到全部的的 ConsumerID 和所有 Queue 並進行排序,每個客戶端使用相同負責均衡算法,例如平均分配的算法,這樣每個客戶端都會計算出自己需要消費那些 Queue,每當 Consumer 增加或減少就會觸發負載均衡,所以我們可以通過 RocketMQ 負載均衡機制實現動態擴容,提升客戶端收發消息能力。
這里有個小問題:可以一直增加客戶端的數量提升消費能力嗎?當然不可以,因為 Queue 數量有限,客戶端數量一旦達到 Queue 數量,再擴容新節點無法提升消費能力,因為會有節點分配不到 Queue 而無法消費。
客戶端負責均衡為客戶端分配好 Queue 后,客戶端會不斷向 Broker 拉取消息,在客戶端進行消費。不是 Push 客戶端嗎?怎么會是客戶端向 Broker 拉消息,不應該是 Broker 推消息到客戶端嗎?這是一個很有意思的點,因為 RocketMQ 無論是 Push Consumer,還是 Pull Consumer,還是后面要介紹的 POP Consumer,都是客戶端拉的方式消費消息。Push Consumer 只是通過客戶端 API 層面的封裝讓我們感覺是 Broker 推送的。
經過客戶端負載均衡以及拉消息,客戶端就可以正常消費消息了。

完整的的Push Consumer處理邏輯可以看下上面這張圖,我們可以看到Push Consumer完整處理流程。
首先客戶端 Rebalance 確定哪些 Consumer 客戶端處理哪些 Queue,然后通過 PullMessageService 服務拉取消息,拉取到消息以后 ConsumeMessageConcurrentlyService 提交消費請求到消息消費線程池,然后調用回調方法 ConsumeMessage,到這里就可以拿到消息處理業務了,最后消費成功更新本地 offset 並上報 offset 到 Broker。如果消費失敗(拋異常,超時等),客戶端會發送 sendBack 告訴 Broker 哪些消息消費失敗了,Broker會將消費失敗的消息發送到延時隊列,延時后再放到retry Topic,客戶端消費retry Topic完成消息重投。這樣做的好處是不會因為部分消費失敗的消息而影響正常消息的消費。想了解細節的同學可以到 github 下載源碼對照這張圖看一下實際的代碼處理流程。

通過前面 Push Consumer 的介紹,我們對 Push Consumer 原理有了一定的認識。我們可以發現,RocketMQ 的客戶端做了很多事情,負載均衡,拉消息,消費位點管理,消費失敗后的 sendBack 等等。這對多語言支持無疑是不友好的。參與過多語言開發的同學應該會感同身受,將這么多的邏輯移植到不同的語言,肯定不是一件簡單的事情。同時客戶端的升級運維也會增加難度。
所以我們思考可不可為客戶端瘦身,把一部分邏輯從客戶端移到 Broker?當然是可以的,前面介紹 Push Consumer 客戶端負責均衡的時候,我們可以發現,負載均衡需要的信息,所有ConsumerId,原本就是客戶端從 Broker 獲取的,所有 Queue 信息,Broker 也可以通過 nameServer 拿到,負責均衡算法在客戶端還是 Broker 端調用也沒有什么大的差異,所以把 Rebalance 移植到 Broker 是一個不錯選擇,Broker 負載均衡可以跟客戶端負責均衡達到基本相同的效果,客戶端邏輯會減少,多語言實現更加簡單,后續升級運維也會更加可控。除此以外因為 Broker 相對客戶端具有全局信息,還可以做一些更有意思的事情。例如在負責均衡的時候根據 Queue 的積壓情況做負載均衡,將一些壓力比較大的客戶端上的 Queue 分配給其它客戶端處理等等。
POP Consumer
通過前面 Push Consumer 的介紹,我們了解到 Push Consumer 的一些特點。
-
隊列獨占:Broker 上的每個隊列只能分配到相同 Consumer group 的一台 Push Consumer 機器上。
-
消費后更新 offset:每次 Pull 請求拉取批量消息到本地隊列緩存,本地消費成功才會 commit offset。
以上特點可能會帶來一些問題,比如客戶端異常機器 hang,導致分配隊列消息堆積,無法消費。
RocketMQ 的 Push Consumer 消費對於機器異常 hang 時並不十分友好。如果遇到客戶端機器 hang 住,處於半死不活的狀態,與 Broker 的心跳沒有斷掉的時候,客戶端 Rebalance 依然會分配消費隊列到 hang 機器上,並且 hang 機器消費速度很慢甚至無法消費的時候,會導致消費堆積。另外類似還有服務端 Broker 發布時,也會由於客戶端多次 Rebalance 導致消費延遲影響等無法避免的問題。如下圖所示:

當 Push Consumer 2 機器發生 hang 的時候,它所分配到的 Broker 上的 Q2 出現嚴重的堆積。我們目前處理這種問題,一般可能是找到這台機器重啟,或者下線。保證業務不受異常機器影響,但是如果隊列擠壓到一定程度可能機器恢復了也沒辦法快速追趕消費進度,這也是受 Push Consumer 的能力限制。
我們總結下 Push Consumer 存在的一些痛點問題:
-
富客戶端,客戶端邏輯比較重,多語言支持不友好;
-
客戶端或者 Broker 升級發布,重啟等 Rebalance 可能導致消費擠壓;
-
隊列占位,單隊列與單 Consumer 綁定,單個 Queue 消費能力無法橫向擴展;
-
機器 hang,會導致擠壓。
基於上述問題,RocketMQ 5.0 實現了全新的消費模型-POP Consumer。
POP Consumer 能夠解決上述穩定性和解除隊列占位的擴展能力。
我們下面來簡單看一下 POP Consumer 是如何消費消息的:

POP Client 從 Broker 的隊列中發出 POP 請求消息,Broker 返回消息 message。在消息的系統屬性里面有一個比較重要的屬性叫做 POP_CK,POP_CK 為一條消息的 handler,通過一個 handler 就可以定位到一條消息。當消息消費成功之后,POP client 發送 ackMessage 並傳遞 handler 向 broker 確認消息消費成功。

對於消息的重試,當 POP 出一條消息之后,這條消息就會進入一個不可見的時間,在這段時間就不會再被 POP 出來。如果沒有在這段不可見時間通過 ackMessage 確認消息消費成功,那么過了不可見時間之后,這條消息就會再一次的可見。
另外,對於消息的重試,我們的重試策略是一個梯度的延遲時間,重試的間隔時間是一個逐步遞增的。所以,還有一個 changeInvisibleTime 可以修改消息的不可見時間。

從圖上可以看見,本來消息會在中間這個時間點再一次的可見的,但是我們在可見之前提前使用 changeInvisibleTime延長了不可見時間,讓這條消息的可見時間推遲了。當用戶業務代碼返回 reconsumeLater 或者拋異常的時候,我們就可以通過 changeInvisibleTime 按照重試次數來修改下一次的可見時間了。另外如果消費 RT 超過了 30 秒(默認值,可以修改),則 Broker 也會把消息放到重試隊列。
除此以外,POP 消費的位點是由 Broker 保存和控制,而且 POP 消費是可以多個 Client 消費同一個隊列,如下圖所示:

三個客戶端並不需要 Rebalance 去分配 Queue,取而代之的是,它們都會使用 POP 請求所有的 Broker 獲取消息進行消費。即使 POP Consumer 2 出現 hang,其內部消息也會讓 POP Consumer1 和 POP Consumer3 進行消費。這樣就解決了 hang 機器可能造成的消費堆積問題。
從整體流程可見,POP 消費可以避免 Rebalance 帶來的消費延時,同時客戶端可以消費 Broker 的所有隊列,這樣就可以避免機器 hang 而導致堆積的問題。
同時擴展能力提升,POP Consumer 可以消費同一 Topic 下所有 Queue,相比 Push Consumer 解除了每個 Queue 必須 Rebalance 到一台客戶端消費的限制,Push Consuner 客戶端數量最多只能等於 Queue 的數量。POP Consumer 可以突破這個限制,多個 POP Consumer 可以消費同一個 Queue。
Broker 實現
POP Consumer 在 Broker 端是如何實現的呢?

POP Consumer 拉取消息后,會在 Queue 維度上加鎖,保證同一時刻只有一個客戶端可以拉去到同一個 Queue 的消息。獲取到消息后,會保存 checkPoint 信息在 Broker,checkPoint 信息主要包括消息的 Topic,ConsumerGroup,QueueId,offset,POPTime,msgCout,reviveQueueId 等信息。checkPoint 信息會優先保存到 buffer 當中,等待 ack 消息,在一段時間內收到客戶端回復的 ack 消息,對應的 checkPoint 信息從 buffer 中移除,並且更新消費進度,標識消息消費成功。

當 checkPoint 消息在 buffer 中等待一段時間,一直未等到 ack 消息時,checkPoint 信息會清理出 buffer 並發送 ck msg 到 store,ck msg 首先被發送到延時隊列 SCHEDULE_Topic_XXXX 中,延時完成以后會進入 REVIVE_LOG Topic,REVIVE_LOG Topic 是保存在 store 當中待處理的 ck msg 和 ack msg 的 Topic,POPReceiveService 拉取 REVIVE_LOG Topic 的消息放到一個 map 當中,如果 ck 有對應的 ack 則會更新 REVIVE_LOG 的消費位點,標識消息消費完成,超時未被確認的 ck msg,會查詢到 ck msg 對應的真實的消息,並把這個消息放到 retry Topic 當中,等待客戶端消費,POP Consumer 正常消費的時候會概率性的消費到 retry Topic 中的消息。我們從這塊設計中可以看到 RocketMQ 的常用設計,通過一些內部的 Topic 實現業務邏輯,事務消息,定時消息都用了這種設計方式。
我們簡單終結一下 POP Consumer 的優勢:
-
無狀態,offset 信息 Broker 維護,客戶端與 Queue 無綁定。
-
輕量級,客戶端只需要收發消息,確認消息。
-
無隊列占位,Queue 不再與客戶端綁定。
-
多語言友好,方便多語言移植。
-
升級更可控,邏輯都收斂到 Broker,升級更加方便可控。
POP&Push 融合
既然 POP 有這么多優勢,我們能否使用 POP 解決 Push 的一些問題呢?前面我們提到 Push Consumer 當一個隊列因為 Consumer 問題已經堆積很多的時候,受限於單個 Consumer 的消費能力,也無法快速的追趕消費進度,延遲會很高。核心問題是單隊列單 Consumer 的限制,導致消費能力無法橫向擴展。
我們希望通過 POPAPI 的形式,當一個隊列堆積太多的情況下,可以切換到 POP 模式,有機會讓多個 Consumer 來一起消費該隊列,追趕進度,我們在 5.0 的實現中也實現了這一點。
POP/Push 模式切換方式
可以通過兩種方式進行切換。
1、命令行
mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
2、代碼切換
public static final String CONSUMER_GROUP = "CID_JODIE_1";
public static final String TOPIC = "TopicTest";
// Or use AdminTools directly: mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
private static void switchPop() throws Exception {
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.start();
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
Set<String> brokerAddrs = clusterInfo.getBrokerAddrTable().values().stream().map(BrokerData::selectBrokerAddr).collect(Collectors.toSet());
for (String brokerAddr : brokerAddrs) {
mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);
}
}
通過下面 POP Consumer Demo,我們看到 POP Consumer 跟 Push API 基本是統一,使用也比較簡單,相比 Push API 只是多了一步消費模式切換。

Push & POP Retry 隊列差異
在使用 POP 消費模式時我們只需要在 Push API 的基礎上切換模式即可,對於 Broker 來說還是需要做一些處理的。主要需要處理的地方是 retry 隊列。
Push 和 POP 模式對 retry 隊列處理不一樣
-
Push 的 retry 處理
1)服務端有一個 %RETRY%ConsumerGroup 隊列
2)客戶端會有拉取任務拉取這個隊列的消息。 -
POP 的 retry 處理
1)服務端針對每個Topic,都有一個名為 %RETRY%ConsumerGroup_Topic 的 retry 隊列
2)客戶端沒有專門針對 retry 隊列的拉任務,每次普通 POP 請求都有一定概率消費相應的 retry 隊列
模式切換之后,老模式的 retry 里的消息還需要繼續處理,否則就丟消息了。
Push & POP 切換
Push 切換到 POP
- 正常隊列切換到 POP 模式
- 正常隊列的 POP 請求會處理對應的 POP retry 隊列
- 針對 Push retry 隊列,我們保留原來 Push retry 隊列的拉取任務,並且是工作在 Push 模式。
POP 切換到 Push
- 正常隊列切換到 Push 模式
- Push retry 隊列自然有相應的拉取任務
- 之前 POP 的 retry 隊列,我們在客戶端自動創建拉取任務,以Push 模式去拉取。注意這
總結下來就是,對於 retry 隊列,我們會特殊處理不參與模式切換。
總結
最后我們總結下 POP Consumer。POP 作為一種全新的消費模式,解決了 Push 模式的一些痛點,使客戶端無狀態,更加輕量,消費邏輯也基本都收斂到了 Broker,對多語言的支持十分的友好。在 API 層面也與 Push 完成了融合,繼承了 Push API 的簡單易用,同時實現了 Push,POP 之間的自由切換。
