rocketmq實現延遲隊列(精確到秒級)


開源版本中,只有RocketMQ支持延遲消息,且只支持18個特定級別的延遲

付費版本中,阿里雲和騰訊雲上的MQ產品都支持精度為秒級別的延遲消息

定時消息:Producer將消息發送到消息隊列RocketMQ版服務端,但並不期望立馬投遞這條消息,而是推遲到在當前時間點之后的某一個時間投遞到Consumer進行消費,該消息即定時消息。

延時消息:Producer將消息發送到消息隊列RocketMQ版服務端,但並不期望立馬投遞這條消息,而是延遲一定時間后才投遞到Consumer進行消費,該消息即延時消息。

定時消息與延時消息在代碼配置上存在一些差異,但是最終達到的效果相同:消息在發送到消息隊列RocketMQ版服務端后並不會立馬投遞,而是根據消息中的屬性延遲固定時間后才投遞給消費者。

實現原理(4種實現方案)

1.代理實現 鏈接

2.時間輪和delay-commit-log實現 鏈接

3.時間輪和時間file實現 鏈接

4.基於rocketmq 18個等級來改造 鏈接

適用場景

定時消息和延時消息適用於以下一些場景:

消息生產和消費有時間窗口要求,例如在電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送一條延時消息。

這條消息將會在30分鍾以后投遞給消費者,消費者收到此消息后需要判斷對應的訂單是否已完成支付。

如支付未完成,則關閉訂單。如已完成支付則忽略。

通過消息觸發一些定時任務,例如在某一固定時間點向用戶發送提醒消息。

使用方式 定時消息和延時消息的使用在代碼編寫上存在略微的區別:

發送定時消息需要明確指定消息發送時間點之后的某一時間點作為消息投遞的時間點。

發送延時消息時需要設定一個延時時間長度,消息將從當前發送時間點開始延遲固定時間之后才開始投遞。

注意事項

定時消息的精度會有1s~2s的延遲誤差。

定時和延時消息的msg.setStartDeliverTime參數需要設置成當前時間戳之后的某個時刻(單位毫秒)。

如果被設置成當前時間戳之前的某個時刻,消息將立刻投遞給消費者。

定時和延時消息的msg.setStartDeliverTime參數可設置40天內的任何時刻(單位毫秒),超過40天消息發送將失敗。

StartDeliverTime是服務端開始向消費端投遞的時間。如果消費者當前有消息堆積,那么定時和延時消息會排在堆積消息后面,將不能嚴格按照配置的時間進行投遞。

由於客戶端和服務端可能存在時間差,消息的實際投遞時間與客戶端設置的投遞時間之間可能存在偏差。

如何使用

推薦使用阿里雲提供的rocketmq版本的pom

       <dependency>  <groupId>com.aliyun.openservices</groupId>  <artifactId>ons-client</artifactId>  <version>1.8.4.Final</version>  </dependency> 
 

消息發送


import com.aliyun.openservices.ons.api.*; import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils;  import java.util.Date; import java.util.Properties;  public class ProducerDelayTest {  public static void main(String[] args) {  Properties properties = new Properties();  // AccessKey ID阿里雲身份驗證,在阿里雲RAM控制台創建。  properties.put(PropertyKeyConst.AccessKey, "XXX");  // AccessKey Secret阿里雲身份驗證,在阿里雲RAM控制台創建。  properties.put(PropertyKeyConst.SecretKey, "XXX");  // 設置TCP接入域名,進入消息隊列RocketMQ版控制台實例詳情頁面的接入點區域查看。  properties.put(PropertyKeyConst.NAMESRV_ADDR, "localhost:9876");    Producer producer = ONSFactory.createProducer(properties);  // 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。  producer.start();   for(int i=0;i<1;i++) {  Message msg = new Message(  // 您在消息隊列RocketMQ版控制台創建的Topic。  "TopicTest",  // Message Tag,可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版服務器過濾。  "TagA",  // Message Body可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。  "演示15秒鍾>>> ".getBytes());  // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。  // 以方便您在無法正常收到消息情況下,可通過控制台查詢消息並補發。  // 注意:不設置也不會影響消息正常收發。  msg.setKey("ORDERID_100e");  try {  // 延時消息,單位毫秒(ms),在指定延遲時間(當前時間之后)進行投遞,例如消息在15秒后投遞。  long delayTime = System.currentTimeMillis() + 15000;  System.out.println("發送時間>>" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));   // 設置消息需要被投遞的時間。  msg.setStartDeliverTime(delayTime);   SendResult sendResult = producer.send(msg);  // 同步發送消息,只要不拋異常就是成功。  if (sendResult != null) {  System.out.println(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss") + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());  }  } catch (Exception e) {  // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。  System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());  e.printStackTrace();  }  }  // 在應用退出前,銷毀Producer對象。  // 注意:如果不銷毀也沒有問題。  producer.shutdown();  } }  
 

消息接收


import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt;  import java.util.Date; import java.util.List;  /**  * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.  */ public class Consumer {   public static void main(String[] args) throws InterruptedException, MQClientException {   /*  * Instantiate with specified consumer group name.  */   final int[] totals = {0};  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");   consumer.setNamesrvAddr("localhost:9876");  /*  * Specify name server addresses.  * <p/>  *  * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR  * <pre>  * {@code  * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");  * }  * </pre>  */   /*  * Specify where to start in case the specified consumer group is a brand new one.  */  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);   /*  * Subscribe one more more topics to consume.  */  consumer.subscribe("TopicTest", "*");   /*  * Register callback to execute on arrival of messages fetched from brokers.  */  consumer.registerMessageListener(new MessageListenerConcurrently() {   @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeConcurrentlyContext context) {  System.out.println("接收到消息:"+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));  System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);  for(MessageExt m: msgs){  System.out.println(">>>"+new String( m.getBody()));  }  totals[0] +=1;  System.out.println(">>>>>total="+ totals[0]);  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }  });   /*  * Launch the consumer instance.  */  consumer.start();   System.out.printf("Consumer Started.%n");  } }  
 

如何使用社區版本的rocketmq 發送延遲消息

 /*設置為您在消息隊列RocketMQ版控制台創建的Topic。*/  Message msg = new Message("YOUR TOPIC",  /*設置消息的Tag。*/  "YOUR MESSAGE TAG",  /*消息內容。*/  "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));  /*發送延時消息,需要設置延時時間,單位毫秒(ms),消息將在指定延時時間后投遞,例如消息將在3秒后投遞。*/  long delayTime = System.currentTimeMillis() + 3000;  msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));   /**  *若需要發送定時消息,則需要設置定時時間,消息將在指定時間進行投遞,例如消息將在2021-08-10 18:45:00投遞。  *定時時間格式為:yyyy-MM-dd HH:mm:ss,若設置的時間戳在當前時間之前,則消息將被立即投遞給Consumer。  * long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();  * msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));  */  SendResult sendResult = producer.send(msg);  System.out.printf("%s%n", sendResult);

 

 

 

開源rocketmq延遲隊列實現: 

https://gitee.com/venus-suite/rocketmq-with-delivery-time.git

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM