rocketMQ 長輪詢


consumer 拉取消息,對應的 queue 如果沒有數據,broker 不會立即返回,而是以一種長輪詢的方式處理,把 PullReuqest 保存起來,等待 queue 有了消息后,或者長輪詢阻塞時間到了,再重新處理該 queue 上的所有 PullRequest。

1. queue 此時沒有數據

// PullMessageProcessor#processRequest 片段
case ResponseCode.PULL_NOT_FOUND:
    // broker 和 consumer 都允許 suspend,默認開啟
    if (brokerAllowSuspend && hasSuspendFlag) {
        // 確定 pollingTimeMills 值
        // 以 DefaultMQPushConsumerImpl 為例,BROKER_SUSPEND_MAX_TIME_MILLIS = 15000
        long pollingTimeMills = suspendTimeoutMillisLong;
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }

        // 使用 PullRequest 把原始的 PullMessageRequest 封裝起來
        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }

2. 保存 PullRequest

// org.apache.rocketmq.broker.longpolling.PullRequestHoldService
private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
    new ConcurrentHashMap<String, ManyPullRequest>(1024);

// org.apache.rocketmq.broker.longpolling.ManyPullRequest
private final ArrayList<PullRequest> pullRequestList = new ArrayList<>();    

3. 當 queue 有新的消息產生,寫到 commitLog 后,並且 reput 到 consumeQueue 和 indexFile 后,則觸發處理 PullRequest

// org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput 片段
// 寫入 consumeQueue 和 indexFile
DefaultMessageStore.this.doDispatch(dispatchRequest);

if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
    // 觸發處理 PullRequest
    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
        dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
        dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
        dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
// 處理 queue 上對應的所有請求,有數據或者超時都返回
// org.apache.rocketmq.broker.longpolling.PullRequestHoldService#notifyMessageArriving
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (mpr != null) {
        List<PullRequest> requestList = mpr.cloneListAndClear();
        if (requestList != null) {
            List<PullRequest> replayList = new ArrayList<PullRequest>();

            for (PullRequest request : requestList) {
                long newestOffset = maxOffset;
                if (newestOffset <= request.getPullFromThisOffset()) {
                    newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                }

                if (newestOffset > request.getPullFromThisOffset()) {
                    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                        new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                    // match by bit map, need eval again when properties is not null.
                    if (match && properties != null) {
                        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                    }

                    if (match) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }
                }

                if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                    try {
                        this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                            request.getRequestCommand());
                    } catch (Throwable e) {
                        log.error("execute request when wakeup failed.", e);
                    }
                    continue;
                }

                replayList.add(request);
            }

            if (!replayList.isEmpty()) {
                mpr.addPullRequest(replayList);
            }
        }
    }
}

4. PullRequestHoldService 是一個線程類,也會在 run 方法里不停地檢查 PullRequest

// org.apache.rocketmq.broker.longpolling.PullRequestHoldService#run
public void run() {
    log.info("{} service started", this.getServiceName());
    while (!this.isStopped()) {
        try {
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }

            long beginLockTimestamp = this.systemClock.now();
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info("{} service end", this.getServiceName());
}

5. PullMessageProcessor 處理長輪詢醒來的請求,這次不能再阻塞它了

// org.apache.rocketmq.broker.processor.PullMessageProcessor#executeRequestWhenWakeup
public void executeRequestWhenWakeup(final Channel channel,
        final RemotingCommand request) throws RemotingCommandException {
    Runnable run = new Runnable() {
        @Override
        public void run() {
            try {
                final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);

                if (response != null) {
                    response.setOpaque(request.getOpaque());
                    response.markResponseType();
                    try {
                        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (!future.isSuccess()) {
                                    log.error("processRequestWrapper response to {} failed",
                                        future.channel().remoteAddress(), future.cause());
                                    log.error(request.toString());
                                    log.error(response.toString());
                                }
                            }
                        });
                    } catch (Throwable e) {
                        log.error("processRequestWrapper process request over, but response failed", e);
                        log.error(request.toString());
                        log.error(response.toString());
                    }
                }
            } catch (RemotingCommandException e1) {
                log.error("excuteRequestWhenWakeup run", e1);
            }
        }
    };
    this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}

6. 注意客戶端的參數

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
// broker 阻塞時間
private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
// 客戶端超時時間
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM