consumer 拉取消息,對應的 queue 如果沒有數據,broker 不會立即返回,而是以一種長輪詢的方式處理,把 PullReuqest 保存起來,等待 queue 有了消息后,或者長輪詢阻塞時間到了,再重新處理該 queue 上的所有 PullRequest。
1. queue 此時沒有數據
// PullMessageProcessor#processRequest 片段 case ResponseCode.PULL_NOT_FOUND: // broker 和 consumer 都允許 suspend,默認開啟 if (brokerAllowSuspend && hasSuspendFlag) { // 確定 pollingTimeMills 值 // 以 DefaultMQPushConsumerImpl 為例,BROKER_SUSPEND_MAX_TIME_MILLIS = 15000 long pollingTimeMills = suspendTimeoutMillisLong; if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } // 使用 PullRequest 把原始的 PullMessageRequest 封裝起來 String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; }
2. 保存 PullRequest
// org.apache.rocketmq.broker.longpolling.PullRequestHoldService private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<String, ManyPullRequest>(1024); // org.apache.rocketmq.broker.longpolling.ManyPullRequest private final ArrayList<PullRequest> pullRequestList = new ArrayList<>();
3. 當 queue 有新的消息產生,寫到 commitLog 后,並且 reput 到 consumeQueue 和 indexFile 后,則觸發處理 PullRequest
// org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput 片段 // 寫入 consumeQueue 和 indexFile DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { // 觸發處理 PullRequest DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); }
// 處理 queue 上對應的所有請求,有數據或者超時都返回 // org.apache.rocketmq.broker.longpolling.PullRequestHoldService#notifyMessageArriving public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { List<PullRequest> requestList = mpr.cloneListAndClear(); if (requestList != null) { List<PullRequest> replayList = new ArrayList<PullRequest>(); for (PullRequest request : requestList) { long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); // match by bit map, need eval again when properties is not null. if (match && properties != null) { match = request.getMessageFilter().isMatchedByCommitLog(null, properties); } if (match) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } } if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } replayList.add(request); } if (!replayList.isEmpty()) { mpr.addPullRequest(replayList); } } } }
4. PullRequestHoldService 是一個線程類,也會在 run 方法里不停地檢查 PullRequest
// org.apache.rocketmq.broker.longpolling.PullRequestHoldService#run public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
5. PullMessageProcessor 處理長輪詢醒來的請求,這次不能再阻塞它了
// org.apache.rocketmq.broker.processor.PullMessageProcessor#executeRequestWhenWakeup public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { Runnable run = new Runnable() { @Override public void run() { try { final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false); if (response != null) { response.setOpaque(request.getOpaque()); response.markResponseType(); try { channel.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { log.error("processRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause()); log.error(request.toString()); log.error(response.toString()); } } }); } catch (Throwable e) { log.error("processRequestWrapper process request over, but response failed", e); log.error(request.toString()); log.error(response.toString()); } } } catch (RemotingCommandException e1) { log.error("excuteRequestWhenWakeup run", e1); } } }; this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request)); }
6. 注意客戶端的參數
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl // broker 阻塞時間 private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15; // 客戶端超時時間 private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;