RocketMQ 消費者核心配置和核心知識


一、RocketMQ4.X 消費者核心配置

  • consumeFromWhere 配置(某些情況失效:參考https://blog.csdn.net/a417930422/article/details/83585397這個配置基本不用改,采用默認配置即可。
    • CONSUME_FROM_FIRST_OFFSET: 初次從消息隊列頭部開始消費,即歷史消息(還儲存在 broker 的)全部消費一遍,后續再啟動接着上次消費的進度開始消費。
    • CONSUME_FROM_LAST_OFFSET: 默認策略,初次從該隊列最尾開始消費,即跳過歷史消息,后續再啟動接着上次消費的進度開始消費。
    • CONSUME_FROM_TIMESTAMP:從某個時間點開始消費,默認是半個小時以前,后續再啟動接着上次消費的進度開始消費。
  • allocateMessageQueueStrategy:負載均衡策略算法,即消費者分配到 queue 的算法
    • 默認值是 AllocateMessageQueueAveragely 即取模平均分配
  • offsetStore:消息消費進度存儲器 offsetStore 有兩個策略:LocalFileOffsetStore 和 RemoteBrokerOffsetStore 
    • 廣播模式默認使用 LocalFileOffsetStore, 集群模式默認使用 RemoteBrokerOffsetStore
  • consumeThreadMax:最大消費線程池數量
  • consumeThreadMin:最小消費線程池數量
  • pullBatchSize:消費者去 broker 拉取消息時,一次拉取多少條。可選配置
  • consumeMessageBatchMaxSize:單次消費時一次性消費多少條消息,批量消費接口才有用,可選配置
  • messageModel:消費者消費模式
    • CLUSTERING:集群模式(默認配置)
    • BROADCASTING:廣播模式

 二、集群和廣播模式下 RocketMQ 消費端處理

Topic 下隊列的奇偶數會影響 Customer 個數里面的消費數量

  • 如果是4個隊列,8個消息,4個節點則會各消費2條,如果不對等,則負載均衡會分配不均。
  • 如果 consumer 實例的數量比 message queue 的總數量還多的話,多出來的 consumer 實例將無法分到 queue,也就無法消費到消息,也就無法起到分攤負載的作用,所以需要控制讓 queue 的總數量大於等於 consumer 的數量。

集群模式(默認):

  • Consumer 實例平均分攤消費生產者發送的消息
  • 例子:訂單消息,一般是只被消費一次(被標記為同一個 ConsumerGroup 組的消費者不會對消息重復消費)

廣播模式:

  • 廣播模式下消費消息:投遞到 Broker 的消息會被每個 Consumer 進行消費,一條消息被多個 Consumer 消費,廣播消費中 ConsumerGroup 暫時無用。
  • 例子:群公告,每個人都需要消費這個消息

怎么切換模式:通過 setMessageModel()

三、RocketMQ 里面的 Tag 作用和消息過濾原理

一個 Message 只有一個 Tag,Tag 是二級分類。過濾分為 Broker 端和 Consumer 端過濾。

  • Broker 端過濾,減少了無用的消息的進行網絡傳輸,增加了 broker 的負擔
  • Consumer 端過濾,完全可以根據業務需求進行過濾,但是增加了很多無用的消息傳輸

一般是監聽 * ,或者指定 tag,|| 運算,SLQ92,FilterServer 等;

  • Tag 性能高,邏輯簡單
  • SQL92 性能差點,支持復雜邏輯(只支持 PushConsumer 中使用) MessageSelector.bySql
    • 語法:> ,<,=,IS NULL,AND,OR,NOT 等,sql where 后續的語法即可(大部分)

 生產者

@RequestMapping("/api/v1/pay_cb")
public Object callback( String tag, String amount) throws Exception {

    Message message = new Message(JmsConfig.TOPIC,tag, "",tag.getBytes());
    // 設置屬性,用於sql過濾
    message.putUserProperty("amount",amount);
    
    SendResult sendResult =  payProducer.getProducer().send(message);
    System.out.printf("發送結果=%s, sendResult=%s \n", sendResult.getSendStatus(), sendResult.toString());
    return new HashMap<>();
}

消費者

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

@Component
public class PayConsumer {


    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_consumer_group";

    public  PayConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //默認是集群方式,可以更改為廣播,但是廣播方式不支持重試
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //多標簽訂閱
        //consumer.subscribe(JmsConfig.TOPIC, "order_pay || order_finish || order_create"); //根據sql語法進行過濾消息
        consumer.subscribe(JmsConfig.TOPIC, MessageSelector.bySql(" amount > 5 "));

        consumer.registerMessageListener( new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);

                try {
                System.out.printf("%s 2 Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            } catch (Exception e) {
                System.out.println("消費異常");
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            }
        });

        consumer.start();
        System.out.println("consumer start ...");
    }

}

