開源版本中,只有RocketMQ支持延遲消息,且只支持18個特定級別的延遲
付費版本中,阿里雲和騰訊雲上的MQ產品都支持精度為秒級別的延遲消息
定時消息:Producer將消息發送到消息隊列RocketMQ版服務端,但並不期望立馬投遞這條消息,而是推遲到在當前時間點之后的某一個時間投遞到Consumer進行消費,該消息即定時消息。
延時消息:Producer將消息發送到消息隊列RocketMQ版服務端,但並不期望立馬投遞這條消息,而是延遲一定時間后才投遞到Consumer進行消費,該消息即延時消息。
定時消息與延時消息在代碼配置上存在一些差異,但是最終達到的效果相同:消息在發送到消息隊列RocketMQ版服務端后並不會立馬投遞,而是根據消息中的屬性延遲固定時間后才投遞給消費者。
實現原理(4種實現方案)
1.代理實現 鏈接
2.時間輪和delay-commit-log實現 鏈接
3.時間輪和時間file實現 鏈接
4.基於rocketmq 18個等級來改造 鏈接
適用場景
定時消息和延時消息適用於以下一些場景:
消息生產和消費有時間窗口要求,例如在電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送一條延時消息。
這條消息將會在30分鍾以后投遞給消費者,消費者收到此消息后需要判斷對應的訂單是否已完成支付。
如支付未完成,則關閉訂單。如已完成支付則忽略。
通過消息觸發一些定時任務,例如在某一固定時間點向用戶發送提醒消息。
使用方式 定時消息和延時消息的使用在代碼編寫上存在略微的區別:
發送定時消息需要明確指定消息發送時間點之后的某一時間點作為消息投遞的時間點。
發送延時消息時需要設定一個延時時間長度,消息將從當前發送時間點開始延遲固定時間之后才開始投遞。
注意事項
定時消息的精度會有1s~2s的延遲誤差。
定時和延時消息的msg.setStartDeliverTime參數需要設置成當前時間戳之后的某個時刻(單位毫秒)。
如果被設置成當前時間戳之前的某個時刻,消息將立刻投遞給消費者。
定時和延時消息的msg.setStartDeliverTime參數可設置40天內的任何時刻(單位毫秒),超過40天消息發送將失敗。
StartDeliverTime是服務端開始向消費端投遞的時間。如果消費者當前有消息堆積,那么定時和延時消息會排在堆積消息后面,將不能嚴格按照配置的時間進行投遞。
由於客戶端和服務端可能存在時間差,消息的實際投遞時間與客戶端設置的投遞時間之間可能存在偏差。
如何使用
推薦使用阿里雲提供的rocketmq版本的pom
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.4.Final</version> </dependency>
消息發送
import com.aliyun.openservices.ons.api.*; import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils; import java.util.Date; import java.util.Properties; public class ProducerDelayTest { public static void main(String[] args) { Properties properties = new Properties(); // AccessKey ID阿里雲身份驗證,在阿里雲RAM控制台創建。 properties.put(PropertyKeyConst.AccessKey, "XXX"); // AccessKey Secret阿里雲身份驗證,在阿里雲RAM控制台創建。 properties.put(PropertyKeyConst.SecretKey, "XXX"); // 設置TCP接入域名,進入消息隊列RocketMQ版控制台實例詳情頁面的接入點區域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "localhost:9876"); Producer producer = ONSFactory.createProducer(properties); // 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。 producer.start(); for(int i=0;i<1;i++) { Message msg = new Message( // 您在消息隊列RocketMQ版控制台創建的Topic。 "TopicTest", // Message Tag,可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版服務器過濾。 "TagA", // Message Body可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。 "演示15秒鍾>>> ".getBytes()); // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。 // 以方便您在無法正常收到消息情況下,可通過控制台查詢消息並補發。 // 注意:不設置也不會影響消息正常收發。 msg.setKey("ORDERID_100e"); try { // 延時消息,單位毫秒(ms),在指定延遲時間(當前時間之后)進行投遞,例如消息在15秒后投遞。 long delayTime = System.currentTimeMillis() + 15000; System.out.println("發送時間>>" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")); // 設置消息需要被投遞的時間。 msg.setStartDeliverTime(delayTime); SendResult sendResult = producer.send(msg); // 同步發送消息,只要不拋異常就是成功。 if (sendResult != null) { System.out.println(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss") + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.