一、集群消費
之前的博客中,啟動的都是單個Consumer,如果啟動多個呢?
RocketMQ-集群消費
其實,對於RocketMQ而言,通過ConsumeGroup的機制,實現了天然的消息負載均衡!通俗點來說,RocketMQ中的消息通過ConsumeGroup實現了將消息分發到C1/C2/C3/…的機制,這意味着我們將非常方便的通過加機器來實現水平擴展!
我們考慮一下這種情況:比如C2發生了重啟,一條消息發往C3進行消費,但是這條消息的處理需要0.1S,而此時C2剛好完成重啟,那么C2是否可能會收到這條消息呢?答案是肯定的,也就是consume broker的重啟,或者水平擴容,或者不遵守先訂閱后生產消息,都可能導致消息的重復消費!關於去重的話題會在后續中予以介紹!
至於消息分發到C1/C2/C3,其實也是可以設置策略的:
RocketMQ-消息負載策略
使用哪種策略,只需要實例化對應的對象即可,如:
AllocateMessageQueueStrategy aqs = new AllocateMessageQueueAveragelyByCircle(); consumer.setAllocateMessageQueueStrategy(aqs);
上面內容,其實是一種消費模式——集群消費。
RocketMQ的消費模式有2種,查看一下源碼:
public enum MessageModel { /** * broadcast */ BROADCASTING, /** * clustering */ CLUSTERING; }
在默認情況下,就是集群消費(CLUSTERING),也就是上面提及的消息的負載均衡消費。另一種消費模式,是廣播消費(BROADCASTING)。
二、廣播消費
廣播消費,類似於ActiveMQ中的發布訂閱模式,消息會發給Consume Group中的每一個消費者進行消費。
RocketMQ-廣播消費模式設置
/** * Consumer,訂閱消息 */ public class Consumer2 { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name"); consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876"); consumer.setConsumeMessageBatchMaxSize(10); // 設置為廣播消費模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { System.out.println(" Receive New Messages: " + msg); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 重試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 成功 } }); consumer.start(); System.out.println("Consumer Started."); } }
內容補充
《RocketMQ(三)——HelloWorld》那篇博客的最后提到了單批次消息消費數量 ,本文既然提到了集群消費,那就針對這兩個內容再進行一次補充吧。
如果我們有2台節點,Producerw往MQ上寫入20條數據 其中Consumer1中拉取了12條 。Consumer2中拉取了8 條,這種情況下,假如Consumer1宕機,那么我們消費數據的時候,只能消費到Consumer2中的8條,Consumer1中的12條已經持久化了。需要Consumer1恢復之后這12條數據才能繼續被消費。其實這種先啟動producer往MQ上寫數據,然后再啟動Consumer的情況本來就是違規操作,正確的情況應該是先啟動Consumer后再啟動producer
我這兒整理了比較全面的JAVA相關的面試資料,
需要領取面試資料的同學,請加群:473984645