我用的是DefaultMQPushConsumer,啟動一個consumer的時候,根據之前的博文,push其實還是一次次的pullrequest。這里就有個問題:如果需要實時性很高,broker新收到一條消息之后,馬上就要傳遞給訂閱的consumer,那么consumer這邊就需要不停的輪詢,一次pullrequest收不到消息,馬上進行下一次請求,這樣就非常的耗費資源。
這其實和線程競爭鎖很像,rocketMQ的解決辦法也和鎖競爭的道理很像,看具體實現:
1、broker這邊,請求過來,如果有新消息返回,在consumer這邊,異步請求的回調函數pullCallback中,判斷pullResult不為null,那么把消息存到processQueue中之后,馬上發起下一個請求。
2、如果broker沒有獲取到新消息,並不會馬上返回pullRequest(consumer那邊的發送pullRequest的請求本來就是異步的,不用擔心等待的問題),而是會在suspendPullRequest方法中,把當前的請求信息(主要是offset,group,topic,requestId這幾個值)放到PullRequestHoldService.pullRequestTable中。而在ReputMessageService的doReput方法會每隔一毫秒掃描commitLog,如果有新消息,會建立索引,並同時判斷之前有沒有pulRequest在等待這個消息,如果有--->messageArrivingListener.arriving--->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把剛才存進去的所有pullRequest取出來,返回請求,這樣就避免了不停的輪詢。
這里面會出現的異常情況:
先看消費者這邊,如果長時間沒有訂閱的消息到達broker---這是絕大多數的情況,那么消費者這邊的responseTable中存的responseFuture就一直得不到響應。實際上會有個定時任務掃描responseTable,代碼邏輯:nettyRemotingClient.start--->NettyRemotingClient.this.scanResponseTable(),定期(默認間隔30秒)取出過期(30秒 + 1秒)的responseFuture,執行callback的operationComplete方法,而pullRequest的operationComplete會判斷responseFuture的responseCommand屬性為不為null,沒有得到響應的話是為null的,那么會進入else中pullCallback.onException,點進去看,是把pullRequest取出再放入隊列中一次(其實這里也是重復消費的一個因素)。
還有的情況就是某次pullRequest的請求已經發出,但是broker並沒有收到而是在網絡中丟掉了,或者說broker的響應消息沒有成功到達consumer,這兩種情況和上面說的一樣,會導致過段時間再掃描,再拉取,只不過就是broker有消息到達,不能及時響應consumer,而是只能響應接下來的掃描提交的第二次消息,這樣會影響時效(可以把上面說的掃描的間隔由30秒降低為3秒),不過好在這個訂閱的事件不會中斷。
還有個問題,broker這里suspendPullRequest暫時扣下來的pullRequest如果一直沒有消息到來去喚醒,那么consumer那邊到期了就會再發一次請求,這樣broker這邊的pullRequest就會越積越多。對於這個問題broker這邊也有定時任務檢測,過期了就模擬消息到來喚醒,這次如果不成功獲取消息,不再suspend,而是返回noMessage。具體代碼邏輯:PullRequestHoldService是一個ServiceThread的子類,brokerController那里會start,run方法里面是上次提到的重寫的countDownLatch循環wait5秒或者1秒(具體看配置文件中longPollingEnable的值),其實也就是個定時的周期任務,checkHoldRequest--->notifyMessageArriving--->executeRequestWhenWakeup也就是發現過期了(suspendTimestamp + timeoutMillis:CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND的值,默認30秒),模擬消息到來喚醒的過程,注意,喚醒之后的PullMessageProcessor.this.processRequest方法中的參數brokerAllowSuspend傳入的是false,所以即使再獲取不到,也會直接給出nomesage的響應而不是suspend了