RocketMQ消息支持的模式:
消息支持的模式分為三種:NormalProducer(普通同步),消息異步發送,OneWay。
消息同步發送:
普通消息的發送和接收在前面已經演示過了,在前面的案例中是基於同步消息發送模式。也就是說消息發送出去后,producer會等到broker回應后才能繼續發送下一個消息.
消息異步發送:
異步發送是指發送方發出數據后,不等接收方發回響應,接着發送下個數據包的通訊方式。 MQ 的異步發送,需要用戶實現異步發送回調接口(SendCallback)。消息發送方在發送了一條消息后,不需要等待服務器響應即可返回,進行第二條消息發送。發送方通過回調接口接收服務器響應,並對響應結果進行處理。
producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%s%n",sendResult); } @Override public void onException(Throwable throwable) { throwable.printStackTrace(); } });
OneWay:
單向(Oneway)發送特點為發送方只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答.效率最高。
producer.sendOneway(msg);
MessageListenerOrderly(順序消費):
在學習kafka的時候我們知道了,消息可以通過自定義分區策略來實現消息的順序發送,實現原理就是把同一類消息都發送到相同的分區上。在RocketMQ中,是基於多個Message Queue來實現類似於kafka的分區效果。如果一個Topic 要發送和接收的數據量非常大, 需要能支持增加並行處理的機器來提高處理速度,這時候一個Topic 可以根據需求設置一個或多個Message Queue。Topic 有了多個Message Queue 后,消息可以並行地向各個Message Queue 發送,消費者也可以並行地從多個Message Queue 讀取消息並消費。要了解RocketMQ消息的順序消費,我們先對RocketMQ的整體架構進行了解。
RocketMQ消息發送及消費的基本原理:
這是一個比較宏觀的部署架構圖,rocketmq天然支持高可用,它可以支持多主多從的部署架構,這也是和kafka最大的區別之一。原因是RocketMQ中並沒有master選舉功能,所以通過配置多個master節點來保證rocketMQ的高可用。和所有的集群角色定位一樣,master節點負責接受事務請求、slave節點只負責接收讀請求,並且接收master同步過來的數據和slave保持一直。當master掛了以后,如果當前rocketmq是一主多從,就意味着無法接受發送端的消息,但是消費者仍然能夠繼續消費。所以配置多個主節點后,可以保證當其中一個master節點掛了,另外一個master節點仍然能夠對外提供消息發送服務。
當存在多個主節點時,一條消息只會發送到其中一個主節點,rocketmq對於多個master節點的消息發送,會做負載均衡,使得消息可以平衡的發送到多個master節點上。一個消費者可以同時消費多個master節點上的消息,在下面這個架構圖中,兩個master節點恰好可以平均分發到兩個消費者上,如果此時只有一個消費者,那么這個消費者會消費兩個master節點的數據。由於每個master可以配置多個slave,所以如果其中一個master掛了,消息仍然可以被消費者從slave節點消費到。可以完美的實現rocketmq消息的高可用。
站在topic的角度來看看消息是如何分發和處理的,假設有兩個master節點的集群,創建了一個TestTopic,並且對這個topic創建了兩個隊列(可以通過producer進行設置producer.setDefaultTopicQueueNums(2),默認是4),也就是分區。消費者定義了兩個分組,分組的概念也是和kafka一樣,通過分組可以實現消息的廣播。
自定義消息發送規則:
通過自定義發送策略來實現消息只發送到同一個隊列因為一個Topic 會有多個Message Queue ,如果使用Producer 的默認配置,這個Producer 會輪流向各個Message Queue 發送消息。Consumer 在消費消息的時候,會根據負載均衡策略,消費被分配到的Message Queue如果不經過特定的設置,某條消息被發往哪個Message Queue ,被哪個Consumer 消費是未知的如果業務需要我們把消息發送到指定的Message Queue 里,比如把同一類型的消息都發往相同的Message Queue。那是不是可以實現順序消息的功能呢?
和kafka一樣,rocketMQ也提供了消息路由的功能,我們可以自定義消息分發策略,可以實現MessageQueueSelector,來實現自己的消息分發策略
SendResult sendResult=producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { int key=o.hashCode(); int size = list.size(); int index = key%size; return list.get(index);// list.get(0); } },"key_"+i);
在消息分發的時候如果消息發送到topic多個MessageQueue,假設設置2個寫隊列以及2個讀隊列,如果讀和寫隊列不一致,會存在消息無法消費到的問題,如果消費隊列為2,啟動一個消費者,那么這個消費者會消費者兩個隊列,如果兩個消費者消費這個隊列,那么意味着消息會均衡分攤到這兩個消費者中,如果消費者數大於readQueueNumbs,那么會有一些消費者消費不到消息,浪費資源。
消息的順序消費:
首先,需要保證順序的消息要發送到同一個messagequeue中;其次,一個messagequeue只能被一個消費者消費,這點是由消息隊列的分配機制來保證的;最后,一個消費者內部對一個mq的消費要保證是有序的。我們要做到生產者 - messagequeue - 消費者之間是一對一對一的關系。
通過分區規則可以實現同類消息在rocketmq上的順序存儲。但是對於消費端來說,如何保證消費的順序?我們前面寫的消息消費代碼使用的是MessageListenerConcurrently並發監聽,也就是基於多個線程並行來消費消息。這個無法保證消息消費的順序。RocketMQ中提供了MessageListenerOrderly 一個類來實現順序消費
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { MessageExt messageExt=list.get(0); if(messageExt.getReconsumeTimes()==3){ //消息重發了三次 //持久化 消息記錄表 return ConsumeOrderlyStatus.SUCCESS; //簽收 } return ConsumeOrderlyStatus.SUCCESS; //簽收 } });
順序消費會帶來一些問題,
- 遇到消息失敗的消息,無法跳過,當前隊列消費暫停
- 降低了消息處理的性能
消費端的負載均衡:
和kafka一樣,消費端也會針對Message Queue做負載均衡,使得每個消費者能夠合理的消費多個分區的消息。
- 消費端會通過RebalanceService線程,10秒鍾做一次基於topic下的所有隊列負載
- 消費端遍歷自己的所有topic,依次調rebalanceByTopic
- 根據topic獲取此topic下的所有queue
- 選擇一台broker獲取基於group的所有消費端(有心跳向所有broker注冊客戶端信息)
- 選擇隊列分配策略實例AllocateMessageQueueStrategy執行分配算法
什么時候觸發負載均衡:
- 消費者啟動之后
- 消費者數量發生變更
- 每10秒會觸發檢查一次rebalance
分配算法,RocketMQ提供了6中分區的分配算法:
- AllocateMessageQueueAveragely :平均分配算法(默認)
- AllocateMessageQueueAveragelyByCircle:環狀分配消息隊列
- AllocateMessageQueueByConfig:按照配置來分配隊列: 根據用戶指定的配置來進行負載
- AllocateMessageQueueByMachineRoom:按照指定機房來配置隊列
- AllocateMachineRoomNearby:按照就近機房來配置隊列:
- AllocateMessageQueueConsistentHash:一致性hash,根據消費者的cid進行
消息的的可靠性原則:
在實際使用RocketMQ的時候我們並不能保證每次發送的消息都剛好能被消費者一次性正常消費成功,可能會存在需要多次消費才能成功或者一直消費失敗的情況,那作為發送者該做如何處理呢?
消息消費端的確認機制:RocketMQ提供了ack機制,以保證消息能夠被正常消費。發送者為了保證消息肯定消費成功,只有使用方明確表示消費成功,RocketMQ才會認為消息消費成功。中途斷電,拋出異常等都不會認為成功
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回消息消費狀態 } });
所有消費者在設置監聽的時候會提供一個回調,業務實現消費回調的時候,當回調方法中返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才會認為這批消息(默認是1條)是消費完成的。如果這時候消息消費失敗,例如數據庫異常,余額不足扣款失敗等一切業務認為消息需要重試的場景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認為這批消息消費失敗了。
消息的衰減重試:
為了保證消息肯定至少被消費一次,RocketMQ會把這批消息重新發回到broker,在延遲的某個時間點(默認是10秒,業務可設置)后,再次投遞到這個ConsumerGroup。而如果一直這樣重復消費都持續失敗到一定次數(默認16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工干預。可以修改broker-a.conf文件messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 。一般情況下我們在實際生產中是不需要重試16次,這樣既浪費時間又浪費性能,理論上當嘗試重復次數達到我們想要的結果時如果還是消費失敗,那么我們需要將對應的消息進行記錄,並且結束重復嘗試。
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeOrderlyContext) -> { for (MessageExt messageExt : list) { if(messageExt.getReconsumeTimes()==3) { //可以將對應的數據保存到數據庫,以便人工干預 System.out.println(messageExt.getMsgId()+","+messageExt.getBody()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
RocketMQ中的延遲消息
開源RocketMQ支持延遲消息,但是不支持秒級精度。默認支持18個level的延遲消息,這是通過broker端的messageDelayLevel配置項確定的,如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Broker在啟動時,內部會創建一個內部主題:SCHEDULE_TOPIC_XXXX,根據延遲level的個數,創建對應數量的隊列,也就是說18個level對應了18個隊列。注意,這並不是說這個內部主題只會有18個隊列,因為Broker通常是集群模式部署的,因此每個節點都有18個隊列。延遲級別的值可以進行修改,以滿足自己的業務需求,可以修改/添加新的level。例如:你想支持2天的延遲,修改最后一個level的值為2d,這個時候依然是18個level;也可以增加一個2d,這個時候總共就有19個level。