DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个
1、DefaultMQPushConsumer启动后,会马上触发一个deRebalance动作;
1.1、DefaultMQPushConsumerImpl.start()
1 public synchronized void start() throws MQClientException { 2 switch (this.serviceState) { 3 case CREATE_JUST: 4 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), 5 this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); 6 this.serviceState = ServiceState.START_FAILED; 7 8 this.checkConfig(); 9 10 this.copySubscription(); 11 12 if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { 13 this.defaultMQPushConsumer.changeInstanceNameToPID(); 14 } 15 16 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); 17 18 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); 19 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); 20 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); 21 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); 22 23 this.pullAPIWrapper = new PullAPIWrapper( 24 mQClientFactory, 25 this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); 26 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); 27 28 if (this.defaultMQPushConsumer.getOffsetStore() != null) { 29 this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); 30 } else { 31 switch (this.defaultMQPushConsumer.getMessageModel()) { 32 case BROADCASTING: 33 this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); 34 break; 35 case CLUSTERING: 36 this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); 37 break; 38 default: 39 break; 40 } 41 this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); 42 } 43 this.offsetStore.load(); 44 45 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { 46 this.consumeOrderly = true; 47 this.consumeMessageService = 48 new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); 49 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { 50 this.consumeOrderly = false; 51 this.consumeMessageService = 52 new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); 53 } 54 55 this.consumeMessageService.start(); 56 57 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); 58 if (!registerOK) { 59 this.serviceState = ServiceState.CREATE_JUST; 60 this.consumeMessageService.shutdown(); 61 throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() 62 + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), 63 null); 64 } 65 66 mQClientFactory.start(); 67 log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); 68 this.serviceState = ServiceState.RUNNING; 69 break; 70 case RUNNING: 71 case START_FAILED: 72 case SHUTDOWN_ALREADY: 73 throw new MQClientException("The PushConsumer service state not OK, maybe started once, " 74 + this.serviceState 75 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), 76 null); 77 default: 78 break; 79 }
1.2、MQClientInstance.start()
1 public void start() throws MQClientException { 2 3 synchronized (this) { 4 switch (this.serviceState) { 5 case CREATE_JUST: 6 this.serviceState = ServiceState.START_FAILED; 7 // If not specified,looking address from name server 8 if (null == this.clientConfig.getNamesrvAddr()) { 9 this.mQClientAPIImpl.fetchNameServerAddr(); 10 } 11 // Start request-response channel 12 this.mQClientAPIImpl.start(); 13 // Start various schedule tasks 14 this.startScheduledTask(); 15 // Start pull service 16 this.pullMessageService.start(); 17 // Start rebalance service 18 this.rebalanceService.start(); 19 // Start push service 20 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); 21 log.info("the client factory [{}] start OK", this.clientId); 22 this.serviceState = ServiceState.RUNNING; 23 break; 24 case RUNNING: 25 break; 26 case SHUTDOWN_ALREADY: 27 break; 28 case START_FAILED: 29 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); 30 default: 31 break; 32 } 33 } 34 }
1.3、org.apache.rocketmq.common.ServiceThread.start()
RebalanceService.run()
1 @Override 2 public void run() { 3 log.info(this.getServiceName() + " service started"); 4 5 while (!this.isStopped()) { 6 this.waitForRunning(waitInterval); 7 this.mqClientFactory.doRebalance(); 8 } 9 10 log.info(this.getServiceName() + " service end"); 11 }
1.4、MQClientInstance.doRebalance()‘
1 public void doRebalance() { 2 for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { 3 MQConsumerInner impl = entry.getValue(); 4 if (impl != null) { 5 try { 6 impl.doRebalance(); 7 } catch (Throwable e) { 8 log.error("doRebalance exception", e); 9 } 10 } 11 } 12 }
2、而且在同一个ConsumerGroup里加入新的DefaultMQPushConsumer时,
各个Consumer都会被触发doRebalance动作
ClientRemotingProcessor.processRequest(ChannelHandlerContext, RemotingCommand)
1 @Override 2 public RemotingCommand processRequest(ChannelHandlerContext ctx, 3 RemotingCommand request) throws RemotingCommandException { 4 switch (request.getCode()) { 5 case RequestCode.CHECK_TRANSACTION_STATE: 6 return this.checkTransactionState(ctx, request); 7 case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: 8 return this.notifyConsumerIdsChanged(ctx, request); 9 case RequestCode.RESET_CONSUMER_CLIENT_OFFSET: 10 return this.resetOffset(ctx, request); 11 case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT: 12 return this.getConsumeStatus(ctx, request); 13 14 case RequestCode.GET_CONSUMER_RUNNING_INFO: 15 return this.getConsumerRunningInfo(ctx, request); 16 17 case RequestCode.CONSUME_MESSAGE_DIRECTLY: 18 return this.consumeMessageDirectly(ctx, request); 19 default: 20 break; 21 } 22 return null; 23 }
consumer负载均衡策略接口AllocateMessageQueueStrategy

1 /** 2 * Strategy Algorithm for message allocating between consumers 3 */ 4 public interface AllocateMessageQueueStrategy { 5 6 /** 7 * Allocating by consumer id 8 * 9 * @param consumerGroup current consumer group 10 * @param currentCID current consumer id 11 * @param mqAll message queue set in current topic 12 * @param cidAll consumer set in current consumer group 13 * @return The allocate result of given strategy 14 */ 15 List<MessageQueue> allocate( 16 final String consumerGroup, 17 final String currentCID, 18 final List<MessageQueue> mqAll, 19 final List<String> cidAll 20 ); 21 22 /** 23 * Algorithm name 24 * 25 * @return The strategy name 26 */ 27 String getName(); 28 }
具体的负载均衡有六种 ,
默认使用AllocateMessageQueueAveragely,负载均衡的结果与Topic的Message Queue数量,以及
ConsumerGroup里的Consumer的数量有关。负载均衡的分配粒度只到Message Queue,把Topic下的所有
Message Queue分配到不同Consumer中,所以Message Queue和Consumer的数量关系,或者整除关系
影响负载均衡结果
3、以下以AllocateMessageQueueAveragely为例讲解
3.1 RebalanceImpl.doRebalance(boolean)
1 public void doRebalance(final boolean isOrder) { 2 Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); 3 if (subTable != null) { 4 for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { 5 final String topic = entry.getKey(); 6 try { 7 this.rebalanceByTopic(topic, isOrder); 8 } catch (Throwable e) { 9 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 10 log.warn("rebalanceByTopic Exception", e); 11 } 12 } 13 } 14 } 15 16 this.truncateMessageQueueNotMyTopic(); 17 }
3.2、RebalanceImpl.rebalanceByTopic(String, boolean)
1 private void rebalanceByTopic(final String topic, final boolean isOrder) { 2 switch (messageModel) { 3 case BROADCASTING: { 4 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 5 if (mqSet != null) { 6 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); 7 if (changed) { 8 this.messageQueueChanged(topic, mqSet, mqSet); 9 log.info("messageQueueChanged {} {} {} {}", 10 consumerGroup, 11 topic, 12 mqSet, 13 mqSet); 14 } 15 } else { 16 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 17 } 18 break; 19 } 20 case CLUSTERING: {
//获取该Topic下所有的MessageQueue,包括不同broker下的 21 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//查询该consumerGroup,topic下consumerIdList 22 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 23 if (null == mqSet) { 24 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 25 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 26 } 27 } 28 29 if (null == cidAll) { 30 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 31 } 32 33 if (mqSet != null && cidAll != null) { 34 List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); 35 mqAll.addAll(mqSet); 36 37 Collections.sort(mqAll); 38 Collections.sort(cidAll); 39 40 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 41 42 List<MessageQueue> allocateResult = null; 43 try { 44 allocateResult = strategy.allocate( 45 this.consumerGroup, 46 this.mQClientFactory.getClientId(), 47 mqAll, 48 cidAll); 49 } catch (Throwable e) { 50 log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), 51 e); 52 return; 53 } 54 55 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); 56 if (allocateResult != null) { 57 allocateResultSet.addAll(allocateResult); 58 } 59 60 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 61 if (changed) { 62 log.info( 63 "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", 64 strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), 65 allocateResultSet.size(), allocateResultSet); 66 this.messageQueueChanged(topic, mqSet, allocateResultSet); 67 } 68 } 69 break; 70 } 71 default: 72 break; 73 } 74 }
3.3、AllocateMessageQueueStrategy.allocate(String, String, List<MessageQueue>, List<String>)
1 /** 2 * Average Hashing queue algorithm 3 */ 4 public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { 5 private final InternalLogger log = ClientLogger.getLog(); 6 7 @Override 8 public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 9 List<String> cidAll) { 10 if (currentCID == null || currentCID.length() < 1) { 11 throw new IllegalArgumentException("currentCID is empty"); 12 } 13 if (mqAll == null || mqAll.isEmpty()) { 14 throw new IllegalArgumentException("mqAll is null or mqAll empty"); 15 } 16 if (cidAll == null || cidAll.isEmpty()) { 17 throw new IllegalArgumentException("cidAll is null or cidAll empty"); 18 } 19 20 List<MessageQueue> result = new ArrayList<MessageQueue>(); 21 if (!cidAll.contains(currentCID)) { 22 log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", 23 consumerGroup, 24 currentCID, 25 cidAll); 26 return result; 27 } 28 29 int index = cidAll.indexOf(currentCID); 30 int mod = mqAll.size() % cidAll.size(); 31 int averageSize = 32 mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() 33 + 1 : mqAll.size() / cidAll.size()); 34 int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; 35 int range = Math.min(averageSize, mqAll.size() - startIndex); 36 for (int i = 0; i < range; i++) { 37 result.add(mqAll.get((startIndex + i) % mqAll.size())); 38 } 39 return result; 40 } 41 42 @Override 43 public String getName() { 44 return "AVG"; 45 } 46 }
分配算法
平均分配策略(默认)(AllocateMessageQueueAveragely)
环形分配策略(AllocateMessageQueueAveragelyByCircle)
手动配置分配策略(AllocateMessageQueueByConfig)
机房分配策略(AllocateMessageQueueByMachineRoom)
一致性哈希分配策略(AllocateMessageQueueConsistentHash)
靠近机房策略(AllocateMachineRoomNearby)
平均分配、环形分配如下
普通消费方式
Message Queue |
ConsumerId |
消息队列[0] | Consumer[0] |
消息队列[1] | Consumer[0] |
消息队列[2] | Consumer[0] |
消息队列[3] | Consumer[1] |
消息队列[4] | Consumer[1] |
消息队列[5] | Consumer[1] |
消息队列[6] | Consumer[2] |
消息队列[7] | Consumer[2] |
- 环形消费方式
Message Queue | ConsumerId |
消息队列[0] | Consumer[0] |
消息队列[1] | Consumer[1] |
消息队列[2] | Consumer[2] |
消息队列[3] | Consumer[0] |
消息队列[4] | Consumer[1] |
消息队列[5] | Consumer[2] |
消息队列[6] | Consumer[0] |
消息队列[7] | Consumer[1] |
机房分配策略
1 @Override 2 public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 3 List<String> cidAll) { 4 List<MessageQueue> result = new ArrayList<MessageQueue>(); 5 int currentIndex = cidAll.indexOf(currentCID); 6 if (currentIndex < 0) { 7 return result; 8 } 9 List<MessageQueue> premqAll = new ArrayList<MessageQueue>(); 10 for (MessageQueue mq : mqAll) { 11 String[] temp = mq.getBrokerName().split("@"); 12 if (temp.length == 2 && consumeridcs.contains(temp[0])) { 13 premqAll.add(mq); 14 } 15 } 16 17 int mod = premqAll.size() / cidAll.size(); 18 int rem = premqAll.size() % cidAll.size(); 19 int startIndex = mod * currentIndex; 20 int endIndex = startIndex + mod; 21 for (int i = startIndex; i < endIndex; i++) { 22 result.add(mqAll.get(i)); 23 } 24 if (rem > currentIndex) { 25 result.add(premqAll.get(currentIndex + mod * cidAll.size())); 26 } 27 return result; 28 }
第4-7行, 计算当前消费者在消费者集合中的下标(index), 如果下标 < 0 , 则直接返回
第8-14行, 根据brokerName解析出所有有效机房信息(其实是有效mq), 结果存储在premqAll中
第17行, 计算消息整除的平均结果mod
第18行, 计算消息是否能够被平均消费rem,(即消息平均消费后还剩多少消息队列(remaing))
第19行, 计算当前消费者开始消费的下标(startIndex)
第20行, 计算当前消费者结束消费的下标(endIndex)
第21-26行, 将消息的消费分为两部分, 第一部分 – (cidAllSize * mod) , 第二部分 – (premqAll - cidAllSize * mod) ;
从第一部分中查询startIndex ~ endIndex之间所有的消息, 从第二部分中查询 currentIndex + mod * cidAll.size() , 最后返回查询的结果result
可以通过下面的例子进一步了解,假设有三个消费者, 八个消息队列
Message Queue Consumer |
Consumer |
消息队列[0] | Consumer[0] |
消息队列[1] | Consumer[0] |
消息队列[2] | Consumer[1] |
消息队列[3] | Consumer[1] |
消息队列[4] | Consumer[2] |
消息队列[5] | Consumer[2] |
消息队列[6] | Consumer[0] |
消息队列[7] | Consumer[1] |
靠近机房算法
1
/**
* An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
* specified.
*
* If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
* should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
* no alive consumer to monopolize them.
*/
@Override 2 public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 3 List<String> cidAll) { 4 if (currentCID == null || currentCID.length() < 1) { 5 throw new IllegalArgumentException("currentCID is empty"); 6 } 7 if (mqAll == null || mqAll.isEmpty()) { 8 throw new IllegalArgumentException("mqAll is null or mqAll empty"); 9 } 10 if (cidAll == null || cidAll.isEmpty()) { 11 throw new IllegalArgumentException("cidAll is null or cidAll empty"); 12 } 13 14 List<MessageQueue> result = new ArrayList<MessageQueue>(); 15 if (!cidAll.contains(currentCID)) { 16 log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", 17 consumerGroup, 18 currentCID, 19 cidAll); 20 return result; 21 } 22 23 //group mq by machine room 24 Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>(); 25 for (MessageQueue mq : mqAll) { 26 String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq); 27 if (StringUtils.isNoneEmpty(brokerMachineRoom)) { 28 if (mr2Mq.get(brokerMachineRoom) == null) { 29 mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>()); 30 } 31 mr2Mq.get(brokerMachineRoom).add(mq); 32 } else { 33 throw new IllegalArgumentException("Machine room is null for mq " + mq); 34 } 35 } 36 37 //group consumer by machine room 38 Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>(); 39 for (String cid : cidAll) { 40 String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid); 41 if (StringUtils.isNoneEmpty(consumerMachineRoom)) { 42 if (mr2c.get(consumerMachineRoom) == null) { 43 mr2c.put(consumerMachineRoom, new ArrayList<String>()); 44 } 45 mr2c.get(consumerMachineRoom).add(cid); 46 } else { 47 throw new IllegalArgumentException("Machine room is null for consumer id " + cid); 48 } 49 } 50 51 List<MessageQueue> allocateResults = new ArrayList<MessageQueue>(); 52 53 //1.allocate the mq that deploy in the same machine room with the current consumer 54 String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID); 55 List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom); 56 List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom); 57 if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) { 58 allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom)); 59 } 60 61 //2.allocate the rest mq to each machine room if there are no consumer alive in that machine room 62 for (String machineRoom : mr2Mq.keySet()) { 63 if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues 64 allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll)); 65 } 66 } 67 68 return allocateResults; 69 }