RocketMQ(4.8.0)——生產者
一、生產者概述
發送消息的一方被稱為生產者,它在整個RocketMQ的生產者和消費體中扮演角色如下:
基本概念:
生產者組: 一個邏輯概念,在使用生產者實例的時候需要制定一個組名。一個生產者組可以生產多個Topic的消息。
生產者實例:一個生產者組部署了多個進程,每個進程都可以稱為一個生產者實例。
Topic:主題名字,一個Topic由若干Queue組成。
RocketMQ 客戶端中的生產者有兩個獨立實現類:org.apache.rocketmq.client.producer.defaultmqproducer(普通消息、順序消息、單向消息、批量消息、延遲消息) 和 org.apache.rocketmq.client.producer.transactionmqproducer(事務消息)。
二、消息結構和消息類型
rocketmq-master/common/src/main/java/org/apache/rocketmq/common/message/Message.java (rocketmq-all-4.8.0)

1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0
10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */
17 package org.apache.rocketmq.common.message; 18
19 import java.io.Serializable; 20 import java.util.Arrays; 21 import java.util.Collection; 22 import java.util.HashMap; 23 import java.util.Map; 24
25 public class Message implements Serializable { 26 private static final long serialVersionUID = 8445773977080406428L; 27
28 private String topic; 29 private int flag; 30 private Map<String, String> properties; 31 private byte[] body; 32 private String transactionId; 33
34 public Message() { 35 } 36
37 public Message(String topic, byte[] body) { 38 this(topic, "", "", 0, body, true); 39 } 40
41 public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) { 42 this.topic = topic; 43 this.flag = flag; 44 this.body = body; 45
46 if (tags != null && tags.length() > 0) 47 this.setTags(tags); 48
49 if (keys != null && keys.length() > 0) 50 this.setKeys(keys); 51
52 this.setWaitStoreMsgOK(waitStoreMsgOK); 53 } 54
55 public Message(String topic, String tags, byte[] body) { 56 this(topic, tags, "", 0, body, true); 57 } 58
59 public Message(String topic, String tags, String keys, byte[] body) { 60 this(topic, tags, keys, 0, body, true); 61 } 62
63 public void setKeys(String keys) { 64 this.putProperty(MessageConst.PROPERTY_KEYS, keys); 65 } 66
67 void putProperty(final String name, final String value) { 68 if (null == this.properties) { 69 this.properties = new HashMap<String, String>(); 70 } 71
72 this.properties.put(name, value); 73 } 74
75 void clearProperty(final String name) { 76 if (null != this.properties) { 77 this.properties.remove(name); 78 } 79 } 80
81 public void putUserProperty(final String name, final String value) { 82 if (MessageConst.STRING_HASH_SET.contains(name)) { 83 throw new RuntimeException(String.format( 84 "The Property<%s> is used by system, input another please", name)); 85 } 86
87 if (value == null || value.trim().isEmpty() 88 || name == null || name.trim().isEmpty()) { 89 throw new IllegalArgumentException( 90 "The name or value of property can not be null or blank string!"
91 ); 92 } 93
94 this.putProperty(name, value); 95 } 96
97 public String getUserProperty(final String name) { 98 return this.getProperty(name); 99 } 100
101 public String getProperty(final String name) { 102 if (null == this.properties) { 103 this.properties = new HashMap<String, String>(); 104 } 105
106 return this.properties.get(name); 107 } 108
109 public String getTopic() { 110 return topic; 111 } 112
113 public void setTopic(String topic) { 114 this.topic = topic; 115 } 116
117 public String getTags() { 118 return this.getProperty(MessageConst.PROPERTY_TAGS); 119 } 120
121 public void setTags(String tags) { 122 this.putProperty(MessageConst.PROPERTY_TAGS, tags); 123 } 124
125 public String getKeys() { 126 return this.getProperty(MessageConst.PROPERTY_KEYS); 127 } 128
129 public void setKeys(Collection<String> keys) { 130 StringBuffer sb = new StringBuffer(); 131 for (String k : keys) { 132 sb.append(k); 133 sb.append(MessageConst.KEY_SEPARATOR); 134 } 135
136 this.setKeys(sb.toString().trim()); 137 } 138
139 public int getDelayTimeLevel() { 140 String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL); 141 if (t != null) { 142 return Integer.parseInt(t); 143 } 144
145 return 0; 146 } 147
148 public void setDelayTimeLevel(int level) { 149 this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); 150 } 151
152 public boolean isWaitStoreMsgOK() { 153 String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); 154 if (null == result) 155 return true; 156
157 return Boolean.parseBoolean(result); 158 } 159
160 public void setWaitStoreMsgOK(boolean waitStoreMsgOK) { 161 this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK)); 162 } 163
164 public void setInstanceId(String instanceId) { 165 this.putProperty(MessageConst.PROPERTY_INSTANCE_ID, instanceId); 166 } 167
168 public int getFlag() { 169 return flag; 170 } 171
172 public void setFlag(int flag) { 173 this.flag = flag; 174 } 175
176 public byte[] getBody() { 177 return body; 178 } 179
180 public void setBody(byte[] body) { 181 this.body = body; 182 } 183
184 public Map<String, String> getProperties() { 185 return properties; 186 } 187
188 void setProperties(Map<String, String> properties) { 189 this.properties = properties; 190 } 191
192 public String getBuyerId() { 193 return getProperty(MessageConst.PROPERTY_BUYER_ID); 194 } 195
196 public void setBuyerId(String buyerId) { 197 putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId); 198 } 199
200 public String getTransactionId() { 201 return transactionId; 202 } 203
204 public void setTransactionId(String transactionId) { 205 this.transactionId = transactionId; 206 } 207
208 @Override 209 public String toString() { 210 return "Message{" +
211 "topic='" + topic + '\'' +
212 ", flag=" + flag +
213 ", properties=" + properties +
214 ", body=" + Arrays.toString(body) +
215 ", transactionId='" + transactionId + '\'' +
216 '}'; 217 } 218 }
topic:主題名字,可以通過 RocketMQ Console 創建。
flag:目前沒用。
properties:消息擴展信息,Tag、keys、延遲級別都保存在這里。
body:消息體,字節數組。需要主席生產者使用什么編碼,消費者也必須使用相同的編碼解碼,否則會產生亂碼。
setKeys():設置消息的key,多個key可以用MessageConst.KEY_SEPARATOR (空格)分隔或者直接用另一個重載方法。如果Broker中messageIndexEnable=true則會根據key創建消息的Hash索引,幫助用戶進行快速查詢。

1 public void setKeys(Collection<String> keys) { 2 StringBuffer sb = new StringBuffer(); 3 for (String k : keys) { 4 sb.append(k); 5 sb.append(MessageConst.KEY_SEPARATOR); 6 } 7
8 this.setKeys(sb.toString().trim()); 9 }
settags():消息過濾的標記,用戶可以訂閱某個Topic的某些tag,這樣broker只會把訂閱了topic-tag的消息發送給消費者。
setDelayTimeLevel():設置延遲級別,延遲多久消費者可以消費。
putUserProperty():如果還有其他擴展信息,可以存放在這里。內部是一個Map,重復調用會覆蓋舊值。
RocketMQ支持普通消息、分區有序消息、全局有序消息、延遲消息和事務消息。
普通消息:沒有特殊功能的消息。普通消息也稱為並發消息,和傳統的隊列相比,並發消息沒有順序,但是生產消息都是並發進行的,單機性能可達到十萬級別的TPS。
分區有序消息:與kafka中的分區類似,把一個Topic消息分為多個分區 "保存" 和消費,在一個分區內的消息就是傳統的隊列,遵循FIFO(先進先出)原則。
全局有序消息:如果把一個Topic的分區數設置為1,那么該Topic中的消息就是單分區,所有的消息都遵循FIFO(先進先出)的原則。
延遲消息:消息發送后,消費者要在一定時間后,或者指定某個時間點才可以消費。在沒有延遲消息時,基本的做法是基於定時計划任務調度,定時發送消息。在RocketMQ中只需要在發送消息時設置延遲級別即可實現。
事務消息:主要涉及分布式事務,即需要保證在多個操作同時成功或者同時失敗時,消費者才能消費消息。RocketMQ通過發送Half消息、處理本地事務、提交(Commit)消費或者回滾(Rollback)消息優雅地實現分布式事務。
三、生產者高可用
通常,我們希望不管Broker、Namesrv出現什么情況,發送消息都不要出現未知狀態或者消息丟失。在消息發送的過程中,客戶端、Broker、Namesrv都有可能發生服務器損壞、掉電等各種故障。當這些故障發生時,RocketMQ是怎么處理的呢?
1、客戶端保證
第一種保證機制:重試機制。RocketMQ支持同步、異步發送,不管哪種方式都可以在配置失敗后重試,如果單個Broker發生故障,重試會選擇其他Broker保證消息正常發送。
配置項retryTimesWhenSendFailed: 同步發送失敗重投次數,默認為2,因此生產者會最多嘗試發送retryTimesWhenSendFailed + 1次。不會選擇上次失敗的broker,嘗試向其他broker發送,最大程度保證消息不丟。超過重投次數,拋出異常,由客戶端保證消息不丟。當出現RemotingException、MQClientException和部分MQBrokerException時會重投。
同步發送的重試代碼可以參考,D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\producer\DefaultMQProducerImpl.java (rocketmq-all-4.8.0)

1 @org.jetbrains.annotations.Nullable 2 private SendResult sendDefaultImpl( 3 Message msg, 4 final CommunicationMode communicationMode, 5 final SendCallback sendCallback, 6 final long timeout 7 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 8 this.makeSureStateOK(); 9 Validators.checkMessage(msg, this.defaultMQProducer); 10 final long invokeID = random.nextLong(); 11 long beginTimestampFirst = System.currentTimeMillis(); 12 long beginTimestampPrev = beginTimestampFirst; 13 long endTimestamp = beginTimestampFirst; 14 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); 15 if (topicPublishInfo != null && topicPublishInfo.ok()) { 16 boolean callTimeout = false; 17 MessageQueue mq = null; 18 Exception exception = null; 19 SendResult sendResult = null; 20 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; 21 int times = 0; 22 String[] brokersSent = new String[timesTotal]; 23 for (; times < timesTotal; times++) { 24 String lastBrokerName = null == mq ? null : mq.getBrokerName(); 25 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); 26 if (mqSelected != null) { 27 mq = mqSelected; 28 brokersSent[times] = mq.getBrokerName(); 29 try { 30 beginTimestampPrev = System.currentTimeMillis(); 31 if (times > 0) { 32 //Reset topic with namespace during resend. 33 msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); 34 } 35 long costTime = beginTimestampPrev - beginTimestampFirst; 36 if (timeout < costTime) { 37 callTimeout = true; 38 break; 39 } 40 41 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); 42 endTimestamp = System.currentTimeMillis(); 43 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); 44 switch (communicationMode) { 45 case ASYNC: 46 return null; 47 case ONEWAY: 48 return 49 ..........null; 50 case SYNC: 51 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { 52 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { 53 continue; 54 } 55 } 56 57 return sendResult; 58 default: 59 break; 60 } 61 } catch (RemotingException e) { 62 endTimestamp = System.currentTimeMillis(); 63 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); 64 log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); 65 log.warn(msg.toString()); 66 exception = e; 67 continue; 68 } catch (MQClientException e) { 69 endTimestamp = System.currentTimeMillis(); 70 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); 71 log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); 72 log.warn(msg.toString()); 73 exception = e; 74 continue; 75 } catch (MQBrokerException e) { 76 endTimestamp = System.currentTimeMillis(); 77 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); 78 log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); 79 log.warn(msg.toString()); 80 exception = e; 81 switch (e.getResponseCode()) { 82 case ResponseCode.TOPIC_NOT_EXIST: 83 case ResponseCode.SERVICE_NOT_AVAILABLE: 84 case ResponseCode.SYSTEM_ERROR: 85 case ResponseCode.NO_PERMISSION: 86 case ResponseCode.NO_BUYER_ID: 87 case ResponseCode.NOT_IN_CURRENT_UNIT: 88 continue; 89 default: 90 if (sendResult != null) { 91 return sendResult; 92 } 93 94 throw e; 95 } 96 } catch (InterruptedException e) { 97 endTimestamp = System.currentTimeMillis(); 98 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); 99 log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); 100 log.warn(msg.toString()); 101 102 log.warn("sendKernelImpl exception", e); 103 log.warn(msg.toString()); 104 throw e; 105 } 106 } else { 107 break; 108 } 109 } 110 111 if (sendResult != null) { 112 return sendResult; 113 } 114 115 String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", 116 times, 117 System.currentTimeMillis() - beginTimestampFirst, 118 msg.getTopic(), 119 Arrays.toString(brokersSent)); 120 121 info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); 122 123 MQClientException mqClientException = new MQClientException(info, exception); 124 if (callTimeout) { 125 throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); 126 } 127 128 if (exception instanceof MQBrokerException) { 129 mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); 130 } else if (exception instanceof RemotingConnectException) { 131 mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); 132 } else if (exception instanceof RemotingTimeoutException) { 133 mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); 134 } else if (exception instanceof MQClientException) { 135 mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); 136 } 137 138 throw mqClientException; 139 }
異步發送重試代碼可以參考,D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\MQClientAPIImpl.java (rocketmq-all-4.8.0)

1 private void sendMessageAsync( 2 final String addr, 3 final String brokerName, 4 final Message msg, 5 final long timeoutMillis, 6 final RemotingCommand request, 7 final SendCallback sendCallback, 8 final TopicPublishInfo topicPublishInfo, 9 final MQClientInstance instance, 10 final int retryTimesWhenSendFailed, 11 final AtomicInteger times, 12 final SendMessageContext context, 13 final DefaultMQProducerImpl producer 14 ) throws InterruptedException, RemotingException { 15 final long beginStartTime = System.currentTimeMillis(); 16 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { 17 @Override 18 public void operationComplete(ResponseFuture responseFuture) { 19 long cost = System.currentTimeMillis() - beginStartTime; 20 RemotingCommand response = responseFuture.getResponseCommand(); 21 if (null == sendCallback && response != null) { 22 23 try { 24 SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); 25 if (context != null && sendResult != null) { 26 context.setSendResult(sendResult); 27 context.getProducer().executeSendMessageHookAfter(context); 28 } 29 } catch (Throwable e) { 30 } 31 32 producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); 33 return; 34 } 35 36 if (response != null) { 37 try { 38 SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); 39 assert sendResult != null; 40 if (context != null) { 41 context.setSendResult(sendResult); 42 context.getProducer().executeSendMessageHookAfter(context); 43 } 44 45 try { 46 sendCallback.onSuccess(sendResult); 47 } catch (Throwable e) { 48 } 49 50 producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); 51 } catch (Exception e) { 52 producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); 53 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, 54 retryTimesWhenSendFailed, times, e, context, false, producer); 55 } 56 } else { 57 producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); 58 if (!responseFuture.isSendRequestOK()) { 59 MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); 60 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, 61 retryTimesWhenSendFailed, times, ex, context, true, producer); 62 } else if (responseFuture.isTimeout()) { 63 MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", 64 responseFuture.getCause()); 65 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, 66 retryTimesWhenSendFailed, times, ex, context, true, producer); 67 } else { 68 MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); 69 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, 70 retryTimesWhenSendFailed, times, ex, context, true, producer); 71 } 72 } 73 } 74 }); 75 }
通信是在通信層異步發送發完成的,當operationComplete()方法返回的response值為null時,會重新執行重試代碼。返回值response為null通常是因為客戶端收到TCP請求解包失敗,或者沒有找到匹配的request。
生產者配置項RetryTimesWhenSendAsyncFailed表示異步重試的次數,異步重試不會選擇其他broker,僅在同一個broker上做重試,不保證消息不丟,默認為2次,加上正常發送的1次,總共有3次發送機會。
第二種保證機制:客戶端容錯。RocketMQ Client會維護一個 "Broker-發送延遲" 關系,根據這個關系選擇一個發送延遲級別較低的 Broker 來發送消息,這樣能最大限度地利用 Broker 的能力,剔除已經宕機、不可用或者發送延遲級別較高的 Broker,盡量保證消息的正常發送。
這種機制主要體現在發送消息時如何選擇Queue,源代碼在 D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\latency\MQFaultStrategy.java (rocketmq-all-4.8.0)

1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.rocketmq.client.latency; 19 20 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; 21 import org.apache.rocketmq.client.log.ClientLogger; 22 import org.apache.rocketmq.logging.InternalLogger; 23 import org.apache.rocketmq.common.message.MessageQueue; 24 25 public class MQFaultStrategy { 26 private final static InternalLogger log = ClientLogger.getLog(); 27 private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); 28 29 private boolean sendLatencyFaultEnable = false; 30 31 private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; 32 private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; 33 34 public long[] getNotAvailableDuration() { 35 return notAvailableDuration; 36 } 37 38 public void setNotAvailableDuration(final long[] notAvailableDuration) { 39 this.notAvailableDuration = notAvailableDuration; 40 } 41 42 public long[] getLatencyMax() { 43 return latencyMax; 44 } 45 46 public void setLatencyMax(final long[] latencyMax) { 47 this.latencyMax = latencyMax; 48 } 49 50 public boolean isSendLatencyFaultEnable() { 51 return sendLatencyFaultEnable; 52 } 53 54 public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { 55 this.sendLatencyFaultEnable = sendLatencyFaultEnable; 56 } 57 58 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { 59 if (this.sendLatencyFaultEnable) { 60 try { 61 /* 第1步,獲取一個在延遲上可接受,並且和上次發送相同的Broker。 62 首先獲取一個自增序號index,通過取模獲取Queue的位置下標Pos。 63 如果Pos對應的Broker的延遲時間是可以接受的,並且是第一次發送,或者和上次發送的Broker相同,則將Queue返回。 64 */ 65 int index = tpInfo.getSendWhichQueue().getAndIncrement(); 66 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { 67 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); 68 if (pos < 0) 69 pos = 0; 70 MessageQueue mq = tpInfo.getMessageQueueList().get(pos); 71 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) 72 return mq; 73 } 74 // 第2步,如果第1步沒有選擇一個Broker,則選擇一個延遲較低的Broker 75 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); 76 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); 77 if (writeQueueNums > 0) { 78 final MessageQueue mq = tpInfo.selectOneMessageQueue(); 79 if (notBestBroker != null) { 80 mq.setBrokerName(notBestBroker); 81 mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); 82 } 83 return mq; 84 } else { 85 latencyFaultTolerance.remove(notBestBroker); 86 } 87 } catch (Exception e) { 88 log.error("Error occurred when selecting message queue", e); 89 } 90 // 第3步,如果第1、第2步都沒有選中一個Broker,則隨機選擇一個Broker。 91 return tpInfo.selectOneMessageQueue(); 92 } 93 94 return tpInfo.selectOneMessageQueue(lastBrokerName); 95 } 96 97 public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { 98 if (this.sendLatencyFaultEnable) { 99 long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); 100 this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); 101 } 102 } 103 104 private long computeNotAvailableDuration(final long currentLatency) { 105 for (int i = latencyMax.length - 1; i >= 0; i--) { 106 if (currentLatency >= latencyMax[i]) 107 return this.notAvailableDuration[i]; 108 } 109 110 return 0; 111 } 112 }
sendLatencyFaultEnable: 發送延遲容錯開關,默認為關閉,如果開關打開了,會觸發發送延遲容錯機制來選擇發送Queue。
上面代碼里包括一個隨機選擇方法tpInfo.selectOneMessageQueue(lastBrokerName),該方法的功能就是隨機一個Broker,具體實現如下:
D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\producer\TopicPublishInfo.java (rocketmq-all-4.8.0)

1 public MessageQueue selectOneMessageQueue(final String lastBrokerName) { 2 // 第1步,如果沒有上次使用Broker作為參考,那么隨機選擇一個Broker 3 if (lastBrokerName == null) { 4 return selectOneMessageQueue(); 5 } else { 6 // 第2步,如果存在上次使用的Broker,就選擇上次使用的Broker,目的是均勻分散Broker的壓力。 7 for (int i = 0; i < this.messageQueueList.size(); i++) { 8 int index = this.sendWhichQueue.getAndIncrement(); 9 int pos = Math.abs(index) % this.messageQueueList.size(); 10 if (pos < 0) 11 pos = 0; 12 MessageQueue mq = this.messageQueueList.get(pos); 13 if (!mq.getBrokerName().equals(lastBrokerName)) { 14 return mq; 15 } 16 } 17 // 第3步,如果第1、第2步都沒有選中一個Broker,則采用兜底方案——隨機選擇一個Broker 18 return selectOneMessageQueue(); 19 } 20 }
客戶端在發送消息后,會調用 updateFaultItem()方法來更新當前接受消息的Broker的延遲情況,這些主要邏輯都在 MQFaultStrategy類中實現,延遲策略有一個標准接口 LatencyFaultTolerance。
2、Broker端保證
Broker 數據同步方式保證:
- 同步復制:指消息發送到Master Broker后,同步到Slave Broker才算發送成功;
- 異步復制:指消息發送到Master Broker后,即為發送成功。
在生產環境中,建議至少部署2個Master和2個Slave。
部署方式 | 優點 | 缺點 | 備注 |
單個Master模式 | 簡單 | 這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用 |
不建議線上環境使用,可用於本地測試 |
多個Master模式 | 配置簡單,單個Master宕機或重啟維護對應用無影響,在磁盤配置為RAID10時,即使機器宕機不可恢復情況下,由於RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高。 | 單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會收到影響。 | 當使用多master無slave的集群搭建方式時,master的brokerRole配置必須為ASYNC_MASTER。如果配置為SYNC_MASTER,則producer發送消息時,返回值的SendStatus會一直是SLAVE_NOT_AVAILABLE。 |
多Master多Slave模式——異步復制 | 即使磁盤損壞,消息丟失的非常少,但消息實時性不會受影響,因為Master宕機后,消費者仍然可以從Slave消費,此過程對應用透明,不需要人工干預,性能同多Master模式幾乎一樣。 | Master宕機,磁盤損壞情況,會丟失少量信息。 | |
多Master多Slave模式——同步雙寫 | 數據與服務都無單點,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高; | 性能比異步復制模式稍低,大約低10%左右,發送單個消息的RT會稍高,目前主宕機后,備機不能自動切換為主機,后續會支持自動切換功能。 |