rocketmq源碼分析3-consumer消息獲取


使用rocketmq的大體消息發送過程如下:

Snip20170222_17

在前面已經分析過MQ的broker接收生產者客戶端發過來的消息的過程,此文主要講述訂閱者獲取消息的過程,或者說broker是怎樣將消息傳遞給消費者客戶端的,即上面時序圖中拉取消息(pull message)動作。。

 

1. 如何找到入口(MQ-broker端)

分析一個機制或者功能時,我們首先希望的是找到入口,前一篇我們是通過端口號方式順藤摸瓜的方式找到了入口。但是此篇略微不同,涉及到consumer客戶端與broker的兩邊分析,最終發現邏輯還是比較繞的,主要有很多異步動作,還有循環調用(當然不是一個線程上,而且中間有阻塞隊列緩沖),這對調試式分析代碼造成了一些不方便。
回到正題,怎么找到這里入口?在具備上篇分析的基礎上,我直接分析broker的代碼,broker接收消息的時候是靠SendMessageProcessor,那么在消息傳遞給消費端的時候是不是也是靠某個processor完成的?據這些processor的命名觀察,猜測PullMessageProcessor比較像。
為了驗證這一想法,注釋掉BrokerController中使用這個processor的地方,再重新測試,發現consumer就收不到producer發過來的消息了。想法初步正確。

2. 調試PullMessageProcessor(MQ-broker端)

RemotingServer在注冊processor的時候,是根據RequestCode進行注冊的。
PullMessageProcessor 對應的RequestCode的PULL_MESSAGE,即11。猜測:consumer客戶端不斷(或定時輪詢,或循環調用,或其他方式)發起pull message請求給broker,broker會處理這些請求,后面會驗證這個猜測。
PullMessageProcessor在注冊的時候對應的線程池是pullMessageExecutor,線程池的corePoolSize以及maxPoolSize都可以在broker中進行config,字段名是pullMessageThreadPoolNums。默認值16+處理器個數*2。

3. 哪里發送了RequestCode為PULL_MESSAGE的請求(consumer客戶端)

通過全局搜索,很容易發現是MQClientAPIImpl.pullMessage422行,發送了PULL_MESSAGE類型的請求。
加斷點(consumer客戶端需要debug方式啟動),看調用堆棧。

Snip20170222_12

很容易發現 是 PullMessageService.run()發出了PULL_MESSAGE的request。 run的代碼如下:

while (!this.isStoped()) {
    try {
        PullRequest pullRequest = this.pullRequestQueue.take();
        if (pullRequest != null) {
            this.pullMessage(pullRequest);
        }
    }
    catch (InterruptedException e) {
    }
    catch (Exception e) {
        log.error("Pull Message Service Run Method exception", e);
    }
}

在線程沒有停止的情況下,一直循環發拉取消息的請求,過程中被pullRequestQueue阻塞隊列阻塞。
分析誰向pullRequestQueue put了元素?是PullMessageService.executePullRequestImmediately(PullRequest)方法。
誰調了上面的方法,同樣斷點分析,調用堆棧如下圖:

Snip20170222_15

划藍色線的ResponseFuture地方,是阿里對這種通過發送網絡請求調用后還能回調回來的一個特性封裝,值得學習。 划紅色線的 MQClientAPIImpl$2地方是在處理業務邏輯,位於方法pullMessageAsync(String, RemotingCommand, long, PullCallback)內。此處又是一個異步。

private void pullMessageAsync(//
        final String addr,// 1
        final RemotingCommand request,//
        final long timeoutMillis,//
        final PullCallback pullCallback//
) throws RemotingException, InterruptedException {
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
        @Override
        public void operationComplete(ResponseFuture responseFuture) {
            RemotingCommand response = responseFuture.getResponseCommand();
            if (response != null) {
                try {
                    PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
                    assert pullResult != null;
                    pullCallback.onSuccess(pullResult);// 457行對應此處
                }
                catch (Exception e) {
                    pullCallback.onException(e);
                }
            }
            else {
                //... 省略
            }
        }
    });
}

4. 調試MQClientAPIImpl.pullMessageAsync(consumer客戶端)

誰調用了MQClientAPIImpl.pullMessageAsync?
449行打斷點,堆棧如下:

Snip20170222_16

發現又回到上面的PullMessageService的run中。:-(
回頭去看看3段落中pullCallback.onSuccess(pullResult);// 457行對應此處
這一行代碼,跟進去就會發現玄機,在這里面又調用了PullMessageService.executePullRequestImmediately(PullRequest)方法。 是一個匿名內部類,位於DefaultMQPushConsumerImpl.pullMessage(PullRequest)中。這個方法太長,貼一些簡略的,

final long beginTimestamp = System.currentTimeMillis();

PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult =
                    DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                        pullRequest.getMessageQueue(), pullResult, subscriptionData);

            switch (pullResult.getPullStatus()) {
            case FOUND:
                //...省略

                long firstMsgOffset = Long.MAX_VALUE;
                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                }
                else {
                    //...省略

                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                    }
                    else {
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    }
                }

                //...省略
                break;
            case NO_NEW_MSG:
                //...省略

                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                break;
            case NO_MATCHED_MSG:
                //...省略

                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                break;
            case OFFSET_ILLEGAL:
                //...省略

上述代碼基本上很清楚地看出來拉取代碼的發起邏輯了。在case FOUND分支打個斷點,當你從producer的客戶端發一條消息過來的時候,能看到斷點被命中。當沒有producer沒有發消息的時候,一直走的就是case NO_NEW_MSG分支。

5. 上面分析的對嗎?

不全對。為什么?回去看段落3的[1]標注出,關於是誰調用了PullMessageService.executePullRequestImmediately(PullRequest)方法。

其實不止是段落3分析的那樣,還有RebalanceImpl.updateProcessQueueTableInRebalance(String, Set )417行發起的調用。
為什么會想到這個問題?因為按段落3 的分析,調用executePullRequestImmediately方法的入參是 PullRequest,但是上面分析是"循環"調用,那么最初始的這個PullRequest是哪里構造的?

帶着這個疑問就不難發現肯定還有其他調用了executePullRequestImmediately方法。於是搜索,加斷點,發現RebalanceImpl.updateProcessQueueTableInRebalance會調用。
其實通過搜索 new PullRequest 關鍵字也是很容易找到上述調用的地方。

誰對RebalanceImpl.updateProcessQueueTableInRebalance發起了調用?
觀察調用棧:
consumer 構造PULL_MESSAGE的消息堆棧
Thread [RebalanceService] (Suspended (breakpoint at line 417 in RebalanceImpl))
RebalancePushImpl(RebalanceImpl).updateProcessQueueTableInRebalance(String, Set ) line: 417
RebalancePushImpl(RebalanceImpl).rebalanceByTopic(String) line: 321 RebalancePushImpl(RebalanceImpl).doRebalance() line: 248
DefaultMQPushConsumerImpl.doRebalance() line: 250
MQClientInstance.doRebalance() line: 925
RebalanceService.run() line: 49 Thread.run() line: 695

RebalancePushImpl(RebalanceImpl).updateProcessQueueTableInRebalance(String, Set ) line: 417

this.dispatchPullRequest(pullRequestList); 該方法對push形式會調用PullMessageService.executePullRequestImmediately。至此,疑問基本解決。

這個堆棧要在consumer啟動時會發現,整個堆棧發生在線程 Thread [RebalanceService]

// --RebalanceService  run --
@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStoped()) {
        this.waitForRunning(WaitInterval);// WaitInterval  = 10s
        this.mqClientFactory.doRebalance();// MQClientInstance.doRebalance()
    }

    log.info(this.getServiceName() + " service end");
}

