RocketMQ(4.8.0)——消費進度保存機制
在消費者啟動時會同時啟動位點管理器,那么位點具體是怎么管理的呢?
RocketMQ 設計了2種位點管理方式:
-
- 遠程位點管理方式(集群消費時,位點由客戶端交給 Broker 保存,代碼路徑:D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\consumer\store\RemoteBrokerOffsetStore.java)
- 本地位點管理方式(廣播消費時,位點保存在消費者本地磁盤上,代碼路徑: D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\consumer\store\LocalFileOffsetStore.java)
接下來,我們將講解 OffsetStore 接口的核心方法。
void load():加載位點信息。
void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly):更新緩存位點信息。
long readOffset(final MessageQueue mq, final ReadOffsetType type):讀取本地位點信息。
void persistAll(final Set<MessageQueue> mqs):持久化全部隊列的位點信息。
void persist(final MessageQueue mq):持久化某一個隊列的位點信息。
void removeOffset(MessageQueue mq):刪除某一個隊列的位點信息。
Map<MessageQueue,Long>cloneOffsetTable(String topic) :復制一份緩存位點信息。
void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean is Oneway):將本地消費位點持久化到 Broker 中。
客戶端消費進度保存也叫消費進度持久化,支持2種方式:
-
- 定時持久化
- 不定時持久化
定時持久化位點實現方法是D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\factory\MQClientInstance.java里的startScheduledTask()方法,代碼如下:

1 private void startScheduledTask() { 2 if (null == this.clientConfig.getNamesrvAddr()) { 3 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 4 5 @Override 6 public void run() { 7 try { 8 MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); 9 } catch (Exception e) { 10 log.error("ScheduledTask fetchNameServerAddr exception", e); 11 } 12 } 13 }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); 14 } 15 16 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 17 18 @Override 19 public void run() { 20 try { 21 MQClientInstance.this.updateTopicRouteInfoFromNameServer(); 22 } catch (Exception e) { 23 log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); 24 } 25 } 26 }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); 27 28 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 29 30 @Override 31 public void run() { 32 try { 33 MQClientInstance.this.cleanOfflineBroker(); 34 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); 35 } catch (Exception e) { 36 log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); 37 } 38 } 39 }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); 40 41 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 42 43 @Override 44 public void run() { 45 try { 46 MQClientInstance.this.persistAllConsumerOffset(); 47 } catch (Exception e) { 48 log.error("ScheduledTask persistAllConsumerOffset exception", e); 49 } 50 } 51 }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); 52 53 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 54 55 @Override 56 public void run() { 57 try { 58 MQClientInstance.this.adjustThreadPool(); 59 } catch (Exception e) { 60 log.error("ScheduledTask adjustThreadPool exception", e); 61 } 62 } 63 }, 1, 1, TimeUnit.MINUTES); 64 }
定時持久化位點邏輯是通過定時任務來實現的,在啟動程序 10s 后,會定時調用持久化方法MQClientInstance.this.persistAllConsumerOffset(),持久化每一個消費者消費的每一個 MessageQueue 的消費進度。
不定時持久化也叫 Pull-And-Commit,也就是在執行 Pull 方法的同時,把隊列最新消費位點信息發給 Broker,具體實現代碼 D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\consumer\DefaultMQPushConsumerImpl.java 里 pullMessage(final PullRequest pullRequest)方法中,代碼如下:

1 public void pullMessage(final PullRequest pullRequest) { 2 { 3 final ProcessQueue processQueue = pullRequest.getProcessQueue(); 4 if (processQueue.isDropped()) { 5 log.info("the pull request[{}] is dropped.", pullRequest.toString()); 6 return; 7 } 8 9 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); 10 11 try { 12 this.makeSureStateOK(); 13 } catch (MQClientException e) { 14 log.warn("pullMessage exception, consumer state not ok", e); 15 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 16 return; 17 } 18 19 if (this.isPause()) { 20 log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); 21 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); 22 return; 23 } 24 25 long cachedMessageCount = processQueue.getMsgCount().get(); 26 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); 27 28 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { 29 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 30 if ((queueFlowControlTimes++ % 1000) == 0) { 31 log.warn( 32 "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", 33 this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 34 } 35 return; 36 } 37 38 if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { 39 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 40 if ((queueFlowControlTimes++ % 1000) == 0) { 41 log.warn( 42 "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", 43 this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 44 } 45 return; 46 } 47 48 if (!this.consumeOrderly) { 49 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { 50 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 51 if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { 52 log.warn( 53 "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", 54 processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), 55 pullRequest, queueMaxSpanFlowControlTimes); 56 } 57 return; 58 } 59 } else { 60 if (processQueue.isLocked()) { 61 if (!pullRequest.isLockedFirst()) { 62 final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); 63 boolean brokerBusy = offset < pullRequest.getNextOffset(); 64 log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", 65 pullRequest, offset, brokerBusy); 66 if (brokerBusy) { 67 log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", 68 pullRequest, offset); 69 } 70 71 pullRequest.setLockedFirst(true); 72 pullRequest.setNextOffset(offset); 73 } 74 } else { 75 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 76 log.info("pull message later because not locked in broker, {}", pullRequest); 77 return; 78 } 79 } 80 81 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); 82 if (null == subscriptionData) { 83 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 84 log.warn("find the consumer's subscription failed, {}", pullRequest); 85 return; 86 } 87 88 final long beginTimestamp = System.currentTimeMillis(); 89 90 PullCallback pullCallback = new PullCallback() { 91 @Override 92 public void onSuccess(PullResult pullResult) { 93 if (pullResult != null) { 94 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, 95 subscriptionData); 96 97 switch (pullResult.getPullStatus()) { 98 case FOUND: 99 long prevRequestOffset = pullRequest.getNextOffset(); 100 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 101 long pullRT = System.currentTimeMillis() - beginTimestamp; 102 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), 103 pullRequest.getMessageQueue().getTopic(), pullRT); 104 105 long firstMsgOffset = Long.MAX_VALUE; 106 if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { 107 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 108 } else { 109 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); 110 111 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), 112 pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); 113 114 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); 115 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( 116 pullResult.getMsgFoundList(), 117 processQueue, 118 pullRequest.getMessageQueue(), 119 dispatchToConsume); 120 121 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { 122 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 123 DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); 124 } else { 125 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 126 } 127 } 128 129 if (pullResult.getNextBeginOffset() < prevRequestOffset 130 || firstMsgOffset < prevRequestOffset) { 131 log.warn( 132 "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", 133 pullResult.getNextBeginOffset(), 134 firstMsgOffset, 135 prevRequestOffset); 136 } 137 138 break; 139 case NO_NEW_MSG: 140 case NO_MATCHED_MSG: 141 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 142 143 DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); 144 145 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 146 break; 147 case OFFSET_ILLEGAL: 148 log.warn("the pull request offset illegal, {} {}", 149 pullRequest.toString(), pullResult.toString()); 150 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 151 152 pullRequest.getProcessQueue().setDropped(true); 153 DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { 154 155 @Override 156 public void run() { 157 try { 158 DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), 159 pullRequest.getNextOffset(), false); 160 161 DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); 162 163 DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); 164 165 log.warn("fix the pull request offset, {}", pullRequest); 166 } catch (Throwable e) { 167 log.error("executeTaskLater Exception", e); 168 } 169 } 170 }, 10000); 171 break; 172 default: 173 break; 174 } 175 } 176 } 177 178 @Override 179 public void onException(Throwable e) { 180 if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 181 log.warn("execute the pull request exception", e); 182 } 183 184 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 185 } 186 }; 187 188 boolean commitOffsetEnable = false; 189 long commitOffsetValue = 0L; 190 if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { 191 commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); 192 if (commitOffsetValue > 0) { 193 commitOffsetEnable = true; 194 } 195 } 196 197 String subExpression = null; 198 boolean classFilter = false; 199 SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); 200 if (sd != null) { 201 if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { 202 subExpression = sd.getSubString(); 203 } 204 205 classFilter = sd.isClassFilterMode(); 206 } 207 208 int sysFlag = PullSysFlag.buildSysFlag( 209 commitOffsetEnable, // commitOffset 210 true, // suspend 211 subExpression != null, // subscription 212 classFilter // class filter 213 ); 214 try { 215 this.pullAPIWrapper.pullKernelImpl( 216 pullRequest.getMessageQueue(), 217 subExpression, 218 subscriptionData.getExpressionType(), 219 subscriptionData.getSubVersion(), 220 pullRequest.getNextOffset(), 221 this.defaultMQPushConsumer.getPullBatchSize(), 222 sysFlag, 223 commitOffsetValue, 224 BROKER_SUSPEND_MAX_TIME_MILLIS, 225 CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, 226 CommunicationMode.ASYNC, 227 pullCallback 228 ); 229 } catch (Exception e) { 230 log.error("pullKernelImpl exception", e); 231 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 232 } 233 }
該方法有兩處持久化位點信息。
第一處,在拉取完成后,如果拉取點非法,則此時客戶端會主動提交一次最新的消費位點信息給 Broker,以便下次能使用正確的位點拉取消息,該處更新位點信息的代碼路徑:D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\consumer\DefaultMQPushConsumerImpl.java,代碼如下:

1 public void pullMessage(final PullRequest pullRequest) { 2 { 3 final ProcessQueue processQueue = pullRequest.getProcessQueue(); 4 if (processQueue.isDropped()) { 5 log.info("the pull request[{}] is dropped.", pullRequest.toString()); 6 return; 7 } 8 9 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); 10 11 try { 12 this.makeSureStateOK(); 13 } catch (MQClientException e) { 14 log.warn("pullMessage exception, consumer state not ok", e); 15 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 16 return; 17 } 18 19 if (this.isPause()) { 20 log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); 21 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); 22 return; 23 } 24 25 long cachedMessageCount = processQueue.getMsgCount().get(); 26 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); 27 28 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { 29 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 30 if ((queueFlowControlTimes++ % 1000) == 0) { 31 log.warn( 32 "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", 33 this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 34 } 35 return; 36 } 37 38 if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { 39 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 40 if ((queueFlowControlTimes++ % 1000) == 0) { 41 log.warn( 42 "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", 43 this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 44 } 45 return; 46 } 47 48 if (!this.consumeOrderly) { 49 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { 50 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 51 if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { 52 log.warn( 53 "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", 54 processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), 55 pullRequest, queueMaxSpanFlowControlTimes); 56 } 57 return; 58 } 59 } else { 60 if (processQueue.isLocked()) { 61 if (!pullRequest.isLockedFirst()) { 62 final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); 63 boolean brokerBusy = offset < pullRequest.getNextOffset(); 64 log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", 65 pullRequest, offset, brokerBusy); 66 if (brokerBusy) { 67 log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", 68 pullRequest, offset); 69 } 70 71 pullRequest.setLockedFirst(true); 72 pullRequest.setNextOffset(offset); 73 } 74 } else { 75 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 76 log.info("pull message later because not locked in broker, {}", pullRequest); 77 return; 78 } 79 } 80 81 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); 82 if (null == subscriptionData) { 83 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 84 log.warn("find the consumer's subscription failed, {}", pullRequest); 85 return; 86 } 87 88 final long beginTimestamp = System.currentTimeMillis(); 89 90 PullCallback pullCallback = new PullCallback() { 91 @Override 92 public void onSuccess(PullResult pullResult) { 93 if (pullResult != null) { 94 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, 95 subscriptionData); 96 97 switch (pullResult.getPullStatus()) { 98 case FOUND: 99 long prevRequestOffset = pullRequest.getNextOffset(); 100 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 101 long pullRT = System.currentTimeMillis() - beginTimestamp; 102 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), 103 pullRequest.getMessageQueue().getTopic(), pullRT); 104 105 long firstMsgOffset = Long.MAX_VALUE; 106 if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { 107 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 108 } else { 109 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); 110 111 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), 112 pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); 113 114 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); 115 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( 116 pullResult.getMsgFoundList(), 117 processQueue, 118 pullRequest.getMessageQueue(), 119 dispatchToConsume); 120 121 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { 122 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 123 DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); 124 } else { 125 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 126 } 127 } 128 129 if (pullResult.getNextBeginOffset() < prevRequestOffset 130 || firstMsgOffset < prevRequestOffset) { 131 log.warn( 132 "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", 133 pullResult.getNextBeginOffset(), 134 firstMsgOffset, 135 prevRequestOffset); 136 } 137 138 break; 139 case NO_NEW_MSG: 140 case NO_MATCHED_MSG: 141 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 142 143 DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); 144 145 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 146 break; 147 case OFFSET_ILLEGAL: 148 log.warn("the pull request offset illegal, {} {}", 149 pullRequest.toString(), pullResult.toString()); 150 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 151 152 pullRequest.getProcessQueue().setDropped(true); 153 DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { 154 155 @Override 156 public void run() { 157 try { 158 DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), 159 pullRequest.getNextOffset(), false); 160 161 DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); 162 163 DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); 164 165 log.warn("fix the pull request offset, {}", pullRequest); 166 } catch (Throwable e) { 167 log.error("executeTaskLater Exception", e); 168 } 169 } 170 }, 10000); 171 break; 172 default: 173 break; 174 } 175 } 176 } 177 178 @Override 179 public void onException(Throwable e) { 180 if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 181 log.warn("execute the pull request exception", e); 182 } 183 184 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 185 } 186 }; 187 188 boolean commitOffsetEnable = false; 189 long commitOffsetValue = 0L; 190 if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { 191 commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); 192 if (commitOffsetValue > 0) { 193 commitOffsetEnable = true; 194 } 195 } 196 197 String subExpression = null; 198 boolean classFilter = false; 199 SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); 200 if (sd != null) { 201 if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { 202 subExpression = sd.getSubString(); 203 } 204 205 classFilter = sd.isClassFilterMode(); 206 } 207 208 int sysFlag = PullSysFlag.buildSysFlag( 209 commitOffsetEnable, // commitOffset 210 true, // suspend 211 subExpression != null, // subscription 212 classFilter // class filter 213 ); 214 try { 215 this.pullAPIWrapper.pullKernelImpl( 216 pullRequest.getMessageQueue(), 217 subExpression, 218 subscriptionData.getExpressionType(), 219 subscriptionData.getSubVersion(), 220 pullRequest.getNextOffset(), 221 this.defaultMQPushConsumer.getPullBatchSize(), 222 sysFlag, 223 commitOffsetValue, 224 BROKER_SUSPEND_MAX_TIME_MILLIS, 225 CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, 226 CommunicationMode.ASYNC, 227 pullCallback 228 ); 229 } catch (Exception e) { 230 log.error("pullKernelImpl exception", e); 231 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 232 } 233 }
第二處,在執行消息拉取動作時,如果是集群消費,並且本地位點值大於0,那么最新的位點上傳給 Broker。

1 public void pullMessage(final PullRequest pullRequest) { 2 final ProcessQueue processQueue = pullRequest.getProcessQueue(); 3 if (processQueue.isDropped()) { 4 log.info("the pull request[{}] is dropped.", pullRequest.toString()); 5 return; 6 } 7 8 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); 9 10 try { 11 this.makeSureStateOK(); 12 } catch (MQClientException e) { 13 log.warn("pullMessage exception, consumer state not ok", e); 14 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 15 return; 16 } 17 18 if (this.isPause()) { 19 log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); 20 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); 21 return; 22 } 23 24 long cachedMessageCount = processQueue.getMsgCount().get(); 25 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); 26 27 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { 28 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 29 if ((queueFlowControlTimes++ % 1000) == 0) { 30 log.warn( 31 "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", 32 this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 33 } 34 return; 35 } 36 37 if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { 38 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 39 if ((queueFlowControlTimes++ % 1000) == 0) { 40 log.warn( 41 "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", 42 this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 43 } 44 return; 45 } 46 47 if (!this.consumeOrderly) { 48 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { 49 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 50 if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { 51 log.warn( 52 "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", 53 processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), 54 pullRequest, queueMaxSpanFlowControlTimes); 55 } 56 return; 57 } 58 } else { 59 if (processQueue.isLocked()) { 60 if (!pullRequest.isLockedFirst()) { 61 final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); 62 boolean brokerBusy = offset < pullRequest.getNextOffset(); 63 log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", 64 pullRequest, offset, brokerBusy); 65 if (brokerBusy) { 66 log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", 67 pullRequest, offset); 68 } 69 70 pullRequest.setLockedFirst(true); 71 pullRequest.setNextOffset(offset); 72 } 73 } else { 74 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 75 log.info("pull message later because not locked in broker, {}", pullRequest); 76 return; 77 } 78 } 79 80 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); 81 if (null == subscriptionData) { 82 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 83 log.warn("find the consumer's subscription failed, {}", pullRequest); 84 return; 85 } 86 87 final long beginTimestamp = System.currentTimeMillis(); 88 89 PullCallback pullCallback = new PullCallback() { 90 @Override 91 public void onSuccess(PullResult pullResult) { 92 if (pullResult != null) { 93 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, 94 subscriptionData); 95 96 switch (pullResult.getPullStatus()) { 97 case FOUND: 98 long prevRequestOffset = pullRequest.getNextOffset(); 99 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 100 long pullRT = System.currentTimeMillis() - beginTimestamp; 101 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), 102 pullRequest.getMessageQueue().getTopic(), pullRT); 103 104 long firstMsgOffset = Long.MAX_VALUE; 105 if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { 106 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 107 } else { 108 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); 109 110 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), 111 pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); 112 113 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); 114 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( 115 pullResult.getMsgFoundList(), 116 processQueue, 117 pullRequest.getMessageQueue(), 118 dispatchToConsume); 119 120 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { 121 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 122 DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); 123 } else { 124 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 125 } 126 } 127 128 if (pullResult.getNextBeginOffset() < prevRequestOffset 129 || firstMsgOffset < prevRequestOffset) { 130 log.warn( 131 "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", 132 pullResult.getNextBeginOffset(), 133 firstMsgOffset, 134 prevRequestOffset); 135 } 136 137 break; 138 case NO_NEW_MSG: 139 case NO_MATCHED_MSG: 140 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 141 142 DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); 143 144 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 145 break; 146 case OFFSET_ILLEGAL: 147 log.warn("the pull request offset illegal, {} {}", 148 pullRequest.toString(), pullResult.toString()); 149 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 150 151 pullRequest.getProcessQueue().setDropped(true); 152 DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { 153 154 @Override 155 public void run() { 156 try { 157 DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), 158 pullRequest.getNextOffset(), false); 159 160 DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); 161 162 DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); 163 164 log.warn("fix the pull request offset, {}", pullRequest); 165 } catch (Throwable e) { 166 log.error("executeTaskLater Exception", e); 167 } 168 } 169 }, 10000); 170 break; 171 default: 172 break; 173 } 174 } 175 } 176 177 @Override 178 public void onException(Throwable e) { 179 if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 180 log.warn("execute the pull request exception", e); 181 } 182 183 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 184 } 185 }; 186 187 boolean commitOffsetEnable = false; 188 long commitOffsetValue = 0L; 189 if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { 190 commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); 191 if (commitOffsetValue > 0) { 192 commitOffsetEnable = true; 193 } 194 } 195 196 String subExpression = null; 197 boolean classFilter = false; 198 SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); 199 if (sd != null) { 200 if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { 201 subExpression = sd.getSubString(); 202 } 203 204 classFilter = sd.isClassFilterMode(); 205 } 206 207 int sysFlag = PullSysFlag.buildSysFlag( 208 commitOffsetEnable, // commitOffset 209 true, // suspend 210 subExpression != null, // subscription 211 classFilter // class filter 212 ); 213 try { 214 this.pullAPIWrapper.pullKernelImpl( 215 pullRequest.getMessageQueue(), 216 subExpression, 217 subscriptionData.getExpressionType(), 218 subscriptionData.getSubVersion(), 219 pullRequest.getNextOffset(), 220 this.defaultMQPushConsumer.getPullBatchSize(), 221 sysFlag, 222 commitOffsetValue, 223 BROKER_SUSPEND_MAX_TIME_MILLIS, 224 CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, 225 CommunicationMode.ASYNC, 226 pullCallback 227 ); 228 } catch (Exception e) { 229 log.error("pullKernelImpl exception", e); 230 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 231 } 232 }
代碼中通過 commitOffsetEnable,sysFlag 兩個終端表示是否可以上報消費位點給 Broker。在執行 Pull 請求時,將 sysFlag 作為網絡請求的消息頭傳遞給 Broker.
hasCommitOffsetFlg:Pull請求中的 sysFlag 參數,是決定 Broker 是否執行持久化消費位點的一個因素。
brokerAllowSuspend:Broker 是否能掛起。如果 Broker 是掛起狀態,將不能持久化位點。
storeOffsetEnable:True表示 Broker 需要持久化消費位點,False則不用持久化位點。
以上是如何定時上報消費位點給 Broker,以及 Broker 如何處理上報位點的邏輯,那么消費者關閉時,如何持久化位點信息呢?
以Push消費者程序關閉為例。Push 消費者關閉邏輯可以參考代碼路徑:D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\consumer\DefaultMQPushConsumer.java,代碼如下:

1 public synchronized void shutdown() { 2 switch (this.serviceState) { 3 case CREATE_JUST: 4 break; 5 case RUNNING: 6 this.persistConsumerOffset(); 7 this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup()); 8 this.mQClientFactory.shutdown(); 9 log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup()); 10 this.serviceState = ServiceState.SHUTDOWN_ALREADY; 11 break; 12 case SHUTDOWN_ALREADY: 13 break; 14 default: 15 break; 16 } 17 }
理論上位點信息越是及時上報 Broker,越能減少消息重復的可能性。RocketMQ 在設計時並不完全支持 Exactly-One 的語義,因為實現該語義的代價頗大,並且使用場景極少,再加上用戶側實現冪等的代價更少,故而 RocketMQ 在設計時將冪等操作交與用戶處理。