com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [696094]ms, Topic: TopicTest, BrokersSent: [broker-b, null] See https://github.com/alibaba/RocketMQ/issues/50 for further details. at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:578) at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1031) at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1025) at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:95)
關鍵代碼片段
final long maxTimeout = this.defaultMQProducer.getSendMsgTimeout() + 1000; final long beginTimestamp = System.currentTimeMillis(); long endTimestamp = beginTimestamp; TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed(); int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal && (endTimestamp - beginTimestamp) < maxTimeout; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName); if (tmpmq != null) { mq = tmpmq; brokersSent[times] = mq.getBrokerName(); try { sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); endTimestamp = System.currentTimeMillis(); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch ... //省略部分代碼 } else { break; } } // end of for if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", // times, // (System.currentTimeMillis() - beginTimestamp), // msg.getTopic(),// Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); throw new MQClientException(info, exception); }
1. 循環幾次發送幾次
2. selectOneMessageQueue 返回與上一個broker不同名的broker
3. timesTotal 是brokersSent 數組
4. 某broker發送失敗時,如果想要重試其他broker,需要把retryAnotherBrokerWhenNotStoreOK設置為true(默認為false)
5. 最大超時時間是在超時時間基礎上增加1s(坑?)