rocketmq--push消費過程


Rocketmq消費分為push和pull兩種方式,push為被動消費類型,pull為主動消費類型,push方式最終還是會從broker中pull消息。不同於pull的是,push首先要注冊消費監聽器,當監聽器處觸發后才開始消費消息,所以被稱為“被動”消費。
 
 具體地,以pushConsumer的測試例子展開介紹,通常使用push消費的過程如下:
public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("Jodie_topic_1023", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20170422221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
 
上述過程背后設計到的點如下: 
 
I. checkConfig 檢查內容:
1.消費組 -- (不能與默認DEFAULT_CONSUMER同名)
2.消費模型 -- (默認CLUSTERING)
3.從何處開始消費 -- (默認CONSUME_FROM_LAST_OFFSET)
4.消費時間戳 -- (消息回溯,默認Default backtracking consumption time Half an hour ago)
5.消費負載均衡策略 -- (默認AllocateMessageQueueAveragely)
6.訂閱關系 --(map類型,即可訂閱多個topic;key=Topic, value=訂閱描述)
7.消費監聽 --(必須為orderly or concurrently類型之一)
8.消費消息的線程數量控制 -- (消費線程池最大、最小數量)
9.檢查單隊列並行消費允許的最大跨度 --(consumeConcurrentlyMaxSpan)
10.檢查拉消息本地隊列緩存消息最大數 --(pullThresholdForQueue)(processQueue.getMsgCount()記數)
11.檢查拉取時間間隔 --(拉消息間隔,由於是長輪詢,所以默認為0)
12.檢查批量消費的個數 --(一次消費多少條消息)
13.檢查批量拉取消息的個數 --(一次最多拉多少條)
 
II. copySubscription:
將訂閱信息設置到rebalanceImpl的map中用於負載。另外,如果該消費者的消費模式為集群消費,則會將retry的topic一並放到rebalanceImpl的map中用於負載。
 
III. 設置rebanlance信息
IV. 實例化pull消息的包裝類型
 
V. 如果不存在offsetStore對象,實例化offsetStore
廣播模式:
public class LocalFileOffsetStore implements OffsetStore {...}
注:load()函數體不為空
集群模式:
public class RemoteBrokerOffsetStore implements OffsetStore {...}
注:load()函數體為空
 
VI. 獲取監聽器,實例化consumeMessageService服務並啟動
ConsumeMessageOrderlyService啟動后會對拉取下來的消息進行處理。ConsumeMessageOrderlyService有兩種類型:ConsumeMessageOrderlyService和ConsumeMessageConcurrentlyService。
1). 如果消息監聽器是orderly類型,則創建ConsumeMessageOrderlyService實例
ConsumeMessageOrderlyService.start()只處理消息模式為CLUSTERING的消息消費。
public void start() {
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
            .messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);
        }
    }
線程啟動后會每隔20s執行lockMQPeriodicallys(),lockMQPeriodicallys()會將消費的隊列上鎖,然后處理,具體過程,有機會單獨成文分析。
 
2). 如果消息監聽器是concurrently類型,則創建ConsumeMessageConcurrentlyService實例
ConsumeMessageConcurrentlyService.start()會定時清除過期消息 --> cleanExpireMsg()。
 
VII. 注冊消費組
將group和consumer注冊到MQClientInstance實例。
與生產者注冊生產者組類似,一個客戶端進程中一個consumerGroup只能有一個實例。
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {
    log.warn("the consumer group[" + group + "] exist already.");
    return false;
}
如果沒有注冊成功,則關閉消費服務,consumeMessageService.shutdown()。
 
VIII. 啟動mQClientFactory及MQClientInstance
1). 獲取client實例對象MQClientInstance -- getAndCreateMQClientInstance。 一個進程只能產生一個MQClientInstance實例對象, 某個客戶端的生產者與消費者共用這個實例對象。
2). 啟動客戶端實例的個各種服務:
public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 1.判斷NamesrvAddr是否為空,為空去遠程http服務拉去地址
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
                    }
                    // 2.開啟通信服務
                    this.mQClientAPIImpl.start();
                    // 3.啟動各種定時任務
                    this.startScheduledTask();
                    // 4.啟動消息拉取服務,循環拉取阻塞隊列pullRequestQueue
                    this.pullMessageService.start();
                    // 5. 啟動負載均衡服務
                    this.rebalanceService.start();
                    // 6.啟動消息生產服務
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId()
                            + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
分析push消費的過程,需對上述過程第3點、第4點、第5點依次介紹。
第3點、啟動各種定時任務過程
 編號  任務 周期 啟動時延 
 獲取namesrv地址 每隔2分鍾  0.01s
2  更新路由信息 每隔3分鍾  001s
3  向所有broker發送心跳包,並清除無效broker   每隔30s  1s
4  持久化消費位置offset 每隔5s  10s
5  調整消費線程池大小 每隔1分鍾  1min
注:編號3中,客戶端會通過心跳消息,向broker注冊消費信息。Broker收到該心跳消息,把它維護在一個叫做ConsumerManager的對象里面,為之后做消費的負載均衡提供數據,負載均衡在消費端做,消費端在負載均衡時首先要從broker那獲取這份全局信息。
 
第4點 啟動pullMessageService服務
初始化客戶端實例時,創建PullMessageService服務對象。
this.pullMessageService = new PullMessageService(this),其中PullMessageService繼承於ServiceThread,是一個線程對象。啟動消息拉取服務線程后,在線程沒有阻塞的情況下會不斷地從循環阻塞隊列pullRequestQueue拉取PullRequest對象,然后執行this.pullMessage(pullRequest)。
 
那么pullRequestQueue的數據如何put進去的?核心是doRebalance ,負載均衡具體細節可以參考: http://www.cnblogs.com/chenjunjie12321/p/7913323.html

例如當前有N個客戶端同時消費一個topic下的消息隊列(如上圖),當前客戶端( clientId = currentCId),經過負載均衡處理后得到分配給當前消費者的消息隊列(如上圖的qM、qN),之后將這些隊列與processQueueTable中的隊列進行比對分析,見下面第五點。

 
第5點 RebalancePushImpl 負載均衡,分發pullRequest到pullRequestQueue。
負載均衡處理后得到分配給當前消費者的消息隊列,然后將這些隊列進行updateProcessQueueTableInRebalance 處理。updateProcessQueueTableInRebalance 的大致邏輯為如下 I、II 兩步:
 
 
I. 首先檢查當前RebalancePushImpl實例processQueueTable中與mqSet的包含關系
(1)如圖中processQueueTable的灰色部分,表示與mqSet集合不互不包含的隊列,對這些隊列首先設置Dropped為true,然后看這些隊列是否可以移除出processQueueTable--removeUnnecessaryMessageQueue,即每隔1s 看是否可以拿到當前隊列的消費鎖(tryLock()),拿到后返回true, 如果等待1s后仍然拿不到當前隊列的消費鎖則返回false,如果返回true則從processQueueTable移除對應的Entry<MessageQueue, ProcessQueue>;
 
(2) 如圖中processQueueTable的白色部分,表示與mqSet集合的交集隊列,對於這些隊列,如果是消費類型是pull型,則不用管,如果是push型,看這些隊列是否isPullExpired,如果是這些隊列首先設置Dropped為true,則可以移除出processQueueTable--removeUnnecessaryMessageQueue。
 
II. 經過 I 處理,processQueueTable更新之后, 將processQueueTable集合與mqSet的的相對補集: processQueueTable(mq) - mqSet 里的消息隊列依次封裝成pullRequest,然后dispatchPullRequest到pullRequestQueue中。
 
經過上述處理后,待消費的隊列放在了pullRequestList中,之后遍歷pullRequestList,對遍歷的每個隊列進行消費,代碼如下:
 @Override
    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {
            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
        }
    }

executePullRequestImmediately的邏輯功能:

 public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

總之,最終會將負載均衡得到的隊列存放到pullRequestQueue。

 
回過來繼續分析第4點, pullMessageService線程涉及到消費的核心過程DefaultMQPushConsumerImpl.pullMessage, pullMessageService線程線程體源碼如下:
 @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            } catch (InterruptedException e) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

調用DefaultMQPushConsumerImpl.pullMessage方法:

 private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        } 

pullMessage具體體拉流程如下圖所示:

 
 


下面對並發消費模型(concurrently)的消費代碼進行展示:


class ConsumeRequest implements Runnable ,其線程體方法如下:
@Override
        public void run() {
            if (this.processQueue.isDropped()) {
                log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                return;
            }

            MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
            ConsumeConcurrentlyStatus status = null;

            ConsumeMessageContext consumeMessageContext = null;
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                consumeMessageContext.setProps(new HashMap<String, String>());
                consumeMessageContext.setMq(messageQueue);
                consumeMessageContext.setMsgList(msgs);
                consumeMessageContext.setSuccess(false);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
            }

            long beginTimestamp = System.currentTimeMillis();
            boolean hasException = false;
            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
            try {
                ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
                if (msgs != null && !msgs.isEmpty()) {
                    for (MessageExt msg : msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            } catch (Throwable e) {
                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                    RemotingHelper.exceptionSimpleDesc(e),
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                hasException = true;
            }
            long consumeRT = System.currentTimeMillis() - beginTimestamp;
            if (null == status) {
                if (hasException) {
                    returnType = ConsumeReturnType.EXCEPTION;
                } else {
                    returnType = ConsumeReturnType.RETURNNULL;
                }
            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                returnType = ConsumeReturnType.TIME_OUT;
            } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                returnType = ConsumeReturnType.FAILED;
            } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                returnType = ConsumeReturnType.SUCCESS;
            }

            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
            }

            if (null == status) {
                log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.setStatus(status.toString());
                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
            }

            ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

            if (!processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            } else {
                log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
            }
        }
View Code

consumerRequest邏輯:

 

 

processConsumeResult -- 對消費結果進行處理:

 

 重試隊列發消息邏輯:

生成一個重試隊列,重試隊列topic =  %RETRY% + consumerGroup的形式。
 
 
 附:
值得注意的是每次消費pullRequest上的一條數據后上更新消費到達的 offset,然后將pullRequest.setNextOffset(offset);
//這里的 this 為一個 DefaultMQPushConsumerImpl 實例對象
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
...
pullRequest.setNextOffset(offset);

其中 computePullFromWhere采用的策略有如下三種(另外還有幾個已經被棄用的(@Deprecated)):

CONSUME_FROM_LAST_OFFSET(默認): 一個新的消費集群第一次啟動從隊列的最后位置開始消費。后續再啟動接着上次消費的進度開始消費。
CONSUME_FROM_FIRST_OFFSET: 一個新的消費集群第一次啟動從隊列的最前位置開始消費。后續再啟動接着上次消費的進度開始消費。
CONSUME_FROM_TIMESTAMP: 一個新的消費集群第一次啟動從指定時間點開始消費。后續再啟動接着上次消費的進度開始消費。

DefaultMQPushConsumer 中默認采用 CONSUME_FROM_LAST_OFFSET 這種方式,當然可以根據自己需要修改computePullFromWhere的策略

private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

 

IX. updateTopicSubscribeInfoWhenSubscriptionChanged
X. sendHeartbeatToAllBrokerWithLock
XI. rebalanceImmediately
 
(完)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM