一、RocketMQ4.X 消費者核心配置
- consumeFromWhere 配置(某些情況失效:參考https://blog.csdn.net/a417930422/article/details/83585397)這個配置基本不用改,采用默認配置即可。
- CONSUME_FROM_FIRST_OFFSET: 初次從消息隊列頭部開始消費,即歷史消息(還儲存在 broker 的)全部消費一遍,后續再啟動接着上次消費的進度開始消費。
- CONSUME_FROM_LAST_OFFSET: 默認策略,初次從該隊列最尾開始消費,即跳過歷史消息,后續再啟動接着上次消費的進度開始消費。
- CONSUME_FROM_TIMESTAMP:從某個時間點開始消費,默認是半個小時以前,后續再啟動接着上次消費的進度開始消費。
- allocateMessageQueueStrategy:負載均衡策略算法,即消費者分配到 queue 的算法
- 默認值是 AllocateMessageQueueAveragely 即取模平均分配
- offsetStore:消息消費進度存儲器 offsetStore 有兩個策略:LocalFileOffsetStore 和 RemoteBrokerOffsetStore
- 廣播模式默認使用 LocalFileOffsetStore, 集群模式默認使用 RemoteBrokerOffsetStore
- consumeThreadMax:最大消費線程池數量
- consumeThreadMin:最小消費線程池數量
- pullBatchSize:消費者去 broker 拉取消息時,一次拉取多少條。可選配置
- consumeMessageBatchMaxSize:單次消費時一次性消費多少條消息,批量消費接口才有用,可選配置
- messageModel:消費者消費模式
- CLUSTERING:集群模式(默認配置)
- BROADCASTING:廣播模式
二、集群和廣播模式下 RocketMQ 消費端處理
Topic 下隊列的奇偶數會影響 Customer 個數里面的消費數量
- 如果是4個隊列,8個消息,4個節點則會各消費2條,如果不對等,則負載均衡會分配不均。
- 如果 consumer 實例的數量比 message queue 的總數量還多的話,多出來的 consumer 實例將無法分到 queue,也就無法消費到消息,也就無法起到分攤負載的作用,所以需要控制讓 queue 的總數量大於等於 consumer 的數量。
集群模式(默認):
- Consumer 實例平均分攤消費生產者發送的消息
- 例子:訂單消息,一般是只被消費一次(被標記為同一個 ConsumerGroup 組的消費者不會對消息重復消費)
廣播模式:
- 廣播模式下消費消息:投遞到 Broker 的消息會被每個 Consumer 進行消費,一條消息被多個 Consumer 消費,廣播消費中 ConsumerGroup 暫時無用。
- 例子:群公告,每個人都需要消費這個消息
怎么切換模式:通過 setMessageModel()
三、RocketMQ 里面的 Tag 作用和消息過濾原理
一個 Message 只有一個 Tag,Tag 是二級分類。過濾分為 Broker 端和 Consumer 端過濾。
- Broker 端過濾,減少了無用的消息的進行網絡傳輸,增加了 broker 的負擔
- Consumer 端過濾,完全可以根據業務需求進行過濾,但是增加了很多無用的消息傳輸
一般是監聽 * ,或者指定 tag,|| 運算,SLQ92,FilterServer 等;
- Tag 性能高,邏輯簡單
- SQL92 性能差點,支持復雜邏輯(只支持 PushConsumer 中使用) MessageSelector.bySql
- 語法:> ,<,=,IS NULL,AND,OR,NOT 等,sql where 后續的語法即可(大部分)
生產者
@RequestMapping("/api/v1/pay_cb") public Object callback( String tag, String amount) throws Exception { Message message = new Message(JmsConfig.TOPIC,tag, "",tag.getBytes()); // 設置屬性,用於sql過濾 message.putUserProperty("amount",amount); SendResult sendResult = payProducer.getProducer().send(message); System.out.printf("發送結果=%s, sendResult=%s \n", sendResult.getSendStatus(), sendResult.toString()); return new HashMap<>(); }
消費者
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; @Component public class PayConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_consumer_group"; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //默認是集群方式,可以更改為廣播,但是廣播方式不支持重試 consumer.setMessageModel(MessageModel.CLUSTERING); //多標簽訂閱 //consumer.subscribe(JmsConfig.TOPIC, "order_pay || order_finish || order_create"); //根據sql語法進行過濾消息 consumer.subscribe(JmsConfig.TOPIC, MessageSelector.bySql(" amount > 5 ")); consumer.registerMessageListener( new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); try { System.out.printf("%s 2 Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { System.out.println("消費異常"); e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start ..."); } }
注意:消費者訂閱關系要一致,不然會消費混亂,甚至消息丟失。訂閱關系一致:訂閱關系由 Topic 和 Tag 組成,同一個 group name,訂閱的 Topic 和 Tag 必須是一樣的。
在 Broker 端進行 MessageTag 過濾原理:遍歷 message queue 存儲的 message tag 和 訂閱傳遞的 tag 的 hashcode 是否一樣,不一樣則跳過,符合的則傳輸給 Consumer,在 consumer queue 存儲的是對應的 hashcode,對比也是通過 hashcode 對比;Consumer 收到過濾消息后也會進行匹配操作,但是是對比真實的 message tag 而不是 hashcode。
- consume queue 存儲使用 hashcode 定長,節約空間
- 過濾中不訪問 commit log,可以高效過濾
- 如果存在 hash 沖突,Consumer 端可以進行再次確認
建議:單一職責,多個隊列;如果想使用多個 Tag,可以使用 sql 表達式,但是不建議。
常見錯誤:
The broker does not support consumer to filter message by SQL92 解決:broker.conf 里面配置如下 enablePropertyFilter=true 備注,修改之后要重啟 Broker master 節點配置:vim conf/2m-2s-async/broker-a.properties slave 節點配置:vim conf/2m-2s-async/broker-a-s.properties
四、PushConsumer/PullConsumer 消費消息模式
Push 和 Pull 優缺點分析
- Push:實時性高;但增加服務端負載,消費端能力不同,如果 Push 推送過快,消費端會出現很多問題
- Pull:消費者從 Server 端拉取消息,主動權在消費者端,可控性好;但間隔時間不好設置,間隔太短,則空請求,浪費資源;間隔時間太長,則消息不能及時處理
- 長輪詢: Client 請求 Server 端也就是 Broker 的時候, Broker 會保持當前連接一段時間,默認是15s,如果這段時間內有消息到達,則立刻返回給 Consumer;沒消息的話,超過15s,則返回空,再進行重新請求;主動權在 Consumer 中,Broker 即使有大量的消息也不會主動推送給 Consumer。 缺點:服務端需要保持 Consumer 的請求,會占用資源,需要客戶端連接數可控,否則會存在一堆連接
PushConsumer 本質是長輪訓
- 系統收到消息后自動處理消息和 offset,如果有新的 Consumer 加入會自動做負載均衡,
- 在 broker 端可以通過 longPollingEnable=true 來開啟長輪詢
- 雖然是 push,但是代碼里面大量使用了pull,是因為使用長輪訓方式達到 push 效果,既有 pull 有的,又有 push 的實時性
- 優雅關閉:主要是釋放資源和保存 Offset, 調用 shutdown() 即可 ,參考 @PostConstruct、@PreDestroy
PullConsumer 需要自己維護 Offset(參考官方例子)
- 官方源碼包例子路徑:org.apache.rocketmq.example.simple.PullConsumer
- 獲取 MessageQueue 遍歷
- 客戶維護 Offset,需用用戶本地存儲 Offset,存儲內存、磁盤、數據庫等
- 處理不同狀態的消息 FOUND、NO_NEW_MSG、OFFSET_ILLRGL、NO_MATCHED_MSG、4種狀態
- 靈活性高可控性強,但是編碼復雜度會高
- 優雅關閉:主要是釋放資源和保存 Offset,需用程序自己保存好 Offset,特別是異常處理的時候