RocketMQ(五)——消費模式


 

一、集群消費

之前的博客中,啟動的都是單個Consumer,如果啟動多個呢?

 

 RocketMQ-集群消費
其實,對於RocketMQ而言,通過ConsumeGroup的機制,實現了天然的消息負載均衡!通俗點來說,RocketMQ中的消息通過ConsumeGroup實現了將消息分發到C1/C2/C3/…的機制,這意味着我們將非常方便的通過加機器來實現水平擴展!
我們考慮一下這種情況:比如C2發生了重啟,一條消息發往C3進行消費,但是這條消息的處理需要0.1S,而此時C2剛好完成重啟,那么C2是否可能會收到這條消息呢?答案是肯定的,也就是consume broker的重啟,或者水平擴容,或者不遵守先訂閱后生產消息,都可能導致消息的重復消費!關於去重的話題會在后續中予以介紹!
至於消息分發到C1/C2/C3,其實也是可以設置策略的:

 

 

RocketMQ-消息負載策略

使用哪種策略,只需要實例化對應的對象即可,如:

AllocateMessageQueueStrategy aqs = new AllocateMessageQueueAveragelyByCircle();
consumer.setAllocateMessageQueueStrategy(aqs);

上面內容,其實是一種消費模式——集群消費。
RocketMQ的消費模式有2種,查看一下源碼:

public enum MessageModel {
    /**
     * broadcast
     */
    BROADCASTING,
    /**
     * clustering
     */
    CLUSTERING;
}

在默認情況下,就是集群消費(CLUSTERING),也就是上面提及的消息的負載均衡消費。另一種消費模式,是廣播消費(BROADCASTING)。

二、廣播消費

 

廣播消費,類似於ActiveMQ中的發布訂閱模式,消息會發給Consume Group中的每一個消費者進行消費。

 

 

 

RocketMQ-廣播消費模式設置

/**
 * Consumer,訂閱消息
 */
public class Consumer2 {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
        consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
        consumer.setConsumeMessageBatchMaxSize(10);
        
        // 設置為廣播消費模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        System.out.println(" Receive New Messages: " + msg);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;    // 重試
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        // 成功
            }
        });

        consumer.start();
        System.out.println("Consumer Started.");
    }
}

內容補充
《RocketMQ(三)——HelloWorld》那篇博客的最后提到了單批次消息消費數量 ,本文既然提到了集群消費,那就針對這兩個內容再進行一次補充吧。
如果我們有2台節點,Producerw往MQ上寫入20條數據 其中Consumer1中拉取了12條 。Consumer2中拉取了8 條,這種情況下,假如Consumer1宕機,那么我們消費數據的時候,只能消費到Consumer2中的8條,Consumer1中的12條已經持久化了。需要Consumer1恢復之后這12條數據才能繼續被消費。其實這種先啟動producer往MQ上寫數據,然后再啟動Consumer的情況本來就是違規操作,正確的情況應該是先啟動Consumer后再啟動producer

 

 我這兒整理了比較全面的JAVA相關的面試資料,


需要領取面試資料的同學,請加群:473984645


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM