一.為什么選擇RocketMQ消息隊列?
- 首先RocketMQ是阿里巴巴自研出來的,也已開源。其性能和穩定性從雙11就能看出來,借用阿里的一句官方介紹:歷年雙 11 購物狂歡節零點千萬級 TPS、萬億級數據洪峰,創造了全球最大的業務消息並發以及流轉紀錄(日志類消息除外);
- 在始終保證高性能前提下,支持億級消息堆積,不影響集群的正常服務,在削峰填谷(蓄洪)、微服務解耦的場景下尤為重要;這,就能說明RocketMQ的強大。
二.RocketMQ的特點和優勢(可跳過看三的整合代碼)
- 削峰填谷(主要解決諸如秒殺、搶紅包、企業開門紅等大型活動時皆會帶來較高的流量脈沖,或因沒做相應的保護而導致系統超負荷甚至崩潰,或因限制太過導致請求大量失敗而影響用戶體驗,海量消息堆積能力強)
- 異步解耦(高可用松耦合架構設計,對高依賴的項目之間進行解耦,當下游系統出現宕機,不會影響上游系統的正常運行,或者雪崩)
- 順序消息(順序消息即保證消息的先進先出,比如證券交易過程時間優先原則,交易系統中的訂單創建、支付、退款等流程,航班中的旅客登機消息處理等)
-
分布式事務消息(確保數據的最終一致性,大量引入 MQ 的分布式事務,既可以實現系統之間的解耦,又可以保證最終的數據一致性,減少系統間的交互)
三.SpringBoot 整合RocketMQ(商業雲端版)
- 首先去阿里雲控制台創建所需消息隊列資源,包括消息隊列 RocketMQ 的實例、Topic、Group ID (GID),以及鑒權需要的 AccessKey(AK)。
- 在springboot項目pom.xml添加需要的依賴 ons-client v1.8.0.Final
<!-- RocketMQ --> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.0.Final</version> </dependency>
- 在對應環境的application-xx.properties文件配置參數
##-------鑒權需要的 AccessKey(AK)(實際項目,這里填寫阿里雲自己的賬號信息)--- rocketmq.accessKey=xxAxxxxxxxxxx rocketmq.secretKey=xxxxxxxxxiHxxxxxxxxxxxxxx
## 實例TCP 協議公網接入地址(實際項目,填寫自己阿里雲MQ的公網地址) rocketmq.nameSrvAddr=http://MQ_INST_***********85_BbM********************yuncs.com:80 #普通消息topic (實際項目,填寫自己阿里雲MQ中的topic名稱和groupid) rocketmq.topic=common rocketmq.groupId=GID-message rocketmq.tag=* #定時/延時消息 rocketmq.timeTopic=time-lapse rocketmq.timeGroupId=GID-message rocketmq.timeTag=*
- 封裝MQ配置類:MqConfig
/** * MQ配置加載 * @author laifuwei */ @Configuration @ConfigurationProperties(prefix = "rocketmq") public class MqConfig { private String accessKey; private String secretKey; private String nameSrvAddr; private String topic; private String groupId; private String tag; private String timeTopic; private String timeGroupId; private String timeTag; public Properties getMqPropertie() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); //設置發送超時時間,單位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "4000"); return properties; } public String getAccessKey() { return accessKey; } public void setAccessKey(String accessKey) { this.accessKey = accessKey; } public String getSecretKey() { return secretKey; } public void setSecretKey(String secretKey) { this.secretKey = secretKey; } public String getNameSrvAddr() { return nameSrvAddr; } public void setNameSrvAddr(String nameSrvAddr) { this.nameSrvAddr = nameSrvAddr; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getTimeTopic() { return timeTopic; } public void setTimeTopic(String timeTopic) { this.timeTopic = timeTopic; } public String getTimeGroupId() { return timeGroupId; } public void setTimeGroupId(String timeGroupId) { this.timeGroupId = timeGroupId; } public String getTimeTag() { return timeTag; } public void setTimeTag(String timeTag) { this.timeTag = timeTag; } }
- 給消息生產者注入配置信息,
ProducerBean
用於將Producer
集成至Spring Bean中/** * MQ配置注入生成消息實例 */ @Configuration public class ProducerClient { @Autowired private MqConfig mqConfig; @Bean(initMethod = "start", destroyMethod = "shutdown") public ProducerBean buildProducer() { //ProducerBean用於將Producer集成至Spring Bean中 ProducerBean producer = new ProducerBean(); producer.setProperties(mqConfig.getMqPropertie()); return producer; } }
- 為了方便使用,我封裝了一個發送消息的類,消息的Message參數和配置,看代碼注釋,很容易理解
/** * MQ發送消息助手 * @author laifuwei */ @Component public class ProducerUtil { private Logger logger = LoggerFactory.getLogger(ProducerUtil.class); @Autowired private MqConfig config; @Autowired private ProducerBean producer; /** * 同步發送消息 * @param msgTag 標簽,可用於消息小分類標注 * @param messageBody 消息body內容,生產者自定義內容 * @param msgKey 消息key值,建議設置全局唯一,可不傳,不影響消息投遞 * @return success:SendResult or error:null */ public SendResult sendMsg(String msgTag,byte[] messageBody,String msgKey) { Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody); return this.send(msg,Boolean.FALSE); } /** * 同步發送定時/延時消息 * @param msgTag 標簽,可用於消息小分類標注,對消息進行再歸類 * @param messageBody 消息body內容,生產者自定義內容,二進制形式的數據 * @param msgKey 消息key值,建議設置全局唯一值,可不設置,不影響消息收發 * @param delayTime 服務端發送消息時間,立即發送輸入0或比更早的時間 * @return success:SendResult or error:null */ public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) { Message msg = new Message(config.getTimeTopic(),msgTag,msgKey,messageBody); msg.setStartDeliverTime(delayTime); return this.send(msg,Boolean.FALSE); } /** * 發送單向消息 */ public void sendOneWayMsg(String msgTag,byte[] messageBody,String msgKey) { Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody); this.send(msg,Boolean.TRUE); } /** * 普通消息發送發放 * @param msg 消息 * @param isOneWay 是否單向發送 */ private SendResult send(Message msg,Boolean isOneWay) { try { if(isOneWay) { //由於在 oneway 方式發送消息時沒有請求應答處理,一旦出現消息發送失敗,則會因為沒有重試而導致數據丟失。 //若數據不可丟,建議選用同步或異步發送方式。 producer.sendOneway(msg); success(msg, "單向消息MsgId不返回"); return null; }else { //可靠同步發送 SendResult sendResult = producer.send(msg); //獲取發送結果,不拋異常即發送成功 if (sendResult != null) { success(msg, sendResult.getMessageId()); return sendResult; }else { error(msg,null); return null; } } } catch (Exception e) { error(msg,e); return null; } } //對於使用異步接口,可設置單獨的回調處理線程池,擁有更靈活的配置和監控能力。 //根據項目需要,服務器配置合理設置線程數,線程太多有OOM 風險, private ExecutorService threads = Executors.newFixedThreadPool(3); //僅建議執行輕量級的Callback任務,避免阻塞公共線程池 引起其它鏈路超時。 /** * 異步發送普通消息 * @param msgTag * @param messageBody * @param msgKey */ public void sendAsyncMsg(String msgTag,byte[] messageBody,String msgKey) { producer.setCallbackExecutor(threads); Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody); try { producer.sendAsync(msg, new SendCallback() { @Override public void onSuccess(final SendResult sendResult) { assert sendResult != null; success(msg, sendResult.getMessageId()); } @Override public void onException(final OnExceptionContext context) { //出現異常意味着發送失敗,為了避免消息丟失,建議緩存該消息然后進行重試。 error(msg,context.getException()); } }); } catch (ONSClientException e) { error(msg,e); } } //--------------日志打印---------- private void error(Message msg,Exception e) { logger.error("發送MQ消息失敗-- Topic:{}, Key:{}, tag:{}, body:{}" ,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody())); logger.error("errorMsg --- {}",e.getMessage()); } private void success(Message msg,String messageId) { logger.info("發送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}" ,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody())); } }
- 前面已經配置好了將Producer集成至Spring Bean中,直接注入Producer,在業務系統需要的地方調用來發送消息即可
//普通消息的Producer 已經注冊到了spring容器中,后面需要使用時可以 直接注入到其它類中 @Autowired private ProducerBean producer; /** * 演示方法,可在自己的業務系統方法中進行發送消息 */ public String mqTest() { /* 使用前面封裝的方法,傳入對應的參數即可發送消息 * msgTag 標簽,可用於消息小分類標注 * messageBody 消息body內容,生產者自定義內容,任何二進制數據,生產者和消費者協定數據的序列化和反序列化 * msgKey 消息key值,建議設置全局唯一,比如訂單號,用戶id這種,可不傳,不影響消息投遞 */ //body內容自定義 JSONObject body = new JSONObject(); body.put("userId", "this is userId"); body.put("notice", "同步消息"); //同步發送消息 producer.sendMsg("userMessage", body.toJSONString().getBytes(), "messageId"); //單向消息 producer.sendOneWayMsg("userMessage", "單向消息".getBytes(), "messageId"); //異步消息 producer.sendAsyncMsg("userMessage", "異步消息".getBytes(), "messageId"); //定時/延時消息,當前時間的30秒后推送。時間自己定義 producer.sendTimeMsg("userMessage", "延時消息".getBytes(), "messageId", System.currentTimeMillis()+30000); //順序消息(全局順序 / 分區順序)、分布式事務消息 目前沒用到,可看官網說明操作 return "ok"; }
- 接下來是消息消費者的配置和接收消息(一般在下游系統或者相關聯的系統),接收消息的項目照舊,添加依賴jar包 ons-client v1.8.0.Final 、配置mq參數鏈接(mq的配置文件參數要和生產者項目配置的一樣)、添加MqConfig類(上面有寫)
- 注入配置、訂閱消息、添加消息處理的方法
@Configuration public class ConsumerClient { @Autowired private MqConfig mqConfig; //普通消息監聽器,Consumer注冊消息監聽器來訂閱消息. @Autowired private MqMessageListener messageListener; //定時消息監聽器,Consumer注冊消息監聽器來訂閱消息. @Autowired private MqTimeMessageListener timeMessageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId()); //將消費者線程數固定為20個 20為默認值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); consumerBean.setProperties(properties); //訂閱消息 Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(); //訂閱普通消息 Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.getTopic()); subscription.setExpression(mqConfig.getTag()); subscriptionTable.put(subscription, messageListener); //訂閱定時/延時消息 Subscription subscriptionTime = new Subscription(); subscriptionTime.setTopic(mqConfig.getTimeTopic()); subscriptionTime.setExpression(mqConfig.getTimeTag()); subscriptionTable.put(subscriptionTime, timeMessageListener); consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } }
- 對定時/延時消息監聽類進行實現,處理接收到的消息
/** * 定時/延時MQ消息監聽消費 * @author laifuwei */ @Component public class MqTimeMessageListener extends AutowiredService implements MessageListener { private Logger logger = LoggerFactory.getLogger(this.getClass()); //實現MessageListtener監聽器的消費方法 @Override public Action consume(Message message, ConsumeContext context) {
logger.info("接收到MQ消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(),message.getTag(),message.getMsgID(),message.getKey(),new String(message.getBody()));try {
String msgTag = message.getTag();//消息類型 String msgKey = message.getKey();//業務唯一id switch (msgTag) { //----通過生產者傳的tag標簽進行消息分類和過濾處理 case "userMessage": //通過唯一key的,比如前面key傳的值是訂單號或者用戶id這種唯一值,來進行數據的查詢或處理 //由於RocketMQ能重復推送消息,處理消息的時候做好數據的冪等,防止重復處理 if(//如訂單系統需要判斷訂單是否被處理過等,通過傳的msgKey即訂單號去查詢數據庫進行判斷) {
break; } //驗證通過,處理業務 //do something break; } //消費成功,繼續消費下一條消息 return Action.CommitMessage; } catch (Exception e) { logger.error("消費MQ消息失敗! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage()); //消費失敗,告知服務器稍后再投遞這條消息,繼續消費其他消息 return Action.ReconsumeLater; } } } - 對普通消息進行監聽,消費消息
/** * 普通(默認同步)MQ消息監聽消費 * @author laifuwei */ @Component public class MqMessageListener extends AutowiredService implements MessageListener { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public Action consume(Message message, ConsumeContext context) { logger.info("接收到MQ消息. Topic :" + message.getTopic() + ", tag :" + message.getTag()+ " msgId : " + message.getMsgID()+", Key :" + message.getKey()+", body:" + new String(message.getBody())); try { String msgTag = message.getTag();//消息類型 String msgKey = message.getKey();//唯一key switch (msgTag) { //--------普通通知 case "userMessage": break; } return Action.CommitMessage; } catch (Exception e) { logger.error("消費MQ消息失敗! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage()); return Action.ReconsumeLater; } } }
四.最后運行消費者項目和生產者項目,調用生產者項目發送消息驗證效果:
- 生產者發送消息結果日志:消息發送正常
2019-08-17 15:11:06.837 INFO 9996 --- [nio-8080-exec-9] com.dyj.shop.mq.ProducerUtil : 發送MQ消息成功 -- Topic:common ,msgId:C0A86532270C2A139A5555A7E5DD0000 , Key:messageId, tag:userMessage, body:{"userId":"this is userId","notice":"同步消息"} 2019-08-17 15:11:06.841 INFO 9996 --- [nio-8080-exec-9] com.dyj.shop.mq.ProducerUtil : 發送MQ消息成功 -- Topic:common ,msgId:單向消息MsgId不返回 , Key:messageId, tag:userMessage, body:單向消息 2019-08-17 15:11:06.901 INFO 9996 --- [pool-6-thread-1] com.dyj.shop.mq.ProducerUtil : 發送MQ消息成功 -- Topic:common ,msgId:C0A86532270C2A139A5555A7E6630004 , Key:messageId, tag:userMessage, body:異步消息 2019-08-17 15:11:07.060 INFO 9996 --- [nio-8080-exec-9] com.dyj.shop.mq.ProducerUtil : 發送MQ消息成功 -- Topic:time-lapse ,msgId:C0A86532270C2A139A5555A7E69F0006 , Key:messageId, tag:userMessage, body:定時/延時消息
- 消費者接收到消息,可以看到普通消息的發送時間和接收到消息的時間,就相差幾毫秒,值得注意的是:延時消息按照生產者定義的30秒后消費者才收到。這就是延時消息的好玩之處
2019-08-17 15:11:06.881 INFO 10942 --- [MessageThread_7] com.dyj.timer.mq.MqMessageListener : 接收到MQ消息. Topic :common, tag :userMessage msgId : C0A86532270C2A139A5555A7E5DD0000, Key :messageId, body:{"userId":"this is userId","notice":"同步消息"} 2019-08-17 15:11:06.934 INFO 10942 --- [MessageThread_8] com.dyj.timer.mq.MqMessageListener : 接收到MQ消息. Topic :common, tag :userMessage msgId : C0A86532270C2A139A5555A7E6550002, Key :messageId, body:單向消息 2019-08-17 15:11:06.947 INFO 10942 --- [MessageThread_9] com.dyj.timer.mq.MqMessageListener : 接收到MQ消息. Topic :common, tag :userMessage msgId : C0A86532270C2A139A5555A7E6630004, Key :messageId, body:異步消息 2019-08-17 15:11:36.996 INFO 10942 --- [essageThread_10] com.dyj.timer.mq.MqTimeMessageListener : 接收到MQ消息. Topic :time-lapse, tag :userMessage msgId : cd900e16f7cba68369ec498ae2f9dd6c, Key :messageId, body:定時/延時消
寫在最后:有不妥或有興趣的可以下方留言,多謝指教(#^-^#)