新的閱讀體驗:http://www.zhouhong.icu/post/157
一、業務需求
需要實現一個提前二十分鍾通知用戶去做某件事的一個業務,拿到這個業務首先想到的最簡單得方法就是使用Redis監控Key值:在排計划時候計算當前時間與提前二十分鍾這個時間差,然后使用一個唯一的業務Key壓入Redis中並設定好過期時間,然后只需要讓Redis監控這個Key值即可,當這個Key過期后就可以直接拿到這個Key的值然后實現發消息等業務。
關於Redis實現該業務的具體實現在之前我已經記過一篇筆記,有興趣的可以直接去瞅瞅,但是現在感覺有好多不足之處。
Redis實現定時: http://www.zhouhong.icu/post/144
二、Redis實現定時推送等功能的不足之處
由於Redis不止你一個使用,其他業務也會使用Redis,那么最容易想到的一個缺點就是:1、如果在提醒的那一刻有大量的其他業務的Key也過期了,那么就會很長時間都輪不到你的這個Key,就會出現消息推送延遲等缺點;2、還有一個缺點就是像阿里雲他們的Redis根本就不支持對 Redis 的 Key值得監控(我也是因為公司使用阿里雲的Redis沒法對Key監控才從之前使用Redis監控轉移到使用RocketMQ的延時消息推送的。。。)
三、阿里雲RocketMQ定時/延遲消息隊列實現
其實在實現上非常簡單
1、首先去阿里雲控制台創建所需消息隊列資源,包括消息隊列 RocketMQ 的實例、Topic、Group ID (GID),以及鑒權需要的 AccessKey(AK),一般公司都有現成的可以直接使用。
2、在springboot項目pom.xml添加需要的依賴。
<!--阿里雲MQ TCP--> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.7.1.Final</version> </dependency>
3、在對應環境的application.properties文件配置參數
console:
rocketmq:
tcp:
accessKey: XXXXXXXX使用自己的
secretKey: XXXXXXXXXXXXX使用自己的
nameSrvAddr: XXXXXXXXXXXXXXXX使用自己的
topic: XXXXXXX使用自己的
groupId: XXXXXXX使用自己的
tag: XXXXXXXXX使用自己的
4、封裝MQ配置類
import com.aliyun.openservices.ons.api.PropertyKeyConst; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import java.util.Properties; /** * @Description: MQ配置類 * @Author: zhouhong * @Date: 2021/8/4 */ @Configuration @EnableConfigurationProperties({PatrolMqConfig.class}) @ConfigurationProperties(prefix = "console.rocketmq.tcp") @Primary public class PatrolMqConfig { private String accessKey; private String secretKey; private String nameSrvAddr; private String topic; private String groupId; private String tag; private String orderTopic; private String orderGroupId; private String orderTag; public Properties getMqPropertie() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); return properties; } public String getAccessKey() { return accessKey; } public void setAccessKey(String accessKey) { this.accessKey = accessKey; } public String getSecretKey() { return secretKey; } public void setSecretKey(String secretKey) { this.secretKey = secretKey; } public String getNameSrvAddr() { return nameSrvAddr; } public void setNameSrvAddr(String nameSrvAddr) { this.nameSrvAddr = nameSrvAddr; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getOrderTopic() { return orderTopic; } public void setOrderTopic(String orderTopic) { this.orderTopic = orderTopic; } public String getOrderGroupId() { return orderGroupId; } public void setOrderGroupId(String orderGroupId) { this.orderGroupId = orderGroupId; } public String getOrderTag() { return orderTag; } public void setOrderTag(String orderTag) { this.orderTag = orderTag; } }
5、配置生產者
import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class PatrolProducerClient { @Autowired private PatrolMqConfig mqConfig; @Bean(name = "ConsoleProducer", initMethod = "start", destroyMethod = "shutdown") public ProducerBean buildProducer() { ProducerBean producer = new ProducerBean(); producer.setProperties(mqConfig.getMqPropertie()); return producer; } }
6、消費者訂閱
import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import java.util.Properties; //項目中加上 @Configuration 注解,這樣服務啟動時consumer也啟動了 @Configuration @Slf4j public class PatrolConsumerClient { @Autowired private PatrolMqConfig mqConfig; @Autowired private MqTimeMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId()); //將消費者線程數固定為20個 20為默認值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); consumerBean.setProperties(properties); //訂閱關系 Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(); Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.getTopic()); subscription.set); subscriptionTable.put(subscription, messageListener); //訂閱多個topic如上面設置 consumerBean.setSubscriptionTable(subscriptionTable); System.err.println("訂閱成功!"); return consumerBean; } }
7、定時延時MQ消息監聽消費
/** * @Description: 定時/延時MQ消息監聽消費 * @Author: zhouhong * @Create: 2021-08-03 09:16 **/ @Component public class MqTimeMessageListener implements MessageListener { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public Action consume(Message message, ConsumeContext context) { System.err.println("收到消息啦!!"); logger.info("接收到MQ消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}", message.getTopic(),message.getTag(),message.getMsgID(),message.getKey(),new String(message.getBody())); try { String msgTag = message.getTag(); // 消息類型 String msgKey = message.getKey(); // 業務唯一id switch (msgTag) { case "XXXX": // TODO 具體業務實現,比如發消息等操作 System.err.println("推送成功!!!!"); break; } return Action.CommitMessage; } catch (Exception e) { logger.error("消費MQ消息失敗! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage()); //消費失敗,告知服務器稍后再投遞這條消息,繼續消費其他消息 return Action.ReconsumeLater; } } }
8、封裝一個發延時/定時消息的工具類
/** * @Description: MQ發送消息助手 * @Author: zhouhong * @Create: 2021-08-03 09:06 **/ @Component public class ProducerUtil { private Logger logger = LoggerFactory.getLogger(ProducerUtil.class); @Autowired private PatrolMqConfig config; @Resource(name = "ConsoleProducer") ProducerBean producerBean; public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) { Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody); msg.setStartDeliverTime(delayTime); return this.send(msg,Boolean.FALSE); } /** * 普通消息發送發放 * @param msg 消息 * @param isOneWay 是否單向發送 */ private SendResult send(Message msg,Boolean isOneWay) { try { if(isOneWay) { //由於在 oneway 方式發送消息時沒有請求應答處理,一旦出現消息發送失敗,則會因為沒有重試而導致數據丟失。 //若數據不可丟,建議選用同步或異步發送方式。 producerBean.sendOneway(msg); success(msg, "單向消息MsgId不返回"); return null; }else { //可靠同步發送 SendResult sendResult = producerBean.send(msg); //獲取發送結果,不拋異常即發送成功 if (sendResult != null) { success(msg, sendResult.getMessageId()); return sendResult; }else { error(msg,null); return null; } } } catch (Exception e) { error(msg,e); return null; } } private ExecutorService threads = Executors.newFixedThreadPool(3); private void error(Message msg,Exception e) { logger.error("發送MQ消息失敗-- Topic:{}, Key:{}, tag:{}, body:{}" ,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody())); logger.error("errorMsg --- {}",e.getMessage()); } private void success(Message msg,String messageId) { logger.info("發送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}" ,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody())); } }
9、接口測試(10000表示延遲10秒,可以根據自己的業務計算出)
// 測試MQ延時 @Autowired ProducerUtil producerUtil; @PostMapping("/patrolTaskTemp/mqtest") public void mqTime(){ producerUtil.sendTimeMsg( "SMARTPATROL", "你好鴨!!!".getBytes(), "紅紅火火恍恍惚惚!!", System.currentTimeMillis() + 10000 ); }
10、結果
2021-08-04 22:07:12.677 INFO 17548 --- [nio-8498-exec-2] c.h.i.i.s.m.common.util.ProducerUtil : 發送MQ消息成功 -- Topic:TID_COMMON ,msgId:C0A80168448C2F0E140B14322CB30000 , Key:紅紅火火恍恍惚惚!!, tag:SMARTPATROL, body:你好鴨!!!
收到消息啦!!
推送成功!!!!
2021-08-04 22:07:22.179 INFO 17548 --- [MessageThread_1] c.h.i.i.s.m.m.t.n.MqTimeMessageListener : 接收到MQ消息 -- Topic:TID_COMMON, tag:SMARTPATROL,msgId:0b17f2e71ebd1b054c2c156f6d1d1655 , Key:紅紅火火恍恍惚惚!!, body:你好鴨!!!