生產者把消息發送到消息隊列中以后,並不期望被立即消費,而是等待指定時間后才可以被消費者消費,這類消息通常被稱為延遲消息。延遲消息的應用場景其實是非常的廣泛,比如以下的場景:
- 網上直播授課時,在課程開始前15分鍾通知所有學生准備上課。
- 訂單提交成功后1個小時內未支付,訂單需要及時關閉並且釋放對應商品的庫存。
- 用戶超過15天未登錄時,給該用戶發送召回推送。
- 工單提交后超過24小時未處理,向相關責任人發送催促處理的提醒。
針對延遲消息,本文向大家分享五種實現方案,下面我們就來逐一討論各種方案的大致實現和優缺點。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
一、Redis
在Redis中,有一種有序集合(Sorted Set)的數據結構,在有序集合中,所有元素是按照其 Score 進行排序的。我們可以把消息被消費的預期時間戳作為Score,定時任務不斷讀取Score大於當前時間的元素即可。基本流程如下:
- 調用API,傳入執行時間、消息體等數據。
- 生成唯一key,把消息體數據序列化后存入Redis的String結構中。
- 把key和執行時間的時間戳存入Redis的有序集合結構中,有序集合中不存儲具體的消息體數據,而是存儲唯一的key。
- 定時任務不斷讀取時間戳最小的消息。
- 如果時間戳小於當前時間,將key放入作為隊列的Redis的List結構中。
- 另外一個定時任務不斷從隊列中讀取需要消費的消息的key。
- 根據key獲取消息體數據,對消息進行消費。
- 如果消費消息成功,刪除key對應的消息體數據。
- 如果消費消息失敗,重新存入key和時間戳(加60秒)。
具體方案如下圖:

為了避免一個有序集合中存儲過多的延時消息,存入操作以及查詢操作速度變慢的問題,可以建立多個有序集合,通過哈希算法把消息路由到不同的有序集合中去。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
優點
簡單實用,快速落地。
缺點
- 單個有序集合無法支持太大的數據量。
- 定時任務不斷讀取可能造成不必要的請求。
所以,Redis方案並不是一個十分成熟的方案,只是一個支持小消息量可以快速落地的方案。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
二、RabbitMQ
RabbitMQ本身是不支持延遲消息功能的,一般的做法,是通過最大生存時間(Time-To-Live)和死信交換機(Dead Letter Exchanges)兩個特性模擬出延遲消息的功能。消息超過最大生存時間沒有被消費就變成一條死信,便會被重新投遞到死信交換機,然后死信交換機根據綁定規則轉發到對應的死信隊列上,監聽該隊列就可以讓消息被重新消費。
不過,在RabbitMQ的3.5.8版本以后,我們就可以使用官方推薦的rabbitmq delayed message exchange插件很方便地實現延遲消息的功能。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
安裝插件
首先去官方插件列表頁面下載rabbitmq_delayed_message_exchang插件,然后復制到RabbitMQ每個節點的plugins目錄中。使用命令啟用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
一旦插件被啟用,我們就可以開始使用它了。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
使用示例
安裝該插件后會生成支持延遲投遞機制的Exchange類型:x-delayed-message。接收到該類型的消息后不會立即將消息投遞至目標隊列中,而是存儲在mnesia表中,檢測消息達到可投遞時間時再投遞到目標隊列。
使用延遲消息時,需要先聲明一個x-delayed-message類型的交換器機:
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("one-more-exchange", "x-delayed-message", true, false, args);
發送延遲消息,其中在header中添加x-delay,表示延遲的毫秒數:
byte[] messageBodyBytes = "This is a delayed message".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("one-more-exchange", "", props.build(), messageBodyBytes);
優點
大品牌中間件,可靠穩定。
缺點
由於master單節點,導致性能瓶頸,吞吐量受限。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
三、ActiveMQ
ActiveMQ在5.4及以上版本開始支持持久化的延遲消息功能,甚至支持Cron表達式。默認是該功能是不開啟的,如果開啟需要修改配置文件activemq.xml,在broker節點上把schedulerSupport屬性設置為true,如:
<broker xmlns="http://activemq.apache.org/schema/core" schedulerSupport="true">
</broker>
當服務端開啟延遲消息功能以后,客戶端就可以利用下面的屬性發送延遲消息:
- AMQ_SCHEDULED_DELAY:該消息延遲發送的時間,單位為毫秒。
- AMQ_SCHEDULED_PERIOD:每次重新發送該消息的時間間隔,單位為毫秒。
- AMQ_SCHEDULED_REPEAT:重新發送該消息的次數。
- AMQ_SCHEDULED_CRON:使用Cron表達式設置發送該消息的時機。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
使用示例
- 消息延遲60秒發送:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a delayed message");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);
producer.send(message);
- 消息延遲60秒發送,並且重復發送5次,每次間隔10秒:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a delayed message");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);
producer.send(message);
- 利用Cron表達式,每天的凌晨3點發送一次消息:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a delayed message");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, " 0 3 * * *");
producer.send(message);
需要注意的是,Cron表達式是由5位組成的,分別表示分鍾(059)、小時(023)、日(131)、月(112)、星期(0~6,表示星期日到星期六)。
- Cron表達式的優先級高於其他參數,如果在設置了Cron表達式的同時,也設置了其他參數,那么會在每次CRON執行時,再應用其他參數。比如,消息延遲60秒發送,並且重復發送5次,每次間隔10秒,並且每個小時都發送這一系列消息:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);
producer.send(message);
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
優點
大品牌中間件,可靠穩定,甚至支持Cron表達式。
缺點
由於master單節點,導致性能瓶頸,吞吐量受限。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
四、RocketMQ
在RocketMQ中,支持延遲消息,但是不支持任意時間精度的延遲消息,只支持特定級別的延遲消息。如果要支持任意時間精度,不能避免在Broker層面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免產生巨大的性能開銷。
消息延遲級別分別為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個級別。在發送消息時,設置消息延遲級別即可,設置消息延遲級別時有以下3種情況:
- 設置消息延遲級別等於0時,則該消息為非延遲消息。
- 設置消息延遲級別大於等於1並且小於等於18時,消息延遲特定時間,如:設置消息延遲級別等於1,則延遲1s;設置消息延遲級別等於2,則延遲5s,以此類推。
- 設置消息延遲級別大於18時,則該消息延遲級別為18,如:設置消息延遲級別等於20,則延遲2h。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
使用示例
首先,寫一個消費者,用於消費延遲消息:
public class Consumer {
public static void main(String[] args) throws MQClientException {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup");
// 設置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe("OneMoreTopic", "*");
// 注冊回調實現類來處理從broker拉取回來的消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s %s Receive New Messages:%n"
, sdf.format(new Date())
, Thread.currentThread().getName());
for (MessageExt msg : msgs) {
System.out.printf("\tMsg Id: %s%n", msg.getMsgId());
System.out.printf("\tBody: %s%n", new String(msg.getBody()));
}
// 標記該消息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 啟動消費者實例
consumer.start();
System.out.println("Consumer Started.");
}
}
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
再寫一個延遲消息的生產者,用於發送延遲消息:
public class DelayProducer {
public static void main(String[] args) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
// 實例化消息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup");
// 設置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer實例
producer.start();
Message msg = new Message("OneMoreTopic"
, "DelayMessage", "This is a delay message.".getBytes());
//"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
//設置消息延遲級別為3,也就是延遲10s。
msg.setDelayTimeLevel(3);
// 發送消息到一個Broker
SendResult sendResult = producer.send(msg);
// 通過sendResult返回消息是否成功送達
System.out.printf("%s Send Status: %s, Msg Id: %s %n"
, sdf.format(new Date())
, sendResult.getSendStatus()
, sendResult.getMsgId());
// 如果不再發送消息,關閉Producer實例。
producer.shutdown();
}
}
運行生產者以后,就會發送一條延遲消息:
10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006D5AB018B4AAC216E0DB690000
10秒鍾后,消費者收到的這條延遲消息:
10:37:25.026 ConsumeMessageThread_1 Receive New Messages:
Msg Id: C0A8006D5AB018B4AAC216E0DB690000
Body: This is a delay message.
優點
分布式、高吞吐量、高性能、高可靠。
缺點
僅支持18個特定級別的延時,無法自定義延時時間。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
五、定制RocketMQ
上面所說的不支持自定義延時時間的RocketMQ是開源版的,但是在阿里雲中商業版的RocketMQ是支持的,可能因為是業務需求不強或商業因素考慮,究竟什么原因不得而知。有條件的可以直接上阿里雲;沒有條件的可以修改開源版RocketMQ的源碼,實現自己的需求。知己知彼,百戰不殆,先看看RocketMQ開源版本中,支持18個時間級別是怎么實現的:
原理分析
以下分析的RocketMQ源碼的版本號是4.7.1,版本不同源碼略有差別。
CommitLog
在CommitLog中,針對延遲消息做了一些處理:
// 延遲級別大於0,就是延時消息
if (msg.getDelayTimeLevel() > 0) {
// 判斷當前延遲級別,如果大於最大延遲級別,
// 就設置當前延遲級別為最大延遲級別。
if (msg.getDelayTimeLevel() > this.defaultMessageStore
.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore
.getScheduleMessageService().getMaxDelayLevel());
}
// 獲取延遲消息的主題,
// 其中RMQ_SYS_SCHEDULE_TOPIC的值為SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 根據延遲級別獲取延遲消息的隊列Id,
// 隊列Id其實就是延遲級別減1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 備份真正的主題和隊列Id
MessageAccessor.putProperty(msg
, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg
, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 設置延時消息的主題和隊列Id
msg.setTopic(topic);
msg.setQueueId(queueId);
}
可以看到,每一個延遲消息的主題都被暫時更改為SCHEDULE_TOPIC_XXXX,並且根據延遲級別延遲消息變更了新的隊列Id。接下來,處理延遲消息的就是ScheduleMessageService。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
ScheduleMessageService
ScheduleMessageService是由DefaultMessageStore進行初始化的,初始化包括構造對象和調用load方法。最后,再執行ScheduleMessageService的start方法:
public void start() {
// 使用AtomicBoolean確保start方法僅有效執行一次
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 遍歷所有延遲級別
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
// key為延遲級別
Integer level = entry.getKey();
// value為延遲級別對應的毫秒數
Long timeDelay = entry.getValue();
// 根據延遲級別獲得對應隊列的偏移量
Long offset = this.offsetTable.get(level);
// 如果偏移量為null,則設置為0
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
// 為每個延遲級別創建定時任務,
// 第一次啟動任務延遲為FIRST_DELAY_TIME,也就是1秒
this.timer.schedule(
new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 延遲10秒后每隔flushDelayOffsetInterval執行一次任務,
// 其中,flushDelayOffsetInterval默認配置也為10秒
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 持久化每個隊列消費的偏移量
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore
.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
遍歷所有延遲級別,根據延遲級別獲得對應隊列的偏移量,如果偏移量不存在,則設置為0。然后為每個延遲級別創建定時任務,第一次啟動任務延遲為1秒,第二次及以后的啟動任務延遲才是延遲級別相應的延遲時間。
然后,又創建了一個定時任務,用於持久化每個隊列消費的偏移量。持久化的頻率由flushDelayOffsetInterval屬性進行配置,默認為10秒。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
定時任務
ScheduleMessageService的start方法執行之后,每個延遲級別都創建自己的定時任務,這里的定時任務的具體實現就在DeliverDelayedMessageTimerTask類之中,它核心代碼是executeOnTimeup方法之中,我們來看一下主要部分:
// 根據主題和隊列Id獲取消息隊列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC
, delayLevel2QueueId(delayLevel));
如果沒有獲取到對應的消息隊列,則在DELAY_FOR_A_WHILE(默認為100)毫秒后再執行任務。如果獲取到了,就繼續執行下面操作:
// 根據消費偏移量從消息隊列中獲取所有有效消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
如果沒有獲取到有效消息,則在DELAY_FOR_A_WHILE(默認為100)毫秒后再執行任務。如果獲取到了,就繼續執行下面操作:
// 遍歷所有消息
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 獲取消息的物理偏移量
long offsetPy = bufferCQ.getByteBuffer().getLong();
// 獲取消息的物理長度
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
// 省略部分代碼...
long now = System.currentTimeMillis();
// 計算消息應該被消費的時間
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
// 計算下一條消息的偏移量
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)
long countdown = deliverTimestamp - now;
// 省略部分代碼...
}
如果當前消息不到消費的時間,則在countdown毫秒后再執行任務。如果到消費的時間,就繼續執行下面操作:
// 根據消息的物理偏移量和大小獲取消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
如果獲取到消息,則繼續執行下面操作:
// 重新構建新的消息,包括:
// 1.清除消息的延遲級別
// 2.恢復真正的消息主題和隊列Id
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
// 重新把消息發送到真正的消息隊列上
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
清除了消息的延遲級別,並且恢復了真正的消息主題和隊列Id,重新把消息發送到真正的消息隊列上以后,消費者就可以立即消費了。
由於篇幅限制,其中源碼的細節不做過多展開,有興趣的小伙伴可以去GitHub上下載源碼仔細閱讀。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
定制化方案
經過以上對源碼的分析,可以總結出延遲消息的實現步驟:
- 如果消息的延遲級別大於0,則表示該消息為延遲消息,修改該消息的主題為SCHEDULE_TOPIC_XXXX,隊列Id為延遲級別減1。
- 消息進入SCHEDULE_TOPIC_XXXX的隊列中。
- 定時任務根據上次拉取的偏移量不斷從隊列中取出所有消息。
- 根據消息的物理偏移量和大小再次獲取消息。
- 根據消息屬性重新創建消息,清除延遲級別,恢復原主題和隊列Id。
- 重新發送消息到原主題的隊列中,供消費者進行消費。
概括起來如下圖:

在CommitLog中,我們可以根據自定義的延遲時間選擇一個最大的延遲級別,比如:延遲15分鍾消費的消息,那么最大的延遲級別就是10分鍾。在ScheduleMessageService中,判斷消息是否真的到了消費的時間,如果已到了消費的時間,則恢復原主題和隊列Id;如果未到消費的時間,則選擇最大延遲級別重新修改主題和隊列ID。如下圖:

消息主體以及元數據都存儲在CommitLog中,隊列中只存放了在CommitLog中的起始物理偏移量、消息大小和消息Tag的哈希值,雖然需要重新把消息放入隊列中,但空間浪費還是比較有限的。
文章持續更新,微信搜索「萬貓學社」第一時間閱讀,關注后回復「電子書」,免費獲取12本Java必讀技術書籍。
優點
分布式、高吞吐量、高性能、高可靠,支持自定義延時時間。
缺點
定制RocketMQ不易維護,無法升級新版本。
總結
從延遲消息的概念和應用場景出發,我們逐一討論了五種不同的實現方案,分別是:
- 使用Redis的Sorted Set結構。
- 使用RabbitMQ的rabbitmq delayed message exchange插件。
- 使用ActiveMQ的5.4及以上版本的延遲消息功能。
- 使用RocketMQ僅支持特定級別的延遲消息。
- 定制RocketMQ,以重新計算延遲級別的方式實現自定義延時。
以上每個方案都是各自的優點和缺點,所以說延遲消息沒有一個放之四海而皆准的方案,需要根據數據規模和業務需求的實際情況才能確定最適合的方案。
微信公眾號:萬貓學社
微信掃描二維碼
關注后回復「電子書」
獲取12本Java必讀技術書籍
