RocketMQ延遲消息的代碼實戰及原理分析


 

RocketMQ簡介

RocketMQ是一款開源的分布式消息系統,基於高可用分布式集群技術,提供低延時的、高可靠、萬億級容量、靈活可伸縮的消息發布與訂閱服務。

它前身是MetaQ,是阿里基於Kafka的設計使用Java進行自主研發的。在2012年,阿里將其開源, 在2016年,阿里將其捐獻給Apache軟件基金會(Apache Software Foundation,簡稱為ASF),正式成為孵化項目。2017 年,Apache軟件基金會宣布RocketMQ已孵化成為 Apache頂級項目(Top Level Project,簡稱為TLP ),是國內首個互聯網中間件在 Apache上的頂級項目。

延遲消息

生產者把消息發送到消息隊列中以后,並不期望被立即消費,而是等待指定時間后才可以被消費者消費,這類消息通常被稱為延遲消息。

在RocketMQ中,支持延遲消息,但是不支持任意時間精度的延遲消息,只支持特定級別的延遲消息。如果要支持任意時間精度,不能避免在Broker層面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免產生巨大的性能開銷。

消息延遲級別分別為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個級別。在發送消息時,設置消息延遲級別即可,設置消息延遲級別時有以下3種情況:

  1. 設置消息延遲級別等於0時,則該消息為非延遲消息。
  2. 設置消息延遲級別大於等於1並且小於等於18時,消息延遲特定時間,如:設置消息延遲級別等於1,則延遲1s;設置消息延遲級別等於2,則延遲5s,以此類推。
  3. 設置消息延遲級別大於18時,則該消息延遲級別為18,如:設置消息延遲級別等於20,則延遲2h。

延遲消息示例

首先,寫一個消費者,用於消費延遲消息:

public class Consumer { public static void main(String[] args) throws MQClientException { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // 實例化消費者  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup"); // 設置NameServer的地址  consumer.setNamesrvAddr("localhost:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息  consumer.subscribe("OneMoreTopic", "*"); // 注冊回調實現類來處理從broker拉取回來的消息  consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { System.out.printf("%s %s Receive New Messages:%n" , sdf.format(new Date()) , Thread.currentThread().getName()); for (MessageExt msg : msgs) { System.out.printf("\tMsg Id: %s%n", msg.getMsgId()); System.out.printf("\tBody: %s%n", new String(msg.getBody())); } // 標記該消息已經被成功消費  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 啟動消費者實例  consumer.start(); System.out.println("Consumer Started."); } } 

再寫一個延遲消息的生產者,用於發送延遲消息:

public class DelayProducer { public static void main(String[] args) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // 實例化消息生產者Producer  DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup"); // 設置NameServer的地址  producer.setNamesrvAddr("localhost:9876"); // 啟動Producer實例  producer.start(); Message msg = new Message("OneMoreTopic" , "DelayMessage", "This is a delay message.".getBytes()); //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"  //設置消息延遲級別為3,也就是延遲10s。  msg.setDelayTimeLevel(3); // 發送消息到一個Broker  SendResult sendResult = producer.send(msg); // 通過sendResult返回消息是否成功送達  System.out.printf("%s Send Status: %s, Msg Id: %s %n" , sdf.format(new Date()) , sendResult.getSendStatus() , sendResult.getMsgId()); // 如果不再發送消息,關閉Producer實例。  producer.shutdown(); } } 

運行生產者以后,就會發送一條延遲消息:

10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006D5AB018B4AAC216E0DB690000

10秒鍾后,消費者收到的這條延遲消息:

10:37:25.026 ConsumeMessageThread_1 Receive New Messages:
    Msg Id: C0A8006D5AB018B4AAC216E0DB690000
    Body: This is a delay message.

延遲消息的原理分析

以下分析的RocketMQ源碼的版本號是4.7.1,版本不同源碼略有差別。

CommitLog

在org.apache.rocketmq.store.CommitLog中,針對延遲消息做了一些處理:

// 延遲級別大於0,就是延時消息 if (msg.getDelayTimeLevel() > 0) { // 判斷當前延遲級別,如果大於最大延遲級別,  // 就設置當前延遲級別為最大延遲級別。  if (msg.getDelayTimeLevel() > this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()); } // 獲取延遲消息的主題,  // 其中RMQ_SYS_SCHEDULE_TOPIC的值為SCHEDULE_TOPIC_XXXX  topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 根據延遲級別獲取延遲消息的隊列Id,  // 隊列Id其實就是延遲級別減1  queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // 備份真正的主題和隊列Id  MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 設置延時消息的主題和隊列Id  msg.setTopic(topic); msg.setQueueId(queueId); } 

可以看到,每一個延遲消息的主題都被暫時更改為SCHEDULE_TOPIC_XXXX,並且根據延遲級別延遲消息變更了新的隊列Id。接下來,處理延遲消息的就是org.apache.rocketmq.store.schedule.ScheduleMessageService。

ScheduleMessageService

ScheduleMessageService是由org.apache.rocketmq.store.DefaultMessageStore進行初始化的,初始化包括構造對象和調用load方法。最后,再執行ScheduleMessageService的start方法:

public void start() { // 使用AtomicBoolean確保start方法僅有效執行一次  if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); // 遍歷所有延遲級別  for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { // key為延遲級別  Integer level = entry.getKey(); // value為延遲級別對應的毫秒數  Long timeDelay = entry.getValue(); // 根據延遲級別獲得對應隊列的偏移量  Long offset = this.offsetTable.get(level); // 如果偏移量為null,則設置為0  if (null == offset) { offset = 0L; } if (timeDelay != null) { // 為每個延遲級別創建定時任務,  // 第一次啟動任務延遲為FIRST_DELAY_TIME,也就是1秒  this.timer.schedule( new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 延遲10秒后每隔flushDelayOffsetInterval執行一次任務,  // 其中,flushDelayOffsetInterval默認配置也為10秒  this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { // 持久化每個隊列消費的偏移量  if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore .getMessageStoreConfig().getFlushDelayOffsetInterval()); } } 

遍歷所有延遲級別,根據延遲級別獲得對應隊列的偏移量,如果偏移量不存在,則設置為0。然后為每個延遲級別創建定時任務,第一次啟動任務延遲為1秒,第二次及以后的啟動任務延遲才是延遲級別相應的延遲時間。

然后,又創建了一個定時任務,用於持久化每個隊列消費的偏移量。持久化的頻率由flushDelayOffsetInterval屬性進行配置,默認為10秒。

定時任務

ScheduleMessageService的start方法執行之后,每個延遲級別都創建自己的定時任務,這里的定時任務的具體實現就在DeliverDelayedMessageTimerTask類之中,它核心代碼是executeOnTimeup方法之中,我們來看一下主要部分:

// 根據主題和隊列Id獲取消息隊列 ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue( TopicValidator.RMQ_SYS_SCHEDULE_TOPIC , delayLevel2QueueId(delayLevel)); 

如果沒有獲取到對應的消息隊列,則在DELAY_FOR_A_WHILE(默認為100)毫秒后再執行任務。如果獲取到了,就繼續執行下面操作:

// 根據消費偏移量從消息隊列中獲取所有有效消息 SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); 

如果沒有獲取到有效消息,則在DELAY_FOR_A_WHILE(默認為100)毫秒后再執行任務。如果獲取到了,就繼續執行下面操作:

// 遍歷所有消息 for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 獲取消息的物理偏移量  long offsetPy = bufferCQ.getByteBuffer().getLong(); // 獲取消息的物理長度  int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); // 省略部分代碼...  long now = System.currentTimeMillis(); // 計算消息應該被消費的時間  long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); // 計算下一條消息的偏移量  nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE) long countdown = deliverTimestamp - now; // 省略部分代碼... } 

如果當前消息不到消費的時間,則在countdown毫秒后再執行任務。如果到消費的時間,就繼續執行下面操作:

// 根據消息的物理偏移量和大小獲取消息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); 

如果獲取到消息,則繼續執行下面操作:

// 重新構建新的消息,包括: // 1.清除消息的延遲級別 // 2.恢復真正的消息主題和隊列Id MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}," + " discard the msg. msg={}", msgInner.getTopic(), msgInner); continue; } // 重新把消息發送到真正的消息隊列上 PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); 

清除了消息的延遲級別,並且恢復了真正的消息主題和隊列Id,重新把消息發送到真正的消息隊列上以后,消費者就可以立即消費了。

總結

經過以上對源碼的分析,可以總結出延遲消息的實現步驟:

  1. 如果消息的延遲級別大於0,則表示該消息為延遲消息,修改該消息的主題為SCHEDULE_TOPIC_XXXX,隊列Id為延遲級別減1。
  2. 消息進入SCHEDULE_TOPIC_XXXX的隊列中。
  3. 定時任務根據上次拉取的偏移量不斷從隊列中取出所有消息。
  4. 根據消息的物理偏移量和大小再次獲取消息。
  5. 根據消息屬性重新創建消息,清除延遲級別,恢復原主題和隊列Id。
  6. 重新發送消息到原主題的隊列中,供消費者進行消費。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM