一、說明
分為兩種,一種是直接發消息,client內部有選擇queue的算法,不允許外界改變。還有一種是可以自定義queue的選擇算法(內置了三種算法,不喜歡的話可以自定義算法實現)。
public class org.apache.rocketmq.client.producer.DefaultMQProducer { // 只發送消息,queue的選擇由默認的算法來實現 @Override public SendResult send(Collection<Message> msgs) {} // 自定義選擇queue的算法進行發消息 @Override public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) {} }
二、源碼
1、send(msg, mq)
1.1、使用場景
有時候我們不希望默認的queue選擇算法,而是需要自定義,一般最常用的場景在順序消息,順序消息的發送一般都會指定某組特征的消息都發當同一個queue里,這樣才能保證順序,因為單queue是有序的。
1.2、原理剖析
內置了三種算法,三種算法都實現了一個共同的接口:
org.apache.rocketmq.client.producer.MessageQueueSelector
-
SelectMessageQueueByRandom
-
SelectMessageQueueByHash
-
SelectMessageQueueByMachineRoom
-
要想自定義邏輯的話,直接實現接口重寫select方法即可。
很典型的策略模式,不同算法不同實現類,有個頂層接口。
1.2.1、SelectMessageQueueByRandom
public class SelectMessageQueueByRandom implements MessageQueueSelector { private Random random = new Random(System.currentTimeMillis()); @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // mqs.size():隊列的個數。假設隊列個數是4,那么這個value就是0-3之間隨機。 int value = random.nextInt(mqs.size()); return mqs.get(value); } }
so easy,就是純隨機。
mqs.size():隊列的個數。假設隊列個數是4,那么這個value就是0-3之間隨機。
1.2.2、SelectMessageQueueByHash
public class SelectMessageQueueByHash implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); // 防止出現負數,取個絕對值,這也是我們平時開發中需要注意到的點 if (value < 0) { value = Math.abs(value); } // 直接取余隊列個數。 value = value % mqs.size(); return mqs.get(value); } }
so easy,就是純取余。
mqs.size():隊列的個數。假設隊列個數是4,且value的hashcode是3,那么3 % 4 = 3,那么就是最后一個隊列,也就是四號隊列,因為下標從0開始。
1.2.3、SelectMessageQueueByMachineRoom
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { private Set<String> consumeridcs; @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } public Set<String> getConsumeridcs() { return consumeridcs; } public void setConsumeridcs(Set<String> consumeridcs) { this.consumeridcs = consumeridcs; } }
沒看懂有啥鳥用,直接return null; 所以如果有自定義需求的話直接自定義就好了,這玩意沒看出有啥卵用。
1.2.4、自定義算法
public class MySelectMessageQueue implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return mqs.get(0); } }
永遠都選擇0號隊列,也就是第一個隊列。只是舉個例子,實際看你業務需求。
1.3、調用鏈
org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg) -> org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg) -> org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg, long timeout) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl(xxx) -> mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); -> selector.select(messageQueueList, userMessage, arg) -> org.apache.rocketmq.client.producer.MessageQueueSelector#select(final List<MessageQueue> mqs, final Message msg, final Object arg)
2、send(msg)
2.1、使用場景
一般沒特殊需求的場景都用這個。因為他默認的queue選擇算法很不錯,各種優化場景都替我們想到了。
2.2、原理剖析
// {@link org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl} // 這是發送消息核心原理,不清楚的看我之前發消息源碼分析的文章 // 選擇消息要發送的隊列 MessageQueue mq = null; for (int times = 0; times < 3; times++) { // 首次肯定是null String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 調用下面的方法進行選擇queue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { // 給mq賦值,如果首次失敗了,那么下次重試的時候(也就是下次for的時候),mq就有值了。 mq = mqSelected; ...... // 很關鍵,能解答下面會提到的兩個問題: // 1.faultItemTable是什么時候放進去的? // 2.isAvailable() 為什么只是判斷一個時間就可以知道Broker是否可用? this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); } }
選擇queue的主入口
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 默認為false,代表不啟用broker故障延遲 if (this.sendLatencyFaultEnable) { try { // 隨機數且+1 int index = tpInfo.getSendWhichQueue().getAndIncrement(); // 遍歷 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { // 先(隨機數 +1) % queue.size() int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) { pos = 0; } MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 看找到的這個queue所屬的broker是不是可用的 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 非失敗重試,直接返回到的隊列 // 失敗重試的情況,如果和選擇的隊列是上次重試是一樣的,則返回 // 也就是說如果你這個queue所在的broker可用, // 且不是重試進來的或失敗重試的情況,如果和選擇的隊列是上次重試是一樣的,那你就是天選之子了。 if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) { return mq; } } } // 如果所有隊列都不可用,那么選擇一個相對好的broker,不考慮可用性的消息隊列 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // 隨機選擇一個queue return tpInfo.selectOneMessageQueue(); } // 當sendLatencyFaultEnable=false的時候選擇queue的方法,默認就是false。 return tpInfo.selectOneMessageQueue(lastBrokerName); }
2.2.1、不啟用broker故障延遲
既然sendLatencyFaultEnable默認是false,那就先看當sendLatencyFaultEnable=false時候的邏輯
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { // 第一次就是null,第二次(也就是重試的時候)就不是null了。 if (lastBrokerName == null) { // 第一次選擇隊列的邏輯 return selectOneMessageQueue(); } else { // 第一次選擇隊列發送消息失敗了,第二次重試的時候選擇隊列的邏輯 int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 過濾掉上次發送消息失敗的隊列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
那就繼續看第一次選擇隊列的邏輯:
public MessageQueue selectOneMessageQueue() { // 當前線程有個ThreadLocal變量,存放了一個隨機數 {@link org.apache.rocketmq.client.common.ThreadLocalIndex#getAndIncrement} // 然后取出隨機數根據隊列長度取模且將隨機數+1 int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) { pos = 0; } return this.messageQueueList.get(pos); }
好吧,其實也有點隨機一個的意思。但是亮點在於取出隨機數根據隊列長度取模且將隨機數+1,這個+1亮了(getAndIncrement cas +1)。
當消息第一次發送失敗時,lastBrokerName會存放當前選擇失敗的broker(mq = mqSelected),通過重試,此時lastBrokerName有值,代表上次選擇的boker發送失敗,則重新對sendWhichQueue本地線程變量+1,遍歷選擇消息隊列,直到不是上次的broker,也就是為了規避上次發送失敗的broker的邏輯所在。
舉個例子:你這次隨機數是1,隊列長度是4,1%4=1,這時候失敗了,進入重試,那么重試之前,也就是在上一步1%4之后,他把1進行了++操作,變成了2,那么你這次重試的時候就是2%4=2,直接過濾掉了剛才失敗的broker。
那就繼續看第二次重試選擇隊列的邏輯:
// +1 int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { // 取模 int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 過濾掉上次發送消息失敗的隊列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } // 沒找到能用的queue的話繼續走默認的那個 return selectOneMessageQueue();
so easy,你上次不是失敗了,進入我這里重試來了嗎?我也很簡單,我就還是取出隨機數+1然后取模隊列長度,我看這個broker是不是上次失敗的那個,是他小子的話就過濾掉,繼續遍歷queue找下一個能用的。
2.2.2、啟用broker故障延遲
也就是下面if里的邏輯
if (this.sendLatencyFaultEnable) { .... }
看上面的注釋就行了,很清晰了,就是我先(隨機數 +1) % queue.size()
,然后看你這個queue所屬的broker是否可用,可用的話且不是重試進來的或失敗重試的情況,如果和選擇的隊列是上次重試是一樣的,那直接return你就完事了。那么怎么看broker是否可用的呢?
// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#isAvailable(String)} public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } // {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl.FaultItem#isAvailable()} public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
疑問:
- faultItemTable是什么時候放進去的?
- isAvailable() 為什么只是判斷一個時間就可以知道Broker是否可用?
這就需要上面發送消息完成后所調用的這個方法了:
// {@link org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem} // 發送開始時間 beginTimestampPrev = System.currentTimeMillis(); // 進行發送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); // 發送結束時間 endTimestamp = System.currentTimeMillis(); // 更新broker的延遲情況 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
細節邏輯如下:
// {@link org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem} public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { // 首次isolation傳入的是false,currentLatency是發送消息所耗費的時間,如下 // this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; // 根據延遲時間對比MQFaultStrategy中的延遲級別數組latencyMax 不可用時長數組notAvailableDuration 來將該broker加進faultItemTable中。 private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { // 假設currentLatency花費了10ms,那么latencyMax里的數據顯然不符合下面的所有判斷,所以直接return 0; if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; } // {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem()} @Override // 其實主要就是給startTimestamp賦值為當前時間+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的結果,給isAvailable()所用 // 也就是說只有notAvailableDuration == 0的時候,isAvailable()才會返回true。 public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); // 給startTimestamp賦值為當前時間+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的結果,給isAvailable()所用 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); // 給startTimestamp賦值為當前時間+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的結果,給isAvailable()所用 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); // 給startTimestamp賦值為當前時間+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的結果,給isAvailable()所用 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
下面這兩句代碼詳細解釋下:
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
latencyMax | notAvailableDuration |
---|---|
50L | 0L |
100L | 0L |
550L | 30000L |
1000L | 60000L |
2000L | 120000L |
3000L | 180000L |
15000L | 600000L |
即
- currentLatency大於等於50小於100,則notAvailableDuration為0
- currentLatency大於等於100小於550,則notAvailableDuration為0
- currentLatency大於等於550小於1000,則notAvailableDuration為300000
- …等等
再來舉個例子:
假設isolation傳入true,
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
那么notAvailableDuration將傳入600000L。結合isAvailable方法,大概流程如下:
RocketMQ為每個Broker預測了個可用時間(當前時間+notAvailableDuration),當當前時間大於該時間,才代表Broker可用,而notAvailableDuration有6個級別和latencyMax的區間一一對應,根據傳入的currentLatency去預測該Broker在什么時候可用。
所以再來看這個
public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
根據執行時間來看落入哪個區間,在0~100的時間內notAvailableDuration都是0,都是可用的,大於該值后,可用的時間就會開始變大了,就認為不是最優解,直接舍棄。
2.3、調用鏈
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl(xxx) -> MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue(xxx) org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName)
2.4、總結
- 在不開啟容錯的情況下,輪詢隊列進行發送,如果失敗了,重試的時候過濾失敗的Broker
- 如果開啟了容錯策略,會通過RocketMQ的預測機制來預測一個Broker是否可用
- 如果上次失敗的Broker可用那么還是會選擇該Broker的隊列
- 如果上述情況失敗,則隨機選擇一個進行發送
- 在發送消息的時候會記錄一下調用的時間與是否報錯,根據該時間去預測broker的可用時間
三、總結
1、疑問
他搞了兩個重載send()方法,一個支持算法選擇器,一個不支持算法選擇,queue的算法選擇是個典型的策略模式。為什么send(message)
方法內置的queue選擇算法不抽出到單獨的類中,然后此類實現org.apache.rocketmq.client.producer.MessageQueueSelector
接口呢?比如叫:SelectMessageQueueByBest
,比如如下:
public class org.apache.rocketmq.client.producer.DefaultMQProducer { // 只發送消息,queue的選擇由默認的算法來實現 @Override public SendResult send(Collection<Message> msgs) { this.send(msgs, new SelectMessageQueueByBest().select(xxx)); } // 自定義選擇queue的算法進行發消息 @Override public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) {} }
我猜測可能是這個算法過於復雜,與其它類的交互也過於多,參數也可能和內置的其他三個不同,所以沒搞到一起,但是還是搞到一起規范呀,干的同一件事,只是算法不同,很典型的策略模式。
2、面試
問:發消息的時候選擇queue的算法有哪些?
答:分為兩種,一種是直接發消息,不能選擇queue,這種的queue選擇算法如下:
- 在不開啟容錯的情況下,輪詢隊列進行發送,如果失敗了,重試的時候過濾失敗的Broker
- 如果開啟了容錯策略,會通過RocketMQ的預測機制來預測一個Broker是否可用
- 如果上次失敗的Broker可用那么還是會選擇該Broker的隊列
- 如果上述情況失敗,則隨機選擇一個進行發送
- 在發送消息的時候會記錄一下調用的時間與是否報錯,根據該時間去預測broker的可用時間
另外一種是發消息的時候可以選擇算法甚至還可以實現接口自定義算法:
SelectMessageQueueByRandom
:隨機SelectMessageQueueByHash
:hashSelectMessageQueueByMachineRoom
- 實現接口自定義
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl(xxx)
->
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue(xxx)
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName)