RocketMQ(4.8.0)——消費進度保存機制


RocketMQ(4.8.0)——消費進度保存機制

  在消費者啟動時會同時啟動位點管理器,那么位點具體是怎么管理的呢?

  RocketMQ 設計了2種位點管理方式

    • 遠程位點管理方式集群消費時,位點由客戶端交給 Broker 保存,代碼路徑:D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\consumer\store\RemoteBrokerOffsetStore.java
    • 本地位點管理方式廣播消費時,位點保存在消費者本地磁盤上,代碼路徑: D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\consumer\store\LocalFileOffsetStore.java

  接下來,我們將講解 OffsetStore 接口的核心方法。

  void load():加載位點信息。

  void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly):更新緩存位點信息。

  long readOffset(final MessageQueue mq, final ReadOffsetType type):讀取本地位點信息。

  void persistAll(final Set<MessageQueue> mqs):持久化全部隊列的位點信息。

  void persist(final MessageQueue mq):持久化某一個隊列的位點信息。

  void removeOffset(MessageQueue mq):刪除某一個隊列的位點信息。

  Map<MessageQueue,Long>cloneOffsetTable(String topic) :復制一份緩存位點信息。

  void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean is Oneway):將本地消費位點持久化到 Broker 中。

  客戶端消費進度保存也叫消費進度持久化,支持2種方式:

    • 定時持久化
    • 不定時持久化

  定時持久化位點實現方法是D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\factory\MQClientInstance.java里的startScheduledTask()方法,代碼如下:

 1     private void startScheduledTask() {
 2         if (null == this.clientConfig.getNamesrvAddr()) {
 3             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 4 
 5                 @Override
 6                 public void run() {
 7                     try {
 8                         MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
 9                     } catch (Exception e) {
10                         log.error("ScheduledTask fetchNameServerAddr exception", e);
11                     }
12                 }
13             }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
14         }
15 
16         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
17 
18             @Override
19             public void run() {
20                 try {
21                     MQClientInstance.this.updateTopicRouteInfoFromNameServer();
22                 } catch (Exception e) {
23                     log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
24                 }
25             }
26         }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
27 
28         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
29 
30             @Override
31             public void run() {
32                 try {
33                     MQClientInstance.this.cleanOfflineBroker();
34                     MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
35                 } catch (Exception e) {
36                     log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
37                 }
38             }
39         }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
40 
41         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
42 
43             @Override
44             public void run() {
45                 try {
46                     MQClientInstance.this.persistAllConsumerOffset();
47                 } catch (Exception e) {
48                     log.error("ScheduledTask persistAllConsumerOffset exception", e);
49                 }
50             }
51         }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
52 
53         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
54 
55             @Override
56             public void run() {
57                 try {
58                     MQClientInstance.this.adjustThreadPool();
59                 } catch (Exception e) {
60                     log.error("ScheduledTask adjustThreadPool exception", e);
61                 }
62             }
63         }, 1, 1, TimeUnit.MINUTES);
64     }
startScheduledTask()

  定時持久化位點邏輯是通過定時任務來實現的,在啟動程序 10s 后,會定時調用持久化方法MQClientInstance.this.persistAllConsumerOffset(),持久化每一個消費者消費的每一個 MessageQueue 的消費進度。

  不定時持久化也叫 Pull-And-Commit,也就是在執行 Pull 方法的同時,把隊列最新消費位點信息發給 Broker,具體實現代碼 D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\consumer\DefaultMQPushConsumerImpl.java 里 pullMessage(final PullRequest pullRequest)方法中,代碼如下:

  1 public void pullMessage(final PullRequest pullRequest) {
  2 {
  3         final ProcessQueue processQueue = pullRequest.getProcessQueue();
  4         if (processQueue.isDropped()) {
  5             log.info("the pull request[{}] is dropped.", pullRequest.toString());
  6             return;
  7         }
  8 
  9         pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
 10 
 11         try {
 12             this.makeSureStateOK();
 13         } catch (MQClientException e) {
 14             log.warn("pullMessage exception, consumer state not ok", e);
 15             this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
 16             return;
 17         }
 18 
 19         if (this.isPause()) {
 20             log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
 21             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
 22             return;
 23         }
 24 
 25         long cachedMessageCount = processQueue.getMsgCount().get();
 26         long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
 27 
 28         if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
 29             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 30             if ((queueFlowControlTimes++ % 1000) == 0) {
 31                 log.warn(
 32                     "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
 33                     this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
 34             }
 35             return;
 36         }
 37 
 38         if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
 39             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 40             if ((queueFlowControlTimes++ % 1000) == 0) {
 41                 log.warn(
 42                     "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
 43                     this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
 44             }
 45             return;
 46         }
 47 
 48         if (!this.consumeOrderly) {
 49             if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
 50                 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 51                 if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
 52                     log.warn(
 53                         "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
 54                         processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
 55                         pullRequest, queueMaxSpanFlowControlTimes);
 56                 }
 57                 return;
 58             }
 59         } else {
 60             if (processQueue.isLocked()) {
 61                 if (!pullRequest.isLockedFirst()) {
 62                     final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
 63                     boolean brokerBusy = offset < pullRequest.getNextOffset();
 64                     log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
 65                         pullRequest, offset, brokerBusy);
 66                     if (brokerBusy) {
 67                         log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
 68                             pullRequest, offset);
 69                     }
 70 
 71                     pullRequest.setLockedFirst(true);
 72                     pullRequest.setNextOffset(offset);
 73                 }
 74             } else {
 75                 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
 76                 log.info("pull message later because not locked in broker, {}", pullRequest);
 77                 return;
 78             }
 79         }
 80 
 81         final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
 82         if (null == subscriptionData) {
 83             this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
 84             log.warn("find the consumer's subscription failed, {}", pullRequest);
 85             return;
 86         }
 87 
 88         final long beginTimestamp = System.currentTimeMillis();
 89 
 90         PullCallback pullCallback = new PullCallback() {
 91             @Override
 92             public void onSuccess(PullResult pullResult) {
 93                 if (pullResult != null) {
 94                     pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
 95                         subscriptionData);
 96 
 97                     switch (pullResult.getPullStatus()) {
 98                         case FOUND:
 99                             long prevRequestOffset = pullRequest.getNextOffset();
100                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
101                             long pullRT = System.currentTimeMillis() - beginTimestamp;
102                             DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
103                                 pullRequest.getMessageQueue().getTopic(), pullRT);
104 
105                             long firstMsgOffset = Long.MAX_VALUE;
106                             if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
107                                 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
108                             } else {
109                                 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
110 
111                                 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
112                                     pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
113 
114                                 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
115                                 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
116                                     pullResult.getMsgFoundList(),
117                                     processQueue,
118                                     pullRequest.getMessageQueue(),
119                                     dispatchToConsume);
120 
121                                 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
122                                     DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
123                                         DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
124                                 } else {
125                                     DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
126                                 }
127                             }
128 
129                             if (pullResult.getNextBeginOffset() < prevRequestOffset
130                                 || firstMsgOffset < prevRequestOffset) {
131                                 log.warn(
132                                     "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
133                                     pullResult.getNextBeginOffset(),
134                                     firstMsgOffset,
135                                     prevRequestOffset);
136                             }
137 
138                             break;
139                         case NO_NEW_MSG:
140                         case NO_MATCHED_MSG:
141                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
142 
143                             DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
144 
145                             DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
146                             break;
147                         case OFFSET_ILLEGAL:
148                             log.warn("the pull request offset illegal, {} {}",
149                                 pullRequest.toString(), pullResult.toString());
150                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
151 
152                             pullRequest.getProcessQueue().setDropped(true);
153                             DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
154 
155                                 @Override
156                                 public void run() {
157                                     try {
158                                         DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
159                                             pullRequest.getNextOffset(), false);
160 
161                                         DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
162 
163                                         DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
164 
165                                         log.warn("fix the pull request offset, {}", pullRequest);
166                                     } catch (Throwable e) {
167                                         log.error("executeTaskLater Exception", e);
168                                     }
169                                 }
170                             }, 10000);
171                             break;
172                         default:
173                             break;
174                     }
175                 }
176             }
177 
178             @Override
179             public void onException(Throwable e) {
180                 if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
181                     log.warn("execute the pull request exception", e);
182                 }
183 
184                 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
185             }
186         };
187 
188         boolean commitOffsetEnable = false;
189         long commitOffsetValue = 0L;
190         if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
191             commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
192             if (commitOffsetValue > 0) {
193                 commitOffsetEnable = true;
194             }
195         }
196 
197         String subExpression = null;
198         boolean classFilter = false;
199         SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
200         if (sd != null) {
201             if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
202                 subExpression = sd.getSubString();
203             }
204 
205             classFilter = sd.isClassFilterMode();
206         }
207 
208         int sysFlag = PullSysFlag.buildSysFlag(
209             commitOffsetEnable, // commitOffset
210             true, // suspend
211             subExpression != null, // subscription
212             classFilter // class filter
213         );
214         try {
215             this.pullAPIWrapper.pullKernelImpl(
216                 pullRequest.getMessageQueue(),
217                 subExpression,
218                 subscriptionData.getExpressionType(),
219                 subscriptionData.getSubVersion(),
220                 pullRequest.getNextOffset(),
221                 this.defaultMQPushConsumer.getPullBatchSize(),
222                 sysFlag,
223                 commitOffsetValue,
224                 BROKER_SUSPEND_MAX_TIME_MILLIS,
225                 CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
226                 CommunicationMode.ASYNC,
227                 pullCallback
228             );
229         } catch (Exception e) {
230             log.error("pullKernelImpl exception", e);
231             this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
232         }
233     }
pullMessage(final PullRequest pullRequest)

  該方法有兩處持久化位點信息。

  第一處,在拉取完成后,如果拉取點非法,則此時客戶端會主動提交一次最新的消費位點信息給 Broker,以便下次能使用正確的位點拉取消息,該處更新位點信息的代碼路徑:D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\consumer\DefaultMQPushConsumerImpl.java,代碼如下:

  1 public void pullMessage(final PullRequest pullRequest) {
  2 {
  3         final ProcessQueue processQueue = pullRequest.getProcessQueue();
  4         if (processQueue.isDropped()) {
  5             log.info("the pull request[{}] is dropped.", pullRequest.toString());
  6             return;
  7         }
  8 
  9         pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
 10 
 11         try {
 12             this.makeSureStateOK();
 13         } catch (MQClientException e) {
 14             log.warn("pullMessage exception, consumer state not ok", e);
 15             this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
 16             return;
 17         }
 18 
 19         if (this.isPause()) {
 20             log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
 21             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
 22             return;
 23         }
 24 
 25         long cachedMessageCount = processQueue.getMsgCount().get();
 26         long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
 27 
 28         if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
 29             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 30             if ((queueFlowControlTimes++ % 1000) == 0) {
 31                 log.warn(
 32                     "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
 33                     this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
 34             }
 35             return;
 36         }
 37 
 38         if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
 39             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 40             if ((queueFlowControlTimes++ % 1000) == 0) {
 41                 log.warn(
 42                     "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
 43                     this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
 44             }
 45             return;
 46         }
 47 
 48         if (!this.consumeOrderly) {
 49             if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
 50                 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 51                 if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
 52                     log.warn(
 53                         "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
 54                         processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
 55                         pullRequest, queueMaxSpanFlowControlTimes);
 56                 }
 57                 return;
 58             }
 59         } else {
 60             if (processQueue.isLocked()) {
 61                 if (!pullRequest.isLockedFirst()) {
 62                     final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
 63                     boolean brokerBusy = offset < pullRequest.getNextOffset();
 64                     log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
 65                         pullRequest, offset, brokerBusy);
 66                     if (brokerBusy) {
 67                         log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
 68                             pullRequest, offset);
 69                     }
 70 
 71                     pullRequest.setLockedFirst(true);
 72                     pullRequest.setNextOffset(offset);
 73                 }
 74             } else {
 75                 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
 76                 log.info("pull message later because not locked in broker, {}", pullRequest);
 77                 return;
 78             }
 79         }
 80 
 81         final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
 82         if (null == subscriptionData) {
 83             this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
 84             log.warn("find the consumer's subscription failed, {}", pullRequest);
 85             return;
 86         }
 87 
 88         final long beginTimestamp = System.currentTimeMillis();
 89 
 90         PullCallback pullCallback = new PullCallback() {
 91             @Override
 92             public void onSuccess(PullResult pullResult) {
 93                 if (pullResult != null) {
 94                     pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
 95                         subscriptionData);
 96 
 97                     switch (pullResult.getPullStatus()) {
 98                         case FOUND:
 99                             long prevRequestOffset = pullRequest.getNextOffset();
100                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
101                             long pullRT = System.currentTimeMillis() - beginTimestamp;
102                             DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
103                                 pullRequest.getMessageQueue().getTopic(), pullRT);
104 
105                             long firstMsgOffset = Long.MAX_VALUE;
106                             if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
107                                 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
108                             } else {
109                                 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
110 
111                                 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
112                                     pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
113 
114                                 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
115                                 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
116                                     pullResult.getMsgFoundList(),
117                                     processQueue,
118                                     pullRequest.getMessageQueue(),
119                                     dispatchToConsume);
120 
121                                 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
122                                     DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
123                                         DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
124                                 } else {
125                                     DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
126                                 }
127                             }
128 
129                             if (pullResult.getNextBeginOffset() < prevRequestOffset
130                                 || firstMsgOffset < prevRequestOffset) {
131                                 log.warn(
132                                     "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
133                                     pullResult.getNextBeginOffset(),
134                                     firstMsgOffset,
135                                     prevRequestOffset);
136                             }
137 
138                             break;
139                         case NO_NEW_MSG:
140                         case NO_MATCHED_MSG:
141                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
142 
143                             DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
144 
145                             DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
146                             break;
147                         case OFFSET_ILLEGAL:
148                             log.warn("the pull request offset illegal, {} {}",
149                                 pullRequest.toString(), pullResult.toString());
150                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
151 
152                             pullRequest.getProcessQueue().setDropped(true);
153                             DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
154 
155                                 @Override
156                                 public void run() {
157                                     try {
158                                         DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
159                                             pullRequest.getNextOffset(), false);
160 
161                                         DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
162 
163                                         DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
164 
165                                         log.warn("fix the pull request offset, {}", pullRequest);
166                                     } catch (Throwable e) {
167                                         log.error("executeTaskLater Exception", e);
168                                     }
169                                 }
170                             }, 10000);
171                             break;
172                         default:
173                             break;
174                     }
175                 }
176             }
177 
178             @Override
179             public void onException(Throwable e) {
180                 if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
181                     log.warn("execute the pull request exception", e);
182                 }
183 
184                 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
185             }
186         };
187 
188         boolean commitOffsetEnable = false;
189         long commitOffsetValue = 0L;
190         if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
191             commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
192             if (commitOffsetValue > 0) {
193                 commitOffsetEnable = true;
194             }
195         }
196 
197         String subExpression = null;
198         boolean classFilter = false;
199         SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
200         if (sd != null) {
201             if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
202                 subExpression = sd.getSubString();
203             }
204 
205             classFilter = sd.isClassFilterMode();
206         }
207 
208         int sysFlag = PullSysFlag.buildSysFlag(
209             commitOffsetEnable, // commitOffset
210             true, // suspend
211             subExpression != null, // subscription
212             classFilter // class filter
213         );
214         try {
215             this.pullAPIWrapper.pullKernelImpl(
216                 pullRequest.getMessageQueue(),
217                 subExpression,
218                 subscriptionData.getExpressionType(),
219                 subscriptionData.getSubVersion(),
220                 pullRequest.getNextOffset(),
221                 this.defaultMQPushConsumer.getPullBatchSize(),
222                 sysFlag,
223                 commitOffsetValue,
224                 BROKER_SUSPEND_MAX_TIME_MILLIS,
225                 CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
226                 CommunicationMode.ASYNC,
227                 pullCallback
228             );
229         } catch (Exception e) {
230             log.error("pullKernelImpl exception", e);
231             this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
232         }
233     }
pullMessage()

   第二處,在執行消息拉取動作時,如果是集群消費,並且本地位點值大於0,那么最新的位點上傳給 Broker。

  1 public void pullMessage(final PullRequest pullRequest) {
  2         final ProcessQueue processQueue = pullRequest.getProcessQueue();
  3         if (processQueue.isDropped()) {
  4             log.info("the pull request[{}] is dropped.", pullRequest.toString());
  5             return;
  6         }
  7 
  8         pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
  9 
 10         try {
 11             this.makeSureStateOK();
 12         } catch (MQClientException e) {
 13             log.warn("pullMessage exception, consumer state not ok", e);
 14             this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
 15             return;
 16         }
 17 
 18         if (this.isPause()) {
 19             log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
 20             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
 21             return;
 22         }
 23 
 24         long cachedMessageCount = processQueue.getMsgCount().get();
 25         long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
 26 
 27         if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
 28             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 29             if ((queueFlowControlTimes++ % 1000) == 0) {
 30                 log.warn(
 31                     "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
 32                     this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
 33             }
 34             return;
 35         }
 36 
 37         if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
 38             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 39             if ((queueFlowControlTimes++ % 1000) == 0) {
 40                 log.warn(
 41                     "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
 42                     this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
 43             }
 44             return;
 45         }
 46 
 47         if (!this.consumeOrderly) {
 48             if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
 49                 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 50                 if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
 51                     log.warn(
 52                         "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
 53                         processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
 54                         pullRequest, queueMaxSpanFlowControlTimes);
 55                 }
 56                 return;
 57             }
 58         } else {
 59             if (processQueue.isLocked()) {
 60                 if (!pullRequest.isLockedFirst()) {
 61                     final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
 62                     boolean brokerBusy = offset < pullRequest.getNextOffset();
 63                     log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
 64                         pullRequest, offset, brokerBusy);
 65                     if (brokerBusy) {
 66                         log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
 67                             pullRequest, offset);
 68                     }
 69 
 70                     pullRequest.setLockedFirst(true);
 71                     pullRequest.setNextOffset(offset);
 72                 }
 73             } else {
 74                 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
 75                 log.info("pull message later because not locked in broker, {}", pullRequest);
 76                 return;
 77             }
 78         }
 79 
 80         final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
 81         if (null == subscriptionData) {
 82             this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
 83             log.warn("find the consumer's subscription failed, {}", pullRequest);
 84             return;
 85         }
 86 
 87         final long beginTimestamp = System.currentTimeMillis();
 88 
 89         PullCallback pullCallback = new PullCallback() {
 90             @Override
 91             public void onSuccess(PullResult pullResult) {
 92                 if (pullResult != null) {
 93                     pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
 94                         subscriptionData);
 95 
 96                     switch (pullResult.getPullStatus()) {
 97                         case FOUND:
 98                             long prevRequestOffset = pullRequest.getNextOffset();
 99                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
100                             long pullRT = System.currentTimeMillis() - beginTimestamp;
101                             DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
102                                 pullRequest.getMessageQueue().getTopic(), pullRT);
103 
104                             long firstMsgOffset = Long.MAX_VALUE;
105                             if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
106                                 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
107                             } else {
108                                 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
109 
110                                 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
111                                     pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
112 
113                                 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
114                                 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
115                                     pullResult.getMsgFoundList(),
116                                     processQueue,
117                                     pullRequest.getMessageQueue(),
118                                     dispatchToConsume);
119 
120                                 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
121                                     DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
122                                         DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
123                                 } else {
124                                     DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
125                                 }
126                             }
127 
128                             if (pullResult.getNextBeginOffset() < prevRequestOffset
129                                 || firstMsgOffset < prevRequestOffset) {
130                                 log.warn(
131                                     "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
132                                     pullResult.getNextBeginOffset(),
133                                     firstMsgOffset,
134                                     prevRequestOffset);
135                             }
136 
137                             break;
138                         case NO_NEW_MSG:
139                         case NO_MATCHED_MSG:
140                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
141 
142                             DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
143 
144                             DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
145                             break;
146                         case OFFSET_ILLEGAL:
147                             log.warn("the pull request offset illegal, {} {}",
148                                 pullRequest.toString(), pullResult.toString());
149                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
150 
151                             pullRequest.getProcessQueue().setDropped(true);
152                             DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
153 
154                                 @Override
155                                 public void run() {
156                                     try {
157                                         DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
158                                             pullRequest.getNextOffset(), false);
159 
160                                         DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
161 
162                                         DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
163 
164                                         log.warn("fix the pull request offset, {}", pullRequest);
165                                     } catch (Throwable e) {
166                                         log.error("executeTaskLater Exception", e);
167                                     }
168                                 }
169                             }, 10000);
170                             break;
171                         default:
172                             break;
173                     }
174                 }
175             }
176 
177             @Override
178             public void onException(Throwable e) {
179                 if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
180                     log.warn("execute the pull request exception", e);
181                 }
182 
183                 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
184             }
185         };
186 
187         boolean commitOffsetEnable = false;
188         long commitOffsetValue = 0L;
189         if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
190             commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
191             if (commitOffsetValue > 0) {
192                 commitOffsetEnable = true;
193             }
194         }
195 
196         String subExpression = null;
197         boolean classFilter = false;
198         SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
199         if (sd != null) {
200             if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
201                 subExpression = sd.getSubString();
202             }
203 
204             classFilter = sd.isClassFilterMode();
205         }
206 
207         int sysFlag = PullSysFlag.buildSysFlag(
208             commitOffsetEnable, // commitOffset
209             true, // suspend
210             subExpression != null, // subscription
211             classFilter // class filter
212         );
213         try {
214             this.pullAPIWrapper.pullKernelImpl(
215                 pullRequest.getMessageQueue(),
216                 subExpression,
217                 subscriptionData.getExpressionType(),
218                 subscriptionData.getSubVersion(),
219                 pullRequest.getNextOffset(),
220                 this.defaultMQPushConsumer.getPullBatchSize(),
221                 sysFlag,
222                 commitOffsetValue,
223                 BROKER_SUSPEND_MAX_TIME_MILLIS,
224                 CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
225                 CommunicationMode.ASYNC,
226                 pullCallback
227             );
228         } catch (Exception e) {
229             log.error("pullKernelImpl exception", e);
230             this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
231         }
232     }
View Code

  代碼中通過 commitOffsetEnable,sysFlag 兩個終端表示是否可以上報消費位點給 Broker。在執行 Pull 請求時,將 sysFlag 作為網絡請求的消息頭傳遞給 Broker.

  hasCommitOffsetFlg:Pull請求中的 sysFlag 參數,是決定 Broker 是否執行持久化消費位點的一個因素。

  brokerAllowSuspend:Broker 是否能掛起。如果 Broker 是掛起狀態,將不能持久化位點。

  storeOffsetEnable:True表示 Broker 需要持久化消費位點,False則不用持久化位點。

  以上是如何定時上報消費位點給 Broker,以及 Broker 如何處理上報位點的邏輯,那么消費者關閉時,如何持久化位點信息呢?

  以Push消費者程序關閉為例。Push 消費者關閉邏輯可以參考代碼路徑:D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\consumer\DefaultMQPushConsumer.java,代碼如下:

 1     public synchronized void shutdown() {
 2         switch (this.serviceState) {
 3             case CREATE_JUST:
 4                 break;
 5             case RUNNING:
 6                 this.persistConsumerOffset();
 7                 this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup());
 8                 this.mQClientFactory.shutdown();
 9                 log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup());
10                 this.serviceState = ServiceState.SHUTDOWN_ALREADY;
11                 break;
12             case SHUTDOWN_ALREADY:
13                 break;
14             default:
15                 break;
16         }
17     }
View Code

  理論上位點信息越是及時上報 Broker,越能減少消息重復的可能性。RocketMQ 在設計時並不完全支持 Exactly-One 的語義,因為實現該語義的代價頗大,並且使用場景極少,再加上用戶側實現冪等的代價更少,故而 RocketMQ 在設計時將冪等操作交與用戶處理。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM