目錄
RocketMQ的負載均衡
producer對MessageQueue
的負載均衡
通過調試代碼可以知道,所謂的MessageQueue
就是broker上的隊列信息,每個topic在創建的時候可以指定相應的queue的數量。也就是說,一個topic的消息存儲在多個主broker中
producer負載均衡
producer端的負載均衡主要是在選擇對應的broker。在producer發送消息的時候會對消息進行路由,看到底是路由到哪個broker。下面主要說下以下兩種發送消息的方法:系統計算路由MessageQueue
,自定義路由MessageQueue
。
系統計算路由MessageQueue
SendResult send = producer.send(message, 60 * 1000);
系統計算路由MessageQueue的其他路由算法
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
// 默認策略(路由到當前的broker主節點列表取模后的broker中)
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
自定義路由MessageQueue
SendResult send = producer.send(message, new MessageQueueSelector() {
/**
*
* @param mqs 通過name server返回的broker主節點列表
* @param msg 當前消息
* @param arg
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int size = mqs.size();
long timeMillis = System.currentTimeMillis();
return mqs.get((int)timeMillis % size);
}
}, 60 * 1000);
Consumer的負載均衡
消費端設置負責均衡策略
在consumer.statrt()
中,consumer會對所訂閱的topic上的messagequeue做負載均衡DefaultConsumerPushImpl.start()
下的 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
, 默認返回的是AllocateMessageQueueAveragely
負責均衡策略
- AllocateMessageQueueAveragely
負載均衡的時機
// RebalanceService
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
// 開始進行分配
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
具體實現
/**
consumerGroup : 消費組名稱
currentCID:當前消費者實例Id(隨機數)
mqAll: 該topic對應的queue的信息列表
cidAll: 消費組中所有的消費者列表
*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}