rocketMQ(七) rocketMQ 消息端配置


一.消息隊列RocketMQ4.X消費者核心配置講解

  1. consumeFromWhere配置(某些情況失效:參考 https://blog.csdn.net/a417930422/article/details/83585397)

    1. CONSUME_FROM_FIRST_OFFSET: 初次從消息隊列頭部開始消費,即歷史消息(還儲存在broker的)全部消費一遍,后續再啟動接着上次消費的進度開始消費1
    2. CONSUME_FROM_LAST_OFFSET: 默認策略,初次從該隊列最尾開始消費,即跳過歷史消息,后續再啟動接着上次消費的進度開始消費
    3. CONSUME_FROM_TIMESTAMP : 從某個時間點開始消費,默認是半個小時以前,后續再啟動接着上次消費的進度開始消費
  2. allocateMessageQueueStrategy:

    1. 負載均衡策略算法,即消費者分配到queue的算法,默認值是AllocateMessageQueueAveragely即取模平均分配
  3. offsetStore:消息消費進度存儲器
    offsetStore 有兩個策略:
    廣播模式默認使用LocalFileOffsetStore
    集群模式默認使用RemoteBrokerOffsetStore

  4. consumeThreadMin 最小消費線程池數量

  5. consumeThreadMax 最大消費線程池數量

  6. pullBatchSize: 消費者去broker拉取消息時,一次拉取多少條。可選配置

  7. consumeMessageBatchMaxSize: 單次消費時一次性消費多少條消息,批量消費接口才有用,可選配置

  8. messageModel : 消費者消費模式, CLUSTERING——默認是集群模式CLUSTERING BROADCASTING——廣播模式

二.講解集群模式下消費端消費消息流程

  1. Topic下隊列的奇偶數會影響Customer個數里面的消費數量
    如果是4個隊列,8個消息,4個節點則會各消費2條,如果不對等,則負載均衡會分配不均,
    如果consumer實例的數量比message queue的總數量還多的話,多出來的consumer實例將無法分到queue,也就無法消費到消息,也就無法起到分攤負載的作用,所以需要控制讓queue的總數量大於等於consumer的數量

  2. 集群模式(默認):
    Consumer實例平均分攤消費生產者發送的消息
    例子:訂單消息,一般是只被消費一次

  3. 廣播模式:
    廣播模式下消費消息:投遞到Broker的消息會被每個Consumer進行消費,一條消息被多個Consumer消費,廣播消費中ConsumerGroup暫時無用
    例子:群公告,每個人都需要消費這個消息

  4. 怎么切換模式:通過setMessageModel()

三.RocketMQ4.X里面的標簽Tag實戰和消息過濾原理

  1. 一個Message只有一個Tag,tag是二級分類
    訂單:數碼類訂單、食品類訂單

  2. 過濾分為Broker端和Consumer端過濾
    Broker端過濾,減少了無用的消息的進行網絡傳輸,增加了broker的負擔
    Consumer端過濾,完全可以根據業務需求進行實習,但是增加了很多無用的消息傳輸

  3. 一般是監聽 * ,或者指定 tag,|| 運算 , SQL92 , FilterServer等;
    tag性能高,邏輯簡單
    SQL92 性能差點,支持復雜邏輯(只支持PushConsumer中使用) MessageSelector.bySql
    語法:> , < = ,IS NULL, AND, OR, NOT 等,sql where后續的語法即可(大部分)

  4. 注意:消費者訂閱關系要一致,不然會消費混亂,甚至消息丟失
    訂閱關系一致:訂閱關系由 Topic和 Tag 組成,同一個 group name,訂閱的 topic和tag 必須是一樣的

  5. 在Broker 端進行MessageTag過濾,遍歷message queue存儲的 message tag和 訂閱傳遞的tag 的hashcode不一樣則跳過,符合的則傳輸給Consumer,在consumer queue存儲的是對應的hashcode, 對比也是通過hashcode對比; Consumer收到過濾消息后也會進行匹配操作,但是是對比真實的message tag而不是hashcode
    consume queue存儲使用hashcode定長,節約空間
    過濾中不訪問commit log,可以高效過濾
    如果存在hash沖突,Consumer端可以進行再次確認

6.如果想使用多個Tag,可以使用sql表達式,但是不建議,單一職責,多個隊列

常見錯誤

The broker does not support consumer to filter message by SQL92

解決:broker.conf 里面配置如下
enablePropertyFilter=true

備注,修改之后要重啟Broker
master節點配置:vim conf/2m-2s-async/broker-a.properties
slave節點配置:vim conf/2m-2s-async/broker-a-s.properties

四.PushConsumer、PullConsumer消費模式分析

1.Push和Pull優缺點分析

  1. Push
    1. 實時性高;但增加服務端負載,消費端能力不同,如果Push推送過快,消費端會出現很多問題
  2. Pull
    1. 消費者從Server端拉取消息,主動權在消費者端,可控性好;但 間隔時間不好設置,間隔太短,則空請求,浪費資源;間隔時間太長,則消息不能及時處理
  3. 長輪詢
    1. Client請求Server端也就是Broker的時候, Broker會保持當前連接一段時間 默認是15s,如果這段時間內有消息到達,則立刻返回給Consumer.沒消息的話 超過15s,則返回空,再進行重新請求;主動權在Consumer中,Broker即使有大量的消息 也不會主動提送Consumer, 缺點:服務端需要保持Consumer的請求,會占用資源,需要客戶端連接數可控 否則會一堆連接

2.PushConsumer本質是長輪訓

  1. 系統收到消息后自動處理消息和offset,如果有新的Consumer加入會自動做負載均衡,
  2. 在broker端可以通過longPollingEnable=true來開啟長輪詢
  3. 消費端代碼:DefaultMQPushConsumerImpl->pullMessage->PullCallback
  4. 服務端代碼:broker.longpolling
  5. 雖然是push,但是代碼里面大量使用了pull,是因為使用長輪訓方式達到Push效果,既有pull有的,又有Push的實時性
  6. 優雅關閉:主要是釋放資源和保存Offset, 調用shutdown()即可 ,參考 @PostConstruct、@PreDestroy

3.PullConsumer需要自己維護Offset(參考官方例子)

  1. 官方例子路徑:org.apache.rocketmq.example.simple.PullConsumer
  2. 獲取MessageQueue遍歷
  3. 客戶維護Offset,需用用戶本地存儲Offset,存儲內存、磁盤、數據庫等
  4. 處理不同狀態的消息 FOUND、NO_NEW_MSG、OFFSET_ILLRGL、NO_MATCHED_MSG、4種狀態
  5. 靈活性高可控性強,但是編碼復雜度會高
  6. 優雅關閉:主要是釋放資源和保存Offset,需用程序自己保存好Offset,特別是異常處理的時候


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM