RocketMQ之Consumer


一、Consumer 介紹

1.1 核心參數

* consumerGroup:消費者組名  
* MessageModel:消息模型,定義了消息傳遞到消費者的方式,默認是 MessageModel.CLUSTERING
    * MessageModel.BROADCASTING:廣播
    * MessageModel.CLUSTERING:集群
* consumeFromWhere: 消費者開始消費的位置,默認值是 ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
	* ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:從隊列最后的位置開始消費
    * ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET:從隊列前面最開始消費
    * ConsumeFromWhere.CONSUME_FROM_TIMESTAMP: 從指定時間開始消費,之前的消息將會被忽略    
* consumeTimestamp:
* allocateMessageQueueStrategy:消息分配策略
* subscription:訂閱關系
* offsetStore:存儲消息偏移量
* consumeThreadMin:線程池最小值,默認值是20
* consumeThreadMax:線程池最大值,默認值是20
* consumeConcurrentlyMaxSpan:單個隊列並行消費最大的跨度,默認2000
* pullThresholdForQueue:一個隊列最大的消費個數,默認1000
* pullInterval:消息拉取的時間間隔
* pullBatchSize:消息拉取的個數,默認32啊
* consumeMessageBatchMaxSize:批量消費量,默認1  
* messageListener:消息監聽器,用來處理消息,它有兩個實現類
    * MessageListenerOrderly:按順序一個個消費    
    * MessageListenerConcurrently:並行消費

二、消費模式

2.1 集群模式

* 同一個 consumerGroup 里,並且訂閱的 tag 也必須是一樣的,這樣的 consumer 實例才能組成 consumer 集群;
* 當 consumer 使用集群消費時,每條消息只會被 consumer 集群內的任意一個 consumer 實例消費一次;
* 默認的消費模式就是集群模式;
* 集群模式天然實現負載均衡機制

2.2 廣播模式

* 同一個 consumerGroup 里的 Consumer 會消費訂閱 Topic 的全部消息
* 通過 consumer.setMessageModel(MessageModel.BROADCASTING) 方法設置    

三、Offset 介紹

3.1 Offset 是什么

* 在 RocketMQ 中,相同類型的消息會放到一個 Topic 里,為了可以並行操作,一個 Topic 會有多個 MessageQueue。  
* Offset 是指某個 Topic 下的一條消息在某個 MessageQueue 里的位置;
* 通過 Offset 的值可以定位到這條消息

3.2 Offset 類結構

從類結構可以看出 Offset 分為本地文件類型和遠程文件類型。

3.2 消費模式采用的 Offset 類型

* 集群模式下因為每個 Consumer 消費所訂閱主題的一部分,所以采用遠程文件存儲 Offset;
* 廣播模式下,由於每個 Consumer 需要消費所有的消息,所以采用本地文件存儲 Offset。

3.3 Offset 文件存儲格式

OffseStore 使用 Json 格式存儲,例如:

{
    "OffsetTable":{
        1:{
            "brokeName":"localhost",
            "QueueId":1,
            "Topic":"broker1"
        },
       2:{
            "brokeName":"localhost",
            "QueueId":2,
            "Topic":"broker2"
        }
    }
}

四、不同類型的消費者

根據對讀取操作的控制情況,可以消費者分為兩種類型。一個是 DefaultMQPushConsumer,由系統控制讀取操作,收到消息后自動調用傳入的處理方法來處理;另一個是 DefaultMQPullConsumer ,讀取操作中的大部分功能由使用者自主控制。

4.1 DefaultMQPushConsumer

DefaultMQPushConsumer 只需要設置好各種參數和設置傳入處理消息的回調函數即可,系統收到消息后會自動調用處理函數來處理消息,而且加入新的 DefaultMQPushConsumer 后會自動做負載均衡。

4.1.1 實例

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 創建消費者對象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupName");
        // 設置服務器地址
        consumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 訂閱指定主題
        consumer.subscribe("topicTest","*");
        // 注冊消息監聽事件
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("msg:"+msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者
        consumer.start();
    }

}

4.2 DefaultMQPullConsumer

4.2.1 消費步驟

1) 讀取 topic 的消息隊列 message queue 的信息;
2) 按隊列去拉取一定數目的消息;
3) 持久化message queue的消費進度 offset;
4) 根據不同的消息狀態做不同的處理

4.2.2 拉取結果狀態

public enum PullStatus {
	// 拉取成功
    FOUND,
	// 沒有消息可以拉取
    NO_NEW_MSG,
	// 過濾結果不匹配
    NO_MATCHED_MSG,
	// 偏移量非法,太大或太小
    OFFSET_ILLEGAL
}

4.2.3 實例

public class PullConsumer {
	// 本地 offset 存儲容器,生產環境可以放到數據庫或 Redis 中
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("DefaultMQPullConsumer");
        // 設置服務器地址
        pullConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 啟動消費者
        pullConsumer.start();
        // 從指定 topic 獲取所有的隊列
        Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues("topicTest");
        // 遍歷隊列,拉取消息
        for (MessageQueue mq : messageQueues) {
            System.out.printf("從隊列中消費: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    // 獲取 offset
                    Long offset = getMessageQueueOffset(mq);
                    // 拉取32個消息
                    PullResult pullResult =
                            pullConsumer.pullBlockIfNotFound(mq, null, offset, 32);
                    System.out.printf("%s%n", pullResult);
                    // 保存 offset
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        pullConsumer.shutdown();
    }

    // 保存上次消費的消息下標
    private static void putMessageQueueOffset(MessageQueue mq,
                                              long nextBeginOffset) {
        OFFSE_TABLE.put(mq, nextBeginOffset);
    }

    // 獲取上次消費的消息的下標
    private static Long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null) {
            return offset;
        }
        return 0l;
    }
}

4.3 DefaultLitePullConsumer

**DefaultMQPullConsumer ** 已經被標識為廢棄,替代的是 DefaultLitePullConsumer,下面我們就直接使用 DefaultLitePullConsumer 來操作。

public class LitePullConsumer {

    public static volatile boolean running = true;

    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumer");
        // 設置服務器地址
        litePullConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 關閉自動提交偏移量
        litePullConsumer.setAutoCommit(false);
        // 啟動消費者
        litePullConsumer.start();
        // 獲取隊列
        Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("topicTest");
        List<MessageQueue> list = new ArrayList<>(mqSet);
        List<MessageQueue> assignList = new ArrayList<>();
        for (int i = 0; i < list.size() / 2; i++) {
            assignList.add(list.get(i));
        }
        litePullConsumer.assign(assignList);
        litePullConsumer.seek(assignList.get(0), 10);
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s %n", messageExts);
                litePullConsumer.commitSync();
            }
        } finally {
            litePullConsumer.shutdown();
        }

    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM