Send [1] times, still failed


com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [696094]ms, Topic: TopicTest, BrokersSent: [broker-b, null]
See https://github.com/alibaba/RocketMQ/issues/50 for further details.
    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:578)
    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1031)
    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1025)
    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:95)

 

關鍵代碼片段

        final long maxTimeout = this.defaultMQProducer.getSendMsgTimeout() + 1000;
        final long beginTimestamp = System.currentTimeMillis();
        long endTimestamp = beginTimestamp;
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed();
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal && (endTimestamp - beginTimestamp) < maxTimeout; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
                if (tmpmq != null) {
                    mq = tmpmq;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
                        endTimestamp = System.currentTimeMillis();
                        switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                        }
                    }
                    catch  ... //省略部分代碼
                }
                else {
                    break;
                }
            } // end of for

            if (sendResult != null) {
                return sendResult;
            }

            String info =
                    String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", //
                        times, //
                        (System.currentTimeMillis() - beginTimestamp), //
                        msg.getTopic(),//
                        Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            throw new MQClientException(info, exception);
        }

 

1. 循環幾次發送幾次

2. selectOneMessageQueue 返回與上一個broker不同名的broker

3. timesTotal  是brokersSent 數組

4. 某broker發送失敗時,如果想要重試其他broker,需要把retryAnotherBrokerWhenNotStoreOK設置為true(默認為false)

5. 最大超時時間是在超時時間基礎上增加1s(坑?)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM