RocketMQ 最新版PushConsumer配置參數詳解
- 1、Push消費模式下的配置
- 2、consumerGroup
- 3、messageModel
- 4、consumeFromWhere
- 5、allocateMessageQueueStrategy
- 6 、consumeMessageBatchMaxSize
- 7 、pullBatchSize
- 8 、pullInterval
- 9、 offsetStore
- 10、consumeThreadMin 和 consumeThreadMax
- 11、consumeConcurrentlyMaxSpan
- 12 、pullThresholdForTopic和pullThresholdSizeForTopic
- 13、 pullThresholdForQueue 和pullThresholdSizeForQueue
- 14、 messageListener
基於RocketMQ 4.3,PushConsumer配置參數詳解
1、Push消費模式下的配置
Push 默認使用的是DefaultMQPushConsumer。
2、consumerGroup
Consumer 組名,參數默認值是:DEFAULT_CONSUMER,多個 Consumer如果屬於一個應用,訂閱同樣的消息,且消費邏輯一致,則應該將它們歸為同一組
3、messageModel
CLUSTERING 消息模型,支持以下兩種 1、集群消費 2、廣播消費。
兩種模式有哪些不同:
(1)分配 Topic 對應消息隊列的算法不同
RebalanceImpl類rebalanceByTopic 函數中 分配 Topic 對應消息隊列的算法不同。廣播模式( BROADCASTING ) 下,分配 Topic 對應的所有消息隊列。集群模式( CLUSTERING ) 下,根據 隊列分配策略( AllocateMessageQueueStrategy ) 分配消息隊列
(2)ACK消費機制不同
RemoteBrokerOffsetStore :Consumer 集群模式 下,使用遠程 Broker 消費進度 offset。集群模式,消費失敗的消息發回到 Broker,如果發回Broker失敗,就會放到Retry隊列。
LocalFileOffsetStore :Consumer 廣播模式下,使用本地 文件 消費進度offset。廣播模式,無論是否消費失敗,不發回消息到 Broker,只打印日志。
4、consumeFromWhere
定義消費Client從那個位置消費消息,分別為:
CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息
CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍
CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前
這里重點要說的就是默認配置CONSUME_FROM_LAST_OFFSET ,CONSUME_FROM_LAST_OFFSET官方的解釋是一個新的訂閱組第一次啟動從隊列的最后位置開始消費,后續再啟動接着上次消費的進度開始消費,但某些情況下卻並不是這樣。先看下RemoteBrokerOffsetStore類中給出的消費client 跟Brocker 之間同步Offset的策略。
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
try {
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
// No offset in broker
catch (MQBrokerException e) {
return -1;
}
//Other exceptions
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
default:
break;
}
}
return -1;
}
當Consumer客戶端啟動的時候無論如何是從內存中獲取不到offset的,只能從遠程Broker里面讀了。如果消息數據從未清理過,或新添加了broker,或topic新擴容了隊列,那么這幾種情況可能會存在RocketMQ認為topic的隊列新上線不久,數據不算太多的情形。另外,參考RocketMQ3.2.6源碼的注釋可以理解其深意:
(1)訂閱組不存在情況下,如果這個隊列的消息最小Offset是0,則表示這個Topic上線時間不長,
(2)服務器堆積的數據也不多,那么這個訂閱組就從0開始消費。
(3)尤其對於Topic隊列數動態擴容時,必須要從0開始消費。
5、allocateMessageQueueStrategy
AllocateMessageQueueByConfig根據配置分配消息隊列。AllocateMessageQueueAveragelyByCircle環狀分配消息隊列AllocateMessageQueueByMachineRoom平均分配消息隊列。該平均分配方式和 AllocateMessageQueueAveragely 略有不同,其是將多余的結尾部分分配給前 rem 個 Consumer。
AllocateMessageQueueAveragely 平均分配消息隊列,也是默認分配算法。
先看下分配隊列需要數據結構,這些參數都不能為空
List<MessageQueue> allocate(
final String consumerGroup, //當前消費client處理的消息組
final String currentCID,//當前消費client的客戶端ID
final List<MessageQueue> mqAll,//消息組包含的所有消息隊列列表
final List<String> cidAll //消息組對應的所有消費Client的客戶端ID列表
);
AllocateMessageQueueAveragely 類源碼解析:
1: public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
2: private final Logger log = ClientLogger.getLog();
3:
4: @Override
5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
6: List<String> cidAll) {
7: // 校驗參數是否正確
8: if (currentCID == null || currentCID.length() < 1) {
9: throw new IllegalArgumentException("currentCID is empty");
10: }
11: if (mqAll == null || mqAll.isEmpty()) {
12: throw new IllegalArgumentException("mqAll is null or mqAll empty");
13: }
14: if (cidAll == null || cidAll.isEmpty()) {
15: throw new IllegalArgumentException("cidAll is null or cidAll empty");
16: }
17:
18: List<MessageQueue> result = new ArrayList<>();
19: if (!cidAll.contains(currentCID)) {
20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
21: consumerGroup,
22: currentCID,
23: cidAll);
24: return result;
25: }
26: // 平均分配
27: int index = cidAll.indexOf(currentCID); // 第幾個consumer。
28: int mod = mqAll.size() % cidAll.size(); // 余數,即多少消息隊列無法平均分配。
29: int averageSize =
30: mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
31: + 1 : mqAll.size() / cidAll.size());
32: int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 有余數的情況下,[0, mod) 平分余數,即每consumer多分配一個節點;第index開始,跳過前mod余數。
33: int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配隊列數量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消息隊列。
34: for (int i = 0; i < range; i++) {
35: result.add(mqAll.get((startIndex + i) % mqAll.size()));
36: }
37: return result;
38: }
39:
40: @Override
41: public String getName() {
42: return "AVG";
43: }
44: }
• 說明 :平均分配隊列策略。
• 第 7 至 25 行 :參數校驗。
• 第 26 至 36 行 :平均分配消息隊列。
o 第 27 行 :index :當前 Consumer 在消費集群里是第幾個。這里就是為什 么需要對傳入的 cidAll 參數必須進行排序的原因。如果不排序,Consumer 在本地計算出來的 index 無法一致,影響計算結果。
o 第 28 行 :mod :余數,即多少消息隊列無法平均分配。
o 第 29 至 31 行 :averageSize :代碼可以簡化成 (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())。
[ 0, mod ) :mqAll.size() / cidAll.size() + 1。前面 mod 個 Consumer 平分余數,多獲得 1 個消息隊列。
[ mod, cidAll.size() ) :mqAll.size() / cidAll.size()。
o 第 32 行 :startIndex :Consumer 分配消息隊列開始位置。
o 第 33 行 :range :分配隊列數量。之所以要 Math#min(...) 的原因:當 mqAll.size() <= cidAll.size() 時,最后幾個 Consumer 分配不到消息隊列。
o 第 34 至 36 行 :生成分配消息隊列結果。
6 、consumeMessageBatchMaxSize
批量消費最大消息條數,取值范圍: [1, 1024]。默認是1
7 、pullBatchSize
消費者去broker拉取消息時,一次拉取多少條。取值范圍: [1, 1024]。默認是32 。可選配置
8 、pullInterval
檢查拉取消息的間隔時間,由於是長輪詢,所以為 0,但是如果應用為了流控,也可以設置大於 0 的值,單位毫秒,取值范圍: [0, 65535]
9、 offsetStore
集群消費:從遠程Broker獲取。
廣播消費:從本地文件獲取。
看下面源代碼:
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
10、consumeThreadMin 和 consumeThreadMax
設置消費線程數大小取值范圍都是 [1, 1000]。
4.2版本中的默認配置為:
consumeThreadMin 默認是20
consumeThreadMax 默認是64
11、consumeConcurrentlyMaxSpan
單隊列並行消費允許的最大跨度取值范圍都是 [1, 65535],默認是2000。
這個參數只在並行消費的時候才起作用。參考 DefaultMQPushConsumerImpl 類pullMessage函數源碼片段:
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
}
12 、pullThresholdForTopic和pullThresholdSizeForTopic
pullThresholdForTopic為每個topic在本地緩存最多的消息條數,取值范圍[1, 6553500],默認的-1。
pullThresholdSizeForTopic 是在topic級別限制了消息緩存的大小,單位為Mib,取值范圍[1, 102400],默認為-1
13、 pullThresholdForQueue 和pullThresholdSizeForQueue
pullThresholdForQueue是拉消息本地隊列緩存消息最大數,用於topic級別的流量控制,控制單位為消息個數,取值范圍都是 [1, 65535],默認是1000。如果設置了pullThresholdForTopic,就是限制了topic級別的消息緩存數(通常沒有),那么會將本地每個queue的緩存數更新為pullThresholdForTopic / currentQueueCount 限制總數 / 隊列數。
pullThresholdSizeForQueue 是 topic級別緩存大小限制,取值范圍 [1, 1024],默認是100Mib,如果設置了這個參數,queue的緩存大小更新為pullThresholdSizeForTopic / currentQueueCount 限制總大小 / 隊列數
RebalancePushImpl類中源碼片段:
int currentQueueCount = this.processQueueTable.size();
if (currentQueueCount != 0) {
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
if (pullThresholdForTopic != -1) {
int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount);
log.info("The pullThresholdForQueue is changed from {} to {}",
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
}
int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
if (pullThresholdSizeForTopic != -1) {
int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);
log.info("The pullThresholdSizeForQueue is changed from {} to {}",
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
}
}
14、 messageListener
消息監聽器 ,處理消息的業務就在監聽里面。目前支持的監聽模式包括:
MessageListenerConcurrently,對應的處理邏輯類是ConsumeMessageConcurrentlyService
MessageListenerOrderly 對應的處理邏輯類是ConsumeMessageOrderlyService
兩者使用不同的ACK機制。RocketMQ提供了ack機制,以保證消息能夠被正常消費。發送者為了保證消息肯定消費成功,只有使用方明確表示消費成功,RocketMQ才會認為消息消費成功。中途斷電,拋出異常等都不會認為成功——即都會重新投遞。上面兩個不同的監聽模式使用的ACK機制是不一樣的。
MessageListenerConcurrently 對應的消費狀態類:
public enum ConsumeConcurrentlyStatus {
/** * Success consumption */
CONSUME_SUCCESS,
/** * Failure consumption,later try to consume */
RECONSUME_LATER;
}
只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認為這批消息消費失敗了。為了保證消息是肯定被至少消費成功一次,RocketMQ會把這批消息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(默認是10秒,業務可設置)后,再次投遞到這個ConsumerGroup。而如果一直這樣重復消費都持續失敗到一定次數(默認16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工干預。
MessageListenerOrderly對應的消費狀態類:
public enum ConsumeOrderlyStatus {
/** * Success consumption */
SUCCESS,
/** * Rollback consumption(only for binlog consumption) */
@Deprecated
ROLLBACK,
/** * Commit offset(only for binlog consumption) */
@Deprecated
COMMIT,
/** * Suspend current queue a moment */
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
當使用順序消費的回調MessageListenerOrderly時,由於順序消費是要前者消費成功才能繼續消費,所以沒有ConsumeConcurrentlyStatus. RECONSUME_LATER的這個狀態,只有ConsumeOrderlyStatus. SUSPEND_CURRENT_ QUEUE_A_ MOMENT來暫停隊列的其余消費,直到原消息不斷重試成功為止才能繼續消費。