rocketmq發送消息的期間的broker選擇


DefaultMQProducerImpl文件中有一個sendDefaultImpl,發送消息的時候就是從這里走的,路由信息怎么拿的?
剛剛啟動的時候,沒有topic信息的,所以需要取注冊中心拿,拿到以后緩存在MQclientInstance的topicRouteTable、BrokerAddrTable。


在這個方法里面,同步模式下,消息一次沒有發送成功就會按照重試次數繼續走
selectOneMessageQueue邏輯進行重試。
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;

 

這里的selectOneMessageQueue的其實內部調用MQFaultStrategy內部對象的selectOneMessageQueue:

 

我個人看來,這個估算功能倒不是特別重要,所以mq默認是不使用這個邏輯,不過這個不妨礙我們研究下。下面是MQFaultStrategy的selectOneMessageQueue

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

  

如果sendLatencyFaultEnable是false,默認也是false。那么每次所有隊列號+1取出消息隊列(消息隊列說白了就是每個broker單位有一個隊列,隊列長度由每個broker配置指定)里面的消息,同時剔除掉上次失敗的brokername。

 

這里有一個問題是,如果只有兩個broker那么可以解決大部分問題,但是如果broker很多,那么我們希望mq有一個時間維度上、可以估算出來一個broker什么時候可用。尤其對於rocketmq來說,因為broker發生變化的時候,producer不是第一時間被通知,而是異步輪訓得到的。另外nameserver跟broker之間也是異步輪詢探活。

 

打開sendLatencyFaultEnable的話,也就是在發送消息前,估算下這個broker是否可用的,如果是可用的那么直接返回。上面代碼:

                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))

 

我感覺應該是寫錯了,應該是mq.getBrokerName().notEquals(lastBrokerName)

 

這里有一個調用latencyFaultTolerance.isAvailable來判斷broker是否可用,這個怎么來的呢?

實際上,在sendDefaultImpl的時候,無論消息是否發送成功與否,都會調用producer內部MQFaultStrategy的updateFaultItem,在這里會去更新latencyFaultTolerance

下面是MQFaultStrategy一些重要成員和重要方法:

    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};


    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
    }

  

在sendDefaultImpl的發送消息期間,只有發送成,這個isolation才是false,這個時候通過computeNotAvailableDuration拿到的duration一般就是0,否則發送消息消耗時間越大,從latencyMax拿到的序列號越大,從notAvailableDuration拿到的duration也就越大。

如果有故障,isolation是true,那么認為這個broker不可用時間是180000L,也就是3分鍾

 

繼續進入LatencyFaultToleranceImpl的updateFaultItem:

    @Override
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }

  這里構造一個faultitem,顧名思義就是錯誤的、有問題的科目,name就是broker-name,currentLatency就是上次發送消息從開始到結束的消耗時間,starttimestamp就是估算的下次可用的時間戳。

 

繼續看FaultItem各個重要方法:

      @Override
        public int compareTo(final FaultItem other) {
            if (this.isAvailable() != other.isAvailable()) {
                if (this.isAvailable())
                    return -1;

                if (other.isAvailable())
                    return 1;
            }

            if (this.currentLatency < other.currentLatency)
                return -1;
            else if (this.currentLatency > other.currentLatency) {
                return 1;
            }

            if (this.startTimestamp < other.startTimestamp)
                return -1;
            else if (this.startTimestamp > other.startTimestamp) {
                return 1;
            }

            return 0;
        }

        public boolean isAvailable() {
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }

  

再回到策略MQFaultStrategy的selectOneMessageQueue,結合上面的代碼,如果找到一個可用broker那么直接返回。如果找不到調用pickOneAtLeast找一個差不多的返回

public String pickOneAtLeast() {
        final Enumeration<FaultItem> elements = this.faultItemTable.elements();
        List<FaultItem> tmpList = new LinkedList<FaultItem>();
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {
            Collections.shuffle(tmpList);

            Collections.sort(tmpList);

            final int half = tmpList.size() / 2;
            if (half <= 0) {
                return tmpList.get(0).getName();
            } else {
                final int i = this.whichItemWorst.getAndIncrement() % half;
                return tmpList.get(i).getName();
            }
        }

        return null;
    }

  faultiitem已經支持按照好壞排序,那么排好序后,從好的前半部分再進行隨機選一個brokername

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM