心跳機制
在Consumer啟動后,它就會通過定時任務不斷地向RocketMQ集群中的所有Broker實例發送心跳包
心跳包內容包含了
-
-
- 消息消費分組名稱
- 訂閱關系集合
- 消息通信模式
- 客戶端id的值
-
Broker端在收到Consumer的心跳消息后,會將它維護在ConsumerManager的本地緩存變量—consumerTable,同時並將封裝后的客戶端網絡通道信息保存在本地緩存變量—channelInfoTable中,為之后做Consumer端的負載均衡提供可以依據的元數據信息。
消息拉取-PullMessageService
PUSH模式
rocketMQ的push模式並沒沒有實現真正的推送模式,而是通過一個pullMessageServce開啟一個線程專門來不斷從broker循環地拉取數據,這樣一有消息就會及時拉取到本地,封裝給用戶使用時就有是broker端push到消費者的感覺。
PullMessageService
消息拉取是由PullMessageService服務線程負責的,在PullMessageService中維護了一個PullRequest隊列,在該線程啟動后,會從PullRequest隊列中取出PullRequest對象,然后調用pullMessage()方法進行消息拉取。
PullMessageService也是繼承了ServiceThread類的,當該線程啟動后會執行其run()方法,如下。
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}

可以看到pullRequestQueue是一個阻塞隊列,因此該隊列為空時,該線程會等待隊列中有值也就是pullRequest對象時才會調用pullMessage方法。
PullRequest
消息拉取時是通過PullRequest中封裝的訂閱信息來從broker中拉取消息的,PullRequest是在RebalnceImpl中,負載均衡完成后,根據分配的消息隊列進行封裝,然后put到PullMessageService中pullRequestQueue中的
public class PullRequest {
private String consumerGroup;//消費者組
private MessageQueue messageQueue;//消息隊列
private ProcessQueue processQueue;//消息處理隊列
private long nextOffset;//消費的起始偏移量
private boolean lockedFirst = false;
...
}

pullMessage()##PullMessageService
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}

pullMessage()##DefaultMQPushConsumerImpl
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
//判斷當前processQueue是否被移除
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
//更新processQueue的lastPullTimestamp,即最后拉取時間戳
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
//如果當前消費者被掛起,則將processQueue延遲一秒后再次放入PullMessageService的pullRequestQueue隊列
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
//對消費端進行限流控制,從消息數量和消息大小兩個維度來限制
//獲取當前processQueue中堆積的消息總數和消息總內存大小
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
//如果堆積消息總量達到限流閾值,則放棄該隊列的本次消息拉取任務,並在50毫秒延遲后重新加入PullMessageService的pullRequestQueue隊列
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
//判斷是否順序消費
if (!this.consumeOrderly) {
//processQueue中消息的最大偏移量與最小偏移量直接的差值不能大於2000
//這個主要是避免存在某條消息堵塞,消息進度無法向前推進
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
//獲取該主題的訂閱信息,如果為空,則停止本次任務,等待三秒后重新放入PullMessageService的pullRequestQueue隊列
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
//負責拉取到消息后的回調處理,我覺得可以單獨用一個方法來返回該對象的,寫在這里導致pullMessage方法太長了,影響閱讀
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
...省略,太長了,后面講...
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
//執行消息拉取操作,拉取成功后調用pullCallback回調方法
//pullAPIWrapper對客戶端API進行了封裝,隱藏了消息拉取的具體步驟
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(), //拉取消息的隊列
subExpression, //消息過濾表達式
subscriptionData.getExpressionType(), //消息過濾表達式類型,支持按TAG過濾以及SQL表達式過濾
subscriptionData.getSubVersion(), //版本
pullRequest.getNextOffset(),//消息拉取的偏移量
this.defaultMQPushConsumer.getPullBatchSize(),//批量拉取的最大條數,默認32
sysFlag,//拉取系統標記
commitOffsetValue,//當前MessageQueue消費進度
BROKER_SUSPEND_MAX_TIME_MILLIS,//消息拉取過程中允許Broker掛起的時間,默認15s
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,//消息拉取超時時間
CommunicationMode.ASYNC,//消息拉取模式,異步
pullCallback//消息拉取成功后的回調方法
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}

流量控制
根據上面源碼可以發現,RocketMQ從下面三個維度來對消費者客戶端進行流量控制,在push模式下。如果是pull模式則無需控制,
因為pull模式本就是消費者按照消息消費速率自動控制消息拉取的。
- 消費者本地緩存消息數超過pullThresholdForQueue時,默認1000。
- 消費者本地緩存消息大小超過pullThresholdSizeForQueue時,默認100MB。
- 消費者本地緩存消息跨度超過consumeConcurrentlyMaxSpan時,默認2000。
消費者流控的結果是降低拉取頻率。
pullKernelImpl()##PullAPIWrapper
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
{
// check version
if (!ExpressionType.isTagType(expressionType)
&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
}
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
//通過NettyRemotingClient執行消
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

pullMessage()##MQClientAPIImpl
public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
//調用remote模塊執行異步請求,remote模塊底層實現為netty
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
//操作完成回調
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
//將broker端返回數據封裝成PullResult對象
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
//執行回調,之前在DefaultMQPushConsumerImpl中實現了一個回調接口的匿名內部類
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
}

回調處理-PullCallback
現在我們看一下之前的回調接口的onSuccess()方法
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};

消息消費 - ConsumeMessageService
根據前面可知,當拉取到消息后,會將消息提交到消息消費任務ConsumeMessageService
public interface ConsumeMessageService {
void start();
void shutdown();
void updateCorePoolSize(int corePoolSize);
void incCorePoolSize();
void decCorePoolSize();
int getCorePoolSize();
//直接消費消息
ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
//提交消息任務到線程池進行消費
void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume);
}

RocketMQ消費者有兩種消息消費模式,並發消費和順序消費
並發消費-ConsumeMessageConcurrentlyService
並發消費時,會根據最大批消費數量,將拉取到的消費拆分到多個消費任務,提交到線程池中並發消費。
submitConsumeRequest()##ConsumeMessageConcurrentlyService
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
//消費批次數量,一個消息消費任務ConsumeRequest中包含的消息數量,默認為1
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
//將消息消費任務提交到線程池
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
//當線程池拒絕該任務,即線程池滿了之后,延遲5秒后重新投遞
this.submitConsumeRequestLater(consumeRequest);
}
} else {
//當消息數量大於consumeBatchSize時,則對消息進行拆分,每個任務分配consumeBatchSize條消息
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}

ConsumeMessageConcurrentlyService$ConsumeRequest
提交到消息消費服務的消息,最終都會封裝成一個個ConsumeRequest任務提交到線程池,ConsumeRequest實現了Runnable接口,可以開啟一個獨立的線程,消息消費的過程就在其run()方法中。
public void run() {
//檢查該處理隊列是否已被移除,如果被移除則停止消費該隊列里的消息。
//在執行重均衡時,如果該消息隊列被分配給消費者組其它消費者,則需要將當前消費者中的該隊列的dropped置為true
//這樣可以避免消費者消費到不屬於自己隊列的消息,避免消息重復消費
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
//獲取消息消費監聽器,我們構建消費者實例時通過registerMessageListener()方法注冊自定義的消息消費監聽
//即執行我們實際業務的處理程序
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
//執行消息消費前置鈎子函數
//通過 consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook() 方法進行注冊鈎子函數
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
//調用具體的消息消費程序,並獲取返回的執行結果
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
//對消費結果進行處理
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//執行消息消費后置鈎子函數
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
//如果在消息消費過程中,該隊列被分配給了其它消費者,則不對消費結果進行處理。
//因此可能會造成消息重復消費,因此需要用戶自己在業務端自行實現重復消費限制
if (!processQueue.isDropped()) {
//對消費結果進行處理
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

根據源碼可以得出,消息消費分為以下幾個步驟:
- 校驗當前消息隊列是否已被移除,避免重復消費
- 執行消息消費前置鈎子函數
- 調用MessageListener來執行用戶注冊的實際消息消費過程
- 對消息消費返回結果進行處理以及消息消費是否超時判斷
- 執行消息消費后置鈎子函數
- 執行消息消費結果處理程序,包括消息重試,offset提交等
消費結果處理及提交
processConsumeResult()
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//廣播模式下,消息消費失敗后不會重新消費,而是打印warn級別日志提醒
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
//集群模式下,如果該批消息消費失敗,則把該批消息投遞到重試隊列
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//將消息發送到重試隊列
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
//如果有消息投遞到重試隊列失敗,則將該消息從當前consumeRequest中移除,並重新提交到消息消費服務,延遲5S后重新消費。
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
//從processQueue中移除這批消息,並返回移除這些消息后processQueue中最小的消息偏移量,然后用該offset更新消息消費進度
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
//更新消費偏移量
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

重試隊列
RocketMQ會為每個消費組都設置一個Topic名稱為“%RETRY%+consumerGroup”的重試隊列(這里需要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每個Topic設置的),用於暫時保存因為各種異常而導致Consumer端無法消費的消息。
考慮到異常恢復起來需要一些時間,會為重試隊列設置多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。延遲級別有以下18個級別,依次遞增。重試級別為1則表示延時5S,為2則表示10S。每消費失敗一次,重試級別+1
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

當重試級別大於0時,即需要延遲處理時,RocketMQ對於重試消息的處理是先保存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊列中,隊列ID為對應的延遲級別 -1。后台定時任務按照對應的時間進行Delay后重新保存至“%RETRY%+consumerGroup”的重試隊列中。關於重試和定時隊列的相關內容會單獨開一篇文章進行詳細分析。這里暫不多說。
提交offset
偏移量提交是通過OffsetStore的updateOffset()方法實現的。OffsetStore默認有兩個實現類,LocalFileOffsetStore和RemoteBrokerOffsetStore分別對應廣播消費模式和集群消費模式,廣播消費模式時offset是存儲在消費者本地的,而集群模式時通過broker端對消費進度進行統一管理。
兩種模式下都是先將offset存儲到本地的Map結構中,然后會定時調用持久化方法將offset持久化,只不過LocalFileOffsetStore是直接保存到本地文件中,而RemoteBrokerOffsetStore是將offset保存到Borker端。默認每10秒持久化一次
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}

關於offset的值
前面消費結果處理方法分析中已經介紹了,當一批消息消費結束后,需要從processQueue中刪除該批消息,並返回processQueue中刪除該批消息后最小的消息offset值。假如processQueue中有offset分別為10,20,21,23,...的消息,其中taskA中的消息offset是20,當taskA結束后,會刪除offset為20的消息,但是返回的offset不會是21,而是10。只有等offset為10的消息被消費完成后,才會用下一個最小的offset去更新比如21。
那么當offset為10的消息消費過程中由於死鎖或者其它原因導致長時間阻塞,那么消息進度會無法向前推進。所以在拉取消息的時候如果processQueue中消息跨度大於限制值時,則延遲該隊列的消息拉取。
- 消費者本地緩存消息跨度超過consumeConcurrentlyMaxSpan時,默認2000
順序消費-ConsumeMessageOrderlyService
順序消費和並發消費一樣,也是通過submitConsumeRequest()方法將消息封裝為ConsumeRequest任務並提交到線程池消費。
不過消息不會拆分,而是封裝到一個ConsumeRequest任務中順序消費。
ConsumeMessageOrderlyService$ConsumeRequest
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
//消費時會對該消息隊列加鎖,因此一個消息隊列同一時刻只會被一個線程訪問,保證了同一個隊列內部消費的順序。
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
//判斷該消息隊列是否已經被移除
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
//是否獲得當前消息隊列的鎖,如果沒有則嘗試獲取鎖之后重新消費,如果獲取鎖失敗則延遲3S之后重新提交
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
//鎖如果已經過期也重新執行獲得鎖的操作
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
long interval = System.currentTimeMillis() - beginTime;
//順序消費時,循環地從處理隊列中獲取consumeBatchSize個消息進行消費,直至沒有消息,
// 每次循環都會判斷本次消費任務是否已消費超時,如果超時則重新提交消費該批消息
//順序消費時,每個ConsumeRequest任務是根據消費時間來限制的,當一個任務消費時間達到閾值(60S)
//則停止本次任務,讓出線程池資源。
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
//每次從處理隊列中按消費批處理數量來一次取出對應數量的消息,如果獲取消息為空,則表明消息已經消費完了。則結束本次任務
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//申請消費鎖,每個處理隊列持有一個Lock對象,保證消息消費時,一個處理隊列不會被並發消費
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
//調用真正的消息消費邏輯
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}

本節重點
- 順序消費時,會按照處理隊列來對消費任務進行加鎖,避免並發消費導致順序不一致。
- 消費開始前,會判斷當前消費者是否獲的該消息隊列的鎖,以及鎖是否過期。並執行獲取鎖的操作之后重新投遞任務。
- 順序消費時,每次拉取到的消息會放入一個ConsumeRequest任務中提交到線程池,消費時,每次從處理隊列獲取一定數量的消息進行消費。而不是按照本次拉取的消息維度來進行消費。
因為根據前面分析可知,即便是順序消費,也是每次拉取消息后都會開啟一個ConsumeRequest任務線程進行消費。
如果是根據拉取的消息維度來進行消費,那么如果拉取消息時,如果上次拉取到的消息還沒有消費完,而本次任務又開始了就會導致消費順序亂序。
假如上次拉取到的是20-40offset的消息,本次是41-50offset的消息。那么上次消費到31的時候,本次認為又開始了,那么本次就會從41開始消費,這樣導致順序就亂了。
- 每次消費任務按照時間進行限制,閾值為60S,即當一批消息提交后,即使沒有消費完成,只要超過該值,則結束本次消費任務,重新投遞消費請求。
long interval = System.currentTimeMillis() - beginTime;
//順序消費時,循環地從處理隊列中獲取consumeBatchSize個消息進行消費,直至沒有消息,
// 每次循環都會判斷本次消費任務是否已消費超時,如果超時則重新提交消費該批消息
//順序消費時,每個ConsumeRequest任務是根據消費時間來限制的,當一個任務消費時間達到閾值(60S)
//則停止本次任務,讓出線程池資源。
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}

進度提交
自動提交時,每消費成功一次消息則會提交一次offset
if (context.isAutoCommit()) {
switch (status) {
//這兩種執行結果在自動提交時是不允許的
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
//獲取要提交的offset值,offset為本次消費的消息里的最大offset值+1,即下一個要消費的消息的offset
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;

手動提交時,需要返回COMMIT狀態才會提交
} else {
switch (status) {
case SUCCESS:
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case COMMIT:
commitOffset = consumeRequest.getProcessQueue().commit();

更新offset
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}

消費重試
當消息消費返回狀態為SUSPEND_CURRENT_QUEUE_A_MOMENT時,如果當前消息的重試次數沒有達到閾值,則將該批消息重新放好處理隊列中,然后清除consumingMsgOrderlyTreeMap。1S延遲后重新加入到消費隊列中,並終止本次消費。
如果已經達到或超過最大閾值,則投遞到死信隊列,並提交進度,繼續消費后續消息。
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
//重新將當前消息加入到處理隊列的消息列表中
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
//重新提交消費請求
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
//停止本次消費任務
continueConsume = false;
} else {
//自動提交時,如果消息的重試次數已經達到最大限制,也提交消息消費進度。
commitOffset = consumeRequest.getProcessQueue().commit();
}

private boolean checkReconsumeTimes(List<MessageExt> msgs) {
boolean suspend = false;
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
//發送消息到死信隊列,如果失敗則繼續重試消費,並將重試次數+1
if (!sendMessageBack(msg)) {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
} else {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
}
}
return suspend;
}
