【不同類型的消費者】
DefaultMQPushConsumer
由系統控制讀取操作,收到消息后自動調用傳入的處理方法來處理。
DefaultMQPullConsumer
讀取操作中的大部分功能由使用者自動控制。
【DefaultMQPushConsumer的使用】
[特點]
1.系統收到消息后自動調用處理方法來處理消息,自動保存Offset。
2.加入的新的DefaultMQPushConsumer會自動做負載均衡。
public class QuickStart { /** * DefaultMQPushConsumer需要配置三個參數 * 1.Consumer的GroupName * 2.NameServer的地址和端口號 * 3.Topic的名稱 */ public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("unique_group_name_1"); //1.GroupName Consumer.setNamesrvAddr("127.0.0.1:9876;127.0.0.2:9876"); //2.NameServer的地址和端口號 Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); Consumer.setMessageModel(MessageModel.BROADCASTING); Consumer.subscribe("TopicTest", "*"); //3.Topic的名稱 Consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + "Receive New Messages:" + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); Consumer.start(); } }
[ 注意1:關於GroupName ]
RocketMQ支持兩種消息模式:Clustering(集群消費) 和 Broadcasting(廣播消費)。
1.Clustering模式 (即P2P模式)
同一個ConsumerGroup(即相同的GroupName)里的每個Consumer只消費訂閱消息的一部分,同一個ConsumerGroup里所有的Consumer消費的內容合起來才是所訂閱的Topic內容的整體,從而達到負載均衡的目的。
2.Broading模式(即發布-訂閱模式)
同一個ConsumerGroup里的每個Consumer都能消費到所訂閱的Topic的消息,就是一個消息會被多次分發,被多個Consumer消費。
[ 注意2:關於NameServer配置 ]
NameServer的地址和端口號,可以填寫多個,用 ";" 隔開,達到消除單點故障的目的。如"ip1:port;ip2:"
[ 注意3:Topic的配置 ]
Topic的名稱用來標識消息類型,需要填創建。如果不需要消費某個Topic下的所有消息,可以通過指定消息的Tag進行消息過濾,比如:
//表示這個Consumer只消費"TopicTest"下的帶有tag1、tag2、tag3的消息 Consumer.subscribe("TopicTest", "tag1||tag2||tag3");
Tag是發消息時設置的標簽,在填寫Tag參數的位置,用null或"*"表示要消費這個Topic的所有消息。
【DefaultMQPushConsumer的處理流程】
DefaultMQPushConsumer的主要功能實現在DefaultMQPushConsumerImpl中,消息處理在pullMessage的PullCallBack中。
在PullCallBack中有個Switch語句,根據Broker返回的消息類型做對應的處理。
DefaultMQPushConsumer的源碼中有很多PullRequest語句,比如
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest)
為什么Push的Consumer中會出現PullRequest相關的代碼呢?
通過長輪詢的方式達到Push效果的方法,長輪詢的方式既有Pull的優點,也有Push方式的實時性。
[ 補充——Push方式和Pull方式的區別 ]
1.Push方式
過程:Server收到消息后,主動把消息推送給Client端。
優點:實時性高。
缺點:
加大了Server端的工作量,會影響Server的性能。
Client端處理能力各不相同,Client的狀態不收Server控制,如果Client端不能及時處理Server推送過來的消息,會造成各種潛在的問題。
2.Pull方式
過程:Client端循環地從Server端拉取消息,主動權在Client手里,自己拉取到一定量的消息后,處理完成之后繼續取。
缺點:
循環拉取的消息間隔時間不好設定,間隔太短就處於忙等狀態,浪費資源;間隔太長,消息不能被及時處理。
[ 長輪詢的方式 ]
長輪詢的方式通過Client端和Server端的配合,達到了既有Pull方式的優點,也能達到保證實時性的目的。
[ 長輪詢的發送Pull消息的代碼片段 ]
拼接PullMessageRequestHeader,然后作為消息參數發送。
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.ConsumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueid()); requestHeader.setQueueOffset(Offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlaginner); requestHeader.setCommitOffset(commitOffset); //設置了Broker的最長阻塞時間,默認15秒,Broker沒有消息時才阻塞,有消息會立刻返回。 requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType);
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);
[ 長輪詢的Broker服務端代碼 ]
//如果隊列里沒有消息 if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); //等待5S } else { this.waitForRunning(this.brokerController.getBrokerConfig(). getShortPollingTimeMills()); long beginLockTimestamp = this.systemClock.now(); this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { Log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } }
從Broker的源碼中看,服務端收到新消息請求時,如果隊列里沒有消息,並不急於返回,而是循環不斷的查看狀態,每次waitForRunning一段時間(默認5S),然后再check。默認情況下當Broker一直沒有新消息,第三次check的時候,等待時間超過了RequestHeader里面的SuspendTimeoutMillis,就會返回空結果。
在等待的過程中,Broker收到了新的消息后會直接調用notifyMessageAriving方法返回的請求結果。
[ 長輪詢小結 ]
長輪詢的核心是,Broker端HOLD住客戶端發送過來的請求一小段時間,在這個時間里有新的消息到達,就利用現有的連接立即返回給Consumer。
長輪詢的主動權還是掌握在Consumer手中,Broker即使有大量的消息積壓,也不會主動推送給Consumer。
局限性:
在HOLD住Consumer請求的時候需要占用資源,它適合用在消息隊列這種客戶端連接可控的場景。
【DefaultMQPush的流量控制】
PushConsumer的核心還是Pull方式,所以采用這種方式的客戶端能根據自身的處理速度調整獲取消息的操作速度。
PushConsumer有一個線程池,消息處理邏輯在各個線程里同時執行,線程池定義如下:
this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryimpl("ConsumeMessageThread") );
Pull拉的消息,如果直接提交到線程池,很難監控和控制,比如當前消息堆積數量、消息是否重復執行、如何延遲處理某些消息。這些問題,都用一個快照類ProcessQueue來解決,在PushConsumer運行的時候,每個Message Queue都有個對應的ProcessQueue對象,保存這個Message Queue消息處理狀態的快照。
[ ProcessQueue對象 ]
主要組成:一個TreeMap + 一個讀寫鎖。
[ PushConsumer的流量控制 ]
有了ProcessQueue對象,流量的控制就方便多了。
PushConsumer會判斷下面三個數據:
1.獲取但還未處理的消息個數;
2.消息的總大小;
3.Offset的跨度;
任何一個值超過設定的大小就會隔一段時間再拉取,從而達到流量控制的目的。
【 DefaultMQPullConsumer 】
使用DefaultMQPullConsumer像使用DefaultMQPushConsumer一樣需要設置各種參數,寫處理消息的方法等。
[ PullConsumer示例代碼 ]
public class PullConsumer { private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer Consumer = new DefaultMQPullConsumer("group_name_A"); Consumer.start(); Set<MessageQueue> mqs = Consumer.fetchSubscribeMessageQueues("TopicTest"); /** 1.獲取MessageQueue,並遍歷 **/ for (MessageQueue mq : mqs) { /** 2.維護Offsetstore **/ long Offset = Consumer.fetchConsumeOffset(mq, true); System.out.printf("Consume from the Queue:" + mq + "%n"); SINGLE_MQ: while (true) { try { PullResult pullResult = Consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); /** 3.根據不同的消息狀態做不同的處理 **/ switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; default: break; } } catch (Exception e) { e.printStackTrace(); } Consumer.shutdown(); } } } private static long getMessageQueueOffset(MessageQueue mq){ Long Offset = OFFSE_TABLE.get(mq); if(Offset!=null){ return Offset; } return 0; } private static void putMessageQueueOffset(MessageQueue mq,long Offset){ OFFSE_TABLE.put(mq,Offset); } }
[ PullConsumer注意點 ]
示例代碼是逐個讀取某個Topic下的所有MessageQueue的內容,主要做了三件事:
1.獲取MessageQueue,並遍歷
一個Topic包含多個MessageQueue。
如果這個Consumer需要獲取這個Topic下的所有消息,就要遍歷所有的MessageQueue。如果有特殊情況,也可以選擇某些特定的MessageQueue來讀取消息。
2.維護Offsetstore
從一個MessageQueue里拉取消息時,要傳入Offset參數,隨着不斷的讀取消息,Offset不斷怎張。此時需要由用戶負責把Offset存儲下來,可以根據具體情況存到內存、磁盤或者數據中。
3.根據不同的消息狀態做不同的處理
拉取消息的請求發出后,會返回下面4種狀態碼:
/** * 拉取消息狀態碼 */ public enum PullStatus { /** * Founded */ FOUND, /** * No new message can be pull */ NO_NEW_MSG, /** * Filtering results can not match */ NO_MATCHED_MSG, /** * Illegal offset,may be too big or too small */ OFFSET_ILLEGAL }
比較重要的是這2個狀態碼:
FOUND 獲取到消息;
NO_NEW_MESSAGE 沒有新的消息;
【Consumer的啟動、關閉流程】
[1.PullConsumer]
PullConsumer的主動權很高,可以根據實際需要暫停、停止、啟動消費者。
[ 注意 ]
PullConsumer的重點是Offset的保存,需要再代碼中異常處理部分增加這樣的處理:把Offset寫入磁盤,記住每個MessageQueue的Offset,才能保證消息消費的准確性。
[ 2.PushConsumer ]
DefaultMQPushConsumer的退出:
調用shutdown()方法,以便釋放資源,保存Offset等。(這個調用要加到Consumer所在應用的處理邏輯中)
PushConsumer啟動:
PushConsumer啟動的時候,會做各種配置的檢查,然后連接NameServer獲取Topic信息,啟動時如遇到異常,如無法連接NameServer,程序依然可以正常啟動不報錯(日志里會有Warn信息)。
【為什么DefaultPushMQConsumer在無法連接NameServer時不報錯?】
和分布式系統的設計有關,RocketMQ集群可以有多個NameServer、Broker,某個機器出異常后整體服務依然可用。
所以DefaultMQPushConsumer被設計成當發現某個連接異常時,不立即退出,而是不斷嘗試重新連接。
【如果要在DefaultMQPushConsumer啟動的時候,及時暴露配置問題(及時報錯),如何處理?】
可以在Consumer.start()語句后調用:Consumer.fetchSubscribeMessageQueues("TopicName"),這時如果配置信息不准確,或者當前服務不可用,會報MQClientException異常。