//-- MQClientInstance.doRebalance() --
public void doRebalance() {
        for (String group : this.consumerTable.keySet()) {
            MQConsumerInner impl = this.consumerTable.get(group);
            if (impl != null) {
                try {
                    impl.doRebalance();// DefaultMQPushConsumerImpl.doRebalance()
                } catch (Exception e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }
    
// -- DefaultMQPushConsumerImpl.doRebalance --
@Override
public void doRebalance() {
    if (this.rebalanceImpl != null) {
        this.rebalanceImpl.doRebalance();
    }
}
// ......

此處RebalanceImpl的實現是RebalancePushImpl
根據上述線程名可以發現是RebalanceService這個類拉起了上面的RebalanceImpl.updateProcessQueueTableInRebalance

誰拉起了RebalanceService
看下面調用堆棧一目了然
consumer 構造PULL<em>MESSAGE的消息堆棧
Thread [main] (Suspended (breakpoint at line 185 in com.alibaba.rocketmq.client.impl.factory.MQClientInstance))
owns: com.alibaba.rocketmq.client.impl.factory.MQClientInstance (id=40)
com.alibaba.rocketmq.client.impl.factory.MQClientInstance.start() line: 185
com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start() line: 720
com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer.start() line: 365
org.simonme.rocketmq.demo.ConsumerDemo.main(java.lang.String[]) line: 55 // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testgroup");

因為我們針對該group設置的consumer就是Push的,即DefaultMQPushConsumer。官方提供的example工程中com.alibaba.rocketmq.example.quickstart.Consumer也是用的push。
引發問題:
1. 應該是可以針對不同的group設置不同形式的(pull or push)的consumer? 2. 不同的client實例訂閱同一個group的同一個tag/不同tag會出現怎樣的情況?是否還可以設置不同的獲取消息方式(push or pull)

那么疑問又來了

如果此處我們設置的不是DefaultMQPushConsumer,而是DefaultMQPullConsumer,那么剛本節(第5節)開頭那個問題怎么解決

嘗試一下
先看Pull形式的Consumer的例子

在PullMessageService.executePullRequestImmediately(PullRequest)方法處加上斷點並調試該demo,就會發現該方法根本不會被調用。因為dispatchPullRequest方法中針對pull模式的實現為空方法體,根本沒有發起PullMessageService.executePullRequestImmediately調用。那么上面的疑問就解決了。

6. 分析PullMessageProcessor(MQ-broker端)

入口方法是processRequest
先做一系列的檢查,然后獲取消息,檢查涉及的要點如下圖:
seq
獲取消息分析
1. 根據offset查詢到SelectMapedBufferResult實例。

final GetMessageResult getMessageResult =
        this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),
            requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(),
            requestHeader.getMaxMsgNums(), subscriptionData);

這個GetMessageResult中有SelectMapedBufferResult的List實例。

這個地方體現了零拷貝。消息落地磁盤的時候,是從file map出來的內存buffer,此時消費消息的時候無需再讀取文件,而是直接讀取map出來的buffer!當然如果堆積消息過多,內存中已經放不下的時候,就需要從磁盤上讀取了。 這是rocketmq性能比較好的原因之一。 通常說的零拷貝是指系統態無需拷貝到用戶態,即針對大文件拷貝常用的sendfile操作。此處不同於這個概念。

根據GetMessageResult取消息,沒太多復雜的,如果發現有消息則走如下分支

switch (getMessageResult.getStatus()) {
case FOUND:
    response.setCode(ResponseCode.SUCCESS);

    // 消息軌跡:記錄客戶端拉取的消息記錄(不表示消費成功)
    if (this.hasConsumeMessageHook()) {
        // 執行hook
        ConsumeMessageContext context = new ConsumeMessageContext();
        context.setConsumerGroup(requestHeader.getConsumerGroup());
        context.setTopic(requestHeader.getTopic());


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM