一.消息隊列RocketMQ4.X消費者核心配置講解
-
consumeFromWhere配置(某些情況失效:參考 https://blog.csdn.net/a417930422/article/details/83585397)
- CONSUME_FROM_FIRST_OFFSET: 初次從消息隊列頭部開始消費,即歷史消息(還儲存在broker的)全部消費一遍,后續再啟動接着上次消費的進度開始消費1
- CONSUME_FROM_LAST_OFFSET: 默認策略,初次從該隊列最尾開始消費,即跳過歷史消息,后續再啟動接着上次消費的進度開始消費
- CONSUME_FROM_TIMESTAMP : 從某個時間點開始消費,默認是半個小時以前,后續再啟動接着上次消費的進度開始消費
-
allocateMessageQueueStrategy:
- 負載均衡策略算法,即消費者分配到queue的算法,默認值是AllocateMessageQueueAveragely即取模平均分配
-
offsetStore:消息消費進度存儲器
offsetStore 有兩個策略:
廣播模式默認使用LocalFileOffsetStore
集群模式默認使用RemoteBrokerOffsetStore -
consumeThreadMin 最小消費線程池數量
-
consumeThreadMax 最大消費線程池數量
-
pullBatchSize: 消費者去broker拉取消息時,一次拉取多少條。可選配置
-
consumeMessageBatchMaxSize: 單次消費時一次性消費多少條消息,批量消費接口才有用,可選配置
-
messageModel : 消費者消費模式, CLUSTERING——默認是集群模式CLUSTERING BROADCASTING——廣播模式
二.講解集群模式下消費端消費消息流程
-
Topic下隊列的奇偶數會影響Customer個數里面的消費數量
如果是4個隊列,8個消息,4個節點則會各消費2條,如果不對等,則負載均衡會分配不均,
如果consumer實例的數量比message queue的總數量還多的話,多出來的consumer實例將無法分到queue,也就無法消費到消息,也就無法起到分攤負載的作用,所以需要控制讓queue的總數量大於等於consumer的數量 -
集群模式(默認):
Consumer實例平均分攤消費生產者發送的消息
例子:訂單消息,一般是只被消費一次 -
廣播模式:
廣播模式下消費消息:投遞到Broker的消息會被每個Consumer進行消費,一條消息被多個Consumer消費,廣播消費中ConsumerGroup暫時無用
例子:群公告,每個人都需要消費這個消息 -
怎么切換模式:通過setMessageModel()
三.RocketMQ4.X里面的標簽Tag實戰和消息過濾原理
-
一個Message只有一個Tag,tag是二級分類
訂單:數碼類訂單、食品類訂單 -
過濾分為Broker端和Consumer端過濾
Broker端過濾,減少了無用的消息的進行網絡傳輸,增加了broker的負擔
Consumer端過濾,完全可以根據業務需求進行實習,但是增加了很多無用的消息傳輸 -
一般是監聽 * ,或者指定 tag,|| 運算 , SQL92 , FilterServer等;
tag性能高,邏輯簡單
SQL92 性能差點,支持復雜邏輯(只支持PushConsumer中使用) MessageSelector.bySql
語法:> , < = ,IS NULL, AND, OR, NOT 等,sql where后續的語法即可(大部分) -
注意:消費者訂閱關系要一致,不然會消費混亂,甚至消息丟失
訂閱關系一致:訂閱關系由 Topic和 Tag 組成,同一個 group name,訂閱的 topic和tag 必須是一樣的 -
在Broker 端進行MessageTag過濾,遍歷message queue存儲的 message tag和 訂閱傳遞的tag 的hashcode不一樣則跳過,符合的則傳輸給Consumer,在consumer queue存儲的是對應的hashcode, 對比也是通過hashcode對比; Consumer收到過濾消息后也會進行匹配操作,但是是對比真實的message tag而不是hashcode
consume queue存儲使用hashcode定長,節約空間
過濾中不訪問commit log,可以高效過濾
如果存在hash沖突,Consumer端可以進行再次確認
6.如果想使用多個Tag,可以使用sql表達式,但是不建議,單一職責,多個隊列
常見錯誤
The broker does not support consumer to filter message by SQL92
解決:broker.conf 里面配置如下
enablePropertyFilter=true
備注,修改之后要重啟Broker
master節點配置:vim conf/2m-2s-async/broker-a.properties
slave節點配置:vim conf/2m-2s-async/broker-a-s.properties
四.PushConsumer、PullConsumer消費模式分析
1.Push和Pull優缺點分析
- Push
- 實時性高;但增加服務端負載,消費端能力不同,如果Push推送過快,消費端會出現很多問題
- Pull
- 消費者從Server端拉取消息,主動權在消費者端,可控性好;但 間隔時間不好設置,間隔太短,則空請求,浪費資源;間隔時間太長,則消息不能及時處理
- 長輪詢
- Client請求Server端也就是Broker的時候, Broker會保持當前連接一段時間 默認是15s,如果這段時間內有消息到達,則立刻返回給Consumer.沒消息的話 超過15s,則返回空,再進行重新請求;主動權在Consumer中,Broker即使有大量的消息 也不會主動提送Consumer, 缺點:服務端需要保持Consumer的請求,會占用資源,需要客戶端連接數可控 否則會一堆連接
2.PushConsumer本質是長輪訓
- 系統收到消息后自動處理消息和offset,如果有新的Consumer加入會自動做負載均衡,
- 在broker端可以通過longPollingEnable=true來開啟長輪詢
- 消費端代碼:DefaultMQPushConsumerImpl->pullMessage->PullCallback
- 服務端代碼:broker.longpolling
- 雖然是push,但是代碼里面大量使用了pull,是因為使用長輪訓方式達到Push效果,既有pull有的,又有Push的實時性
- 優雅關閉:主要是釋放資源和保存Offset, 調用shutdown()即可 ,參考 @PostConstruct、@PreDestroy
3.PullConsumer需要自己維護Offset(參考官方例子)
- 官方例子路徑:org.apache.rocketmq.example.simple.PullConsumer
- 獲取MessageQueue遍歷
- 客戶維護Offset,需用用戶本地存儲Offset,存儲內存、磁盤、數據庫等
- 處理不同狀態的消息 FOUND、NO_NEW_MSG、OFFSET_ILLRGL、NO_MATCHED_MSG、4種狀態
- 靈活性高可控性強,但是編碼復雜度會高
- 優雅關閉:主要是釋放資源和保存Offset,需用程序自己保存好Offset,特別是異常處理的時候