注意:消費者訂閱關系要一致,不然會消費混亂,甚至消息丟失。訂閱關系一致:訂閱關系由 Topic 和 Tag 組成,同一個 group name,訂閱的 Topic 和 Tag 必須是一樣的。

在 Broker 端進行 MessageTag 過濾原理:遍歷 message queue 存儲的 message tag 和 訂閱傳遞的 tag 的 hashcode 是否一樣,不一樣則跳過,符合的則傳輸給 Consumer,在 consumer queue 存儲的是對應的 hashcode,對比也是通過 hashcode 對比;Consumer 收到過濾消息后也會進行匹配操作,但是是對比真實的 message tag 而不是 hashcode。

  • consume queue 存儲使用 hashcode 定長,節約空間
  • 過濾中不訪問 commit log,可以高效過濾
  • 如果存在 hash 沖突,Consumer 端可以進行再次確認

建議:單一職責,多個隊列;如果想使用多個 Tag,可以使用 sql 表達式,但是不建議。

常見錯誤:

The broker does not support consumer to filter message by SQL92

解決:broker.conf 里面配置如下
enablePropertyFilter=true

備注,修改之后要重啟 Broker
master 節點配置:vim conf/2m-2s-async/broker-a.properties
slave 節點配置:vim conf/2m-2s-async/broker-a-s.properties

 

四、PushConsumer/PullConsumer 消費消息模式

Push 和 Pull 優缺點分析

  • Push:實時性高;但增加服務端負載,消費端能力不同,如果 Push 推送過快,消費端會出現很多問題
  • Pull:消費者從 Server 端拉取消息,主動權在消費者端,可控性好;但間隔時間不好設置,間隔太短,則空請求,浪費資源;間隔時間太長,則消息不能及時處理
  • 長輪詢: Client 請求 Server 端也就是 Broker 的時候, Broker 會保持當前連接一段時間,默認是15s,如果這段時間內有消息到達,則立刻返回給 Consumer;沒消息的話,超過15s,則返回空,再進行重新請求;主動權在 Consumer 中,Broker 即使有大量的消息也不會主動推送給 Consumer。 缺點:服務端需要保持 Consumer 的請求,會占用資源,需要客戶端連接數可控,否則會存在一堆連接

PushConsumer 本質是長輪訓

  • 系統收到消息后自動處理消息和 offset,如果有新的 Consumer 加入會自動做負載均衡,
  • 在 broker 端可以通過 longPollingEnable=true 來開啟長輪詢
  • 雖然是 push,但是代碼里面大量使用了pull,是因為使用長輪訓方式達到 push 效果,既有 pull 有的,又有 push 的實時性
  • 優雅關閉:主要是釋放資源和保存 Offset, 調用 shutdown() 即可 ,參考 @PostConstruct、@PreDestroy

PullConsumer 需要自己維護 Offset(參考官方例子)

  • 官方源碼包例子路徑:org.apache.rocketmq.example.simple.PullConsumer
  • 獲取 MessageQueue 遍歷
  • 客戶維護 Offset,需用用戶本地存儲 Offset,存儲內存、磁盤、數據庫等
  • 處理不同狀態的消息 FOUND、NO_NEW_MSG、OFFSET_ILLRGL、NO_MATCHED_MSG、4種狀態
  • 靈活性高可控性強,但是編碼復雜度會高
  • 優雅關閉:主要是釋放資源和保存 Offset,需用程序自己保存好 Offset,特別是異常處理的時候

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM