Pull消費者客戶端(主動拉取消息的消費者)即構造了DefaultMQPullConsumer對象,DefaultMQPullConsumer繼承了ClientConfig類。我們先看其構造方法
[java] view plain copy public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) { this.consumerGroup = consumerGroup; defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook); }
這里只是簡單設置了consumerGroup消費者組名,表示消費者屬於哪個組。構造了DefaultMQPullConsumerImpl的實例,DefaultMQPullConsumerImpl的構造方法很簡單,只是綁定了DefaultMQPullConsumer、配置了傳入的rpcHook。
DefaultMQPullConsumer內部封裝了DefaultMQPullConsumerImpl,其中還維護這一些配置信息。這里維護着消費者訂閱的topic集合。
[java] view plain copy private Set<String> registerTopics = new HashSet<String>();
整個消費者客戶端的啟動,調用了DefaultMQPullConsumer的start()方法,內部直接調用DefaultMQPullConsumerImpl的start()方法,這個start方法加了synchronized修飾。
[java] view plain copy public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPullConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer , this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPullConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPullConsumer.getOffsetStore(); } else { switch (this.defaultMQPullConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer .getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer .getConsumerGroup()); break; default: break; } this.defaultMQPullConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl .GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } }
一開始的serverState的狀態自然為CREAT_JUST,調用checkConfig(),其中先是對ConsumerGroup進行驗證,非空,合法(符合正則規則,且長度不超過配置最大值),且不為默認值(防止消費者集群名沖突),然后對消費者消息模式、消息隊列分配算法進行非空、合法校驗。
關於消費者消息模式有BroadCasting(廣播)跟Clustering(集群)兩種、默認是Clustering(集群)配置在DefaultMQPullConsumer中。關於消費者的消息分配算法,在DefaultMQPullConsumer中實現有默認的消息分配算法,allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();(平均分配算法)。其實現了AllocateMessageQueueStrategy接口,重點看其實現的allocate()方法。
[java] view plain copy @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll); return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; }
傳入的參數有當前消費者id,所有消息隊列數組,以及當前所有消費者數組。先簡單驗證非空,再通過消費者數組大小跟消息隊列大小根據平均算法算出當前消費者該分配哪些消息隊列集合。邏輯不難。RocketMQ還提供了循環平均、一致性哈希、配置分配等算法,這里默認采用平均分配。
我們再回到DefaultMQPullConsumerImpl的start()方法,checkConfig后,調用copySubscription()方法,將配置在DefaultMQPullConsumer中的topic信息構造成並構造成subscriptionData數據結構,以topic為key以subscriptionData為value以鍵值對形式存到rebalanceImpl的subscriptionInner中。
[java] view plain copy private void copySubscription() throws MQClientException { try { Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics(); if (registerTopics != null) { for (final String topic : registerTopics) { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
接下來從MQCLientManager中得到MQClient的實例,這個步驟跟生產者客戶端相同。
再往后是對rebalanceImpl的配置,我們重點看下rebalanceImpl,它是在DefaultMQPullConsumerImpl成員中直接構造private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);即在DefaultMQPullConsumerImpl初始化的時候構造。接下來對其消費者組名、消息模式(默認集群)、隊列分配算法(默認平均分配)、消費者客戶端實例進行配置,配置信息都是從DefaultMQPullConsumer中取得。
[java] view plain copy public abstract class RebalanceImpl { protected static final Logger log = ClientLogger.getLog(); protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>(); protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>(); protected String consumerGroup; protected MessageModel messageModel; protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; protected MQClientInstance mQClientFactory;
接下來構造了PullAPIWrapper,僅僅調用其構造方法,簡單的配置下
[java] view plain copy public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) { this.mQClientFactory = mQClientFactory; this.consumerGroup = consumerGroup; this.unitMode = unitMode; }
然后初始化消費者的offsetStore,offset即偏移量,可以理解為消費進度,這里根據不同的消息模式來選擇不同的策略。如果是廣播模式,那么所有消費者都應該收到訂閱的消息,那么每個消費者只應該自己消費的消費隊列的進度,那么需要把消費進度即offsetStore存於本地采用LocalFileOffsetStroe,相反的如果是集群模式,那么集群中的消費者來平均消費消息隊列,那么應該把消費進度存於遠程采用RemoteBrokerOffsetStore。然后調用相應的load方法加載。
之后將當前消費者注冊在MQ客戶端實例上之后,調用MQClientInstance的start()方法,啟動消費者客戶端。
[java] view plain copy public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed." , null); default: break; } } }
看到這里應該很熟悉,跟生產者客戶端這里是同一段代碼,無非解析路由消息並完成路由消息的配置,啟動netty客戶端,啟動定時任務(定時更新從名稱服務器獲取路由信息更新本地路由信息,心跳,調整線程數量),后面啟動pull server、rebalance service、push service最后把serviceState狀態設為Running表示客戶端啟動。
我們在這里重點看下RebalanceService的啟動。下面貼出的是RebalanceService的run()方法。
[java] view plain copy @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }
可以看到,只要這個線程沒有被停止(客戶端沒關閉),會一直循環調用客戶端的doRebalance()方法。
[java] view plain copy public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }
MQClientInstance遍歷consumerTable(之前注冊的時候以consumerGroup為key,以消費者客戶端DefaultMQPullConsumerImpl為value存入consumerTable中)中的每個元素,循環調用其元素的doRebalance()方法。那我們看DefaultMQPullConsumerImpl的doRebalance方法。
1 [java] view plain copy 2 @Override 3 public void doRebalance() { 4 if (this.rebalanceImpl != null) { 5 this.rebalanceImpl.doRebalance(false); 6 } 7 }
直接調用了rebalanceImpl的doRebalance方法
[java] view plain copy public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
可以看到先得到subTable即subscriptionInner,之前根據配置的每個topic生成的SubscriptionData數據結構的map。先遍歷該map,得到每個topic,針對每個topic調用rebalanceByTopic()
1 [java] view plain copy 2 private void rebalanceByTopic(final String topic, final boolean isOrder) { 3 switch (messageModel) { 4 case BROADCASTING: { 5 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 6 if (mqSet != null) { 7 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); 8 if (changed) { 9 this.messageQueueChanged(topic, mqSet, mqSet); 10 log.info("messageQueueChanged {} {} {} {}", 11 consumerGroup, 12 topic, 13 mqSet, 14 mqSet); 15 } 16 } else { 17 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 18 } 19 break; 20 } 21 case CLUSTERING: { 22 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 23 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 24 if (null == mqSet) { 25 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 26 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 27 } 28 } 29 30 if (null == cidAll) { 31 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 32 } 33 34 if (mqSet != null && cidAll != null) { 35 List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); 36 mqAll.addAll(mqSet); 37 38 Collections.sort(mqAll); 39 Collections.sort(cidAll); 40 41 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 42 43 List<MessageQueue> allocateResult = null; 44 try { 45 allocateResult = strategy.allocate( 46 this.consumerGroup, 47 this.mQClientFactory.getClientId(), 48 mqAll, 49 cidAll); 50 } catch (Throwable e) { 51 log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", 52 strategy.getName(), e); 53 return; 54 } 55 56 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); 57 if (allocateResult != null) { 58 allocateResultSet.addAll(allocateResult); 59 } 60 61 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 62 if (changed) { 63 log.info( 64 "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={} 65 , cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", 66 strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), 67 allocateResultSet.size(), allocateResultSet); 68 this.messageQueueChanged(topic, mqSet, allocateResultSet); 69 } 70 } 71 break; 72 } 73 default: 74 break; 75 } 76 }
我們先重點關注集群模式下,先得到topic的本地路由信息,再通過topic跟這個消費者的組名,調用netty客戶端的同步網絡訪問topic指定的broker,從broker端得到與其連接的且是指定消費組名下訂閱指定topic的消費者id的集合。然后采用默認的分配算法的allocate()進行隊列給消費者平均分配。然后調用updateProcessQueueTableInRebalance()方法判斷是否重新隊列分配。
1 [java] view plain copy 2 private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, 3 final boolean isOrder) { 4 boolean changed = false; 5 6 Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); 7 while (it.hasNext()) { 8 Entry<MessageQueue, ProcessQueue> next = it.next(); 9 MessageQueue mq = next.getKey(); 10 ProcessQueue pq = next.getValue(); 11 12 if (mq.getTopic().equals(topic)) { 13 if (!mqSet.contains(mq)) { 14 pq.setDropped(true); 15 if (this.removeUnnecessaryMessageQueue(mq, pq)) { 16 it.remove(); 17 changed = true; 18 log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); 19 } 20 } else if (pq.isPullExpired()) { 21 switch (this.consumeType()) { 22 case CONSUME_ACTIVELY: 23 break; 24 case CONSUME_PASSIVELY: 25 pq.setDropped(true); 26 if (this.removeUnnecessaryMessageQueue(mq, pq)) { 27 it.remove(); 28 changed = true; 29 log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", 30 consumerGroup, mq); 31 } 32 break; 33 default: 34 break; 35 } 36 } 37 } 38 } 39 40 List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); 41 for (MessageQueue mq : mqSet) { 42 if (!this.processQueueTable.containsKey(mq)) { 43 if (isOrder && !this.lock(mq)) { 44 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); 45 continue; 46 } 47 48 this.removeDirtyOffset(mq); 49 ProcessQueue pq = new ProcessQueue(); 50 long nextOffset = this.computePullFromWhere(mq); 51 if (nextOffset >= 0) { 52 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); 53 if (pre != null) { 54 log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); 55 } else { 56 log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); 57 PullRequest pullRequest = new PullRequest(); 58 pullRequest.setConsumerGroup(consumerGroup); 59 pullRequest.setNextOffset(nextOffset); 60 pullRequest.setMessageQueue(mq); 61 pullRequest.setProcessQueue(pq); 62 pullRequestList.add(pullRequest); 63 changed = true; 64 } 65 } else { 66 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); 67 } 68 } 69 } 70 71 this.dispatchPullRequest(pullRequestList); 72 73 return changed; 74 }
先遍歷processQueueTable,看其topic下的該處理消息隊列是否還是應該處理,由於新分配之后,消息隊列可能會改變,所以原該處理的消息隊列可能沒必要處理,因此沒必要處理的消息隊列移除。當然也有可能多出需要處理的消息隊列,於是需要建立其與processQueue的對應關系,先調用computerPullFromWhere得到該條消息下次拉取數據的位置,在RebalancePullImpl中實現了該方法直接返回0,把該處理的mq封裝成pq后,更新到processQueueTable中。若有更新,無論是增加還是刪除,則changed都設為true。(這個地方講的有點模糊,他是客戶端pull與push區別的關鍵,實際上push不過是在pull之上封裝了下操作,后面我們會重新回來分析。)
方法返回后,如果changed為true,會調用messageQueueChanged方法來通知配置在DefaultMQPullConsumer中的相關messageQueueListener,我們可以看到RebalancePullImpl中的實現。
1 [java] view plain copy 2 @Override 3 public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { 4 MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener(); 5 if (messageQueueListener != null) { 6 try { 7 messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); 8 } catch (Throwable e) { 9 log.error("messageQueueChanged exception", e); 10 } 11 } 12 }
廣播模式則比較簡單,由於所有消費者都要處理,少了隊列分配這個步驟。
本文轉載自:https://blog.csdn.net/panxj856856/article/details/80725630