RocketMq消費者拉取消息服務PullMessageService


RocketMq消費者拉取消息服務PullMessageService

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            //從請求隊列里獲取一個請求
            PullRequest pullRequest = this.pullRequestQueue.take();
            if (pullRequest != null) {
                //拉取數據
                this.pullMessage(pullRequest);
            }
        } catch (InterruptedException e) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

PullMessageService#pullMessage

private void pullMessage(final PullRequest pullRequest) {
    //根據組名獲取對應的消費者 一個mqClientInstance里一個consumerGroup只有一個消費者對應
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        //實際是根據組名獲取對應的消費者來發起消息拉取
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}

DefaultMQPushConsumerImpl#pullMessage


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM