RocketMQ(4.8.0)——生產者


RocketMQ(4.8.0)——生產者

一、生產者概述

  發送消息的一方被稱為生產者,它在整個RocketMQ的生產者和消費體中扮演角色如下:

基本概念:

生產者組: 一個邏輯概念,在使用生產者實例的時候需要制定一個組名。一個生產者組可以生產多個Topic的消息。

生產者實例:一個生產者組部署了多個進程,每個進程都可以稱為一個生產者實例。

Topic:主題名字,一個Topic由若干Queue組成。

  RocketMQ 客戶端中的生產者有兩個獨立實現類:org.apache.rocketmq.client.producer.defaultmqproducer(普通消息、順序消息、單向消息、批量消息、延遲消息)org.apache.rocketmq.client.producer.transactionmqproducer(事務消息)

二、消息結構和消息類型

rocketmq-master/common/src/main/java/org/apache/rocketmq/common/message/Message.java (rocketmq-all-4.8.0)

 1 /*
 2  * Licensed to the Apache Software Foundation (ASF) under one or more  3  * contributor license agreements. See the NOTICE file distributed with  4  * this work for additional information regarding copyright ownership.  5  * The ASF licenses this file to You under the Apache License, Version 2.0  6  * (the "License"); you may not use this file except in compliance with  7  * the License. You may obtain a copy of the License at  8  *  9  * http://www.apache.org/licenses/LICENSE-2.0
 10  *  11  * Unless required by applicable law or agreed to in writing, software  12  * distributed under the License is distributed on an "AS IS" BASIS,  13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  14  * See the License for the specific language governing permissions and  15  * limitations under the License.  16  */
 17 package org.apache.rocketmq.common.message;  18 
 19 import java.io.Serializable;  20 import java.util.Arrays;  21 import java.util.Collection;  22 import java.util.HashMap;  23 import java.util.Map;  24 
 25 public class Message implements Serializable {  26     private static final long serialVersionUID = 8445773977080406428L;  27 
 28     private String topic;  29     private int flag;  30     private Map<String, String> properties;  31     private byte[] body;  32     private String transactionId;  33 
 34     public Message() {  35  }  36 
 37     public Message(String topic, byte[] body) {  38         this(topic, "", "", 0, body, true);  39  }  40 
 41     public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {  42         this.topic = topic;  43         this.flag = flag;  44         this.body = body;  45 
 46         if (tags != null && tags.length() > 0)  47             this.setTags(tags);  48 
 49         if (keys != null && keys.length() > 0)  50             this.setKeys(keys);  51 
 52         this.setWaitStoreMsgOK(waitStoreMsgOK);  53  }  54 
 55     public Message(String topic, String tags, byte[] body) {  56         this(topic, tags, "", 0, body, true);  57  }  58 
 59     public Message(String topic, String tags, String keys, byte[] body) {  60         this(topic, tags, keys, 0, body, true);  61  }  62 
 63     public void setKeys(String keys) {  64         this.putProperty(MessageConst.PROPERTY_KEYS, keys);  65  }  66 
 67     void putProperty(final String name, final String value) {  68         if (null == this.properties) {  69             this.properties = new HashMap<String, String>();  70  }  71 
 72         this.properties.put(name, value);  73  }  74 
 75     void clearProperty(final String name) {  76         if (null != this.properties) {  77             this.properties.remove(name);  78  }  79  }  80 
 81     public void putUserProperty(final String name, final String value) {  82         if (MessageConst.STRING_HASH_SET.contains(name)) {  83             throw new RuntimeException(String.format(  84                 "The Property<%s> is used by system, input another please", name));  85  }  86 
 87         if (value == null || value.trim().isEmpty()  88             || name == null || name.trim().isEmpty()) {  89             throw new IllegalArgumentException(  90                 "The name or value of property can not be null or blank string!"
 91  );  92  }  93 
 94         this.putProperty(name, value);  95  }  96 
 97     public String getUserProperty(final String name) {  98         return this.getProperty(name);  99  } 100 
101     public String getProperty(final String name) { 102         if (null == this.properties) { 103             this.properties = new HashMap<String, String>(); 104  } 105 
106         return this.properties.get(name); 107  } 108 
109     public String getTopic() { 110         return topic; 111  } 112 
113     public void setTopic(String topic) { 114         this.topic = topic; 115  } 116 
117     public String getTags() { 118         return this.getProperty(MessageConst.PROPERTY_TAGS); 119  } 120 
121     public void setTags(String tags) { 122         this.putProperty(MessageConst.PROPERTY_TAGS, tags); 123  } 124 
125     public String getKeys() { 126         return this.getProperty(MessageConst.PROPERTY_KEYS); 127  } 128 
129     public void setKeys(Collection<String> keys) { 130         StringBuffer sb = new StringBuffer(); 131         for (String k : keys) { 132  sb.append(k); 133  sb.append(MessageConst.KEY_SEPARATOR); 134  } 135 
136         this.setKeys(sb.toString().trim()); 137  } 138 
139     public int getDelayTimeLevel() { 140         String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL); 141         if (t != null) { 142             return Integer.parseInt(t); 143  } 144 
145         return 0; 146  } 147 
148     public void setDelayTimeLevel(int level) { 149         this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); 150  } 151 
152     public boolean isWaitStoreMsgOK() { 153         String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); 154         if (null == result) 155             return true; 156 
157         return Boolean.parseBoolean(result); 158  } 159 
160     public void setWaitStoreMsgOK(boolean waitStoreMsgOK) { 161         this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK)); 162  } 163 
164     public void setInstanceId(String instanceId) { 165         this.putProperty(MessageConst.PROPERTY_INSTANCE_ID, instanceId); 166  } 167 
168     public int getFlag() { 169         return flag; 170  } 171 
172     public void setFlag(int flag) { 173         this.flag = flag; 174  } 175 
176     public byte[] getBody() { 177         return body; 178  } 179 
180     public void setBody(byte[] body) { 181         this.body = body; 182  } 183 
184     public Map<String, String> getProperties() { 185         return properties; 186  } 187 
188     void setProperties(Map<String, String> properties) { 189         this.properties = properties; 190  } 191 
192     public String getBuyerId() { 193         return getProperty(MessageConst.PROPERTY_BUYER_ID); 194  } 195 
196     public void setBuyerId(String buyerId) { 197  putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId); 198  } 199 
200     public String getTransactionId() { 201         return transactionId; 202  } 203 
204     public void setTransactionId(String transactionId) { 205         this.transactionId = transactionId; 206  } 207 
208  @Override 209     public String toString() { 210         return "Message{" +
211             "topic='" + topic + '\'' +
212             ", flag=" + flag +
213             ", properties=" + properties +
214             ", body=" + Arrays.toString(body) +
215             ", transactionId='" + transactionId + '\'' +
216             '}'; 217  } 218 }
View Code

topic:主題名字,可以通過 RocketMQ Console 創建。

flag:目前沒用。

properties:消息擴展信息,Tag、keys、延遲級別都保存在這里。

body:消息體,字節數組。需要主席生產者使用什么編碼,消費者也必須使用相同的編碼解碼,否則會產生亂碼。

setKeys():設置消息的key,多個key可以用MessageConst.KEY_SEPARATOR (空格)分隔或者直接用另一個重載方法。如果Broker中messageIndexEnable=true則會根據key創建消息的Hash索引,幫助用戶進行快速查詢。

1     public void setKeys(Collection<String> keys) { 2         StringBuffer sb = new StringBuffer(); 3         for (String k : keys) { 4  sb.append(k); 5  sb.append(MessageConst.KEY_SEPARATOR); 6  } 7 
8         this.setKeys(sb.toString().trim()); 9     }
View Code

settags():消息過濾的標記,用戶可以訂閱某個Topic的某些tag,這樣broker只會把訂閱了topic-tag的消息發送給消費者。

setDelayTimeLevel():設置延遲級別,延遲多久消費者可以消費。

putUserProperty():如果還有其他擴展信息,可以存放在這里。內部是一個Map,重復調用會覆蓋舊值。

RocketMQ支持普通消息、分區有序消息、全局有序消息、延遲消息和事務消息。

普通消息:沒有特殊功能的消息。普通消息也稱為並發消息,和傳統的隊列相比,並發消息沒有順序,但是生產消息都是並發進行的,單機性能可達到十萬級別的TPS。

分區有序消息:與kafka中的分區類似,把一個Topic消息分為多個分區 "保存" 和消費,在一個分區內的消息就是傳統的隊列,遵循FIFO(先進先出)原則。

全局有序消息:如果把一個Topic的分區數設置為1,那么該Topic中的消息就是單分區,所有的消息都遵循FIFO(先進先出)的原則。

延遲消息:消息發送后,消費者要在一定時間后,或者指定某個時間點才可以消費。在沒有延遲消息時,基本的做法是基於定時計划任務調度,定時發送消息。在RocketMQ中只需要在發送消息時設置延遲級別即可實現。

事務消息:主要涉及分布式事務,即需要保證在多個操作同時成功或者同時失敗時,消費者才能消費消息。RocketMQ通過發送Half消息、處理本地事務、提交(Commit)消費或者回滾(Rollback)消息優雅地實現分布式事務。

三、生產者高可用

  通常,我們希望不管Broker、Namesrv出現什么情況,發送消息都不要出現未知狀態或者消息丟失。在消息發送的過程中,客戶端、Broker、Namesrv都有可能發生服務器損壞、掉電等各種故障。當這些故障發生時,RocketMQ是怎么處理的呢?

  1、客戶端保證

  第一種保證機制:重試機制。RocketMQ支持同步、異步發送,不管哪種方式都可以在配置失敗后重試,如果單個Broker發生故障,重試會選擇其他Broker保證消息正常發送。

  配置項retryTimesWhenSendFailed: 同步發送失敗重投次數,默認為2,因此生產者會最多嘗試發送retryTimesWhenSendFailed + 1次。不會選擇上次失敗的broker,嘗試向其他broker發送,最大程度保證消息不丟。超過重投次數,拋出異常,由客戶端保證消息不丟。當出現RemotingException、MQClientException和部分MQBrokerException時會重投。

同步發送的重試代碼可以參考,D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\producer\DefaultMQProducerImpl.java (rocketmq-all-4.8.0

  1     @org.jetbrains.annotations.Nullable
  2     private SendResult sendDefaultImpl(
  3         Message msg,
  4         final CommunicationMode communicationMode,
  5         final SendCallback sendCallback,
  6         final long timeout
  7     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  8         this.makeSureStateOK();
  9         Validators.checkMessage(msg, this.defaultMQProducer);
 10         final long invokeID = random.nextLong();
 11         long beginTimestampFirst = System.currentTimeMillis();
 12         long beginTimestampPrev = beginTimestampFirst;
 13         long endTimestamp = beginTimestampFirst;
 14         TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 15         if (topicPublishInfo != null && topicPublishInfo.ok()) {
 16             boolean callTimeout = false;
 17             MessageQueue mq = null;
 18             Exception exception = null;
 19             SendResult sendResult = null;
 20             int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
 21             int times = 0;
 22             String[] brokersSent = new String[timesTotal];
 23             for (; times < timesTotal; times++) {
 24                 String lastBrokerName = null == mq ? null : mq.getBrokerName();
 25                 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
 26                 if (mqSelected != null) {
 27                     mq = mqSelected;
 28                     brokersSent[times] = mq.getBrokerName();
 29                     try {
 30                         beginTimestampPrev = System.currentTimeMillis();
 31                         if (times > 0) {
 32                             //Reset topic with namespace during resend.
 33                             msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
 34                         }
 35                         long costTime = beginTimestampPrev - beginTimestampFirst;
 36                         if (timeout < costTime) {
 37                             callTimeout = true;
 38                             break;
 39                         }
 40 
 41                         sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
 42                         endTimestamp = System.currentTimeMillis();
 43                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
 44                         switch (communicationMode) {
 45                             case ASYNC:
 46                                 return null;
 47                             case ONEWAY:
 48                                 return
 49                                         ..........null;
 50                             case SYNC:
 51                                 if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
 52                                     if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
 53                                         continue;
 54                                     }
 55                                 }
 56 
 57                                 return sendResult;
 58                             default:
 59                                 break;
 60                         }
 61                     } catch (RemotingException e) {
 62                         endTimestamp = System.currentTimeMillis();
 63                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 64                         log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 65                         log.warn(msg.toString());
 66                         exception = e;
 67                         continue;
 68                     } catch (MQClientException e) {
 69                         endTimestamp = System.currentTimeMillis();
 70                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 71                         log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 72                         log.warn(msg.toString());
 73                         exception = e;
 74                         continue;
 75                     } catch (MQBrokerException e) {
 76                         endTimestamp = System.currentTimeMillis();
 77                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 78                         log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 79                         log.warn(msg.toString());
 80                         exception = e;
 81                         switch (e.getResponseCode()) {
 82                             case ResponseCode.TOPIC_NOT_EXIST:
 83                             case ResponseCode.SERVICE_NOT_AVAILABLE:
 84                             case ResponseCode.SYSTEM_ERROR:
 85                             case ResponseCode.NO_PERMISSION:
 86                             case ResponseCode.NO_BUYER_ID:
 87                             case ResponseCode.NOT_IN_CURRENT_UNIT:
 88                                 continue;
 89                             default:
 90                                 if (sendResult != null) {
 91                                     return sendResult;
 92                                 }
 93 
 94                                 throw e;
 95                         }
 96                     } catch (InterruptedException e) {
 97                         endTimestamp = System.currentTimeMillis();
 98                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
 99                         log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
100                         log.warn(msg.toString());
101 
102                         log.warn("sendKernelImpl exception", e);
103                         log.warn(msg.toString());
104                         throw e;
105                     }
106                 } else {
107                     break;
108                 }
109             }
110 
111             if (sendResult != null) {
112                 return sendResult;
113             }
114 
115             String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
116                 times,
117                 System.currentTimeMillis() - beginTimestampFirst,
118                 msg.getTopic(),
119                 Arrays.toString(brokersSent));
120 
121             info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
122 
123             MQClientException mqClientException = new MQClientException(info, exception);
124             if (callTimeout) {
125                 throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
126             }
127 
128             if (exception instanceof MQBrokerException) {
129                 mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
130             } else if (exception instanceof RemotingConnectException) {
131                 mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
132             } else if (exception instanceof RemotingTimeoutException) {
133                 mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
134             } else if (exception instanceof MQClientException) {
135                 mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
136             }
137 
138             throw mqClientException;
139         }
View Code

異步發送重試代碼可以參考,D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\MQClientAPIImpl.java (rocketmq-all-4.8.0)

 1     private void sendMessageAsync(
 2         final String addr,
 3         final String brokerName,
 4         final Message msg,
 5         final long timeoutMillis,
 6         final RemotingCommand request,
 7         final SendCallback sendCallback,
 8         final TopicPublishInfo topicPublishInfo,
 9         final MQClientInstance instance,
10         final int retryTimesWhenSendFailed,
11         final AtomicInteger times,
12         final SendMessageContext context,
13         final DefaultMQProducerImpl producer
14     ) throws InterruptedException, RemotingException {
15         final long beginStartTime = System.currentTimeMillis();
16         this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
17             @Override
18             public void operationComplete(ResponseFuture responseFuture) {
19                 long cost = System.currentTimeMillis() - beginStartTime;
20                 RemotingCommand response = responseFuture.getResponseCommand();
21                 if (null == sendCallback && response != null) {
22 
23                     try {
24                         SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
25                         if (context != null && sendResult != null) {
26                             context.setSendResult(sendResult);
27                             context.getProducer().executeSendMessageHookAfter(context);
28                         }
29                     } catch (Throwable e) {
30                     }
31 
32                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
33                     return;
34                 }
35 
36                 if (response != null) {
37                     try {
38                         SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
39                         assert sendResult != null;
40                         if (context != null) {
41                             context.setSendResult(sendResult);
42                             context.getProducer().executeSendMessageHookAfter(context);
43                         }
44 
45                         try {
46                             sendCallback.onSuccess(sendResult);
47                         } catch (Throwable e) {
48                         }
49 
50                         producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
51                     } catch (Exception e) {
52                         producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
53                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
54                             retryTimesWhenSendFailed, times, e, context, false, producer);
55                     }
56                 } else {
57                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
58                     if (!responseFuture.isSendRequestOK()) {
59                         MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
60                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
61                             retryTimesWhenSendFailed, times, ex, context, true, producer);
62                     } else if (responseFuture.isTimeout()) {
63                         MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
64                             responseFuture.getCause());
65                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
66                             retryTimesWhenSendFailed, times, ex, context, true, producer);
67                     } else {
68                         MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
69                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
70                             retryTimesWhenSendFailed, times, ex, context, true, producer);
71                     }
72                 }
73             }
74         });
75     }
View Code

  通信是在通信層異步發送發完成的,當operationComplete()方法返回的response值為null時,會重新執行重試代碼。返回值response為null通常是因為客戶端收到TCP請求解包失敗,或者沒有找到匹配的request。

  生產者配置項RetryTimesWhenSendAsyncFailed表示異步重試的次數,異步重試不會選擇其他broker,僅在同一個broker上做重試,不保證消息不丟,默認為2次,加上正常發送的1次,總共有3次發送機會。

  第二種保證機制:客戶端容錯。RocketMQ Client會維護一個 "Broker-發送延遲" 關系,根據這個關系選擇一個發送延遲級別較低的 Broker 來發送消息,這樣能最大限度地利用 Broker 的能力,剔除已經宕機、不可用或者發送延遲級別較高的 Broker,盡量保證消息的正常發送。

  這種機制主要體現在發送消息時如何選擇Queue,源代碼在 D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\latency\MQFaultStrategy.java  (rocketmq-all-4.8.0

  1 /*
  2  * Licensed to the Apache Software Foundation (ASF) under one or more
  3  * contributor license agreements.  See the NOTICE file distributed with
  4  * this work for additional information regarding copyright ownership.
  5  * The ASF licenses this file to You under the Apache License, Version 2.0
  6  * (the "License"); you may not use this file except in compliance with
  7  * the License.  You may obtain a copy of the License at
  8  *
  9  *     http://www.apache.org/licenses/LICENSE-2.0
 10  *
 11  * Unless required by applicable law or agreed to in writing, software
 12  * distributed under the License is distributed on an "AS IS" BASIS,
 13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  * See the License for the specific language governing permissions and
 15  * limitations under the License.
 16  */
 17 
 18 package org.apache.rocketmq.client.latency;
 19 
 20 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 21 import org.apache.rocketmq.client.log.ClientLogger;
 22 import org.apache.rocketmq.logging.InternalLogger;
 23 import org.apache.rocketmq.common.message.MessageQueue;
 24 
 25 public class MQFaultStrategy {
 26     private final static InternalLogger log = ClientLogger.getLog();
 27     private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
 28 
 29     private boolean sendLatencyFaultEnable = false;
 30 
 31     private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
 32     private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
 33 
 34     public long[] getNotAvailableDuration() {
 35         return notAvailableDuration;
 36     }
 37 
 38     public void setNotAvailableDuration(final long[] notAvailableDuration) {
 39         this.notAvailableDuration = notAvailableDuration;
 40     }
 41 
 42     public long[] getLatencyMax() {
 43         return latencyMax;
 44     }
 45 
 46     public void setLatencyMax(final long[] latencyMax) {
 47         this.latencyMax = latencyMax;
 48     }
 49 
 50     public boolean isSendLatencyFaultEnable() {
 51         return sendLatencyFaultEnable;
 52     }
 53 
 54     public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
 55         this.sendLatencyFaultEnable = sendLatencyFaultEnable;
 56     }
 57 
 58     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
 59         if (this.sendLatencyFaultEnable) {
 60             try {
 61                 /* 第1步,獲取一個在延遲上可接受,並且和上次發送相同的Broker。
 62                 首先獲取一個自增序號index,通過取模獲取Queue的位置下標Pos。
 63                 如果Pos對應的Broker的延遲時間是可以接受的,並且是第一次發送,或者和上次發送的Broker相同,則將Queue返回。
 64                 */
 65                 int index = tpInfo.getSendWhichQueue().getAndIncrement();
 66                 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
 67                     int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
 68                     if (pos < 0)
 69                         pos = 0;
 70                     MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
 71                     if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
 72                         return mq;
 73                 }
 74                 // 第2步,如果第1步沒有選擇一個Broker,則選擇一個延遲較低的Broker
 75                 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
 76                 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
 77                 if (writeQueueNums > 0) {
 78                     final MessageQueue mq = tpInfo.selectOneMessageQueue();
 79                     if (notBestBroker != null) {
 80                         mq.setBrokerName(notBestBroker);
 81                         mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
 82                     }
 83                     return mq;
 84                 } else {
 85                     latencyFaultTolerance.remove(notBestBroker);
 86                 }
 87             } catch (Exception e) {
 88                 log.error("Error occurred when selecting message queue", e);
 89             }
 90             // 第3步,如果第1、第2步都沒有選中一個Broker,則隨機選擇一個Broker。
 91             return tpInfo.selectOneMessageQueue();
 92         }
 93 
 94         return tpInfo.selectOneMessageQueue(lastBrokerName);
 95     }
 96 
 97     public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
 98         if (this.sendLatencyFaultEnable) {
 99             long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
100             this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
101         }
102     }
103 
104     private long computeNotAvailableDuration(final long currentLatency) {
105         for (int i = latencyMax.length - 1; i >= 0; i--) {
106             if (currentLatency >= latencyMax[i])
107                 return this.notAvailableDuration[i];
108         }
109 
110         return 0;
111     }
112 }
View Code

 sendLatencyFaultEnable: 發送延遲容錯開關,默認為關閉,如果開關打開了,會觸發發送延遲容錯機制來選擇發送Queue。

上面代碼里包括一個隨機選擇方法tpInfo.selectOneMessageQueue(lastBrokerName),該方法的功能就是隨機一個Broker,具體實現如下:
D:\rocketmq-master\client\src\main\java\org\apache\rocketmq\client\impl\producer\TopicPublishInfo.java (rocketmq-all-4.8.0)
 
        
 1     public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
 2         // 第1步,如果沒有上次使用Broker作為參考,那么隨機選擇一個Broker
 3         if (lastBrokerName == null) {
 4             return selectOneMessageQueue();
 5         } else {
 6             // 第2步,如果存在上次使用的Broker,就選擇上次使用的Broker,目的是均勻分散Broker的壓力。
 7             for (int i = 0; i < this.messageQueueList.size(); i++) {
 8                 int index = this.sendWhichQueue.getAndIncrement();
 9                 int pos = Math.abs(index) % this.messageQueueList.size();
10                 if (pos < 0)
11                     pos = 0;
12                 MessageQueue mq = this.messageQueueList.get(pos);
13                 if (!mq.getBrokerName().equals(lastBrokerName)) {
14                     return mq;
15                 }
16             }
17             // 第3步,如果第1、第2步都沒有選中一個Broker,則采用兜底方案——隨機選擇一個Broker
18             return selectOneMessageQueue();
19         }
20     }
View Code
 
        

  客戶端在發送消息后,會調用 updateFaultItem()方法來更新當前接受消息的Broker的延遲情況,這些主要邏輯都在 MQFaultStrategy類中實現,延遲策略有一個標准接口 LatencyFaultTolerance。

  2、Broker端保證
  Broker 數據同步方式保證:

  1. 同步復制:指消息發送到Master Broker后,同步到Slave Broker才算發送成功;
  2. 異步復制:指消息發送到Master Broker后,即為發送成功。

在生產環境中,建議至少部署2個Master和2個Slave。

部署方式 優點 缺點 備注
單個Master模式 簡單
這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用
不建議線上環境使用,可用於本地測試
多個Master模式 配置簡單,單個Master宕機或重啟維護對應用無影響,在磁盤配置為RAID10時,即使機器宕機不可恢復情況下,由於RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高。 單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會收到影響。 當使用多master無slave的集群搭建方式時,master的brokerRole配置必須為ASYNC_MASTER。如果配置為SYNC_MASTER,則producer發送消息時,返回值的SendStatus會一直是SLAVE_NOT_AVAILABLE。
多Master多Slave模式——異步復制 即使磁盤損壞,消息丟失的非常少,但消息實時性不會受影響,因為Master宕機后,消費者仍然可以從Slave消費,此過程對應用透明,不需要人工干預,性能同多Master模式幾乎一樣。 Master宕機,磁盤損壞情況,會丟失少量信息。  
多Master多Slave模式——同步雙寫 數據與服務都無單點,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高; 性能比異步復制模式稍低,大約低10%左右,發送單個消息的RT會稍高,目前主宕機后,備機不能自動切換為主機,后續會支持自動切換功能。  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM